diff --git a/index.ts b/index.ts index 7185e16..33b09a3 100644 --- a/index.ts +++ b/index.ts @@ -15,6 +15,8 @@ import { FabricClient } from './src/fabric-client.js'; import { IdentityRegistry } from './src/identity.js'; import { syncFabricCommands } from './src/command-sync.js'; import { PresenceSync } from './src/presence-sync.js'; +import { SubDiscussionStore } from './src/sub-discussion-store.js'; +import { registerSubDiscussionHook } from './src/sub-discussion-hook.js'; import path from 'node:path'; import os from 'node:os'; @@ -48,19 +50,38 @@ export default defineChannelPluginEntry({ on: (ev: string, fn: (...args: unknown[]) => unknown) => void; registerTool: (d: unknown) => void; }; - const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } }; + const cfg = (api.config ?? {}) as { + channels?: { fabric?: { centerApiBase?: string; commandsSyncKey?: string } }; + }; const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api'; const idFile = api.pluginConfig?.identityFilePath ?? path.join(os.homedir(), '.openclaw', 'fabric-identity.json'); + const subDiscussionFile = path.join( + os.homedir(), + '.openclaw', + 'fabric-sub-discussion.json', + ); // tools operate against a default Center; per-account keys come from config const client = new FabricClient(centerApiBase); const identity = new IdentityRegistry(idFile); + const subDiscussion = new SubDiscussionStore(subDiscussionFile); registerFabricTools( { registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity, + subDiscussion, + cfg, + ); + // Per-(agent, channel) prompt injection for sub-discussion channels. + // Runs as a sibling to PrismFacet's before_prompt_build hook (and + // ClawPrompts' fabric-chat-injector); openclaw composes + // appendSystemContext from all registered handlers. + registerSubDiscussionHook( + { on: api.on, logger: api.logger }, + subDiscussion, + identity, ); // Cross-plugin API: globalThis.__fabric diff --git a/openclaw.plugin.json b/openclaw.plugin.json index 21ac613..8218aa9 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -14,7 +14,9 @@ "create-work-channel", "create-report-channel", "create-discussion-channel", + "create-sub-discussion", "discussion-complete", + "close-sub-discussion", "fabric-canvas", "fabric-channel", "fabric-send-message", diff --git a/src/sub-discussion-hook.ts b/src/sub-discussion-hook.ts new file mode 100644 index 0000000..7ff6479 --- /dev/null +++ b/src/sub-discussion-hook.ts @@ -0,0 +1,76 @@ +import type { IdentityRegistry } from './identity.js'; +import type { SubDiscussionStore } from './sub-discussion-store.js'; + +// Plugin-local before_prompt_build hook that injects per-(agent, channel) +// guides for sub-discussion channels created via the `create-sub-discussion` +// tool. Mirrors the pattern used by ClawPrompts' fabric-chat-injector +// (channelId-aware injection) but with content dynamically supplied at +// channel-creation time instead of read from static files via PrismFacet's +// router/rule registry. +// +// Match logic per turn: +// ctx.channelId → store.find() → sub-discussion entry +// ctx.agentId → identity.findByAgentId().fabricUserId +// ─ matches entry.hostUserId → inject hostGuide +// ─ matches entry.guestUserIds → inject guestGuide +// ─ neither → no injection +// +// Fail-closed on unknown agentId/channelId — we never inject "the wrong" +// guide, only the right one or nothing. + +const _G = globalThis as Record; +const DEDUP_KEY = '_fabricSubDiscussionHookDedup'; + +type PromptCtx = { + agentId?: string; + channelId?: string; + messageProvider?: string; +}; + +export function registerSubDiscussionHook( + api: { + on: (hook: string, handler: (...args: unknown[]) => unknown) => void; + logger: { info: (m: string) => void; warn: (m: string) => void }; + }, + store: SubDiscussionStore, + identity: IdentityRegistry, +): void { + if (!(_G[DEDUP_KEY] instanceof WeakSet)) _G[DEDUP_KEY] = new WeakSet(); + const dedup = _G[DEDUP_KEY] as WeakSet; + + api.on('before_prompt_build', async (...args: unknown[]) => { + const event = args[0]; + const ctx = (args[1] ?? {}) as PromptCtx; + // The hook fires both for fabric-driven turns (channelId set) and + // for other triggers (HF wake, exec-event, etc.) — drop those. + if (typeof event === 'object' && event !== null) { + if (dedup.has(event)) return undefined; + dedup.add(event); + } + const agentId = (ctx.agentId ?? '').trim(); + const channelId = (ctx.channelId ?? '').trim(); + if (!agentId || !channelId) return undefined; + const provider = (ctx.messageProvider ?? '').toLowerCase(); + if (provider && provider !== 'fabric') return undefined; + + const entry = store.find(channelId); + if (!entry) return undefined; + + const ident = identity.findByAgentId(agentId); + const myUserId = (ident?.fabricUserId ?? '').trim(); + if (!myUserId) { + // identity registry caches fabricUserId after the first agentLogin + // in inbound.ts. If it's missing here, the agent likely hasn't + // completed login yet — skip rather than guess. + return undefined; + } + + if (myUserId === entry.hostUserId) { + return { appendSystemContext: entry.hostGuide }; + } + if (entry.guestUserIds.includes(myUserId)) { + return { appendSystemContext: entry.guestGuide }; + } + return undefined; + }); +} diff --git a/src/sub-discussion-store.ts b/src/sub-discussion-store.ts new file mode 100644 index 0000000..b0740ea --- /dev/null +++ b/src/sub-discussion-store.ts @@ -0,0 +1,76 @@ +import { readFileSync, writeFileSync, existsSync, mkdirSync } from 'node:fs'; +import { dirname } from 'node:path'; + +// Records per-(sub-discussion-channel) state created by the +// `create-sub-discussion` tool and consumed by: +// 1. `before_prompt_build` hook — looks up by (agentId, channelId) to +// inject the host or guest guide as appendSystemContext. +// 2. `close-sub-discussion` tool — looks up by sub channelId to find +// the parent channel to post the callback to and to validate that +// the caller is the original host. +// +// One sub-discussion = one channel. Lifetime: from create-sub-discussion +// return until close-sub-discussion (or gateway-stop / disk corruption). +// We persist to a small JSON file so a gateway restart mid-interview +// doesn't strand both host and guest with no guide. + +export type SubDiscussionEntry = { + subChannelId: string; + hostAgentId: string; // the openclaw agentId that called create-sub-discussion + hostUserId: string; // the host's Fabric Center userId (session.user.id) + guestUserIds: string[]; // Fabric Center userIds invited as guests + hostGuide: string; // appended to host's session system prompt while in this channel + guestGuide: string; // appended to each guest's session system prompt while in this channel + callbackGuildNodeId: string; // where to post the callback when close is called + callbackChannelId: string; // parent channel to post callback to (system msg) + createdAt: string; // ISO timestamp +}; + +type StoreFile = { entries: SubDiscussionEntry[] }; + +export class SubDiscussionStore { + private byChannelId = new Map(); + + constructor(private readonly filePath: string) { + this.load(); + } + + private load(): void { + if (!existsSync(this.filePath)) return; + try { + const data = JSON.parse(readFileSync(this.filePath, 'utf8')) as StoreFile; + for (const e of data.entries ?? []) { + if (e?.subChannelId) this.byChannelId.set(e.subChannelId, e); + } + } catch { + // Corrupt file → start empty; first mutation rewrites cleanly. + } + } + + private persist(): void { + mkdirSync(dirname(this.filePath), { recursive: true }); + const data: StoreFile = { entries: [...this.byChannelId.values()] }; + writeFileSync(this.filePath, JSON.stringify(data, null, 2)); + } + + list(): SubDiscussionEntry[] { + return [...this.byChannelId.values()]; + } + + find(subChannelId: string): SubDiscussionEntry | undefined { + return this.byChannelId.get(subChannelId); + } + + add(entry: SubDiscussionEntry): void { + this.byChannelId.set(entry.subChannelId, entry); + this.persist(); + } + + remove(subChannelId: string): SubDiscussionEntry | undefined { + const e = this.byChannelId.get(subChannelId); + if (!e) return undefined; + this.byChannelId.delete(subChannelId); + this.persist(); + return e; + } +} diff --git a/src/tools.ts b/src/tools.ts index 5d7f018..a1d2806 100644 --- a/src/tools.ts +++ b/src/tools.ts @@ -1,5 +1,7 @@ import type { FabricClient } from './fabric-client.js'; import type { IdentityRegistry } from './identity.js'; +import type { SubDiscussionStore } from './sub-discussion-store.js'; +import { resolveCommandsSyncKey } from './accounts.js'; // OpenClaw tool registration api (loose typing — concrete shape from // openclaw/plugin-sdk/core at host SDK version). @@ -10,6 +12,9 @@ type ToolApi = { type Ctx = { agentId?: string }; +// Loose config shape — just the fields we read here. +type ToolsCfg = { channels?: { fabric?: { commandsSyncKey?: string } }; [k: string]: unknown }; + const X_BY_KIND: Record = { chat: 'general', work: 'work', @@ -17,10 +22,25 @@ const X_BY_KIND: Record = { discussion: 'discuss', }; +// Delay between create-sub-discussion's channel create and greeting post. +// Backend pushes channel.joined to invitee sockets on create; that push +// has to traverse socket.io rooms before the guest plugin can sub the +// channel: room. If the greeting is posted before that, the guest's +// turn-activation wakeup misses (the socket isn't in the room yet). +// 500ms is empirically slack enough on local sim + production t3, and +// short enough not to feel laggy from the host's tool-result POV. Bump +// via FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS env if needed. +const GREETING_DELAY_MS = (() => { + const v = Number.parseInt(process.env.FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS ?? '', 10); + return Number.isFinite(v) && v >= 0 ? v : 500; +})(); + export function registerFabricTools( api: ToolApi, client: FabricClient, identity: IdentityRegistry, + store: SubDiscussionStore, + cfg: ToolsCfg, ): void { // Resolve the calling agent's Fabric session + a guild's token/endpoint. const ctxGuild = async (agentId: string, guildNodeId: string) => { @@ -134,6 +154,245 @@ export function registerFabricTools( }, })); + // ─────────────────────────────────────────────────────────────────── + // create-sub-discussion: open a discuss-type sub-channel hanging off + // the caller's current channel. Designed for host-driven multi-turn + // exchanges (interview, brainstorm, narrow Q&A) where the guests are + // either fresh agents without workflow capability (recruitment + // interviewee) or peers that just need a short scoped chat without + // entering their own subflow. + // + // What it does on top of plain create-discussion-channel: + // 1. Persists a store entry indexed by the new sub channelId, carrying: + // host agent + userId, guest userIds, host/guest guide texts, + // callback (parent) channel info. + // 2. Auto-posts `greetingMsg` using the host's own Fabric account so + // turn rotation's activation rule (first author → newOrder[0], + // currentSpeaker → newOrder[1], wakeup → newOrder[1]) puts the + // first guest on the spot immediately — no race where host posts + // before guest's socket subs the channel room (we wait + // GREETING_DELAY_MS for backend's channel.joined push to land). + // 3. The accompanying before_prompt_build hook (sub-discussion-hook + // registered from index.ts) then injects `hostGuideMsg` into the + // host's session prompt and `guestGuideMsg` into each guest's + // session prompt whenever a turn in this channel fires — so the + // two roles see different instructions, no shared guide file. + // ─────────────────────────────────────────────────────────────────── + api.registerTool((ctx: Ctx) => ({ + name: 'create-sub-discussion', + description: + 'Open a host-driven sub-discussion channel (x_type=discuss) hanging off your current channel, ' + + 'with role-specific system-prompt guides for host and guests. Use this for interviews / scoped ' + + 'Q&A where you stay in control of when the conversation ends. Returns the sub channelId; ' + + 'reach it via fabric-send-message in the rotating turn order. Close with close-sub-discussion ' + + 'to write a callback back into the parent channel.', + parameters: { + type: 'object', + additionalProperties: false, + required: [ + 'guildNodeId', + 'currentChannelId', + 'channelName', + 'greetingMsg', + 'hostGuideMsg', + 'guestGuideMsg', + 'guests', + ], + properties: { + guildNodeId: { type: 'string', description: 'Fabric guild node id (same guild for parent + sub).' }, + currentChannelId: { + type: 'string', + description: 'Channel id you are currently in (parent). Used as the callback target on close.', + }, + channelName: { type: 'string', description: 'Display name for the new sub-discussion channel.' }, + greetingMsg: { + type: 'string', + description: + 'First message posted by YOU (the host) in the sub channel. Triggers turn rotation so ' + + "the first guest's session wakes immediately with both your greeting in history and the " + + 'guest guide injected as system prompt.', + }, + hostGuideMsg: { + type: 'string', + description: + "Appended to YOUR session's system prompt whenever a turn fires in this sub channel. " + + 'Use it to remind yourself of the procedure (what to ask, when to call close-sub-discussion).', + }, + guestGuideMsg: { + type: 'string', + description: + "Appended to EACH GUEST's session system prompt for turns in this sub channel. Use it to " + + 'orient guests with no prior workflow context (e.g. a fresh interviewee). Keep it short; ' + + 'long guides bloat every turn.', + }, + guests: { + type: 'array', + items: { type: 'string' }, + minItems: 1, + description: + 'Fabric Center userIds invited as guests. Resolve via fabric-channel-list members or the ' + + '@.hangman-lab.top email convention.', + }, + purpose: { type: 'string', description: 'Optional channel.purpose for discoverability.' }, + }, + }, + execute: async ( + _id: string, + p: { + guildNodeId: string; + currentChannelId: string; + channelName: string; + greetingMsg: string; + hostGuideMsg: string; + guestGuideMsg: string; + guests: string[]; + purpose?: string; + }, + ) => { + const agentId = ctx.agentId; + if (!agentId) return { ok: false, error: 'no agent context' }; + if (!Array.isArray(p.guests) || p.guests.length === 0) { + return { ok: false, error: 'guests must be a non-empty array of Fabric userIds' }; + } + const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId); + const ch = await client.createChannel(guild.endpoint, token, { + guildId: p.guildNodeId, + name: p.channelName, + xType: 'discuss', + isPublic: false, + memberUserIds: p.guests, + ...(p.purpose !== undefined ? { purpose: p.purpose } : {}), + }); + store.add({ + subChannelId: ch.id, + hostAgentId: agentId, + hostUserId: session.user.id, + guestUserIds: [...p.guests], + hostGuide: p.hostGuideMsg, + guestGuide: p.guestGuideMsg, + callbackGuildNodeId: p.guildNodeId, + callbackChannelId: p.currentChannelId, + createdAt: new Date().toISOString(), + }); + // Let backend's channel.joined push reach guest sockets before our + // greeting fires — otherwise the wakeup emitted by turn-activation + // races a not-yet-subscribed socket.io room. + if (GREETING_DELAY_MS > 0) { + await new Promise((r) => setTimeout(r, GREETING_DELAY_MS)); + } + try { + await client.postMessage(guild.endpoint, token, ch.id, p.greetingMsg, session.user.id); + } catch (err) { + api.logger.warn( + `fabric: create-sub-discussion greeting post failed channel=${ch.id} err=${String(err)}`, + ); + } + return { ok: true, subChannelId: ch.id }; + }, + })); + + // ─────────────────────────────────────────────────────────────────── + // close-sub-discussion: post a system-authored callback into the + // parent channel + close the sub-discussion channel. Only the original + // host can call this. Uses the Guild's x-fabric-system-key path (shared + // secret = commandsSyncKey) so the callback lands as a guild/system + // author, not the host's personal account — and can wake the host on + // the parent channel to continue whatever workflow opened the sub. + // ─────────────────────────────────────────────────────────────────── + api.registerTool((ctx: Ctx) => ({ + name: 'close-sub-discussion', + description: + 'Close a sub-discussion channel you opened (host-only) and write a callback to the parent ' + + 'channel as a system message. Pass `wakeupHost: false` to land the callback silently in ' + + 'history without waking yourself.', + parameters: { + type: 'object', + additionalProperties: false, + required: ['subChannelId', 'callbackMsg'], + properties: { + subChannelId: { type: 'string', description: 'The sub-discussion channelId returned by create-sub-discussion.' }, + callbackMsg: { + type: 'string', + description: + 'Content to post into the parent channel as a system-authored message. Typical content: ' + + 'the conclusion / extracted data from the sub-discussion, so the next turn on the parent ' + + 'channel can act on it.', + }, + wakeupHost: { + type: 'boolean', + description: + 'Whether to wake YOU (the host) on the parent channel. Default true — for recruitment ' + + 'interview flow where the next workflow step needs to run immediately. Pass false for ' + + 'fire-and-forget logging.', + }, + }, + }, + execute: async ( + _id: string, + p: { subChannelId: string; callbackMsg: string; wakeupHost?: boolean }, + ) => { + const agentId = ctx.agentId; + if (!agentId) return { ok: false, error: 'no agent context' }; + const entry = store.find(p.subChannelId); + if (!entry) { + return { ok: false, error: `sub-discussion not found: ${p.subChannelId}` }; + } + if (entry.hostAgentId !== agentId) { + return { + ok: false, + error: `only the host (${entry.hostAgentId}) may close this sub-discussion`, + }; + } + const systemKey = resolveCommandsSyncKey(cfg); + if (!systemKey) { + return { + ok: false, + error: + 'channels.fabric.commandsSyncKey is not configured — close-sub-discussion needs it for ' + + 'the x-fabric-system-key callback. Configure via openclaw config.', + }; + } + const { guild, token } = await ctxGuild(agentId, entry.callbackGuildNodeId); + const wakeup = p.wakeupHost !== false; + // 1) Post callback into parent channel via the system-key path. + const url = `${guild.endpoint}/api/channels/${encodeURIComponent(entry.callbackChannelId)}/messages`; + const res = await fetch(url, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-fabric-system-key': systemKey, + }, + body: JSON.stringify({ + content: p.callbackMsg, + wakeupUserId: wakeup ? entry.hostUserId : null, + }), + }).catch((err) => { + throw new Error(`callback POST failed: ${String(err)}`); + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + return { + ok: false, + error: `callback POST ${url} -> ${res.status} ${text}`, + }; + } + // 2) Close the sub channel using the host's own bearer (the host is + // a member of the channel — channel.close auth is per-member). + try { + await client.closeChannel(guild.endpoint, token, entry.subChannelId); + } catch (err) { + api.logger.warn( + `fabric: close-sub-discussion: sub channel close failed channel=${entry.subChannelId} err=${String(err)}`, + ); + } + // 3) Drop the store entry so the prompt-injection hook stops firing + // for this channel (the sub is closed; any straggler turns would + // just hit the closed-channel write reject downstream). + store.remove(entry.subChannelId); + return { ok: true, closed: true }; + }, + })); + // fabric-canvas: share / update / read / close the channel's single // pinned canvas document (one tool, four actions). update/close are // sharer-only server-side (the guild returns 403 otherwise).