feat(fabric): coalesce a split agent turn into ONE message (deterministic)

OpenClaw delivers an agent turn whose blocks are text -> thinking/tool
-> text via multiple inbound deliver() calls (a non-text block is a
delivery boundary), so one turn became N Fabric messages.

Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush
them as ONE postMessage at a deterministic boundary — the finally after
dispatchInboundReplyWithBase() resolves, which provably runs only after
every deliver() of the turn (verified: deliver,deliver -> dispatch
returned -> flush). No hooks, no timers, no idle guessing. The
agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop
flushes any leftover; a long safety timeout is a leak-guard only.
channels.fabric.coalesce=false restores raw per-segment posting.

Verified on local openclaw + Fabric with a fake text/thinking/text
model: single trigger -> exactly one merged message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
h z
2026-05-16 22:15:46 +01:00
parent ab126825ef
commit 8774cfd7cc
12 changed files with 253 additions and 7 deletions

View File

@@ -18,6 +18,11 @@ export type FabricChannelConfig = {
// (Guild C-2). Required by the channel config schema; sourced from config
// only — never from the environment.
commandsSyncKey?: string;
// Coalesce an agent turn that OpenClaw split into multiple deliveries
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
// message. The flush boundary is the deterministic `agent_end` hook (not
// a timer). Default true; set false for raw per-segment posting.
coalesce?: boolean;
accounts?: Record<string, FabricAccountConfig>;
defaultAccount?: string;
} & FabricAccountConfig;
@@ -46,6 +51,12 @@ export function resolveCommandsSyncKey(cfg: Cfg): string {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg: Cfg): boolean {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg: Cfg): string[] {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
cfg?: unknown;
config?: unknown;
}) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

93
src/coalesce.ts Normal file
View File

@@ -0,0 +1,93 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x: string | null | undefined): string {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
type Pending = {
parts: string[];
post: (text: string) => Promise<void>;
log?: (m: string) => void;
safety: ReturnType<typeof setTimeout>;
};
const pendingByChannel = new Map<string, Pending>();
async function flushChannel(channelId: string, reason: string): Promise<void> {
const p = pendingByChannel.get(channelId);
if (!p) return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text) return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
} catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params: {
channelId: string;
text: string;
coalesce: boolean;
post: (text: string) => Promise<void>;
log?: (m: string) => void;
}): Promise<void> {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text) return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
} else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(
() => void flushChannel(cid, 'safety-timeout'),
SAFETY_FLUSH_MS,
),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(
rawChannelId: string | null | undefined,
): Promise<void> {
const cid = normChannelId(rawChannelId);
if (cid) await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric(): Promise<void> {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

View File

@@ -5,6 +5,8 @@ import { io, type Socket } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
// channels (nextcloud-talk) drive the kernel:
@@ -301,8 +303,17 @@ export class FabricInbound {
const text = (payload?.text ?? '').trim();
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt) return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg as never),
post: (t) =>
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
log: (m) => this.log.info(m),
});
},
onRecordError: (err: unknown) =>
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
@@ -327,6 +338,12 @@ export class FabricInbound {
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
} catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
} finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}