OpenClaw delivers an agent turn whose blocks are text -> thinking/tool -> text via multiple inbound deliver() calls (a non-text block is a delivery boundary), so one turn became N Fabric messages. Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush them as ONE postMessage at a deterministic boundary — the finally after dispatchInboundReplyWithBase() resolves, which provably runs only after every deliver() of the turn (verified: deliver,deliver -> dispatch returned -> flush). No hooks, no timers, no idle guessing. The agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop flushes any leftover; a long safety timeout is a leak-guard only. channels.fabric.coalesce=false restores raw per-segment posting. Verified on local openclaw + Fabric with a fake text/thinking/text model: single trigger -> exactly one merged message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
101 lines
3.8 KiB
TypeScript
101 lines
3.8 KiB
TypeScript
// Fabric channel plugin entry.
|
|
// COMPAT NOTE (openclaw v2026.5.7): defineChannelPluginEntry signature
|
|
// { id, name, description, plugin, setRuntime?, registerFull? }. setRuntime
|
|
// receives the PluginRuntime (has channel.turn kernel); registerFull receives
|
|
// the OpenClawPluginApi for runtime startup (transport + tools).
|
|
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
|
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
|
|
import { fabricChannelPlugin } from './src/channel.js';
|
|
import { flushAllFabric } from './src/coalesce.js';
|
|
import { FabricInbound } from './src/inbound.js';
|
|
import { listEnabledFabricAccounts } from './src/accounts.js';
|
|
import { registerFabricTools } from './src/tools.js';
|
|
import { FabricClient } from './src/fabric-client.js';
|
|
import { IdentityRegistry } from './src/identity.js';
|
|
import { syncFabricCommands } from './src/command-sync.js';
|
|
import path from 'node:path';
|
|
import os from 'node:os';
|
|
|
|
let runtimeRef: unknown = null;
|
|
let inbound: FabricInbound | null = null;
|
|
|
|
export { fabricChannelPlugin } from './src/channel.js';
|
|
|
|
export default defineChannelPluginEntry({
|
|
id: 'fabric',
|
|
name: 'Fabric',
|
|
description: 'Fabric channel plugin — OpenClaw agents speak in Fabric guilds',
|
|
plugin: fabricChannelPlugin,
|
|
|
|
setRuntime(runtime: unknown) {
|
|
runtimeRef = runtime;
|
|
},
|
|
|
|
registerFull(apiRaw: OpenClawPluginApi) {
|
|
// COMPAT: access the subset we use through a loose view so SDK type
|
|
// drift in unrelated api members doesn't break the build.
|
|
const api = apiRaw as unknown as {
|
|
config?: unknown;
|
|
pluginConfig?: { identityFilePath?: string };
|
|
logger: { info: (m: string) => void; warn: (m: string) => void };
|
|
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
|
|
registerTool: (d: unknown) => void;
|
|
};
|
|
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
|
|
const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api';
|
|
const idFile =
|
|
api.pluginConfig?.identityFilePath ??
|
|
path.join(os.homedir(), '.openclaw', 'fabric-identity.json');
|
|
|
|
// tools operate against a default Center; per-account keys come from config
|
|
const client = new FabricClient(centerApiBase);
|
|
const identity = new IdentityRegistry(idFile);
|
|
registerFabricTools(
|
|
{ registerTool: (d) => api.registerTool(d), logger: api.logger },
|
|
client,
|
|
identity,
|
|
);
|
|
|
|
api.on('gateway_start', () => {
|
|
const _G = globalThis as Record<string, unknown>;
|
|
if (_G._fabricInboundStarted) return;
|
|
_G._fabricInboundStarted = true;
|
|
const accounts = listEnabledFabricAccounts(cfg as never).map((a) => ({
|
|
agentId: a.accountId,
|
|
fabricApiKey: a.fabricApiKey,
|
|
}));
|
|
// also include any tool-registered identities
|
|
for (const e of identity.list()) {
|
|
if (!accounts.some((x) => x.agentId === e.agentId)) {
|
|
accounts.push({ agentId: e.agentId, fabricApiKey: e.fabricApiKey });
|
|
}
|
|
}
|
|
if (!runtimeRef) {
|
|
api.logger.warn('fabric: runtime not set; inbound disabled');
|
|
return;
|
|
}
|
|
inbound = new FabricInbound(
|
|
runtimeRef,
|
|
api.config,
|
|
client,
|
|
identity,
|
|
api.logger,
|
|
accounts,
|
|
);
|
|
void inbound.start();
|
|
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
|
|
void syncFabricCommands(client, cfg, accounts, api.logger);
|
|
});
|
|
|
|
// Note: the per-turn coalesce flush happens deterministically in
|
|
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
|
|
// is the real "all deliveries done" boundary; the agent_end hook fires
|
|
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
|
|
api.on('gateway_stop', () => {
|
|
void flushAllFabric();
|
|
inbound?.stop();
|
|
inbound = null;
|
|
});
|
|
},
|
|
});
|