OpenClaw delivers an agent turn whose blocks are text -> thinking/tool -> text via multiple inbound deliver() calls (a non-text block is a delivery boundary), so one turn became N Fabric messages. Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush them as ONE postMessage at a deterministic boundary — the finally after dispatchInboundReplyWithBase() resolves, which provably runs only after every deliver() of the turn (verified: deliver,deliver -> dispatch returned -> flush). No hooks, no timers, no idle guessing. The agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop flushes any leftover; a long safety timeout is a leak-guard only. channels.fabric.coalesce=false restores raw per-segment posting. Verified on local openclaw + Fabric with a fake text/thinking/text model: single trigger -> exactly one merged message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
76 lines
2.9 KiB
JavaScript
76 lines
2.9 KiB
JavaScript
// Deterministic turn coalescer.
|
|
//
|
|
// OpenClaw calls the Fabric `deliver` callback once per assistant text
|
|
// segment; a thinking/tool block between two text blocks is a delivery
|
|
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
|
|
// multiple deliver() calls. There is no turn id on the delivery, so we
|
|
// BUFFER segments by Fabric channelId and post the merged message when the
|
|
// turn truly ends. The flush is driven by inbound.ts right after
|
|
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
|
|
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
|
|
// agent_end hook, which fires before deliver()). `coalesce=false` posts
|
|
// each segment immediately.
|
|
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
|
|
export function normChannelId(x) {
|
|
const s = String(x ?? '');
|
|
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
|
|
}
|
|
const pendingByChannel = new Map();
|
|
async function flushChannel(channelId, reason) {
|
|
const p = pendingByChannel.get(channelId);
|
|
if (!p)
|
|
return;
|
|
pendingByChannel.delete(channelId);
|
|
clearTimeout(p.safety);
|
|
const text = p.parts.join('\n\n').trim();
|
|
if (!text)
|
|
return;
|
|
try {
|
|
await p.post(text);
|
|
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
|
|
}
|
|
catch (e) {
|
|
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
|
|
}
|
|
}
|
|
// Buffer one delivered segment (or send immediately when coalesce=false).
|
|
// `post` performs the real Fabric postMessage with the caller's already
|
|
// resolved guild/token; on flush it is called once with the merged text.
|
|
export async function enqueueDelivery(params) {
|
|
const cid = normChannelId(params.channelId);
|
|
const text = (params.text ?? '').trim();
|
|
if (!text)
|
|
return;
|
|
if (!params.coalesce) {
|
|
await params.post(text);
|
|
return;
|
|
}
|
|
const existing = pendingByChannel.get(cid);
|
|
if (existing) {
|
|
existing.parts.push(text);
|
|
existing.post = params.post; // freshest guild/token closure
|
|
existing.log = params.log;
|
|
}
|
|
else {
|
|
pendingByChannel.set(cid, {
|
|
parts: [text],
|
|
post: params.post,
|
|
log: params.log,
|
|
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
|
|
});
|
|
}
|
|
}
|
|
// Called by the agent_end hook with the hook ctx's channelId (bare or
|
|
// fabric:-prefixed). Deterministic per-turn boundary.
|
|
export async function flushFabricForChannel(rawChannelId) {
|
|
const cid = normChannelId(rawChannelId);
|
|
if (cid)
|
|
await flushChannel(cid, 'dispatch-end');
|
|
}
|
|
// gateway_stop: flush anything still buffered.
|
|
export async function flushAllFabric() {
|
|
for (const cid of [...pendingByChannel.keys()]) {
|
|
await flushChannel(cid, 'gateway_stop');
|
|
}
|
|
}
|