From 893b93198dde23ff4c3463e8d6f39b1fba3b3207 Mon Sep 17 00:00:00 2001 From: hzhang Date: Mon, 1 Jun 2026 08:56:53 +0100 Subject: [PATCH] feat(fabric): dynamic-subscription via fabric-register openclaw tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The plugin manifest declared `fabric-register` as a tool name but tools.ts never registered it — recruitment fell through to the standalone `/root/.openclaw/bin/fabric-register` binary, which writes ~/.openclaw/fabric-identity.json correctly but exits without notifying the running plugin. That left fabric inbound's subscription as a connect-time snapshot: every new agent required a gateway restart between `new-agent` and the interview's sub-discussion or the message had no socket to dispatch on. Wire the full path: - `FabricInbound.addAccount(entry)` — login, upsert identity, open socket(s), track per-agent so removeAccount can teardown cleanly. Idempotent: a second call replaces the previous socket (used post-onboard when the agent rotates off the shared `interviewee` placeholder onto its own apikey). - `FabricInbound.removeAccount(agentId)` — disconnect sockets, clear timers + per-agent caches. - `__fabric.addAccount` / `removeAccount` — cross-plugin bridge so the `fabric-register` tool can reach the live FabricInbound instance from its tool handler context. - `fabric-register` openclaw tool — validates apiKey, calls `__fabric.addAccount`, returns `{ok, fabricUserId, displayName}`. Accepts `agentId` arg so recruitment can bind on behalf of a freshly-created agent before that agent has a session of its own. Removes the "restart the gateway" advice from the ctxGuild "agent not registered" error message — operators should now call the tool path instead. After this lands + the ClawSkills register-agent script flip (separate commit), `recruitment.new-agent` -> interviewer sub-discussion runs without a gateway restart in between. --- index.ts | 20 +++++++++++-- src/inbound.ts | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/tools.ts | 78 ++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 168 insertions(+), 9 deletions(-) diff --git a/index.ts b/index.ts index 33b09a3..d3e55fc 100644 --- a/index.ts +++ b/index.ts @@ -96,13 +96,29 @@ export default defineChannelPluginEntry({ // fall back to "assume DM" — fail closed on unknown. { const _G = globalThis as Record; - _G['__fabric'] = { getChannelType }; + _G['__fabric'] = { + getChannelType, + // Dynamic-subscription bridges: tools (notably `fabric-register`) + // call these to add/remove an account's inbound socket without + // a gateway restart. Both delegate to the live FabricInbound + // instance via the module-level `inbound` closure variable; the + // closures stay valid across gateway_start / gateway_stop + // because we re-assign the variable, not the property. + addAccount: async (entry: { agentId: string; fabricApiKey: string }) => { + if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)'); + await inbound.addAccount(entry); + }, + removeAccount: (agentId: string) => { + if (!inbound) return; + inbound.removeAccount(agentId); + }, + }; // Flush channel-meta cache when the gateway shuts down so // recently-recorded xType entries don't get lost. api.on('gateway_stop', () => { try { flushChannelMeta(); } catch { /* ignore */ } }); - api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)'); + api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)'); } api.on('gateway_start', () => { diff --git a/src/inbound.ts b/src/inbound.ts index bbeefb8..37472f5 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -77,6 +77,12 @@ function findFabricBindingAccountId(cfg: unknown, agentId: string): string | und export class FabricInbound { private sockets: Socket[] = []; + // Per-agent socket + timer tracking. Enables `removeAccount(agentId)` + // to tear down ONE agent without restarting the whole inbound. New + // sockets get appended on `connectAgent`; both maps are emptied by + // `stop()`. + private socketsByAgent = new Map(); + private timersByAgent = new Map(); private seen = new Set(); // Timers that periodically re-sync channel membership per (agent, guild). // Without this, the agent's socket.io subscriptions are a snapshot taken @@ -287,6 +293,71 @@ export class FabricInbound { this.channelSyncTimers = []; for (const s of this.sockets) s.disconnect(); this.sockets = []; + this.socketsByAgent.clear(); + this.timersByAgent.clear(); + } + + /** + * Bring up ONE new account at runtime (no gateway restart). + * + * Mirrors what `start()` does per entry: login to Center, upsert the + * identity registry, open the socket(s). Idempotent: re-calling with + * the same agentId tears down the previous socket(s) first so the + * fresh apikey replaces the stale one (recruitment onboard rotates + * the agent from the shared `interviewee` placeholder to a real + * per-agent apikey — the old `interviewee` socket must drop before + * the new one comes up or the agent ends up subscribed to both users + * at once). + * + * Used by the `fabric-register` openclaw tool to make recruitment + * end-to-end without a gateway restart between `new-agent` and the + * interview's sub-discussion dispatch. + */ + async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise { + if (this.socketsByAgent.has(entry.agentId)) { + this.removeAccount(entry.agentId); + } + const session = await this.client.agentLogin(entry.fabricApiKey); + this.identity.upsert({ + agentId: entry.agentId, + fabricApiKey: entry.fabricApiKey, + fabricUserId: session.user.id, + displayName: session.user.name, + }); + await this.connectAgent(entry.agentId, session); + this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`); + } + + /** + * Tear down ONE account's sockets + timers without touching others. + * Caller is responsible for any identity-registry cleanup; this only + * drops the live socket subscription so the agent stops receiving + * Fabric pushes. + */ + removeAccount(agentId: string): void { + const sockets = this.socketsByAgent.get(agentId); + if (sockets) { + for (const s of sockets) { + try { s.disconnect(); } catch { /* socket already dead */ } + // Also remove from the flat list so `stop()` doesn't double-close. + const idx = this.sockets.indexOf(s); + if (idx !== -1) this.sockets.splice(idx, 1); + } + this.socketsByAgent.delete(agentId); + } + const timers = this.timersByAgent.get(agentId); + if (timers) { + for (const t of timers) { + clearInterval(t); + const idx = this.channelSyncTimers.indexOf(t); + if (idx !== -1) this.channelSyncTimers.splice(idx, 1); + } + this.timersByAgent.delete(agentId); + } + this.firstGuildByAgent.delete(agentId); + this.tokenCache.delete(agentId); + this.agentStatusCache.delete(agentId); + this.log.info(`fabric: agent ${agentId} dynamically removed`); } /** @@ -452,6 +523,9 @@ export class FabricInbound { FabricInbound.CHANNEL_SYNC_INTERVAL_MS, ); this.channelSyncTimers.push(syncTimer); + const agentTimers = this.timersByAgent.get(agentId) ?? []; + agentTimers.push(syncTimer); + this.timersByAgent.set(agentId, agentTimers); socket.on('message.created', (m: FabricMessage) => { const channelId = m.channelId ?? ''; if (!channelId) return; @@ -497,6 +571,11 @@ export class FabricInbound { }); socket.connect(); this.sockets.push(socket); + // Track per-agent so addAccount/removeAccount can teardown + // independently without disturbing other agents. + const agentSockets = this.socketsByAgent.get(agentId) ?? []; + agentSockets.push(socket); + this.socketsByAgent.set(agentId, agentSockets); } } diff --git a/src/tools.ts b/src/tools.ts index cf0c906..7acc293 100644 --- a/src/tools.ts +++ b/src/tools.ts @@ -47,9 +47,9 @@ export function registerFabricTools( const entry = identity.findByAgentId(agentId); if (!entry) throw new Error( - `agent ${agentId} not registered — run: AGENT_ID=${agentId} ` + - `~/.openclaw/bin/fabric-register --api-key (or set ` + - `channels.fabric.accounts.${agentId}); then restart the gateway`, + `agent ${agentId} not registered — call the openclaw \`fabric-register\` ` + + `tool (apiKey: , agentId: ${agentId}); the dynamic-subscription ` + + `path brings the socket up immediately, no gateway restart needed`, ); const session = await client.agentLogin(entry.fabricApiKey); const guild = session.guilds.find((g) => g.nodeId === guildNodeId); @@ -58,10 +58,74 @@ export function registerFabricTools( return { session, guild, token }; }; - // NOTE: binding an agent's Fabric API key is intentionally NOT a tool. - // It's a one-time step done out-of-band via the installed script - // ~/.openclaw/bin/fabric-register --api-key (AGENT_ID or --agent-id) - // or via static config (channels.fabric.accounts.). + // Bind an agent's Fabric API key — validates the key against Center, + // upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound + // socket immediately via the live FabricInbound instance (no gateway + // restart). The standalone binary `~/.openclaw/bin/fabric-register` + // still exists for one-time bootstrap before the gateway runs, but + // recruitment's `register-agent` script should prefer this tool path + // so the new agent's socket is live before `interviewer` fires. + api.registerTool((ctx: Ctx) => ({ + name: 'fabric-register', + description: + 'Bind an agent to a Fabric Center API key. Validates the key, writes ' + + 'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' + + 'inbound socket immediately so the agent receives Fabric pushes ' + + 'without a gateway restart. Caller defaults to the current agent; ' + + 'pass `agentId` to bind on behalf of another agent (recruitment use).', + parameters: { + type: 'object', + additionalProperties: false, + required: ['apiKey'], + properties: { + apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' }, + agentId: { + type: 'string', + description: + 'Agent to register. Defaults to the calling agent (ctx.agentId). ' + + 'Recruitment onboarding may override this when wiring a freshly ' + + 'created agent before that agent has a session of its own.', + }, + }, + }, + execute: async (_id: string, p: { apiKey: string; agentId?: string }) => { + const agentId = p.agentId ?? ctx.agentId; + if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' }; + if (!p.apiKey || typeof p.apiKey !== 'string') { + return { ok: false, error: 'apiKey required' }; + } + // Delegate to FabricInbound.addAccount via the cross-plugin bridge. + // The bridge is installed in index.ts when inbound spins up; if it's + // not present yet, the gateway is still starting and the caller should + // retry (rare path — only hit during the gateway_start window). + const fabricApi = (globalThis as Record)['__fabric'] as + | { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise } + | undefined; + if (!fabricApi?.addAccount) { + return { + ok: false, + error: + 'fabric inbound not ready (gateway still starting?). Fall back to ' + + '~/.openclaw/bin/fabric-register or retry after a few seconds.', + }; + } + try { + await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey }); + } catch (err) { + return { + ok: false, + error: `fabric-register failed: ${String(err)}`, + }; + } + const entry = identity.findByAgentId(agentId); + return { + ok: true, + agentId, + fabricUserId: entry?.fabricUserId, + displayName: entry?.displayName, + }; + }, + })); const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') => api.registerTool((ctx: Ctx) => ({ -- 2.49.1