From f8c8c21727bd616d8ab9cdc41bcf7d247d1a2446 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 7 Jun 2026 21:04:38 +0100 Subject: [PATCH] feat(gateway): register fabric.register gateway method for live no-restart registration Recruitment's register-agent step is a plain shell step (no LLM turn), so it cannot invoke the `fabric-register` TOOL (tool only fires inside an agent turn) and there is no `openclaw tools call` CLI. It previously fell back to the standalone bootstrap binary, which writes fabric-identity.json but cannot notify the running plugin -> the new agent's inbound socket only came up after a gateway restart. This adds an in-process gateway method `fabric.register` (scope: operator.write) whose handler runs inbound.addAccount: validates the key, persists identity, and brings the inbound socket up immediately. The script now calls `openclaw gateway call fabric.register --params ...` and only falls back to the bootstrap binary if the method is unavailable. Also resyncs committed dist/ to source (sub-discussion-hook/store + tools/ inbound were source-committed but their dist artifacts were stale). Co-Authored-By: Claude Opus 4.8 (1M context) --- dist/fabric/index.js | 60 +++- dist/fabric/src/channel.js | 1 + dist/fabric/src/inbound.js | 115 ++++++- dist/fabric/src/sub-discussion-hook.js | 59 ++++ dist/fabric/src/sub-discussion-store.js | 47 +++ dist/fabric/src/tools.js | 437 +++++++++++++++++++++++- index.ts | 39 +++ 7 files changed, 737 insertions(+), 21 deletions(-) create mode 100644 dist/fabric/src/sub-discussion-hook.js create mode 100644 dist/fabric/src/sub-discussion-store.js diff --git a/dist/fabric/index.js b/dist/fabric/index.js index 2e120b9..d576ff7 100644 --- a/dist/fabric/index.js +++ b/dist/fabric/index.js @@ -14,6 +14,8 @@ import { FabricClient } from './src/fabric-client.js'; import { IdentityRegistry } from './src/identity.js'; import { syncFabricCommands } from './src/command-sync.js'; import { PresenceSync } from './src/presence-sync.js'; +import { SubDiscussionStore } from './src/sub-discussion-store.js'; +import { registerSubDiscussionHook } from './src/sub-discussion-hook.js'; import path from 'node:path'; import os from 'node:os'; let runtimeRef = null; @@ -40,10 +42,17 @@ export default defineChannelPluginEntry({ const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api'; const idFile = api.pluginConfig?.identityFilePath ?? path.join(os.homedir(), '.openclaw', 'fabric-identity.json'); + const subDiscussionFile = path.join(os.homedir(), '.openclaw', 'fabric-sub-discussion.json'); // tools operate against a default Center; per-account keys come from config const client = new FabricClient(centerApiBase); const identity = new IdentityRegistry(idFile); - registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity); + const subDiscussion = new SubDiscussionStore(subDiscussionFile); + registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity, subDiscussion, cfg); + // Per-(agent, channel) prompt injection for sub-discussion channels. + // Runs as a sibling to PrismFacet's before_prompt_build hook (and + // ClawPrompts' fabric-chat-injector); openclaw composes + // appendSystemContext from all registered handlers. + registerSubDiscussionHook({ on: api.on, logger: api.logger }, subDiscussion, identity); // Cross-plugin API: globalThis.__fabric // Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt // injection to DM-typed channels only. The channel-meta cache is @@ -56,7 +65,25 @@ export default defineChannelPluginEntry({ // fall back to "assume DM" — fail closed on unknown. { const _G = globalThis; - _G['__fabric'] = { getChannelType }; + _G['__fabric'] = { + getChannelType, + // Dynamic-subscription bridges: tools (notably `fabric-register`) + // call these to add/remove an account's inbound socket without + // a gateway restart. Both delegate to the live FabricInbound + // instance via the module-level `inbound` closure variable; the + // closures stay valid across gateway_start / gateway_stop + // because we re-assign the variable, not the property. + addAccount: async (entry) => { + if (!inbound) + throw new Error('fabric inbound not ready yet (gateway not started?)'); + await inbound.addAccount(entry); + }, + removeAccount: (agentId) => { + if (!inbound) + return; + inbound.removeAccount(agentId); + }, + }; // Flush channel-meta cache when the gateway shuts down so // recently-recorded xType entries don't get lost. api.on('gateway_stop', () => { @@ -65,8 +92,35 @@ export default defineChannelPluginEntry({ } catch { /* ignore */ } }); - api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)'); + api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)'); } + // CLI-invocable live registration, callable from a shell script via + // openclaw gateway call fabric.register --params '{"agentId":"…","apiKey":"fak_…"}' + // The `fabric-register` TOOL only fires inside an agent turn, and there is + // no `openclaw tools call` CLI — so recruitment's `register-agent` script + // (a plain shell step, no LLM turn) had to fall back to the standalone + // binary, which can't notify the running plugin → needed a gateway + // restart. This gateway method runs in-process: inbound.addAccount + // validates the key, persists identity, and brings the socket up live — + // no restart. + api.registerGatewayMethod('fabric.register', async ({ params, respond }) => { + const p = (params ?? {}); + if (!p.agentId || !p.apiKey) { + respond(false, { ok: false }, { code: 'INVALID_REQUEST', message: 'agentId and apiKey required' }); + return; + } + if (!inbound) { + respond(false, { ok: false }, { code: 'UNAVAILABLE', message: 'fabric inbound not ready (gateway still starting?)' }); + return; + } + try { + await inbound.addAccount({ agentId: p.agentId, fabricApiKey: p.apiKey }); + respond(true, { ok: true, agentId: p.agentId }); + } + catch (err) { + respond(false, { ok: false }, { code: 'UNAVAILABLE', message: `fabric-register failed: ${String(err)}` }); + } + }, { scope: 'operator.write' }); api.on('gateway_start', () => { const _G = globalThis; if (_G._fabricInboundStarted) diff --git a/dist/fabric/src/channel.js b/dist/fabric/src/channel.js index a5f841d..620a6fa 100644 --- a/dist/fabric/src/channel.js +++ b/dist/fabric/src/channel.js @@ -128,6 +128,7 @@ export const fabricChannelPlugin = createChatChannelPlugin({ // Mirror isConfigured here so the snapshot truthfully reports false for // any account without a fabricApiKey. describeAccount: (account) => ({ + accountId: account.accountId, configured: Boolean(account.fabricApiKey), }), }, diff --git a/dist/fabric/src/inbound.js b/dist/fabric/src/inbound.js index 03bb9c1..4a9e312 100644 --- a/dist/fabric/src/inbound.js +++ b/dist/fabric/src/inbound.js @@ -7,6 +7,25 @@ import { resolveCoalesce } from './accounts.js'; import { fabricPeerRoutingForXType } from './channel.js'; import { recordChannelType } from './channel-meta.js'; import { enqueueDelivery, flushFabricForChannel } from './coalesce.js'; +// Walk cfg.bindings for the entry that ties `agentId` to a fabric account. +// Returns the binding's match.accountId (the slot label routing keys on); +// returns undefined when the agent has no explicit fabric binding so the +// caller can fall back to agentId without changing pre-existing semantics +// for agents whose binding accountId == agent_id anyway. +function findFabricBindingAccountId(cfg, agentId) { + const bindings = cfg?.bindings; + if (!Array.isArray(bindings)) + return undefined; + for (const b of bindings) { + if (b?.agentId === agentId && + b?.match?.channel === 'fabric' && + typeof b?.match?.accountId === 'string' && + b.match.accountId.length > 0) { + return b.match.accountId; + } + } + return undefined; +} export class FabricInbound { core; cfg; @@ -15,6 +34,12 @@ export class FabricInbound { log; accounts; sockets = []; + // Per-agent socket + timer tracking. Enables `removeAccount(agentId)` + // to tear down ONE agent without restarting the whole inbound. New + // sockets get appended on `connectAgent`; both maps are emptied by + // `stop()`. + socketsByAgent = new Map(); + timersByAgent = new Map(); seen = new Set(); // Timers that periodically re-sync channel membership per (agent, guild). // Without this, the agent's socket.io subscriptions are a snapshot taken @@ -210,6 +235,74 @@ export class FabricInbound { for (const s of this.sockets) s.disconnect(); this.sockets = []; + this.socketsByAgent.clear(); + this.timersByAgent.clear(); + } + /** + * Bring up ONE new account at runtime (no gateway restart). + * + * Mirrors what `start()` does per entry: login to Center, upsert the + * identity registry, open the socket(s). Idempotent: re-calling with + * the same agentId tears down the previous socket(s) first so the + * fresh apikey replaces the stale one (recruitment onboard rotates + * the agent from the shared `interviewee` placeholder to a real + * per-agent apikey — the old `interviewee` socket must drop before + * the new one comes up or the agent ends up subscribed to both users + * at once). + * + * Used by the `fabric-register` openclaw tool to make recruitment + * end-to-end without a gateway restart between `new-agent` and the + * interview's sub-discussion dispatch. + */ + async addAccount(entry) { + if (this.socketsByAgent.has(entry.agentId)) { + this.removeAccount(entry.agentId); + } + const session = await this.client.agentLogin(entry.fabricApiKey); + this.identity.upsert({ + agentId: entry.agentId, + fabricApiKey: entry.fabricApiKey, + fabricUserId: session.user.id, + displayName: session.user.name, + }); + await this.connectAgent(entry.agentId, session); + this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`); + } + /** + * Tear down ONE account's sockets + timers without touching others. + * Caller is responsible for any identity-registry cleanup; this only + * drops the live socket subscription so the agent stops receiving + * Fabric pushes. + */ + removeAccount(agentId) { + const sockets = this.socketsByAgent.get(agentId); + if (sockets) { + for (const s of sockets) { + try { + s.disconnect(); + } + catch { /* socket already dead */ } + // Also remove from the flat list so `stop()` doesn't double-close. + const idx = this.sockets.indexOf(s); + if (idx !== -1) + this.sockets.splice(idx, 1); + } + this.socketsByAgent.delete(agentId); + } + const timers = this.timersByAgent.get(agentId); + if (timers) { + for (const t of timers) { + clearInterval(t); + const idx = this.channelSyncTimers.indexOf(t); + if (idx !== -1) + this.channelSyncTimers.splice(idx, 1); + } + this.timersByAgent.delete(agentId); + } + this.firstGuildByAgent.delete(agentId); + this.tokenCache.delete(agentId); + this.agentStatusCache.delete(agentId); + this.log.info(`fabric: agent ${agentId} dynamically removed`); } /** * Per-account metadata harvested during `start()` — used by @@ -360,6 +453,9 @@ export class FabricInbound { }); const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS); this.channelSyncTimers.push(syncTimer); + const agentTimers = this.timersByAgent.get(agentId) ?? []; + agentTimers.push(syncTimer); + this.timersByAgent.set(agentId, agentTimers); socket.on('message.created', (m) => { const channelId = m.channelId ?? ''; if (!channelId) @@ -407,6 +503,11 @@ export class FabricInbound { }); socket.connect(); this.sockets.push(socket); + // Track per-agent so addAccount/removeAccount can teardown + // independently without disturbing other agents. + const agentSockets = this.socketsByAgent.get(agentId) ?? []; + agentSockets.push(socket); + this.socketsByAgent.set(agentId, agentSockets); } } // Download a message's attachments to a temp dir using the agent's guild @@ -463,10 +564,22 @@ export class FabricInbound { // (commands-handlers `isDirectMessage` checks ChatType==='direct') // misclassifies the turn. const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType); + // resolveAgentRoute needs the *binding* accountId (the channel-side + // slot name) — not the openclaw agentId. For most agents the binding + // is `{agentId: X, match: {channel: fabric, accountId: X}}` so the + // two coincide; but for shared-placeholder cases (e.g. the recruitment + // `interviewee` slot bound to multiple agents over its lifetime) the + // binding accountId is the slot label ("interviewee", "Neon", …) not + // the agent_id. Passing agentId there returned bindings=0 and silently + // fell back to `main`, hijacking sub-discussion turns. Look up the + // agent's fabric binding accountId here; fall back to agentId when no + // explicit binding exists (preserves prior behavior for agents with + // no fabric binding declared). + const bindingAccountId = findFabricBindingAccountId(this.cfg, agentId) ?? agentId; const route = core.channel.routing.resolveAgentRoute({ cfg: this.cfg, channel: 'fabric', - accountId: agentId, + accountId: bindingAccountId, peer: { kind: peerKind, id: channelId }, }); const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { diff --git a/dist/fabric/src/sub-discussion-hook.js b/dist/fabric/src/sub-discussion-hook.js new file mode 100644 index 0000000..b643305 --- /dev/null +++ b/dist/fabric/src/sub-discussion-hook.js @@ -0,0 +1,59 @@ +// Plugin-local before_prompt_build hook that injects per-(agent, channel) +// guides for sub-discussion channels created via the `create-sub-discussion` +// tool. Mirrors the pattern used by ClawPrompts' fabric-chat-injector +// (channelId-aware injection) but with content dynamically supplied at +// channel-creation time instead of read from static files via PrismFacet's +// router/rule registry. +// +// Match logic per turn: +// ctx.channelId → store.find() → sub-discussion entry +// ctx.agentId → identity.findByAgentId().fabricUserId +// ─ matches entry.hostUserId → inject hostGuide +// ─ matches entry.guestUserIds → inject guestGuide +// ─ neither → no injection +// +// Fail-closed on unknown agentId/channelId — we never inject "the wrong" +// guide, only the right one or nothing. +const _G = globalThis; +const DEDUP_KEY = '_fabricSubDiscussionHookDedup'; +export function registerSubDiscussionHook(api, store, identity) { + if (!(_G[DEDUP_KEY] instanceof WeakSet)) + _G[DEDUP_KEY] = new WeakSet(); + const dedup = _G[DEDUP_KEY]; + api.on('before_prompt_build', async (...args) => { + const event = args[0]; + const ctx = (args[1] ?? {}); + // The hook fires both for fabric-driven turns (channelId set) and + // for other triggers (HF wake, exec-event, etc.) — drop those. + if (typeof event === 'object' && event !== null) { + if (dedup.has(event)) + return undefined; + dedup.add(event); + } + const agentId = (ctx.agentId ?? '').trim(); + const channelId = (ctx.channelId ?? '').trim(); + if (!agentId || !channelId) + return undefined; + const provider = (ctx.messageProvider ?? '').toLowerCase(); + if (provider && provider !== 'fabric') + return undefined; + const entry = store.find(channelId); + if (!entry) + return undefined; + const ident = identity.findByAgentId(agentId); + const myUserId = (ident?.fabricUserId ?? '').trim(); + if (!myUserId) { + // identity registry caches fabricUserId after the first agentLogin + // in inbound.ts. If it's missing here, the agent likely hasn't + // completed login yet — skip rather than guess. + return undefined; + } + if (myUserId === entry.hostUserId) { + return { appendSystemContext: entry.hostGuide }; + } + if (entry.guestUserIds.includes(myUserId)) { + return { appendSystemContext: entry.guestGuide }; + } + return undefined; + }); +} diff --git a/dist/fabric/src/sub-discussion-store.js b/dist/fabric/src/sub-discussion-store.js new file mode 100644 index 0000000..7d574f5 --- /dev/null +++ b/dist/fabric/src/sub-discussion-store.js @@ -0,0 +1,47 @@ +import { readFileSync, writeFileSync, existsSync, mkdirSync } from 'node:fs'; +import { dirname } from 'node:path'; +export class SubDiscussionStore { + filePath; + byChannelId = new Map(); + constructor(filePath) { + this.filePath = filePath; + this.load(); + } + load() { + if (!existsSync(this.filePath)) + return; + try { + const data = JSON.parse(readFileSync(this.filePath, 'utf8')); + for (const e of data.entries ?? []) { + if (e?.subChannelId) + this.byChannelId.set(e.subChannelId, e); + } + } + catch { + // Corrupt file → start empty; first mutation rewrites cleanly. + } + } + persist() { + mkdirSync(dirname(this.filePath), { recursive: true }); + const data = { entries: [...this.byChannelId.values()] }; + writeFileSync(this.filePath, JSON.stringify(data, null, 2)); + } + list() { + return [...this.byChannelId.values()]; + } + find(subChannelId) { + return this.byChannelId.get(subChannelId); + } + add(entry) { + this.byChannelId.set(entry.subChannelId, entry); + this.persist(); + } + remove(subChannelId) { + const e = this.byChannelId.get(subChannelId); + if (!e) + return undefined; + this.byChannelId.delete(subChannelId); + this.persist(); + return e; + } +} diff --git a/dist/fabric/src/tools.js b/dist/fabric/src/tools.js index 723813f..64c7f4e 100644 --- a/dist/fabric/src/tools.js +++ b/dist/fabric/src/tools.js @@ -1,17 +1,71 @@ +import { resolveCommandsSyncKey } from './accounts.js'; const X_BY_KIND = { chat: 'general', work: 'work', report: 'report', discussion: 'discuss', }; -export function registerFabricTools(api, client, identity) { +// Delay between create-sub-discussion's channel create and greeting post. +// Backend pushes channel.joined to invitee sockets on create; that push +// has to traverse socket.io rooms before the guest plugin can sub the +// channel: room. If the greeting is posted before that, the guest's +// turn-activation wakeup misses (the socket isn't in the room yet). +// 500ms is empirically slack enough on local sim + production t3, and +// short enough not to feel laggy from the host's tool-result POV. Bump +// via FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS env if needed. +const GREETING_DELAY_MS = (() => { + const v = Number.parseInt(process.env.FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS ?? '', 10); + return Number.isFinite(v) && v >= 0 ? v : 500; +})(); +// PaddedCell tools-cache integration (cross-runtime alignment with +// Plexum decision #37; openclaw lacks a before_outgoing_tools hook so we +// opt in per-plugin). All Fabric tools are gate-able by default — agents +// must `dynamic-cache-tools` them before the model sees them. +// `gatedRegister` wraps openclaw's api.registerTool: each factory invocation +// (1) registers the tool's name+description into PaddedCell's catalog +// so dynamic-list-tools / dynamic-search-tools surface it +// (2) returns null if the per-session cache doesn't include the name +// Buffer-drain pattern handles plugin load order — if PaddedCell hasn't +// loaded yet, the stub default-allows + queues catalog entries; PaddedCell +// drains the queue when it installs the real API. +function ensurePaddedStub() { + const g = globalThis; + if (g.__padded) + return; + const buf = []; + g.__padded = { + _pendingCatalog: buf, + registerCatalogEntry(name, description) { + buf.push({ name, description }); + }, + allowTool: () => true, // fail-open until PaddedCell installs the real API + }; +} +export function registerFabricTools(api, client, identity, store, cfg) { + ensurePaddedStub(); + const seenForCatalog = new Set(); + const gatedRegister = (factory) => { + api.registerTool((ctx) => { + const tool = factory(ctx); + if (!tool || !tool.name) + return tool; + const padded = globalThis.__padded; + if (padded?.registerCatalogEntry && !seenForCatalog.has(tool.name)) { + padded.registerCatalogEntry(tool.name, tool.description ?? ''); + seenForCatalog.add(tool.name); + } + if (padded?.allowTool && !padded.allowTool(tool.name, ctx)) + return null; + return tool; + }); + }; // Resolve the calling agent's Fabric session + a guild's token/endpoint. const ctxGuild = async (agentId, guildNodeId) => { const entry = identity.findByAgentId(agentId); if (!entry) - throw new Error(`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` + - `~/.openclaw/bin/fabric-register --api-key (or set ` + - `channels.fabric.accounts.${agentId}); then restart the gateway`); + throw new Error(`agent ${agentId} not registered — call the openclaw \`fabric-register\` ` + + `tool (apiKey: , agentId: ${agentId}); the dynamic-subscription ` + + `path brings the socket up immediately, no gateway restart needed`); const session = await client.agentLogin(entry.fabricApiKey); const guild = session.guilds.find((g) => g.nodeId === guildNodeId); const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token; @@ -19,11 +73,72 @@ export function registerFabricTools(api, client, identity) { throw new Error(`agent not a member of guild ${guildNodeId}`); return { session, guild, token }; }; - // NOTE: binding an agent's Fabric API key is intentionally NOT a tool. - // It's a one-time step done out-of-band via the installed script - // ~/.openclaw/bin/fabric-register --api-key (AGENT_ID or --agent-id) - // or via static config (channels.fabric.accounts.). - const makeCreate = (kind) => api.registerTool((ctx) => ({ + // Bind an agent's Fabric API key — validates the key against Center, + // upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound + // socket immediately via the live FabricInbound instance (no gateway + // restart). The standalone binary `~/.openclaw/bin/fabric-register` + // still exists for one-time bootstrap before the gateway runs, but + // recruitment's `register-agent` script should prefer this tool path + // so the new agent's socket is live before `interviewer` fires. + gatedRegister((ctx) => ({ + name: 'fabric-register', + description: 'Bind an agent to a Fabric Center API key. Validates the key, writes ' + + 'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' + + 'inbound socket immediately so the agent receives Fabric pushes ' + + 'without a gateway restart. Caller defaults to the current agent; ' + + 'pass `agentId` to bind on behalf of another agent (recruitment use).', + parameters: { + type: 'object', + additionalProperties: false, + required: ['apiKey'], + properties: { + apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' }, + agentId: { + type: 'string', + description: 'Agent to register. Defaults to the calling agent (ctx.agentId). ' + + 'Recruitment onboarding may override this when wiring a freshly ' + + 'created agent before that agent has a session of its own.', + }, + }, + }, + execute: async (_id, p) => { + const agentId = p.agentId ?? ctx.agentId; + if (!agentId) + return { ok: false, error: 'no agent context (pass agentId)' }; + if (!p.apiKey || typeof p.apiKey !== 'string') { + return { ok: false, error: 'apiKey required' }; + } + // Delegate to FabricInbound.addAccount via the cross-plugin bridge. + // The bridge is installed in index.ts when inbound spins up; if it's + // not present yet, the gateway is still starting and the caller should + // retry (rare path — only hit during the gateway_start window). + const fabricApi = globalThis['__fabric']; + if (!fabricApi?.addAccount) { + return { + ok: false, + error: 'fabric inbound not ready (gateway still starting?). Fall back to ' + + '~/.openclaw/bin/fabric-register or retry after a few seconds.', + }; + } + try { + await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey }); + } + catch (err) { + return { + ok: false, + error: `fabric-register failed: ${String(err)}`, + }; + } + const entry = identity.findByAgentId(agentId); + return { + ok: true, + agentId, + fabricUserId: entry?.fabricUserId, + displayName: entry?.displayName, + }; + }, + })); + const makeCreate = (kind) => gatedRegister((ctx) => ({ name: `create-${kind}-channel`, description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` + 'Optionally pass `purpose` to describe what this channel is for — ' + @@ -70,7 +185,7 @@ export function registerFabricTools(api, client, identity) { makeCreate('discussion'); // discussion-complete: post a summary then close the channel (Guild // /channels/:id/close — history stays readable, new posts -> 409). - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'discussion-complete', description: 'Conclude a discussion: post a summary then close the channel.', parameters: { @@ -99,10 +214,223 @@ export function registerFabricTools(api, client, identity) { return { ok: true, closed: true }; }, })); + // ─────────────────────────────────────────────────────────────────── + // create-sub-discussion: open a discuss-type sub-channel hanging off + // the caller's current channel. Designed for host-driven multi-turn + // exchanges (interview, brainstorm, narrow Q&A) where the guests are + // either fresh agents without workflow capability (recruitment + // interviewee) or peers that just need a short scoped chat without + // entering their own subflow. + // + // What it does on top of plain create-discussion-channel: + // 1. Persists a store entry indexed by the new sub channelId, carrying: + // host agent + userId, guest userIds, host/guest guide texts, + // callback (parent) channel info. + // 2. Auto-posts `greetingMsg` using the host's own Fabric account so + // turn rotation's activation rule (first author → newOrder[0], + // currentSpeaker → newOrder[1], wakeup → newOrder[1]) puts the + // first guest on the spot immediately — no race where host posts + // before guest's socket subs the channel room (we wait + // GREETING_DELAY_MS for backend's channel.joined push to land). + // 3. The accompanying before_prompt_build hook (sub-discussion-hook + // registered from index.ts) then injects `hostGuideMsg` into the + // host's session prompt and `guestGuideMsg` into each guest's + // session prompt whenever a turn in this channel fires — so the + // two roles see different instructions, no shared guide file. + // ─────────────────────────────────────────────────────────────────── + gatedRegister((ctx) => ({ + name: 'create-sub-discussion', + description: 'Open a host-driven sub-discussion channel (x_type=discuss) hanging off your current channel, ' + + 'with role-specific system-prompt guides for host and guests. Use this for interviews / scoped ' + + 'Q&A where you stay in control of when the conversation ends. Returns the sub channelId; ' + + 'reach it via fabric-send-message in the rotating turn order. Close with close-sub-discussion ' + + 'to write a callback back into the parent channel.', + parameters: { + type: 'object', + additionalProperties: false, + required: [ + 'guildNodeId', + 'currentChannelId', + 'channelName', + 'greetingMsg', + 'hostGuideMsg', + 'guestGuideMsg', + 'guests', + ], + properties: { + guildNodeId: { type: 'string', description: 'Fabric guild node id (same guild for parent + sub).' }, + currentChannelId: { + type: 'string', + description: 'Channel id you are currently in (parent). Used as the callback target on close.', + }, + channelName: { type: 'string', description: 'Display name for the new sub-discussion channel.' }, + greetingMsg: { + type: 'string', + description: 'First message posted by YOU (the host) in the sub channel. Triggers turn rotation so ' + + "the first guest's session wakes immediately with both your greeting in history and the " + + 'guest guide injected as system prompt.', + }, + hostGuideMsg: { + type: 'string', + description: "Appended to YOUR session's system prompt whenever a turn fires in this sub channel. " + + 'Use it to remind yourself of the procedure (what to ask, when to call close-sub-discussion).', + }, + guestGuideMsg: { + type: 'string', + description: "Appended to EACH GUEST's session system prompt for turns in this sub channel. Use it to " + + 'orient guests with no prior workflow context (e.g. a fresh interviewee). Keep it short; ' + + 'long guides bloat every turn.', + }, + guests: { + type: 'array', + items: { type: 'string' }, + minItems: 1, + description: 'Fabric Center userIds invited as guests. Resolve via fabric-channel-list members or the ' + + '@.hangman-lab.top email convention.', + }, + purpose: { type: 'string', description: 'Optional channel.purpose for discoverability.' }, + }, + }, + execute: async (_id, p) => { + const agentId = ctx.agentId; + if (!agentId) + return { ok: false, error: 'no agent context' }; + if (!Array.isArray(p.guests) || p.guests.length === 0) { + return { ok: false, error: 'guests must be a non-empty array of Fabric userIds' }; + } + const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId); + const ch = await client.createChannel(guild.endpoint, token, { + guildId: p.guildNodeId, + name: p.channelName, + xType: 'discuss', + isPublic: false, + memberUserIds: p.guests, + ...(p.purpose !== undefined ? { purpose: p.purpose } : {}), + }); + store.add({ + subChannelId: ch.id, + hostAgentId: agentId, + hostUserId: session.user.id, + guestUserIds: [...p.guests], + hostGuide: p.hostGuideMsg, + guestGuide: p.guestGuideMsg, + callbackGuildNodeId: p.guildNodeId, + callbackChannelId: p.currentChannelId, + createdAt: new Date().toISOString(), + }); + // Let backend's channel.joined push reach guest sockets before our + // greeting fires — otherwise the wakeup emitted by turn-activation + // races a not-yet-subscribed socket.io room. + if (GREETING_DELAY_MS > 0) { + await new Promise((r) => setTimeout(r, GREETING_DELAY_MS)); + } + try { + await client.postMessage(guild.endpoint, token, ch.id, p.greetingMsg, session.user.id); + } + catch (err) { + api.logger.warn(`fabric: create-sub-discussion greeting post failed channel=${ch.id} err=${String(err)}`); + } + return { ok: true, subChannelId: ch.id }; + }, + })); + // ─────────────────────────────────────────────────────────────────── + // close-sub-discussion: post a system-authored callback into the + // parent channel + close the sub-discussion channel. Only the original + // host can call this. Uses the Guild's x-fabric-system-key path (shared + // secret = commandsSyncKey) so the callback lands as a guild/system + // author, not the host's personal account — and can wake the host on + // the parent channel to continue whatever workflow opened the sub. + // ─────────────────────────────────────────────────────────────────── + gatedRegister((ctx) => ({ + name: 'close-sub-discussion', + description: 'Close a sub-discussion channel you opened (host-only) and write a callback to the parent ' + + 'channel as a system message. Pass `wakeupHost: false` to land the callback silently in ' + + 'history without waking yourself.', + parameters: { + type: 'object', + additionalProperties: false, + required: ['subChannelId', 'callbackMsg'], + properties: { + subChannelId: { type: 'string', description: 'The sub-discussion channelId returned by create-sub-discussion.' }, + callbackMsg: { + type: 'string', + description: 'Content to post into the parent channel as a system-authored message. Typical content: ' + + 'the conclusion / extracted data from the sub-discussion, so the next turn on the parent ' + + 'channel can act on it.', + }, + wakeupHost: { + type: 'boolean', + description: 'Whether to wake YOU (the host) on the parent channel. Default true — for recruitment ' + + 'interview flow where the next workflow step needs to run immediately. Pass false for ' + + 'fire-and-forget logging.', + }, + }, + }, + execute: async (_id, p) => { + const agentId = ctx.agentId; + if (!agentId) + return { ok: false, error: 'no agent context' }; + const entry = store.find(p.subChannelId); + if (!entry) { + return { ok: false, error: `sub-discussion not found: ${p.subChannelId}` }; + } + if (entry.hostAgentId !== agentId) { + return { + ok: false, + error: `only the host (${entry.hostAgentId}) may close this sub-discussion`, + }; + } + const systemKey = resolveCommandsSyncKey(cfg); + if (!systemKey) { + return { + ok: false, + error: 'channels.fabric.commandsSyncKey is not configured — close-sub-discussion needs it for ' + + 'the x-fabric-system-key callback. Configure via openclaw config.', + }; + } + const { guild, token } = await ctxGuild(agentId, entry.callbackGuildNodeId); + const wakeup = p.wakeupHost !== false; + // 1) Post callback into parent channel via the system-key path. + const url = `${guild.endpoint}/api/channels/${encodeURIComponent(entry.callbackChannelId)}/messages`; + const res = await fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-fabric-system-key': systemKey, + }, + body: JSON.stringify({ + content: p.callbackMsg, + wakeupUserId: wakeup ? entry.hostUserId : null, + }), + }).catch((err) => { + throw new Error(`callback POST failed: ${String(err)}`); + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + return { + ok: false, + error: `callback POST ${url} -> ${res.status} ${text}`, + }; + } + // 2) Close the sub channel using the host's own bearer (the host is + // a member of the channel — channel.close auth is per-member). + try { + await client.closeChannel(guild.endpoint, token, entry.subChannelId); + } + catch (err) { + api.logger.warn(`fabric: close-sub-discussion: sub channel close failed channel=${entry.subChannelId} err=${String(err)}`); + } + // 3) Drop the store entry so the prompt-injection hook stops firing + // for this channel (the sub is closed; any straggler turns would + // just hit the closed-channel write reject downstream). + store.remove(entry.subChannelId); + return { ok: true, closed: true }; + }, + })); // fabric-canvas: share / update / read / close the channel's single // pinned canvas document (one tool, four actions). update/close are // sharer-only server-side (the guild returns 403 otherwise). - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-canvas', description: "Manage a channel's pinned canvas document. action: " + "read (current canvas or null) | share (create/replace; you become " + @@ -174,7 +502,7 @@ export function registerFabricTools(api, client, identity) { }, })); // fabric-channel: channel membership (one tool, three actions). - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-channel', description: 'Channel membership. action: members (list channel member userIds) | ' + 'join (this agent joins the channel) | leave (this agent leaves).', @@ -221,7 +549,7 @@ export function registerFabricTools(api, client, identity) { // workload to #agents-room, or triage agent following up on an // already-routed task by commenting in #updates. // ----------------------------------------------------------------- - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-send-message', description: 'Send a text message into a specific Fabric channel. Author is the calling agent. ' + 'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.', @@ -244,13 +572,88 @@ export function registerFabricTools(api, client, identity) { return { ok: true, messageId: res.messageId, seq: res.seq }; }, })); + // ─────────────────────────────────────────────────────────────────── + // fabric-send-sys-msg: post a system-authored message (author = + // sentinel UUID 0000…, not the calling agent) using the Guild's + // x-fabric-system-key path. Use for cross-agent broadcasts where you + // don't want the message tied to one agent's identity — Dialectic + // topic announcements / lifecycle events, host-system advisories, + // etc. Caller doesn't need to be a member of the channel (the + // backend isSystem branch skips assertParticipant), but must be a + // member of the guild (their session resolves the guild endpoint). + // + // Shared secret: reads channels.fabric.commandsSyncKey (same value + // as the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY env). Empty + // config → tool returns ok:false with a clear error, no fall-through + // to regular agent posting. + // ─────────────────────────────────────────────────────────────────── + gatedRegister((ctx) => ({ + name: 'fabric-send-sys-msg', + description: 'Send a SYSTEM-AUTHORED message into a Fabric channel (author = guild sentinel, not you). ' + + 'Use for cross-agent broadcasts that should not be attributed to a single agent — ' + + 'Dialectic announce-channel topic broadcasts, lifecycle events, system advisories. ' + + 'Optionally precise-wake one recipient via wakeupUserId; otherwise the message lands ' + + 'silently in history (no wake).', + parameters: { + type: 'object', + additionalProperties: false, + required: ['guildNodeId', 'channelId', 'content'], + properties: { + guildNodeId: { type: 'string' }, + channelId: { type: 'string' }, + content: { type: 'string', description: 'Message body (markdown supported by the renderer).' }, + wakeupUserId: { + type: 'string', + description: "Optional: a single Fabric userId to wake with this message (everyone else in the " + + 'channel sees it but with wakeup=false). Omit for fully silent broadcast.', + }, + }, + }, + execute: async (_id, p) => { + const agentId = ctx.agentId; + if (!agentId) + return { ok: false, error: 'no agent context' }; + const systemKey = resolveCommandsSyncKey(cfg); + if (!systemKey) { + return { + ok: false, + error: 'channels.fabric.commandsSyncKey is not configured — fabric-send-sys-msg needs it for ' + + 'the x-fabric-system-key header. Configure via openclaw config.', + }; + } + const { guild } = await ctxGuild(agentId, p.guildNodeId); + const url = `${guild.endpoint}/api/channels/${encodeURIComponent(p.channelId)}/messages`; + const wakeup = typeof p.wakeupUserId === 'string' && p.wakeupUserId.trim() + ? p.wakeupUserId.trim() + : null; + const res = await fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-fabric-system-key': systemKey, + }, + body: JSON.stringify({ content: p.content, wakeupUserId: wakeup }), + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + return { ok: false, error: `POST ${url} -> ${res.status} ${text}` }; + } + const json = (await res.json().catch(() => null)); + return { + ok: true, + messageId: json?.messageId, + seq: json?.seq, + authorUserId: json?.authorUserId, + }; + }, + })); // ----------------------------------------------------------------- // fabric-channel-list: enumerate channels the calling agent can see // in a given guild. Backend filters to public channels + channels the // agent is a member of. Returns id / name / xType per channel so the // agent can pick a channelId for fabric-send-message etc. // ----------------------------------------------------------------- - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-channel-list', description: 'List channels visible to the calling agent in a guild. Optional ' + 'nameFilter does a case-insensitive substring match client-side. ' + @@ -308,7 +711,7 @@ export function registerFabricTools(api, client, identity) { // workflow says "find the right guild for X" — pick by purpose, // then fabric-channel-list to find the right channel inside it. // ----------------------------------------------------------------- - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-guild-list', description: 'List guilds the calling agent is a member of. Returns ' + '{nodeId, name, purpose, status} per row. ' + @@ -370,7 +773,7 @@ export function registerFabricTools(api, client, identity) { // channel must be public). Use this to backfill purpose on existing // channels, or to refine it after a channel's role evolves. // ----------------------------------------------------------------- - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-channel-set-purpose', description: "Set or update a channel's free-form purpose description. " + 'Channel membership required (or the channel must be public). ' + @@ -407,7 +810,7 @@ export function registerFabricTools(api, client, identity) { // gated; verify a previous message went through; lookup recent // duplicates before opening a new task in triage. // ----------------------------------------------------------------- - api.registerTool((ctx) => ({ + gatedRegister((ctx) => ({ name: 'fabric-message-history', description: "Read a channel's recent message history. Omit seqFrom/seqTo to " + 'tail (last `limit` messages, default 20, max 200). Backend ' + diff --git a/index.ts b/index.ts index d3e55fc..1dfab28 100644 --- a/index.ts +++ b/index.ts @@ -49,6 +49,18 @@ export default defineChannelPluginEntry({ logger: { info: (m: string) => void; warn: (m: string) => void }; on: (ev: string, fn: (...args: unknown[]) => unknown) => void; registerTool: (d: unknown) => void; + registerGatewayMethod: ( + method: string, + handler: (req: { + params?: unknown; + respond: ( + ok: boolean, + data?: unknown, + error?: { code: string; message: string }, + ) => void; + }) => void | Promise, + opts?: { scope?: string }, + ) => void; }; const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string; commandsSyncKey?: string } }; @@ -121,6 +133,33 @@ export default defineChannelPluginEntry({ api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)'); } + // CLI-invocable live registration, callable from a shell script via + // openclaw gateway call fabric.register --params '{"agentId":"…","apiKey":"fak_…"}' + // The `fabric-register` TOOL only fires inside an agent turn, and there is + // no `openclaw tools call` CLI — so recruitment's `register-agent` script + // (a plain shell step, no LLM turn) had to fall back to the standalone + // binary, which can't notify the running plugin → needed a gateway + // restart. This gateway method runs in-process: inbound.addAccount + // validates the key, persists identity, and brings the socket up live — + // no restart. + api.registerGatewayMethod('fabric.register', async ({ params, respond }) => { + const p = (params ?? {}) as { agentId?: string; apiKey?: string }; + if (!p.agentId || !p.apiKey) { + respond(false, { ok: false }, { code: 'INVALID_REQUEST', message: 'agentId and apiKey required' }); + return; + } + if (!inbound) { + respond(false, { ok: false }, { code: 'UNAVAILABLE', message: 'fabric inbound not ready (gateway still starting?)' }); + return; + } + try { + await inbound.addAccount({ agentId: p.agentId, fabricApiKey: p.apiKey }); + respond(true, { ok: true, agentId: p.agentId }); + } catch (err) { + respond(false, { ok: false }, { code: 'UNAVAILABLE', message: `fabric-register failed: ${String(err)}` }); + } + }, { scope: 'operator.write' }); + api.on('gateway_start', () => { const _G = globalThis as Record; if (_G._fabricInboundStarted) return;