From 8774cfd7ccdb85f41484506ffa3430ca090ac95c Mon Sep 17 00:00:00 2001 From: hzhang Date: Sat, 16 May 2026 22:15:46 +0100 Subject: [PATCH] feat(fabric): coalesce a split agent turn into ONE message (deterministic) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OpenClaw delivers an agent turn whose blocks are text -> thinking/tool -> text via multiple inbound deliver() calls (a non-text block is a delivery boundary), so one turn became N Fabric messages. Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush them as ONE postMessage at a deterministic boundary — the finally after dispatchInboundReplyWithBase() resolves, which provably runs only after every deliver() of the turn (verified: deliver,deliver -> dispatch returned -> flush). No hooks, no timers, no idle guessing. The agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop flushes any leftover; a long safety timeout is a leak-guard only. channels.fabric.coalesce=false restores raw per-segment posting. Verified on local openclaw + Fabric with a fake text/thinking/text model: single trigger -> exactly one merged message. Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 6 +++ dist/fabric/index.js | 6 +++ dist/fabric/src/accounts.js | 5 ++ dist/fabric/src/channel.js | 5 +- dist/fabric/src/coalesce.js | 75 ++++++++++++++++++++++++++++++ dist/fabric/src/inbound.js | 21 ++++++++- index.ts | 8 +++- openclaw.plugin.json | 4 ++ src/accounts.ts | 11 +++++ src/channel.ts | 5 +- src/coalesce.ts | 93 +++++++++++++++++++++++++++++++++++++ src/inbound.ts | 21 ++++++++- 12 files changed, 253 insertions(+), 7 deletions(-) create mode 100644 dist/fabric/src/coalesce.js create mode 100644 src/coalesce.ts diff --git a/README.md b/README.md index de641d0..7e02b95 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,12 @@ Two ways, both write the same identity registry the transport reads: `FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`. Sourced from config only — never from the environment. +- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent + turn whose blocks are `text → thinking/tool → text` into multiple + `deliver()` calls; this buffers them and posts ONE Fabric message at the + deterministic turn boundary (right after the inbound reply dispatch + resolves — no hooks, no timers, no idle guessing). `false` = raw + per-segment posting. - `channels.fabric.accounts.` = `{ fabricApiKey, enabled }` (**agent = account**; the account id is the OpenClaw agentId) - plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json` diff --git a/dist/fabric/index.js b/dist/fabric/index.js index d99db3b..247dbed 100644 --- a/dist/fabric/index.js +++ b/dist/fabric/index.js @@ -5,6 +5,7 @@ // the OpenClawPluginApi for runtime startup (transport + tools). import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core'; import { fabricChannelPlugin } from './src/channel.js'; +import { flushAllFabric } from './src/coalesce.js'; import { FabricInbound } from './src/inbound.js'; import { listEnabledFabricAccounts } from './src/accounts.js'; import { registerFabricTools } from './src/tools.js'; @@ -60,7 +61,12 @@ export default defineChannelPluginEntry({ api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`); void syncFabricCommands(client, cfg, accounts, api.logger); }); + // Note: the per-turn coalesce flush happens deterministically in + // inbound.ts right after dispatchInboundReplyWithBase resolves (that + // is the real "all deliveries done" boundary; the agent_end hook fires + // BEFORE deliver()). gateway_stop only flushes any leftover buffer. api.on('gateway_stop', () => { + void flushAllFabric(); inbound?.stop(); inbound = null; }); diff --git a/dist/fabric/src/accounts.js b/dist/fabric/src/accounts.js index d60f1a9..70f452b 100644 --- a/dist/fabric/src/accounts.js +++ b/dist/fabric/src/accounts.js @@ -13,6 +13,11 @@ function section(cfg) { export function resolveCommandsSyncKey(cfg) { return (section(cfg).commandsSyncKey ?? '').trim(); } +// Whether to coalesce a split agent turn into one Fabric message +// (channel-level). Default true. +export function resolveCoalesce(cfg) { + return (cfg.channels?.fabric ?? {}).coalesce !== false; +} export function listFabricAccountIds(cfg) { const accts = section(cfg).accounts ?? {}; const ids = Object.keys(accts); diff --git a/dist/fabric/src/channel.js b/dist/fabric/src/channel.js index 4496ab6..cb06c38 100644 --- a/dist/fabric/src/channel.js +++ b/dist/fabric/src/channel.js @@ -137,7 +137,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({ attachedResults: { channel: 'fabric', sendText: async (ctx) => { - // openclaw passes config under cfg or config depending on path + // openclaw passes config under cfg or config depending on path. + // Note: inbound agent replies go through inbound.ts `deliver` + // (where turn coalescing happens). This path is for any direct + // outbound sends and posts immediately. const cfg = (ctx.cfg ?? ctx.config ?? {}); try { const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text); diff --git a/dist/fabric/src/coalesce.js b/dist/fabric/src/coalesce.js new file mode 100644 index 0000000..048200d --- /dev/null +++ b/dist/fabric/src/coalesce.js @@ -0,0 +1,75 @@ +// Deterministic turn coalescer. +// +// OpenClaw calls the Fabric `deliver` callback once per assistant text +// segment; a thinking/tool block between two text blocks is a delivery +// boundary, so one agent turn of `text → thinking/tool → text` arrives as +// multiple deliver() calls. There is no turn id on the delivery, so we +// BUFFER segments by Fabric channelId and post the merged message when the +// turn truly ends. The flush is driven by inbound.ts right after +// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every +// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the +// agent_end hook, which fires before deliver()). `coalesce=false` posts +// each segment immediately. +const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism +export function normChannelId(x) { + const s = String(x ?? ''); + return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s; +} +const pendingByChannel = new Map(); +async function flushChannel(channelId, reason) { + const p = pendingByChannel.get(channelId); + if (!p) + return; + pendingByChannel.delete(channelId); + clearTimeout(p.safety); + const text = p.parts.join('\n\n').trim(); + if (!text) + return; + try { + await p.post(text); + p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`); + } + catch (e) { + p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`); + } +} +// Buffer one delivered segment (or send immediately when coalesce=false). +// `post` performs the real Fabric postMessage with the caller's already +// resolved guild/token; on flush it is called once with the merged text. +export async function enqueueDelivery(params) { + const cid = normChannelId(params.channelId); + const text = (params.text ?? '').trim(); + if (!text) + return; + if (!params.coalesce) { + await params.post(text); + return; + } + const existing = pendingByChannel.get(cid); + if (existing) { + existing.parts.push(text); + existing.post = params.post; // freshest guild/token closure + existing.log = params.log; + } + else { + pendingByChannel.set(cid, { + parts: [text], + post: params.post, + log: params.log, + safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS), + }); + } +} +// Called by the agent_end hook with the hook ctx's channelId (bare or +// fabric:-prefixed). Deterministic per-turn boundary. +export async function flushFabricForChannel(rawChannelId) { + const cid = normChannelId(rawChannelId); + if (cid) + await flushChannel(cid, 'dispatch-end'); +} +// gateway_stop: flush anything still buffered. +export async function flushAllFabric() { + for (const cid of [...pendingByChannel.keys()]) { + await flushChannel(cid, 'gateway_stop'); + } +} diff --git a/dist/fabric/src/inbound.js b/dist/fabric/src/inbound.js index 5e82e79..54df8b2 100644 --- a/dist/fabric/src/inbound.js +++ b/dist/fabric/src/inbound.js @@ -3,6 +3,8 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { io } from 'socket.io-client'; import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; +import { resolveCoalesce } from './accounts.js'; +import { enqueueDelivery, flushFabricForChannel } from './coalesce.js'; export class FabricInbound { core; cfg; @@ -244,8 +246,16 @@ export class FabricInbound { this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`); if (!text || !gt) return; - await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); - this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`); + // Buffer segments; the merged message is posted right after + // dispatch returns (the deterministic turn boundary, see the + // finally below). Disable per channel: channels.fabric.coalesce. + await enqueueDelivery({ + channelId, + text, + coalesce: resolveCoalesce(this.cfg), + post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id), + log: (m) => this.log.info(m), + }); }, onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`), onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`), @@ -270,5 +280,12 @@ export class FabricInbound { catch (err) { this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`); } + finally { + // Deterministic per-turn boundary: dispatchInboundReplyWithBase only + // resolves AFTER every deliver() call of this turn has run, so the + // buffer now holds all segments — flush them as ONE Fabric message. + // No hooks, no timers, no idle guessing. + await flushFabricForChannel(channelId); + } } } diff --git a/index.ts b/index.ts index 4b15dbd..a744a5a 100644 --- a/index.ts +++ b/index.ts @@ -6,6 +6,7 @@ import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core'; import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core'; import { fabricChannelPlugin } from './src/channel.js'; +import { flushAllFabric } from './src/coalesce.js'; import { FabricInbound } from './src/inbound.js'; import { listEnabledFabricAccounts } from './src/accounts.js'; import { registerFabricTools } from './src/tools.js'; @@ -37,7 +38,7 @@ export default defineChannelPluginEntry({ config?: unknown; pluginConfig?: { identityFilePath?: string }; logger: { info: (m: string) => void; warn: (m: string) => void }; - on: (ev: string, fn: () => void) => void; + on: (ev: string, fn: (...args: unknown[]) => unknown) => void; registerTool: (d: unknown) => void; }; const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } }; @@ -86,7 +87,12 @@ export default defineChannelPluginEntry({ void syncFabricCommands(client, cfg, accounts, api.logger); }); + // Note: the per-turn coalesce flush happens deterministically in + // inbound.ts right after dispatchInboundReplyWithBase resolves (that + // is the real "all deliveries done" boundary; the agent_end hook fires + // BEFORE deliver()). gateway_stop only flushes any leftover buffer. api.on('gateway_stop', () => { + void flushAllFabric(); inbound?.stop(); inbound = null; }); diff --git a/openclaw.plugin.json b/openclaw.plugin.json index 6841548..7cfd092 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -44,6 +44,10 @@ "minLength": 1, "description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js" }, + "coalesce": { + "type": "boolean", + "description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting." + }, "dmSecurity": { "type": "string" }, "dmPolicy": { "type": "string" }, "enabled": { "type": "boolean" }, diff --git a/src/accounts.ts b/src/accounts.ts index c532673..2bd1261 100644 --- a/src/accounts.ts +++ b/src/accounts.ts @@ -18,6 +18,11 @@ export type FabricChannelConfig = { // (Guild C-2). Required by the channel config schema; sourced from config // only — never from the environment. commandsSyncKey?: string; + // Coalesce an agent turn that OpenClaw split into multiple deliveries + // (text → thinking/tool → text => N sendText calls) into ONE Fabric + // message. The flush boundary is the deterministic `agent_end` hook (not + // a timer). Default true; set false for raw per-segment posting. + coalesce?: boolean; accounts?: Record; defaultAccount?: string; } & FabricAccountConfig; @@ -46,6 +51,12 @@ export function resolveCommandsSyncKey(cfg: Cfg): string { return (section(cfg).commandsSyncKey ?? '').trim(); } +// Whether to coalesce a split agent turn into one Fabric message +// (channel-level). Default true. +export function resolveCoalesce(cfg: Cfg): boolean { + return (cfg.channels?.fabric ?? {}).coalesce !== false; +} + export function listFabricAccountIds(cfg: Cfg): string[] { const accts = section(cfg).accounts ?? {}; const ids = Object.keys(accts); diff --git a/src/channel.ts b/src/channel.ts index 18d4f84..38603d1 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin { - // openclaw passes config under cfg or config depending on path + // openclaw passes config under cfg or config depending on path. + // Note: inbound agent replies go through inbound.ts `deliver` + // (where turn coalescing happens). This path is for any direct + // outbound sends and posts immediately. const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg; try { const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text); diff --git a/src/coalesce.ts b/src/coalesce.ts new file mode 100644 index 0000000..3c9315c --- /dev/null +++ b/src/coalesce.ts @@ -0,0 +1,93 @@ +// Deterministic turn coalescer. +// +// OpenClaw calls the Fabric `deliver` callback once per assistant text +// segment; a thinking/tool block between two text blocks is a delivery +// boundary, so one agent turn of `text → thinking/tool → text` arrives as +// multiple deliver() calls. There is no turn id on the delivery, so we +// BUFFER segments by Fabric channelId and post the merged message when the +// turn truly ends. The flush is driven by inbound.ts right after +// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every +// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the +// agent_end hook, which fires before deliver()). `coalesce=false` posts +// each segment immediately. + +const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism + +export function normChannelId(x: string | null | undefined): string { + const s = String(x ?? ''); + return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s; +} + +type Pending = { + parts: string[]; + post: (text: string) => Promise; + log?: (m: string) => void; + safety: ReturnType; +}; +const pendingByChannel = new Map(); + +async function flushChannel(channelId: string, reason: string): Promise { + const p = pendingByChannel.get(channelId); + if (!p) return; + pendingByChannel.delete(channelId); + clearTimeout(p.safety); + const text = p.parts.join('\n\n').trim(); + if (!text) return; + try { + await p.post(text); + p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`); + } catch (e) { + p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`); + } +} + +// Buffer one delivered segment (or send immediately when coalesce=false). +// `post` performs the real Fabric postMessage with the caller's already +// resolved guild/token; on flush it is called once with the merged text. +export async function enqueueDelivery(params: { + channelId: string; + text: string; + coalesce: boolean; + post: (text: string) => Promise; + log?: (m: string) => void; +}): Promise { + const cid = normChannelId(params.channelId); + const text = (params.text ?? '').trim(); + if (!text) return; + if (!params.coalesce) { + await params.post(text); + return; + } + const existing = pendingByChannel.get(cid); + if (existing) { + existing.parts.push(text); + existing.post = params.post; // freshest guild/token closure + existing.log = params.log; + } else { + pendingByChannel.set(cid, { + parts: [text], + post: params.post, + log: params.log, + safety: setTimeout( + () => void flushChannel(cid, 'safety-timeout'), + SAFETY_FLUSH_MS, + ), + }); + } +} + +// Called by the agent_end hook with the hook ctx's channelId (bare or +// fabric:-prefixed). Deterministic per-turn boundary. +export async function flushFabricForChannel( + rawChannelId: string | null | undefined, +): Promise { + const cid = normChannelId(rawChannelId); + if (cid) await flushChannel(cid, 'dispatch-end'); +} + +// gateway_stop: flush anything still buffered. +export async function flushAllFabric(): Promise { + for (const cid of [...pendingByChannel.keys()]) { + await flushChannel(cid, 'gateway_stop'); + } +} diff --git a/src/inbound.ts b/src/inbound.ts index 65ef4d5..efce9e4 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -5,6 +5,8 @@ import { io, type Socket } from 'socket.io-client'; import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; import type { FabricClient, FabricSession } from './fabric-client.js'; import type { IdentityRegistry } from './identity.js'; +import { resolveCoalesce } from './accounts.js'; +import { enqueueDelivery, flushFabricForChannel } from './coalesce.js'; // COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled // channels (nextcloud-talk) drive the kernel: @@ -301,8 +303,17 @@ export class FabricInbound { const text = (payload?.text ?? '').trim(); this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`); if (!text || !gt) return; - await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); - this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`); + // Buffer segments; the merged message is posted right after + // dispatch returns (the deterministic turn boundary, see the + // finally below). Disable per channel: channels.fabric.coalesce. + await enqueueDelivery({ + channelId, + text, + coalesce: resolveCoalesce(this.cfg as never), + post: (t) => + this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise, + log: (m) => this.log.info(m), + }); }, onRecordError: (err: unknown) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`), @@ -327,6 +338,12 @@ export class FabricInbound { this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`); } catch (err) { this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`); + } finally { + // Deterministic per-turn boundary: dispatchInboundReplyWithBase only + // resolves AFTER every deliver() call of this turn has run, so the + // buffer now holds all segments — flush them as ONE Fabric message. + // No hooks, no timers, no idle guessing. + await flushFabricForChannel(channelId); } } }