13 Commits

Author SHA1 Message Date
cd36d1b9e2 feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history
Plugin previously had no way for an agent to send text into a specific
channel proactively — outbound went only through the channel-reply
path (responds to the channel that woke the agent). discussion-complete
internally called client.postMessage but only for the close-time
summary, no general-purpose surface.

Three new tools (+ declare existing fabric-canvas / fabric-channel that
were registered but missing from contracts.tools so agents couldn't
see them per the openclaw plugin contract):

  * fabric-send-message {guildNodeId, channelId, content}
      → {ok, messageId, seq}
    Author = calling agent. Use for ARD broadcasts, follow-ups in a
    different channel, etc.

  * fabric-channel-list {guildNodeId, nameFilter?, xType?, includeClosed?}
      → {ok, count, channels[]}
    Backend filters to public + member channels; nameFilter is client-
    side case-insensitive substring; xType / includeClosed apply post-
    fetch. Returns id/name/xType/lastSeq so callers can pipe into the
    other tools.

  * fabric-message-history {guildNodeId, channelId, seqFrom?, seqTo?, limit?}
      → {ok, page, messages[]}
    Tail-by-default: omit seqFrom/seqTo and the tool fetches the
    channel head from listChannels then asks for [head-limit+1, head].
    Limit default 20, max 200. Backend rejects non-participants.

Plus 3 supporting client methods (listChannels, listMessages — both
GET via existing req helper).

contracts.tools updated to declare these 5 (3 new + 2 previously-
silent ones). Verified earlier in sim restart logs: openclaw warned
'plugin tool is undeclared (fabric): fabric-canvas / fabric-channel'
so agents couldn't use them despite registerTool firing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:11:01 +01:00
9c910f082b Merge pull request 'feat(triage): per-channel serial queue + HF on_call gate + observer skip' (#3) from feat/triage-on-call-gate-and-queue into main 2026-05-22 21:59:23 +00:00
c5fd091f5a fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier)
os.hostname() fallback is wrong in sim where container hostname (server.t2)
doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate
fallback that reads openclaw config plugins.harbor-forge.identifier — the
same value the HF plugin uses for its outbound HF calls — keeping plugin
and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER
env override.

Priority:
  1. HF_CLAW_IDENTIFIER env (operator override)
  2. openclaw config plugins.harbor-forge.identifier (NEW)
  3. os.hostname() last-resort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:49:31 +01:00
c5a33c33ec feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the
new triage flow:

## 1. Per-channel serial queue

Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel
chain so consecutive messages on the same channel are processed strictly
in order — no concurrent model turns for the same channel. Other
channels remain independent (parallelism preserved across channels).

Implementation: `Map<channelId, Promise>` where each new task awaits
the previous. The map entry self-cleans when the chain settles AND
no newer task has overwritten it.

## 2. HF on_call gate (triage + wake=true only)

Before dispatching a triage wake to the on-duty agent, hit HF
`GET /calendar/agent/status?agent_id=...`. If the agent isn't
currently on_call, the message is pushed to a per-agent gated queue
instead of dispatched — no model turn fires.

Status check is cached for 5s to amortise across rapid triage bursts.

When a subsequent triage message arrives and the agent IS on_call by
that point, the gated queue drains FIFO (re-enqueued through the same
per-channel chain so order is kept) before the new message dispatches.

Drained queue is in-memory only; on gateway restart the underlying
Fabric messages get re-fetched via the connect-time history sweep.

## 3. Triage observer skip (wake=false)

Triage messages that arrive with wakeup=false are admin observers — by
spec they MUST NOT enter the agent's session history. Skipped entirely
(no recordInboundSession call). The next time this agent legitimately
wakes for triage, their context contains only past wakeups + their own
outgoing messages — no observer-side chatter from other agents.

For NON-triage channels the legacy "record-as-history" stays — those
keep their full channel conversation available for later wakes.

## Env

- HF_API_BASE_URL  — defaults `https://monitor.hangman-lab.top`
- HF_CLAW_IDENTIFIER — defaults to `os.hostname()`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:17:39 +01:00
28f5083679 Merge pull request 'feat(inbound): listen for backend-pushed channel.joined/left events' (#2) from feat/inbound-listen-push-events into main 2026-05-21 07:12:51 +00:00
a060ff98a2 feat(inbound): listen for backend-pushed channel.joined/left events
Companion to nav/Fabric.Backend.Guild#<TBD> which adds the server-side
emitToUser broadcast on channel membership changes. Before, the inbound
only learned about new channels via the 60s polling resync (worst-case
60s lag). Now the backend tells us directly so sub/unsub is realtime.

socket.on('channel.joined', evt) → join the socket.io room for evt.channelId
                                    and add to the local 'joined' set.
socket.on('channel.left',   evt) → leave + remove from 'joined'.

Both events are idempotent (`if (joined.has(id))` / `if (!joined.has(id))`)
so duplicate emits from server are safe. Polling resync still runs every
60s as a safety net for transient socket drops between emit and
reconnect, partial server failures, etc.

When backend lacks this support (older deployments), nothing breaks —
the event simply never fires and polling carries the load as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:08:33 +01:00
b9a5456d57 Merge pull request 'fix: dynamically sync inbound channel subscriptions' (#1) from fix/inbound-dynamic-channel-sync into main 2026-05-21 06:56:49 +00:00
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
92945b777d feat(fabric): dm channels deliver any non-self message (no wakeup gate)
inbound: FabricMessage gains xType; the wakeup gate is bypassed when
xType==='dm' (self messages are already filtered upstream), so a 1:1
dm always reaches the model regardless of wakeup metadata.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:20 +01:00
8774cfd7cc feat(fabric): coalesce a split agent turn into ONE message (deterministic)
OpenClaw delivers an agent turn whose blocks are text -> thinking/tool
-> text via multiple inbound deliver() calls (a non-text block is a
delivery boundary), so one turn became N Fabric messages.

Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush
them as ONE postMessage at a deterministic boundary — the finally after
dispatchInboundReplyWithBase() resolves, which provably runs only after
every deliver() of the turn (verified: deliver,deliver -> dispatch
returned -> flush). No hooks, no timers, no idle guessing. The
agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop
flushes any leftover; a long safety timeout is a leak-guard only.
channels.fabric.coalesce=false restores raw per-segment posting.

Verified on local openclaw + Fabric with a fake text/thinking/text
model: single trigger -> exactly one merged message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 22:15:46 +01:00
ab126825ef feat(security): commandsSyncKey is a required channel-config field (Guild C-2)
The slash-command sync secret now comes from
channels.fabric.commandsSyncKey (configSchema marks it required) and
is no longer read from FABRIC_COMMANDS_SYNC_KEY env. command-sync
resolves it from config and threads it into client.syncCommands;
when absent, sync is skipped with a clear warning. README updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:44:25 +01:00
bb63a57384 feat(security): send x-commands-sync-key when configured (Guild C-2)
syncCommands attaches the FABRIC_COMMANDS_SYNC_KEY header when the
operator sets it, so the guild can restrict slash-command catalog
writes to this plugin. No-op / backward compatible when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:14 +01:00
fc6edaabfd docs: slash-command catalog section
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:15:03 +01:00
17 changed files with 935 additions and 34 deletions

View File

@@ -68,6 +68,16 @@ Two ways, both write the same identity registry the transport reads:
## Config
- `channels.fabric.centerApiBase` — e.g. `http://localhost:7001/api`
- `channels.fabric.commandsSyncKey` — **required**; must equal the guild's
`FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the
guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`.
Sourced from config only — never from the environment.
- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent
turn whose blocks are `text → thinking/tool → text` into multiple
`deliver()` calls; this buffers them and posts ONE Fabric message at the
deterministic turn boundary (right after the inbound reply dispatch
resolves — no hooks, no timers, no idle guessing). `false` = raw
per-segment posting.
- `channels.fabric.accounts.<agentId>` = `{ fabricApiKey, enabled }`
(**agent = account**; the account id is the OpenClaw agentId)
- plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json`
@@ -100,6 +110,20 @@ Then `openclaw gateway restart`.
`members` (list the channel's member userIds) · `join` (this agent
joins) · `leave` (this agent leaves).
## Slash-command catalog
On `gateway_start` the plugin syncs OpenClaw's native-command catalog to
each guild (`command-sync.ts`: `listNativeCommandSpecsForConfig` +
`findCommandByNativeName`, dynamic arg `choices` snapshotted via
`resolveCommandArgChoices`) → `PUT /api/commands`. This is exactly the data
Discord registers as slash commands; Fabric's frontend uses it for `/`
autocomplete. Fabric deliberately does **not** advertise the
`nativeCommands` channel capability — it stays a text-command surface, so a
`/<cmd>` message is delivered normally and OpenClaw's command system
executes it (text-command + command session + auth), reusing the standard
inbound path. Only `/no-reply` and `/force-proceed` stay
server-intercepted by the guild.
## Install / build
```bash

View File

@@ -5,6 +5,7 @@
// the OpenClawPluginApi for runtime startup (transport + tools).
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
@@ -60,7 +61,12 @@ export default defineChannelPluginEntry({
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -7,6 +7,17 @@ const DEFAULT_CENTER = 'http://localhost:7001/api';
function section(cfg) {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg) {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg) {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg) {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -137,7 +137,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({
attachedResults: {
channel: 'fabric',
sendText: async (ctx) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {});
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

75
dist/fabric/src/coalesce.js vendored Normal file
View File

@@ -0,0 +1,75 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x) {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
const pendingByChannel = new Map();
async function flushChannel(channelId, reason) {
const p = pendingByChannel.get(channelId);
if (!p)
return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text)
return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
}
catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params) {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text)
return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
}
else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(rawChannelId) {
const cid = normChannelId(rawChannelId);
if (cid)
await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric() {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

View File

@@ -6,6 +6,7 @@
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import { listNativeCommandSpecsForConfig, findCommandByNativeName, resolveCommandArgChoices, } from 'openclaw/plugin-sdk/native-command-registry';
import { resolveCommandsSyncKey } from './accounts.js';
function normChoice(c) {
if (typeof c === 'string')
return { value: c, label: c };
@@ -62,6 +63,14 @@ export function buildFabricCommandSpecs(cfg) {
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(client, cfg, accounts, log) {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg);
if (!syncKey) {
log.warn('fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)');
return;
}
let specs;
try {
specs = buildFabricCommandSpecs(cfg);
@@ -88,7 +97,7 @@ export async function syncFabricCommands(client, cfg, accounts, log) {
if (!tok)
continue;
try {
await client.syncCommands(g.endpoint, tok, specs);
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
}

View File

@@ -23,12 +23,13 @@ export class FabricClient {
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
async req(method, url, auth, body) {
async req(method, url, auth, body, extraHeaders) {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
@@ -74,8 +75,11 @@ export class FabricClient {
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(guildEndpoint, guildToken, commands) {
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands });
syncCommands(guildEndpoint, guildToken, commands, syncKey) {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands }, syncKey ? { 'x-commands-sync-key': syncKey } : undefined);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(guildEndpoint, guildToken, channelId) {

View File

@@ -3,6 +3,8 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
cfg;
@@ -12,6 +14,14 @@ export class FabricInbound {
accounts;
sockets = [];
seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -73,6 +83,9 @@ export class FabricInbound {
}
}
stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets)
s.disconnect();
this.sockets = [];
@@ -88,19 +101,83 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
const channels = res.ok ? (await res.json()) : [];
for (const c of channels)
socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
}
catch {
/* best effort */
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt) => {
const id = evt?.channelId;
if (!id || joined.has(id))
return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt) => {
const id = evt?.channelId;
if (!id || !joined.has(id))
return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)
@@ -199,7 +276,11 @@ export class FabricInbound {
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
if (m.wakeup !== true) {
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
@@ -244,8 +325,16 @@ export class FabricInbound {
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt)
return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg),
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
log: (m) => this.log.info(m),
});
},
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
@@ -270,5 +359,12 @@ export class FabricInbound {
catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
}
finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -6,6 +6,7 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
@@ -37,7 +38,7 @@ export default defineChannelPluginEntry({
config?: unknown;
pluginConfig?: { identityFilePath?: string };
logger: { info: (m: string) => void; warn: (m: string) => void };
on: (ev: string, fn: () => void) => void;
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
registerTool: (d: unknown) => void;
};
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
@@ -86,7 +87,12 @@ export default defineChannelPluginEntry({
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -14,7 +14,12 @@
"create-work-channel",
"create-report-channel",
"create-discussion-channel",
"discussion-complete"
"discussion-complete",
"fabric-canvas",
"fabric-channel",
"fabric-send-message",
"fabric-channel-list",
"fabric-message-history"
]
},
"configSchema": {
@@ -39,6 +44,15 @@
"type": "string",
"description": "Fabric Center API base, e.g. http://localhost:7001/api"
},
"commandsSyncKey": {
"type": "string",
"minLength": 1,
"description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js"
},
"coalesce": {
"type": "boolean",
"description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting."
},
"dmSecurity": { "type": "string" },
"dmPolicy": { "type": "string" },
"enabled": { "type": "boolean" },
@@ -59,10 +73,12 @@
}
}
}
}
},
"required": ["commandsSyncKey"]
},
"uiHints": {
"centerApiBase": { "label": "Center API base" }
"centerApiBase": { "label": "Center API base" },
"commandsSyncKey": { "label": "Commands sync key" }
}
}
}

View File

@@ -14,6 +14,15 @@ export type FabricAccountConfig = {
export type FabricChannelConfig = {
centerApiBase?: string;
// Shared secret matching the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY
// (Guild C-2). Required by the channel config schema; sourced from config
// only — never from the environment.
commandsSyncKey?: string;
// Coalesce an agent turn that OpenClaw split into multiple deliveries
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
// message. The flush boundary is the deterministic `agent_end` hook (not
// a timer). Default true; set false for raw per-segment posting.
coalesce?: boolean;
accounts?: Record<string, FabricAccountConfig>;
defaultAccount?: string;
} & FabricAccountConfig;
@@ -35,6 +44,19 @@ function section(cfg: Cfg): FabricChannelConfig {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg: Cfg): string {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg: Cfg): boolean {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg: Cfg): string[] {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
cfg?: unknown;
config?: unknown;
}) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

93
src/coalesce.ts Normal file
View File

@@ -0,0 +1,93 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x: string | null | undefined): string {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
type Pending = {
parts: string[];
post: (text: string) => Promise<void>;
log?: (m: string) => void;
safety: ReturnType<typeof setTimeout>;
};
const pendingByChannel = new Map<string, Pending>();
async function flushChannel(channelId: string, reason: string): Promise<void> {
const p = pendingByChannel.get(channelId);
if (!p) return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text) return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
} catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params: {
channelId: string;
text: string;
coalesce: boolean;
post: (text: string) => Promise<void>;
log?: (m: string) => void;
}): Promise<void> {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text) return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
} else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(
() => void flushChannel(cid, 'safety-timeout'),
SAFETY_FLUSH_MS,
),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(
rawChannelId: string | null | undefined,
): Promise<void> {
const cid = normChannelId(rawChannelId);
if (cid) await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric(): Promise<void> {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

View File

@@ -11,6 +11,7 @@ import {
resolveCommandArgChoices,
} from 'openclaw/plugin-sdk/native-command-registry';
import type { FabricClient } from './fabric-client.js';
import { resolveCommandsSyncKey } from './accounts.js';
type Logger = { info: (m: string) => void; warn: (m: string) => void };
@@ -102,6 +103,17 @@ export async function syncFabricCommands(
accounts: Array<{ agentId: string; fabricApiKey: string }>,
log: Logger,
): Promise<void> {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg as never);
if (!syncKey) {
log.warn(
'fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)',
);
return;
}
let specs: FabricCommand[];
try {
specs = buildFabricCommandSpecs(cfg);
@@ -126,7 +138,7 @@ export async function syncFabricCommands(
)?.token;
if (!tok) continue;
try {
await client.syncCommands(g.endpoint, tok, specs);
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
} catch (err) {

View File

@@ -36,12 +36,14 @@ export class FabricClient {
url: string,
auth?: string,
body?: unknown,
extraHeaders?: Record<string, string>,
): Promise<T> {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
@@ -122,8 +124,18 @@ export class FabricClient {
guildEndpoint: string,
guildToken: string,
commands: unknown[],
syncKey: string,
): Promise<unknown> {
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands });
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req(
'PUT',
`${guildEndpoint}/api/commands`,
guildToken,
{ commands },
syncKey ? { 'x-commands-sync-key': syncKey } : undefined,
);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
@@ -178,6 +190,76 @@ export class FabricClient {
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(
guildEndpoint: string,
guildToken: string,
guildNodeId: string,
): Promise<Array<{
id: string;
guildId: string;
name: string;
xType: string;
kind: string;
isPublic: boolean;
closed: boolean;
lastSeq: number;
createdAt: string;
}>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
guildToken,
);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(
guildEndpoint: string,
guildToken: string,
channelId: string,
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
): Promise<{
items: Array<{
messageId: string;
seq: number;
content: string;
authorUserId: string;
createdAt: string;
editedAt: string | null;
deletedAt: string | null;
isDeleted: boolean;
}>;
page: {
seqFrom: number;
seqTo: number;
limit: number;
returned: number;
hasMore: boolean;
nextExpectedSeq: number;
highestCommittedSeq: number;
};
}> {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
}
export type CanvasFormat = 'md' | 'html' | 'text';

View File

@@ -5,6 +5,8 @@ import { io, type Socket } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
// channels (nextcloud-talk) drive the kernel:
@@ -42,11 +44,22 @@ type FabricMessage = {
channelId?: string;
attachments?: FabricAttachment[];
wakeup?: boolean;
// x-type of the channel (sent on message.created). 'dm' bypasses the
// wakeup gate: any message that isn't the agent's own is delivered.
xType?: string;
};
export class FabricInbound {
private sockets: Socket[] = [];
private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
private channelSyncTimers: NodeJS.Timeout[] = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -54,6 +67,136 @@ export class FabricInbound {
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
private channelChains = new Map<string, Promise<void>>();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
private static readonly AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
private pendingTriageGated: Array<{
agentId: string;
g: { nodeId: string; endpoint: string };
channelId: string;
m: FabricMessage;
session: FabricSession;
}> = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
private async checkAgentOnCall(agentId: string): Promise<boolean> {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg as {
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
};
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
} catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json()) as { status?: string };
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
} catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
private async drainGatedFor(agentId: string): Promise<void> {
const keep: typeof this.pendingTriageGated = [];
const drain: typeof this.pendingTriageGated = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId) drain.push(item);
else keep.push(item);
}
if (drain.length === 0) return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
@@ -114,6 +257,8 @@ export class FabricInbound {
}
stop(): void {
for (const t of this.channelSyncTimers) clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect();
this.sockets = [];
}
@@ -128,20 +273,87 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set<string>();
const syncChannels = async (kind: 'initial' | 'resync') => {
let freshTok: string | undefined;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
} catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
{ headers: { authorization: `Bearer ${tok}` } },
{ headers: { authorization: `Bearer ${authTok}` } },
);
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
if (!res.ok) return;
const channels = (await res.json()) as Array<{ id: string }>;
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
);
} else if (added > 0 || removed > 0) {
this.log.info(
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
);
}
} catch {
/* best effort */
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || joined.has(id)) return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || !joined.has(id)) return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
@@ -150,7 +362,33 @@ export class FabricInbound {
if (this.seen.has(key)) return;
this.seen.add(key);
if (this.seen.size > 5000) this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
// Per-channel serial queue. Prevents concurrent model turns for
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
});
socket.connect();
this.sockets.push(socket);
@@ -250,7 +488,24 @@ export class FabricInbound {
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
if (m.wakeup !== true) {
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
@@ -301,8 +556,17 @@ export class FabricInbound {
const text = (payload?.text ?? '').trim();
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt) return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg as never),
post: (t) =>
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
log: (m) => this.log.info(m),
});
},
onRecordError: (err: unknown) =>
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
@@ -327,6 +591,12 @@ export class FabricInbound {
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
} catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
} finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -243,4 +243,173 @@ export function registerFabricTools(
}
},
}));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-message',
description:
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (p: { guildNodeId: string; channelId: string; content: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(
guild.endpoint,
token,
p.channelId,
p.content,
session.user.id,
)) as { messageId?: string; seq?: number };
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-list',
description:
'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (p: {
guildNodeId: string;
nameFilter?: string;
xType?: string;
includeClosed?: boolean;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed) return false;
if (p.xType && c.xType !== p.xType) return false;
if (needle && !c.name.toLowerCase().includes(needle)) return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-message-history',
description:
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (p: {
guildNodeId: string;
channelId: string;
seqFrom?: number;
seqTo?: number;
limit?: number;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
}