From 9cb262367e272ad6b0361a6c693287a007d3adf9 Mon Sep 17 00:00:00 2001 From: hzhang Date: Fri, 15 May 2026 18:24:35 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20working=20v1=20=E2=80=94=20full=20Fabri?= =?UTF-8?q?c<->openclaw=20round-trip=20verified?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Real channel-turn dispatch (resolveAgentRoute + finalizeInboundContext + dispatchInboundReplyWithBase), wakeup->drop/dispatch, messaging target grammar (fabric:) + outbound.sendText, tools use execute/parameters. Verified live: human msg in Fabric -> wakeup -> openclaw agent runs -> reply posted back into the Fabric channel as the agent. Co-Authored-By: Claude Opus 4.7 (1M context) --- dist/fabric/index.js | 2 +- dist/fabric/src/channel.js | 120 ++++++++++++++++++++-------- dist/fabric/src/inbound.js | 132 ++++++++++++++++--------------- dist/fabric/src/tools.js | 12 +-- index.ts | 9 ++- openclaw.plugin.json | 20 ++++- src/channel.ts | 79 ++++++++++++++++--- src/inbound.ts | 158 ++++++++++++++++++++----------------- src/tools.ts | 12 +-- 9 files changed, 349 insertions(+), 195 deletions(-) diff --git a/dist/fabric/index.js b/dist/fabric/index.js index b1949ee..3cfd5fe 100644 --- a/dist/fabric/index.js +++ b/dist/fabric/index.js @@ -54,7 +54,7 @@ export default defineChannelPluginEntry({ api.logger.warn('fabric: runtime not set; inbound disabled'); return; } - inbound = new FabricInbound(runtimeRef, client, identity, api.logger, accounts); + inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts); void inbound.start(); api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`); }); diff --git a/dist/fabric/src/channel.js b/dist/fabric/src/channel.js index 979a58a..8ecd4d6 100644 --- a/dist/fabric/src/channel.js +++ b/dist/fabric/src/channel.js @@ -8,13 +8,52 @@ // "channel"> (so `messageId: string` is required) // Casts at the createChatChannelPlugin boundary are intentional and // localized; keep them here so upgrades touch one file. -import { createChatChannelPlugin, createChannelPluginBase, } from 'openclaw/plugin-sdk/core'; +import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core'; import { FabricClient } from './fabric-client.js'; import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js'; +// ---- target grammar: fabric: ---- +export function stripFabricTargetPrefix(raw) { + let s = (raw ?? '').trim(); + if (!s) + return undefined; + if (s.toLowerCase().startsWith('fabric:')) + s = s.slice('fabric:'.length).trim(); + if (s.toLowerCase().startsWith('channel:')) + s = s.slice('channel:'.length).trim(); + return s || undefined; +} +export function normalizeFabricTarget(raw) { + const id = stripFabricTargetPrefix(raw); + return id ? `fabric:${id}`.toLowerCase() : undefined; +} +export function looksLikeFabricTargetId(raw) { + const t = (raw ?? '').trim(); + if (!t) + return false; + if (/^fabric:/i.test(t)) + return true; + return /^[a-z0-9-]{8,}$/i.test(t); +} +export function resolveFabricOutboundSessionRoute(params) { + const id = stripFabricTargetPrefix(params.target); + if (!id) + return null; + return buildChannelOutboundSessionRoute({ + cfg: params.cfg, + agentId: params.agentId, + channel: 'fabric', + accountId: params.accountId, + peer: { kind: 'group', id }, + chatType: 'group', + from: `fabric:channel:${id}`, + to: `fabric:${id}`, + }); +} // Posts an agent's reply to Fabric. `to` is the Fabric channelId; `accountId` // is the agentId (= Fabric identity). One auth concept: account apiKey -> // agent/login -> guild token -> POST message. async function sendToFabric(cfg, accountId, to, text) { + const channelId = stripFabricTargetPrefix(to) ?? to; const acc = resolveFabricAccount(cfg, accountId); if (!acc.fabricApiKey) throw new Error(`fabric account ${acc.accountId} has no fabricApiKey`); @@ -29,46 +68,54 @@ async function sendToFabric(cfg, accountId, to, text) { headers: { authorization: `Bearer ${gt}` }, }); const channels = res.ok ? (await res.json()) : []; - if (channels.some((c) => c.id === to)) { - await client.postMessage(g.endpoint, gt, to, text, session.user.id); - return { messageId: `${to}:${Date.now()}` }; + if (channels.some((c) => c.id === channelId)) { + await client.postMessage(g.endpoint, gt, channelId, text, session.user.id); + return { messageId: `${channelId}:${Date.now()}` }; } } // fallback: first guild const g = session.guilds[0]; const gt = session.guildAccessTokens.find((t) => t.guildNodeId === g?.nodeId)?.token; if (g && gt) { - await client.postMessage(g.endpoint, gt, to, text, session.user.id); - return { messageId: `${to}:${Date.now()}` }; + await client.postMessage(g.endpoint, gt, channelId, text, session.user.id); + return { messageId: `${channelId}:${Date.now()}` }; } throw new Error('fabric: no guild available to deliver'); } export const fabricChannelPlugin = createChatChannelPlugin({ - base: createChannelPluginBase({ - id: 'fabric', - meta: { id: 'fabric', label: 'Fabric', blurb: 'Connect OpenClaw agents to a Fabric guild.' }, - capabilities: { - chatTypes: ['channel', 'group', 'direct'], - reactions: false, - threads: false, - media: false, - nativeCommands: false, - blockStreaming: true, + base: { + ...createChannelPluginBase({ + id: 'fabric', + meta: { id: 'fabric', label: 'Fabric', blurb: 'Connect OpenClaw agents to a Fabric guild.' }, + capabilities: { + chatTypes: ['channel', 'group', 'direct'], + reactions: false, + threads: false, + media: false, + nativeCommands: false, + blockStreaming: true, + }, + reload: { configPrefixes: ['channels.fabric'] }, + config: { + listAccountIds: (cfg) => listFabricAccountIds(cfg), + resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId), + defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg), + isConfigured: (account) => Boolean(account.fabricApiKey), + }, + // Minimal setup adapter: Fabric is configured directly under + // channels.fabric.* (no interactive wizard). applyAccountConfig is the + // only required member. + setup: { + applyAccountConfig: ({ cfg }) => cfg, + }, + }), + messaging: { + normalizeTarget: normalizeFabricTarget, + resolveSessionTarget: ({ id }) => normalizeFabricTarget(`fabric:${id}`), + resolveOutboundSessionRoute: (params) => resolveFabricOutboundSessionRoute(params), + targetResolver: { looksLikeId: looksLikeFabricTargetId, hint: '' }, }, - reload: { configPrefixes: ['channels.fabric'] }, - config: { - listAccountIds: (cfg) => listFabricAccountIds(cfg), - resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId), - defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg), - isConfigured: (account) => Boolean(account.fabricApiKey), - }, - // Minimal setup adapter: Fabric is configured directly under - // channels.fabric.* (no interactive wizard). applyAccountConfig is the - // only required member. - setup: { - applyAccountConfig: ({ cfg }) => cfg, - }, - }), + }, security: { dm: { channelKey: 'fabric', @@ -83,8 +130,17 @@ export const fabricChannelPlugin = createChatChannelPlugin({ attachedResults: { channel: 'fabric', sendText: async (ctx) => { - const cfg = (ctx.cfg ?? {}); - return sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text); + // openclaw passes config under cfg or config depending on path + const cfg = (ctx.cfg ?? ctx.config ?? {}); + try { + const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text); + console.log(`[fabric] outbound.sendText -> ${ctx.to} ok`); + return r; + } + catch (e) { + console.log(`[fabric] outbound.sendText FAILED to=${ctx.to}: ${String(e)}`); + throw e; + } }, }, }, diff --git a/dist/fabric/src/inbound.js b/dist/fabric/src/inbound.js index dc5fde3..4f52d09 100644 --- a/dist/fabric/src/inbound.js +++ b/dist/fabric/src/inbound.js @@ -1,18 +1,19 @@ import { io } from 'socket.io-client'; -// One live Fabric connection per agent identity (Phase 1 = B1). Lives in the -// channel-plugin runtime (no separate sidecar). Firehose (B2) would replace -// this class behind the same dispatch() call. +import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; export class FabricInbound { - runtime; + core; + cfg; client; identity; log; accounts; sockets = []; - timers = []; seen = new Set(); - constructor(runtime, client, identity, log, accounts = []) { - this.runtime = runtime; + constructor(core, // PluginRuntime + cfg, // OpenClawConfig + client, identity, log, accounts = []) { + this.core = core; + this.cfg = cfg; this.client = client; this.identity = identity; this.log = log; @@ -40,12 +41,9 @@ export class FabricInbound { } } stop() { - for (const t of this.timers) - clearInterval(t); for (const s of this.sockets) s.disconnect(); this.sockets = []; - this.timers = []; } async connectAgent(agentId, session) { const selfUserId = session.user.id; @@ -60,12 +58,11 @@ export class FabricInbound { }); const joinAll = async () => { try { - const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { - headers: { authorization: `Bearer ${tok}` }, - }); + const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } }); const channels = res.ok ? (await res.json()) : []; for (const c of channels) socket.emit('join_channel', { channelId: c.id }); + this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); } catch { /* best effort */ @@ -76,7 +73,6 @@ export class FabricInbound { const channelId = m.channelId ?? ''; if (!channelId) return; - // self-echo guard + dedupe if (m.authorUserId && m.authorUserId === selfUserId) return; const key = `${agentId}:${m.messageId}`; @@ -85,67 +81,75 @@ export class FabricInbound { this.seen.add(key); if (this.seen.size > 5000) this.seen.clear(); - void this.dispatch(agentId, g, channelId, m); + void this.dispatch(agentId, g, channelId, m, session); }); socket.connect(); this.sockets.push(socket); } } - // Hand the inbound Fabric message to OpenClaw's channel-turn kernel. - // wakeup === true -> dispatch (agent runs, may reply) - // wakeup !== true -> drop but keep as group history/context - async dispatch(agentId, guild, channelId, m) { - const admit = m.wakeup === true; + async dispatch(agentId, guild, channelId, m, session) { + // wakeup === false -> drop (Fabric already decided this agent is silent) + if (m.wakeup !== true) { + this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`); + return; + } + this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`); + const core = this.core; + const cfg = this.cfg; try { - await this.runtime.channel.turn.run({ + const route = core.channel.routing.resolveAgentRoute({ + cfg: this.cfg, channel: 'fabric', accountId: agentId, - raw: m, - adapter: { - ingest: (raw) => ({ - id: raw.messageId, - timestamp: raw.createdAt ? Date.parse(raw.createdAt) : Date.now(), - rawText: raw.content, - textForAgent: raw.content, - }), - classify: () => ({ kind: 'message', canStartAgentTurn: admit }), - preflight: () => admit ? {} : { admission: { kind: 'drop', reason: 'no-wakeup', recordHistory: true } }, - resolveTurn: (input) => ({ - route: { - agentId, - routeSessionKey: `agent:${agentId}:fabric:channel:${channelId}`, - createIfMissing: true, - }, - conversation: { kind: 'channel', id: channelId, label: `fabric:${guild.nodeId}` }, - reply: { to: channelId, nativeChannelId: channelId }, - message: { - body: m.content, - rawBody: m.content, - bodyForAgent: m.content, - envelopeFrom: m.authorUserId ?? 'fabric', - }, - delivery: { - deliver: async (payload) => { - const text = typeof payload?.text === 'string' ? payload.text : ''; - if (!text.trim()) - return { visibleReplySent: false }; - const entry = this.identity.findByAgentId(agentId); - const session = entry ? await this.client.agentLogin(entry.fabricApiKey) : null; - const gt = session?.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token; - if (!session || !gt) - return { visibleReplySent: false }; - await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); - return { visibleReplySent: true }; - }, - }, - meta: { admission: admit ? { kind: 'dispatch' } : { kind: 'drop', recordHistory: true } }, - }), - }, - log: (e) => this.runtime.log?.debug?.(`fabric.turn.${e?.stage}`), + peer: { kind: 'group', id: channelId }, }); + const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const ctxPayload = core.channel.reply.finalizeInboundContext({ + Body: m.content, + BodyForAgent: m.content, + RawBody: m.content, + CommandBody: m.content, + From: `fabric:channel:${channelId}`, + To: `fabric:${channelId}`, + SessionKey: route.sessionKey, + AccountId: route.accountId ?? agentId, + ChatType: 'group', + ConversationLabel: `fabric:${guild.nodeId}`, + SenderId: m.authorUserId ?? 'fabric', + Provider: 'fabric', + Surface: 'fabric', + MessageSid: m.messageId, + Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(), + OriginatingChannel: 'fabric', + OriginatingTo: `fabric:${channelId}`, + }); + const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token; + await dispatchInboundReplyWithBase({ + cfg: this.cfg, + channel: 'fabric', + accountId: agentId, + route, + storePath, + ctxPayload: ctxPayload, + core: this.core, + deliver: async (payload) => { + const text = (payload?.text ?? '').trim(); + this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`); + if (!text || !gt) + return; + await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); + this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`); + }, + onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`), + onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`), + replyOptions: {}, + }); + this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`); } catch (err) { - this.log.warn(`fabric: turn.run failed agent=${agentId} channel=${channelId}: ${String(err)}`); + this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`); } } } diff --git a/dist/fabric/src/tools.js b/dist/fabric/src/tools.js index 1c1b2a5..102dd0c 100644 --- a/dist/fabric/src/tools.js +++ b/dist/fabric/src/tools.js @@ -21,7 +21,7 @@ export function registerFabricTools(api, client, identity) { api.registerTool((ctx) => ({ name: 'fabric-register', description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).", - inputSchema: { + parameters: { type: 'object', additionalProperties: false, required: ['fabricApiKey'], @@ -29,7 +29,7 @@ export function registerFabricTools(api, client, identity) { fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' }, }, }, - handler: async (params) => { + execute: async (params) => { const agentId = ctx.agentId; if (!agentId) return { ok: false, error: 'no agent context' }; @@ -46,7 +46,7 @@ export function registerFabricTools(api, client, identity) { const makeCreate = (kind) => api.registerTool((ctx) => ({ name: `create-${kind}-channel`, description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`, - inputSchema: { + parameters: { type: 'object', additionalProperties: false, required: ['guildNodeId', 'name'], @@ -59,7 +59,7 @@ export function registerFabricTools(api, client, identity) { listeners: { type: 'array', items: { type: 'string' } }, }, }, - handler: async (p) => { + execute: async (p) => { const agentId = ctx.agentId; if (!agentId) return { ok: false, error: 'no agent context' }; @@ -83,7 +83,7 @@ export function registerFabricTools(api, client, identity) { api.registerTool((ctx) => ({ name: 'discussion-complete', description: 'Conclude a discussion: post a summary then close the channel.', - inputSchema: { + parameters: { type: 'object', additionalProperties: false, required: ['guildNodeId', 'channelId', 'summary'], @@ -94,7 +94,7 @@ export function registerFabricTools(api, client, identity) { callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' }, }, }, - handler: async (p) => { + execute: async (p) => { const agentId = ctx.agentId; if (!agentId) return { ok: false, error: 'no agent context' }; diff --git a/index.ts b/index.ts index d04f127..f46b21c 100644 --- a/index.ts +++ b/index.ts @@ -72,7 +72,14 @@ export default defineChannelPluginEntry({ api.logger.warn('fabric: runtime not set; inbound disabled'); return; } - inbound = new FabricInbound(runtimeRef as never, client, identity, api.logger, accounts); + inbound = new FabricInbound( + runtimeRef, + api.config, + client, + identity, + api.logger, + accounts, + ); void inbound.start(); api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`); }); diff --git a/openclaw.plugin.json b/openclaw.plugin.json index 6040725..43edf54 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -40,7 +40,25 @@ "description": "Fabric Center API base, e.g. http://localhost:7001/api" }, "dmSecurity": { "type": "string" }, - "allowFrom": { "type": "array", "items": { "type": "string" } } + "dmPolicy": { "type": "string" }, + "enabled": { "type": "boolean" }, + "allowFrom": { "type": "array", "items": { "type": "string" } }, + "defaultAccount": { "type": "string" }, + "accounts": { + "type": "object", + "description": "agent = account; key is the openclaw agentId", + "additionalProperties": { + "type": "object", + "additionalProperties": false, + "properties": { + "fabricApiKey": { "type": "string" }, + "centerApiBase": { "type": "string" }, + "enabled": { "type": "boolean" }, + "dmPolicy": { "type": "string" }, + "allowFrom": { "type": "array", "items": { "type": "string" } } + } + } + } } }, "uiHints": { diff --git a/src/channel.ts b/src/channel.ts index 28078ca..64f5668 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -11,6 +11,8 @@ import { createChatChannelPlugin, createChannelPluginBase, + buildChannelOutboundSessionRoute, + type ChannelOutboundSessionRouteParams, } from 'openclaw/plugin-sdk/core'; import { FabricClient } from './fabric-client.js'; import { @@ -22,6 +24,39 @@ import { type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown }; +// ---- target grammar: fabric: ---- +export function stripFabricTargetPrefix(raw: string): string | undefined { + let s = (raw ?? '').trim(); + if (!s) return undefined; + if (s.toLowerCase().startsWith('fabric:')) s = s.slice('fabric:'.length).trim(); + if (s.toLowerCase().startsWith('channel:')) s = s.slice('channel:'.length).trim(); + return s || undefined; +} +export function normalizeFabricTarget(raw: string): string | undefined { + const id = stripFabricTargetPrefix(raw); + return id ? `fabric:${id}`.toLowerCase() : undefined; +} +export function looksLikeFabricTargetId(raw: string): boolean { + const t = (raw ?? '').trim(); + if (!t) return false; + if (/^fabric:/i.test(t)) return true; + return /^[a-z0-9-]{8,}$/i.test(t); +} +export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) { + const id = stripFabricTargetPrefix(params.target); + if (!id) return null; + return buildChannelOutboundSessionRoute({ + cfg: params.cfg, + agentId: params.agentId, + channel: 'fabric', + accountId: params.accountId, + peer: { kind: 'group', id }, + chatType: 'group', + from: `fabric:channel:${id}`, + to: `fabric:${id}`, + }); +} + // Posts an agent's reply to Fabric. `to` is the Fabric channelId; `accountId` // is the agentId (= Fabric identity). One auth concept: account apiKey -> // agent/login -> guild token -> POST message. @@ -31,6 +66,7 @@ async function sendToFabric( to: string, text: string, ): Promise<{ messageId: string }> { + const channelId = stripFabricTargetPrefix(to) ?? to; const acc = resolveFabricAccount(cfg as never, accountId); if (!acc.fabricApiKey) throw new Error(`fabric account ${acc.accountId} has no fabricApiKey`); const client = new FabricClient(acc.centerApiBase); @@ -43,23 +79,24 @@ async function sendToFabric( headers: { authorization: `Bearer ${gt}` }, }); const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : []; - if (channels.some((c) => c.id === to)) { - await client.postMessage(g.endpoint, gt, to, text, session.user.id); - return { messageId: `${to}:${Date.now()}` }; + if (channels.some((c) => c.id === channelId)) { + await client.postMessage(g.endpoint, gt, channelId, text, session.user.id); + return { messageId: `${channelId}:${Date.now()}` }; } } // fallback: first guild const g = session.guilds[0]; const gt = session.guildAccessTokens.find((t) => t.guildNodeId === g?.nodeId)?.token; if (g && gt) { - await client.postMessage(g.endpoint, gt, to, text, session.user.id); - return { messageId: `${to}:${Date.now()}` }; + await client.postMessage(g.endpoint, gt, channelId, text, session.user.id); + return { messageId: `${channelId}:${Date.now()}` }; } throw new Error('fabric: no guild available to deliver'); } export const fabricChannelPlugin = createChatChannelPlugin({ - base: createChannelPluginBase({ + base: { + ...createChannelPluginBase({ id: 'fabric', meta: { id: 'fabric', label: 'Fabric', blurb: 'Connect OpenClaw agents to a Fabric guild.' }, capabilities: { @@ -83,7 +120,15 @@ export const fabricChannelPlugin = createChatChannelPlugin cfg as never, } as never, - }) as never, + }), + messaging: { + normalizeTarget: normalizeFabricTarget, + resolveSessionTarget: ({ id }: { id: string }) => normalizeFabricTarget(`fabric:${id}`), + resolveOutboundSessionRoute: (params: ChannelOutboundSessionRouteParams) => + resolveFabricOutboundSessionRoute(params), + targetResolver: { looksLikeId: looksLikeFabricTargetId, hint: '' }, + }, + } as never, security: { dm: { @@ -100,9 +145,23 @@ export const fabricChannelPlugin = createChatChannelPlugin { - const cfg = (ctx.cfg ?? {}) as AnyCfg; - return sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text); + sendText: async (ctx: { + accountId?: string | null; + to: string; + text: string; + cfg?: unknown; + config?: unknown; + }) => { + // openclaw passes config under cfg or config depending on path + const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg; + try { + const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text); + console.log(`[fabric] outbound.sendText -> ${ctx.to} ok`); + return r; + } catch (e) { + console.log(`[fabric] outbound.sendText FAILED to=${ctx.to}: ${String(e)}`); + throw e; + } }, }, } as never, diff --git a/src/inbound.ts b/src/inbound.ts index 6863222..9dcd422 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -1,20 +1,24 @@ import { io, type Socket } from 'socket.io-client'; +import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; import type { FabricClient, FabricSession } from './fabric-client.js'; import type { IdentityRegistry } from './identity.js'; -// OpenClaw plugin runtime — only the channel-turn kernel surface we use. -// Typed loosely on purpose: the concrete shapes come from -// openclaw/plugin-sdk/core at the host's SDK version. -type PluginRuntime = { +// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled +// channels (nextcloud-talk) drive the kernel: +// core = PluginRuntime (from setRuntime) +// route = core.channel.routing.resolveAgentRoute(...) +// ctx = core.channel.reply.finalizeInboundContext(...) // has SessionKey +// dispatch= dispatchInboundReplyWithBase({ cfg, route, ctxPayload, core, deliver }) +// `core.channel.*` is accessed loosely so unrelated SDK drift won't break us. +type Core = { channel: { - turn: { - run(args: unknown): Promise; - }; + routing: { resolveAgentRoute: (p: unknown) => { agentId: string; sessionKey: string; accountId?: string } }; + session: { resolveStorePath: (store: unknown, o: { agentId: string }) => string }; + reply: { finalizeInboundContext: (p: Record) => unknown }; }; - log?: { debug?: (m: string, x?: unknown) => void }; }; -type Logger = { info: (m: string) => void; warn: (m: string) => void }; +type Logger = { info: (m: string) => void; warn: (m: string) => void; error?: (m: string) => void }; type FabricMessage = { messageId: string; @@ -22,20 +26,17 @@ type FabricMessage = { content: string; authorUserId?: string; createdAt?: string; - // per-recipient metadata Fabric attaches at push time (this agent's view) + channelId?: string; wakeup?: boolean; }; -// One live Fabric connection per agent identity (Phase 1 = B1). Lives in the -// channel-plugin runtime (no separate sidecar). Firehose (B2) would replace -// this class behind the same dispatch() call. export class FabricInbound { private sockets: Socket[] = []; - private timers: NodeJS.Timeout[] = []; private seen = new Set(); constructor( - private readonly runtime: PluginRuntime, + private readonly core: unknown, // PluginRuntime + private readonly cfg: unknown, // OpenClawConfig private readonly client: FabricClient, private readonly identity: IdentityRegistry, private readonly log: Logger, @@ -65,10 +66,8 @@ export class FabricInbound { } stop(): void { - for (const t of this.timers) clearInterval(t); for (const s of this.sockets) s.disconnect(); this.sockets = []; - this.timers = []; } private async connectAgent(agentId: string, session: FabricSession): Promise { @@ -76,101 +75,112 @@ export class FabricInbound { for (const g of session.guilds) { const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; if (!tok) continue; - const socket = io(`${g.endpoint}/realtime`, { transports: ['websocket'], auth: { token: tok }, autoConnect: false, }); - const joinAll = async () => { try { - const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { - headers: { authorization: `Bearer ${tok}` }, - }); + const res = await fetch( + `${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, + { headers: { authorization: `Bearer ${tok}` } }, + ); const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : []; for (const c of channels) socket.emit('join_channel', { channelId: c.id }); + this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); } catch { /* best effort */ } }; - socket.on('connect', () => void joinAll()); - socket.on('message.created', (m: FabricMessage & { channelId?: string }) => { + socket.on('message.created', (m: FabricMessage) => { const channelId = m.channelId ?? ''; if (!channelId) return; - // self-echo guard + dedupe if (m.authorUserId && m.authorUserId === selfUserId) return; const key = `${agentId}:${m.messageId}`; if (this.seen.has(key)) return; this.seen.add(key); if (this.seen.size > 5000) this.seen.clear(); - void this.dispatch(agentId, g, channelId, m); + void this.dispatch(agentId, g, channelId, m, session); }); - socket.connect(); this.sockets.push(socket); } } - // Hand the inbound Fabric message to OpenClaw's channel-turn kernel. - // wakeup === true -> dispatch (agent runs, may reply) - // wakeup !== true -> drop but keep as group history/context private async dispatch( agentId: string, guild: { nodeId: string; endpoint: string }, channelId: string, m: FabricMessage, + session: FabricSession, ): Promise { - const admit = m.wakeup === true; + // wakeup === false -> drop (Fabric already decided this agent is silent) + if (m.wakeup !== true) { + this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`); + return; + } + this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`); + + const core = this.core as Core & Record; + const cfg = this.cfg as { session?: { store?: unknown } }; try { - await this.runtime.channel.turn.run({ + const route = core.channel.routing.resolveAgentRoute({ + cfg: this.cfg, channel: 'fabric', accountId: agentId, - raw: m, - adapter: { - ingest: (raw: FabricMessage) => ({ - id: raw.messageId, - timestamp: raw.createdAt ? Date.parse(raw.createdAt) : Date.now(), - rawText: raw.content, - textForAgent: raw.content, - }), - classify: () => ({ kind: 'message', canStartAgentTurn: admit }), - preflight: () => - admit ? {} : { admission: { kind: 'drop', reason: 'no-wakeup', recordHistory: true } }, - resolveTurn: (input: { id: string }) => ({ - route: { - agentId, - routeSessionKey: `agent:${agentId}:fabric:channel:${channelId}`, - createIfMissing: true, - }, - conversation: { kind: 'channel', id: channelId, label: `fabric:${guild.nodeId}` }, - reply: { to: channelId, nativeChannelId: channelId }, - message: { - body: m.content, - rawBody: m.content, - bodyForAgent: m.content, - envelopeFrom: m.authorUserId ?? 'fabric', - }, - delivery: { - deliver: async (payload: { text?: string }) => { - const text = typeof payload?.text === 'string' ? payload.text : ''; - if (!text.trim()) return { visibleReplySent: false }; - const entry = this.identity.findByAgentId(agentId); - const session = entry ? await this.client.agentLogin(entry.fabricApiKey) : null; - const gt = session?.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token; - if (!session || !gt) return { visibleReplySent: false }; - await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); - return { visibleReplySent: true }; - }, - }, - meta: { admission: admit ? { kind: 'dispatch' } : { kind: 'drop', recordHistory: true } }, - }), - }, - log: (e: { stage?: string }) => this.runtime.log?.debug?.(`fabric.turn.${e?.stage}`), + peer: { kind: 'group', id: channelId }, }); + const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const ctxPayload = core.channel.reply.finalizeInboundContext({ + Body: m.content, + BodyForAgent: m.content, + RawBody: m.content, + CommandBody: m.content, + From: `fabric:channel:${channelId}`, + To: `fabric:${channelId}`, + SessionKey: route.sessionKey, + AccountId: route.accountId ?? agentId, + ChatType: 'group', + ConversationLabel: `fabric:${guild.nodeId}`, + SenderId: m.authorUserId ?? 'fabric', + Provider: 'fabric', + Surface: 'fabric', + MessageSid: m.messageId, + Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(), + OriginatingChannel: 'fabric', + OriginatingTo: `fabric:${channelId}`, + }); + + const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token; + + await dispatchInboundReplyWithBase({ + cfg: this.cfg as never, + channel: 'fabric', + accountId: agentId, + route, + storePath, + ctxPayload: ctxPayload as never, + core: this.core as never, + deliver: async (payload: { text?: string }) => { + const text = (payload?.text ?? '').trim(); + this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`); + if (!text || !gt) return; + await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); + this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`); + }, + onRecordError: (err: unknown) => + this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`), + onDispatchError: (err: unknown, info: { kind: string }) => + this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`), + replyOptions: {}, + }); + this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`); } catch (err) { - this.log.warn(`fabric: turn.run failed agent=${agentId} channel=${channelId}: ${String(err)}`); + this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`); } } } diff --git a/src/tools.ts b/src/tools.ts index cbd90d2..ae9cf47 100644 --- a/src/tools.ts +++ b/src/tools.ts @@ -37,7 +37,7 @@ export function registerFabricTools( api.registerTool((ctx: Ctx) => ({ name: 'fabric-register', description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).", - inputSchema: { + parameters: { type: 'object', additionalProperties: false, required: ['fabricApiKey'], @@ -45,7 +45,7 @@ export function registerFabricTools( fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' }, }, }, - handler: async (params: { fabricApiKey: string }) => { + execute: async (params: { fabricApiKey: string }) => { const agentId = ctx.agentId; if (!agentId) return { ok: false, error: 'no agent context' }; const session = await client.agentLogin(params.fabricApiKey); @@ -63,7 +63,7 @@ export function registerFabricTools( api.registerTool((ctx: Ctx) => ({ name: `create-${kind}-channel`, description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`, - inputSchema: { + parameters: { type: 'object', additionalProperties: false, required: ['guildNodeId', 'name'], @@ -76,7 +76,7 @@ export function registerFabricTools( listeners: { type: 'array', items: { type: 'string' } }, }, }, - handler: async (p: { + execute: async (p: { guildNodeId: string; name: string; isPublic?: boolean; @@ -106,7 +106,7 @@ export function registerFabricTools( api.registerTool((ctx: Ctx) => ({ name: 'discussion-complete', description: 'Conclude a discussion: post a summary then close the channel.', - inputSchema: { + parameters: { type: 'object', additionalProperties: false, required: ['guildNodeId', 'channelId', 'summary'], @@ -117,7 +117,7 @@ export function registerFabricTools( callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' }, }, }, - handler: async (p: { + execute: async (p: { guildNodeId: string; channelId: string; summary: string;