9 Commits

Author SHA1 Message Date
h z
86e8d2ac8d Merge pull request 'feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history' (#4) from feat/fabric-send-and-discover-tools into main 2026-05-22 22:11:06 +00:00
hanghang zhang
121c7c78bd feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history
Plugin previously had no way for an agent to send text into a specific
channel proactively — outbound went only through the channel-reply
path (responds to the channel that woke the agent). discussion-complete
internally called client.postMessage but only for the close-time
summary, no general-purpose surface.

Three new tools (+ declare existing fabric-canvas / fabric-channel that
were registered but missing from contracts.tools so agents couldn't
see them per the openclaw plugin contract):

  * fabric-send-message {guildNodeId, channelId, content}
      → {ok, messageId, seq}
    Author = calling agent. Use for ARD broadcasts, follow-ups in a
    different channel, etc.

  * fabric-channel-list {guildNodeId, nameFilter?, xType?, includeClosed?}
      → {ok, count, channels[]}
    Backend filters to public + member channels; nameFilter is client-
    side case-insensitive substring; xType / includeClosed apply post-
    fetch. Returns id/name/xType/lastSeq so callers can pipe into the
    other tools.

  * fabric-message-history {guildNodeId, channelId, seqFrom?, seqTo?, limit?}
      → {ok, page, messages[]}
    Tail-by-default: omit seqFrom/seqTo and the tool fetches the
    channel head from listChannels then asks for [head-limit+1, head].
    Limit default 20, max 200. Backend rejects non-participants.

Plus 3 supporting client methods (listChannels, listMessages — both
GET via existing req helper).

contracts.tools updated to declare these 5 (3 new + 2 previously-
silent ones). Verified earlier in sim restart logs: openclaw warned
'plugin tool is undeclared (fabric): fabric-canvas / fabric-channel'
so agents couldn't use them despite registerTool firing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:11:01 +01:00
h z
73bfbd7594 Merge pull request 'feat(triage): per-channel serial queue + HF on_call gate + observer skip' (#3) from feat/triage-on-call-gate-and-queue into main 2026-05-22 21:59:23 +00:00
hanghang zhang
b7e9d278cd fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier)
os.hostname() fallback is wrong in sim where container hostname (server.t2)
doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate
fallback that reads openclaw config plugins.harbor-forge.identifier — the
same value the HF plugin uses for its outbound HF calls — keeping plugin
and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER
env override.

Priority:
  1. HF_CLAW_IDENTIFIER env (operator override)
  2. openclaw config plugins.harbor-forge.identifier (NEW)
  3. os.hostname() last-resort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:49:31 +01:00
hanghang zhang
c68de029e4 feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the
new triage flow:

## 1. Per-channel serial queue

Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel
chain so consecutive messages on the same channel are processed strictly
in order — no concurrent model turns for the same channel. Other
channels remain independent (parallelism preserved across channels).

Implementation: `Map<channelId, Promise>` where each new task awaits
the previous. The map entry self-cleans when the chain settles AND
no newer task has overwritten it.

## 2. HF on_call gate (triage + wake=true only)

Before dispatching a triage wake to the on-duty agent, hit HF
`GET /calendar/agent/status?agent_id=...`. If the agent isn't
currently on_call, the message is pushed to a per-agent gated queue
instead of dispatched — no model turn fires.

Status check is cached for 5s to amortise across rapid triage bursts.

When a subsequent triage message arrives and the agent IS on_call by
that point, the gated queue drains FIFO (re-enqueued through the same
per-channel chain so order is kept) before the new message dispatches.

Drained queue is in-memory only; on gateway restart the underlying
Fabric messages get re-fetched via the connect-time history sweep.

## 3. Triage observer skip (wake=false)

Triage messages that arrive with wakeup=false are admin observers — by
spec they MUST NOT enter the agent's session history. Skipped entirely
(no recordInboundSession call). The next time this agent legitimately
wakes for triage, their context contains only past wakeups + their own
outgoing messages — no observer-side chatter from other agents.

For NON-triage channels the legacy "record-as-history" stays — those
keep their full channel conversation available for later wakes.

## Env

- HF_API_BASE_URL  — defaults `https://monitor.hangman-lab.top`
- HF_CLAW_IDENTIFIER — defaults to `os.hostname()`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:17:39 +01:00
h z
2ec97eceda Merge pull request 'feat(inbound): listen for backend-pushed channel.joined/left events' (#2) from feat/inbound-listen-push-events into main 2026-05-21 07:12:51 +00:00
hanghang zhang
0f1fb27cf9 feat(inbound): listen for backend-pushed channel.joined/left events
Companion to nav/Fabric.Backend.Guild#<TBD> which adds the server-side
emitToUser broadcast on channel membership changes. Before, the inbound
only learned about new channels via the 60s polling resync (worst-case
60s lag). Now the backend tells us directly so sub/unsub is realtime.

socket.on('channel.joined', evt) → join the socket.io room for evt.channelId
                                    and add to the local 'joined' set.
socket.on('channel.left',   evt) → leave + remove from 'joined'.

Both events are idempotent (`if (joined.has(id))` / `if (!joined.has(id))`)
so duplicate emits from server are safe. Polling resync still runs every
60s as a safety net for transient socket drops between emit and
reconnect, partial server failures, etc.

When backend lacks this support (older deployments), nothing breaks —
the event simply never fires and polling carries the load as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:08:33 +01:00
h z
8224975119 Merge pull request 'fix: dynamically sync inbound channel subscriptions' (#1) from fix/inbound-dynamic-channel-sync into main 2026-05-21 06:56:49 +00:00
hanghang zhang
7d90ae4f2b fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
17 changed files with 37 additions and 2217 deletions

56
dist/fabric/index.js vendored
View File

@@ -6,23 +6,16 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core'; import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js'; import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js'; import { flushAllFabric } from './src/coalesce.js';
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
import { FabricInbound } from './src/inbound.js'; import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js'; import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js'; import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js'; import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js'; import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js'; import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js';
import path from 'node:path'; import path from 'node:path';
import os from 'node:os'; import os from 'node:os';
let runtimeRef = null; let runtimeRef = null;
let inbound = null; let inbound = null;
let presence = null;
// Periodic re-harvest of presence accounts so newly-connected agents
// (registered through tool-based identity flow AFTER initial start)
// get picked up. Cleared on gateway_stop.
let presenceRefreshTimer = null;
export { fabricChannelPlugin } from './src/channel.js'; export { fabricChannelPlugin } from './src/channel.js';
export default defineChannelPluginEntry({ export default defineChannelPluginEntry({
id: 'fabric', id: 'fabric',
@@ -44,29 +37,6 @@ export default defineChannelPluginEntry({
const client = new FabricClient(centerApiBase); const client = new FabricClient(centerApiBase);
const identity = new IdentityRegistry(idFile); const identity = new IdentityRegistry(idFile);
registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity); registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity);
// Cross-plugin API: globalThis.__fabric
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
// injection to DM-typed channels only. The channel-meta cache is
// populated lazily from inbound (message.created carries xType) and
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
// very first DM after a fresh gateway start hits cache from the
// previous run rather than firing the injector on the wrong type.
//
// null return = channel never seen (cache cold). Callers MUST NOT
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis;
_G['__fabric'] = { getChannelType };
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try {
flushChannelMeta();
}
catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
}
api.on('gateway_start', () => { api.on('gateway_start', () => {
const _G = globalThis; const _G = globalThis;
if (_G._fabricInboundStarted) if (_G._fabricInboundStarted)
@@ -87,25 +57,7 @@ export default defineChannelPluginEntry({
return; return;
} }
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts); inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
// start() resolves once all accounts have attempted login; per- void inbound.start();
// agent failures are logged but don't reject. Once it resolves we
// can harvest the presence accounts (those that DID log in have
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound)
return;
presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
// Re-harvest every 5 min: catches agents added via tool-based
// identity provisioning after gateway_start (recruitment flow).
// setAccounts is idempotent — duplicates collapse on agentId.
presenceRefreshTimer = setInterval(() => {
if (inbound && presence)
presence.setAccounts(inbound.getPresenceAccounts());
}, 5 * 60_000);
});
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`); api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger); void syncFabricCommands(client, cfg, accounts, api.logger);
}); });
@@ -115,12 +67,6 @@ export default defineChannelPluginEntry({
// BEFORE deliver()). gateway_stop only flushes any leftover buffer. // BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => { api.on('gateway_stop', () => {
void flushAllFabric(); void flushAllFabric();
if (presenceRefreshTimer) {
clearInterval(presenceRefreshTimer);
presenceRefreshTimer = null;
}
presence?.stop();
presence = null;
inbound?.stop(); inbound?.stop();
inbound = null; inbound = null;
}); });

View File

@@ -1,105 +0,0 @@
/**
* Channel-meta cache. Records (channelId → xType) for every fabric
* channel the gateway has seen at least one inbound message in.
*
* Populated lazily from inbound (`recordChannelType` is called for
* every `message.created` event with non-empty `xType`). Persisted to
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
* gateway restarts (so the very first DM after restart still gets the
* right xType without waiting for a fresh inbound).
*
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
* to xType==='dm' only.
*
* Failure mode: lookup misses (channel never seen / inbound dropped
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
* fall back to "assume DM" or you re-introduce the false-positive on
* group channels.
*/
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { homedir } from 'node:os';
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
let memory = new Map();
let loaded = false;
let dirty = false;
let flushTimer = null;
function load() {
if (loaded)
return;
loaded = true;
try {
if (!existsSync(CACHE_FILE))
return;
const raw = readFileSync(CACHE_FILE, 'utf8');
const parsed = JSON.parse(raw);
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
if (typeof k === 'string' && typeof v === 'string')
memory.set(k, v);
}
}
catch {
// ignore — start with empty cache on corruption
}
}
function scheduleFlush() {
if (flushTimer)
return;
// Debounce writes — many inbound messages may arrive in a burst.
// 250ms coalesces them; on gateway_stop the channel plugin can force
// a synchronous flush via flushChannelMeta().
flushTimer = setTimeout(() => {
flushTimer = null;
if (!dirty)
return;
dirty = false;
flushSync();
}, 250);
}
function flushSync() {
try {
const dir = dirname(CACHE_FILE);
if (!existsSync(dir))
mkdirSync(dir, { recursive: true });
const out = { channels: Object.fromEntries(memory) };
const tmp = CACHE_FILE + '.tmp';
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
renameSync(tmp, CACHE_FILE);
}
catch {
// swallow — cache is an optimization; loss-on-write is recoverable
}
}
/** Called by inbound on every message.created. xType empty → no-op. */
export function recordChannelType(channelId, xType) {
if (!channelId || !xType)
return;
load();
const existing = memory.get(channelId);
if (existing === xType)
return;
memory.set(channelId, xType);
dirty = true;
scheduleFlush();
}
/** Cross-plugin lookup. null when channel never seen / unknown. */
export function getChannelType(channelId) {
if (!channelId)
return null;
load();
return memory.get(channelId) ?? null;
}
/** Force-flush — called on plugin shutdown to make sure recently
* recorded entries hit disk before the gateway dies. */
export function flushChannelMeta() {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (dirty) {
dirty = false;
flushSync();
}
}
export const CHANNEL_META_PATH = CACHE_FILE;

View File

@@ -11,15 +11,6 @@
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core'; import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
import { FabricClient } from './fabric-client.js'; import { FabricClient } from './fabric-client.js';
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js'; import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
import { getChannelType } from './channel-meta.js';
export function fabricPeerRoutingForXType(xType) {
if (xType === 'dm')
return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId) {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
// ---- target grammar: fabric:<channelId> ---- // ---- target grammar: fabric:<channelId> ----
export function stripFabricTargetPrefix(raw) { export function stripFabricTargetPrefix(raw) {
let s = (raw ?? '').trim(); let s = (raw ?? '').trim();
@@ -47,18 +38,13 @@ export function resolveFabricOutboundSessionRoute(params) {
const id = stripFabricTargetPrefix(params.target); const id = stripFabricTargetPrefix(params.target);
if (!id) if (!id)
return null; return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({ return buildChannelOutboundSessionRoute({
cfg: params.cfg, cfg: params.cfg,
agentId: params.agentId, agentId: params.agentId,
channel: 'fabric', channel: 'fabric',
accountId: params.accountId, accountId: params.accountId,
peer: { kind: peerKind, id }, peer: { kind: 'group', id },
chatType, chatType: 'group',
from: `fabric:channel:${id}`, from: `fabric:channel:${id}`,
to: `fabric:${id}`, to: `fabric:${id}`,
}); });
@@ -117,19 +103,6 @@ export const fabricChannelPlugin = createChatChannelPlugin({
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId), resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId),
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg), defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg),
isConfigured: (account) => Boolean(account.fabricApiKey), isConfigured: (account) => Boolean(account.fabricApiKey),
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
// by the channel-health-monitor — defaults `configured: true` when the
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
// in server-channels). Without this, fabric's synthetic 'default'
// account (returned by listFabricAccountIds when channels.fabric.accounts
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
// running:false} → isManagedAccount=true → not-running → restart loop
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
// Mirror isConfigured here so the snapshot truthfully reports false for
// any account without a fabricApiKey.
describeAccount: (account) => ({
configured: Boolean(account.fabricApiKey),
}),
}, },
// Minimal setup adapter: Fabric is configured directly under // Minimal setup adapter: Fabric is configured directly under
// channels.fabric.* (no interactive wizard). applyAccountConfig is the // channels.fabric.* (no interactive wizard). applyAccountConfig is the

View File

@@ -63,11 +63,6 @@ export class FabricClient {
createChannel(guildEndpoint, guildToken, body) { createChannel(guildEndpoint, guildToken, body) {
return this.post(`${guildEndpoint}/api/channels`, body, guildToken); return this.post(`${guildEndpoint}/api/channels`, body, guildToken);
} }
// PATCH /api/channels/:id — backend currently only patches `purpose`.
// Caller must be a member of the channel (or any user if public).
setChannelPurpose(guildEndpoint, guildToken, channelId, purpose) {
return this.req('PATCH', `${guildEndpoint}/api/channels/${channelId}`, guildToken, { purpose });
}
closeChannel(guildEndpoint, guildToken, channelId) { closeChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken); return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken);
} }
@@ -110,31 +105,4 @@ export class FabricClient {
removeCanvas(endpoint, token, channelId) { removeCanvas(endpoint, token, channelId) {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token); return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
} }
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(guildEndpoint, guildToken, guildNodeId) {
return this.req('GET', `${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`, guildToken);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(guildEndpoint, guildToken, channelId, opts = {}) {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined)
qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined)
qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined)
qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
} }

View File

@@ -4,8 +4,6 @@ import { join } from 'node:path';
import { io } from 'socket.io-client'; import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js'; import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js'; import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound { export class FabricInbound {
core; core;
@@ -30,125 +28,6 @@ export class FabricInbound {
// Re-login per agent on a short TTL to keep a fresh token. // Re-login per agent on a short TTL to keep a fresh token.
tokenCache = new Map(); tokenCache = new Map();
static TOKEN_TTL_MS = 8 * 60 * 1000; static TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
channelChains = new Map();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
agentStatusCache = new Map();
static AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
pendingTriageGated = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
enqueueChannelTask(channelId, task) {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
async checkAgentOnCall(agentId) {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg;
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
}
catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json());
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
}
catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
async drainGatedFor(agentId) {
const keep = [];
const drain = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId)
drain.push(item);
else
keep.push(item);
}
if (drain.length === 0)
return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with // Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back // the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails. // to the connect-time session token if re-login fails.
@@ -211,76 +90,15 @@ export class FabricInbound {
s.disconnect(); s.disconnect();
this.sockets = []; this.sockets = [];
} }
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts() {
const out = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId)
continue;
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuild)
continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target). Stores both
// endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
firstGuildByAgent = new Map();
async connectAgent(agentId, session) { async connectAgent(agentId, session) {
const selfUserId = session.user.id; const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild)
this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
}
for (const g of session.guilds) { for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) if (!tok)
continue; continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, { const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'], transports: ['websocket'],
auth: (cb) => { auth: { token: tok },
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false, autoConnect: false,
}); });
// Tracked socket.io rooms for this (agent, guild). The initial fetch // Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -364,13 +182,6 @@ export class FabricInbound {
const channelId = m.channelId ?? ''; const channelId = m.channelId ?? '';
if (!channelId) if (!channelId)
return; return;
// Record xType into the channel-meta cache before self-author
// / dedup gates — channel type doesn't depend on who sent the
// message, and recording it on observer-only triage messages
// is still useful (the next consumer asking
// __fabric.getChannelType wants the answer regardless of
// whether THIS message was delivered to an agent).
recordChannelType(channelId, m.xType);
if (m.authorUserId && m.authorUserId === selfUserId) if (m.authorUserId && m.authorUserId === selfUserId)
return; return;
const key = `${agentId}:${m.messageId}`; const key = `${agentId}:${m.messageId}`;
@@ -379,31 +190,7 @@ export class FabricInbound {
this.seen.add(key); this.seen.add(key);
if (this.seen.size > 5000) if (this.seen.size > 5000)
this.seen.clear(); this.seen.clear();
// Per-channel serial queue. Prevents concurrent model turns for void this.dispatch(agentId, g, channelId, m, session);
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
}); });
socket.connect(); socket.connect();
this.sockets.push(socket); this.sockets.push(socket);
@@ -455,19 +242,11 @@ export class FabricInbound {
const core = this.core; const core = this.core;
const cfg = this.cfg; const cfg = this.cfg;
try { try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({ const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg, cfg: this.cfg,
channel: 'fabric', channel: 'fabric',
accountId: agentId, accountId: agentId,
peer: { kind: peerKind, id: channelId }, peer: { kind: 'group', id: channelId },
}); });
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId, agentId: route.agentId,
@@ -481,7 +260,7 @@ export class FabricInbound {
To: `fabric:${channelId}`, To: `fabric:${channelId}`,
SessionKey: route.sessionKey, SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId, AccountId: route.accountId ?? agentId,
ChatType: chatType, ChatType: 'group',
ConversationLabel: `fabric:${guild.nodeId}`, ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric', SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric', Provider: 'fabric',
@@ -502,17 +281,6 @@ export class FabricInbound {
// any message that isn't the agent's own (already filtered above) is // any message that isn't the agent's own (already filtered above) is
// always delivered to the model. // always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) { if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx); const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({ await core.channel.session.recordInboundSession({
storePath, storePath,

View File

@@ -1,139 +0,0 @@
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
export class PresenceSync {
logger;
client;
timer = null;
lastStatus = new Map(); // by agentId
accounts = new Map();
tokenCache = new Map(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
inflight = false;
constructor(logger, client) {
this.logger = logger;
this.client = client;
}
setAccounts(accounts) {
this.accounts.clear();
for (const a of accounts)
this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000) {
if (this.timer)
return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
async ensureGuildToken(acct) {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now)
return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
}
catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
async tick() {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight)
return;
this.inflight = true;
try {
await this.tickInner();
}
finally {
this.inflight = false;
}
}
async tickInner() {
const bridge = globalThis['__hfAgentStatus'];
if (!bridge || typeof bridge.get !== 'function')
return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status;
try {
status = await bridge.get(agentId);
}
catch {
continue;
}
if (!status)
continue;
if (this.lastStatus.get(agentId) === status)
continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken)
continue;
try {
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
}
else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401)
this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
}
catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}

View File

@@ -25,9 +25,7 @@ export function registerFabricTools(api, client, identity) {
// or via static config (channels.fabric.accounts.<agentId>). // or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind) => api.registerTool((ctx) => ({ const makeCreate = (kind) => api.registerTool((ctx) => ({
name: `create-${kind}-channel`, name: `create-${kind}-channel`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` + description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
'Optionally pass `purpose` to describe what this channel is for — ' +
'agents browse channels by purpose via fabric-channel-list.',
parameters: { parameters: {
type: 'object', type: 'object',
additionalProperties: false, additionalProperties: false,
@@ -39,16 +37,9 @@ export function registerFabricTools(api, client, identity) {
memberUserIds: { type: 'array', items: { type: 'string' } }, memberUserIds: { type: 'array', items: { type: 'string' } },
onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' }, onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' },
listeners: { type: 'array', items: { type: 'string' } }, listeners: { type: 'array', items: { type: 'string' } },
purpose: {
type: 'string',
description: "Free-form description of what this channel is for. Optional but " +
'strongly recommended so other agents can find this channel by ' +
'intent (via fabric-channel-list). Can be edited later with ' +
'fabric-channel-set-purpose.',
}, },
}, },
}, execute: async (p) => {
execute: async (_id, p) => {
const agentId = ctx.agentId; const agentId = ctx.agentId;
if (!agentId) if (!agentId)
return { ok: false, error: 'no agent context' }; return { ok: false, error: 'no agent context' };
@@ -59,7 +50,6 @@ export function registerFabricTools(api, client, identity) {
xType: X_BY_KIND[kind], xType: X_BY_KIND[kind],
isPublic: p.isPublic ?? false, isPublic: p.isPublic ?? false,
memberUserIds: p.memberUserIds ?? [], memberUserIds: p.memberUserIds ?? [],
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
}); });
return { ok: true, channelId: ch.id }; return { ok: true, channelId: ch.id };
}, },
@@ -84,7 +74,7 @@ export function registerFabricTools(api, client, identity) {
callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' }, callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' },
}, },
}, },
execute: async (_id, p) => { execute: async (p) => {
const agentId = ctx.agentId; const agentId = ctx.agentId;
if (!agentId) if (!agentId)
return { ok: false, error: 'no agent context' }; return { ok: false, error: 'no agent context' };
@@ -128,7 +118,7 @@ export function registerFabricTools(api, client, identity) {
}, },
}, },
}, },
execute: async (_id, p) => { execute: async (p) => {
const agentId = ctx.agentId; const agentId = ctx.agentId;
if (!agentId) if (!agentId)
return { ok: false, error: 'no agent context' }; return { ok: false, error: 'no agent context' };
@@ -188,7 +178,7 @@ export function registerFabricTools(api, client, identity) {
channelId: { type: 'string' }, channelId: { type: 'string' },
}, },
}, },
execute: async (_id, p) => { execute: async (p) => {
const agentId = ctx.agentId; const agentId = ctx.agentId;
if (!agentId) if (!agentId)
return { ok: false, error: 'no agent context' }; return { ok: false, error: 'no agent context' };
@@ -212,252 +202,4 @@ export function registerFabricTools(api, client, identity) {
} }
}, },
})); }));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-send-message',
description: 'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(guild.endpoint, token, p.channelId, p.content, session.user.id));
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-channel-list',
description: 'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed)
return false;
if (p.xType && c.xType !== p.xType)
return false;
if (needle && !c.name.toLowerCase().includes(needle))
return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
purpose: c.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-guild-list: enumerate guilds the calling agent belongs to.
// Each row carries `purpose` — free-form description of what the
// guild is for (admin-set). Use this as the first step when a
// workflow says "find the right guild for X" — pick by purpose,
// then fabric-channel-list to find the right channel inside it.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-guild-list',
description: 'List guilds the calling agent is a member of. Returns ' +
'{nodeId, name, purpose, status} per row. ' +
"`purpose` is a free-form description of what each guild is for — " +
'pick the guild whose purpose matches your intent. Use this tool ' +
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
'debate broadcasts" → then list its announce-type channels).',
parameters: {
type: 'object',
additionalProperties: false,
properties: {
nameFilter: {
type: 'string',
description: 'optional case-insensitive substring match on guild name',
},
purposeFilter: {
type: 'string',
description: 'optional case-insensitive substring match on guild purpose ' +
'(e.g. "debate", "announcements")',
},
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const entry = identity.findByAgentId(agentId);
if (!entry)
return { ok: false, error: `agent ${agentId} not registered` };
const session = await client.agentLogin(entry.fabricApiKey);
const nameNeedle = (p.nameFilter ?? '').toLowerCase();
const purposeNeedle = (p.purposeFilter ?? '').toLowerCase();
const guilds = session.guilds.filter((g) => {
if (nameNeedle && !g.name.toLowerCase().includes(nameNeedle))
return false;
if (purposeNeedle) {
const purp = (g.purpose ?? '').toLowerCase();
if (!purp.includes(purposeNeedle))
return false;
}
return true;
});
return {
ok: true,
count: guilds.length,
guilds: guilds.map((g) => ({
nodeId: g.nodeId,
name: g.name,
status: g.status,
purpose: g.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-channel-set-purpose: set/update a channel's free-form
// purpose description. Caller must be a channel member (or the
// channel must be public). Use this to backfill purpose on existing
// channels, or to refine it after a channel's role evolves.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-channel-set-purpose',
description: "Set or update a channel's free-form purpose description. " +
'Channel membership required (or the channel must be public). ' +
'Pass empty string to clear. Use this to make a channel ' +
'discoverable to other agents via fabric-channel-list.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'purpose'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
purpose: {
type: 'string',
description: "What this channel is for. Pass '' (empty string) to clear.",
},
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = await client.setChannelPurpose(guild.endpoint, token, p.channelId, p.purpose);
return { ok: true, channel: res };
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-message-history',
description: "Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
} }

View File

@@ -7,26 +7,17 @@ import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core'; import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js'; import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js'; import { flushAllFabric } from './src/coalesce.js';
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
import { FabricInbound } from './src/inbound.js'; import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js'; import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js'; import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js'; import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js'; import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js'; import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js';
import { SubDiscussionStore } from './src/sub-discussion-store.js';
import { registerSubDiscussionHook } from './src/sub-discussion-hook.js';
import path from 'node:path'; import path from 'node:path';
import os from 'node:os'; import os from 'node:os';
let runtimeRef: unknown = null; let runtimeRef: unknown = null;
let inbound: FabricInbound | null = null; let inbound: FabricInbound | null = null;
let presence: PresenceSync | null = null;
// Periodic re-harvest of presence accounts so newly-connected agents
// (registered through tool-based identity flow AFTER initial start)
// get picked up. Cleared on gateway_stop.
let presenceRefreshTimer: ReturnType<typeof setInterval> | null = null;
export { fabricChannelPlugin } from './src/channel.js'; export { fabricChannelPlugin } from './src/channel.js';
@@ -50,76 +41,20 @@ export default defineChannelPluginEntry({
on: (ev: string, fn: (...args: unknown[]) => unknown) => void; on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
registerTool: (d: unknown) => void; registerTool: (d: unknown) => void;
}; };
const cfg = (api.config ?? {}) as { const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
channels?: { fabric?: { centerApiBase?: string; commandsSyncKey?: string } };
};
const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api'; const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api';
const idFile = const idFile =
api.pluginConfig?.identityFilePath ?? api.pluginConfig?.identityFilePath ??
path.join(os.homedir(), '.openclaw', 'fabric-identity.json'); path.join(os.homedir(), '.openclaw', 'fabric-identity.json');
const subDiscussionFile = path.join(
os.homedir(),
'.openclaw',
'fabric-sub-discussion.json',
);
// tools operate against a default Center; per-account keys come from config // tools operate against a default Center; per-account keys come from config
const client = new FabricClient(centerApiBase); const client = new FabricClient(centerApiBase);
const identity = new IdentityRegistry(idFile); const identity = new IdentityRegistry(idFile);
const subDiscussion = new SubDiscussionStore(subDiscussionFile);
registerFabricTools( registerFabricTools(
{ registerTool: (d) => api.registerTool(d), logger: api.logger }, { registerTool: (d) => api.registerTool(d), logger: api.logger },
client, client,
identity, identity,
subDiscussion,
cfg,
); );
// Per-(agent, channel) prompt injection for sub-discussion channels.
// Runs as a sibling to PrismFacet's before_prompt_build hook (and
// ClawPrompts' fabric-chat-injector); openclaw composes
// appendSystemContext from all registered handlers.
registerSubDiscussionHook(
{ on: api.on, logger: api.logger },
subDiscussion,
identity,
);
// Cross-plugin API: globalThis.__fabric
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
// injection to DM-typed channels only. The channel-meta cache is
// populated lazily from inbound (message.created carries xType) and
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
// very first DM after a fresh gateway start hits cache from the
// previous run rather than firing the injector on the wrong type.
//
// null return = channel never seen (cache cold). Callers MUST NOT
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis as Record<string, unknown>;
_G['__fabric'] = {
getChannelType,
// Dynamic-subscription bridges: tools (notably `fabric-register`)
// call these to add/remove an account's inbound socket without
// a gateway restart. Both delegate to the live FabricInbound
// instance via the module-level `inbound` closure variable; the
// closures stay valid across gateway_start / gateway_stop
// because we re-assign the variable, not the property.
addAccount: async (entry: { agentId: string; fabricApiKey: string }) => {
if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)');
await inbound.addAccount(entry);
},
removeAccount: (agentId: string) => {
if (!inbound) return;
inbound.removeAccount(agentId);
},
};
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try { flushChannelMeta(); } catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)');
}
api.on('gateway_start', () => { api.on('gateway_start', () => {
const _G = globalThis as Record<string, unknown>; const _G = globalThis as Record<string, unknown>;
@@ -147,24 +82,7 @@ export default defineChannelPluginEntry({
api.logger, api.logger,
accounts, accounts,
); );
// start() resolves once all accounts have attempted login; per- void inbound.start();
// agent failures are logged but don't reject. Once it resolves we
// can harvest the presence accounts (those that DID log in have
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound) return;
presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
// Re-harvest every 5 min: catches agents added via tool-based
// identity provisioning after gateway_start (recruitment flow).
// setAccounts is idempotent — duplicates collapse on agentId.
presenceRefreshTimer = setInterval(() => {
if (inbound && presence) presence.setAccounts(inbound.getPresenceAccounts());
}, 5 * 60_000);
});
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`); api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger); void syncFabricCommands(client, cfg, accounts, api.logger);
}); });
@@ -175,9 +93,6 @@ export default defineChannelPluginEntry({
// BEFORE deliver()). gateway_stop only flushes any leftover buffer. // BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => { api.on('gateway_stop', () => {
void flushAllFabric(); void flushAllFabric();
if (presenceRefreshTimer) { clearInterval(presenceRefreshTimer); presenceRefreshTimer = null; }
presence?.stop();
presence = null;
inbound?.stop(); inbound?.stop();
inbound = null; inbound = null;
}); });

View File

@@ -14,17 +14,12 @@
"create-work-channel", "create-work-channel",
"create-report-channel", "create-report-channel",
"create-discussion-channel", "create-discussion-channel",
"create-sub-discussion",
"discussion-complete", "discussion-complete",
"close-sub-discussion",
"fabric-canvas", "fabric-canvas",
"fabric-channel", "fabric-channel",
"fabric-send-message", "fabric-send-message",
"fabric-send-sys-msg",
"fabric-channel-list", "fabric-channel-list",
"fabric-message-history", "fabric-message-history"
"fabric-guild-list",
"fabric-channel-set-purpose"
] ]
}, },
"configSchema": { "configSchema": {

View File

@@ -1,108 +0,0 @@
/**
* Channel-meta cache. Records (channelId → xType) for every fabric
* channel the gateway has seen at least one inbound message in.
*
* Populated lazily from inbound (`recordChannelType` is called for
* every `message.created` event with non-empty `xType`). Persisted to
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
* gateway restarts (so the very first DM after restart still gets the
* right xType without waiting for a fresh inbound).
*
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
* to xType==='dm' only.
*
* Failure mode: lookup misses (channel never seen / inbound dropped
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
* fall back to "assume DM" or you re-introduce the false-positive on
* group channels.
*/
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { homedir } from 'node:os';
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
interface ChannelMetaFile {
// channelId → xType ('dm' | 'triage' | 'group' | etc.)
channels: Record<string, string>;
}
let memory = new Map<string, string>();
let loaded = false;
let dirty = false;
let flushTimer: ReturnType<typeof setTimeout> | null = null;
function load(): void {
if (loaded) return;
loaded = true;
try {
if (!existsSync(CACHE_FILE)) return;
const raw = readFileSync(CACHE_FILE, 'utf8');
const parsed = JSON.parse(raw) as ChannelMetaFile;
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
if (typeof k === 'string' && typeof v === 'string') memory.set(k, v);
}
} catch {
// ignore — start with empty cache on corruption
}
}
function scheduleFlush(): void {
if (flushTimer) return;
// Debounce writes — many inbound messages may arrive in a burst.
// 250ms coalesces them; on gateway_stop the channel plugin can force
// a synchronous flush via flushChannelMeta().
flushTimer = setTimeout(() => {
flushTimer = null;
if (!dirty) return;
dirty = false;
flushSync();
}, 250);
}
function flushSync(): void {
try {
const dir = dirname(CACHE_FILE);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const out: ChannelMetaFile = { channels: Object.fromEntries(memory) };
const tmp = CACHE_FILE + '.tmp';
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
renameSync(tmp, CACHE_FILE);
} catch {
// swallow — cache is an optimization; loss-on-write is recoverable
}
}
/** Called by inbound on every message.created. xType empty → no-op. */
export function recordChannelType(channelId: string, xType: string | undefined): void {
if (!channelId || !xType) return;
load();
const existing = memory.get(channelId);
if (existing === xType) return;
memory.set(channelId, xType);
dirty = true;
scheduleFlush();
}
/** Cross-plugin lookup. null when channel never seen / unknown. */
export function getChannelType(channelId: string): string | null {
if (!channelId) return null;
load();
return memory.get(channelId) ?? null;
}
/** Force-flush — called on plugin shutdown to make sure recently
* recorded entries hit disk before the gateway dies. */
export function flushChannelMeta(): void {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (dirty) {
dirty = false;
flushSync();
}
}
export const CHANNEL_META_PATH = CACHE_FILE;

View File

@@ -21,39 +21,6 @@ import {
resolveDefaultFabricAccountId, resolveDefaultFabricAccountId,
type ResolvedFabricAccount, type ResolvedFabricAccount,
} from './accounts.js'; } from './accounts.js';
import { getChannelType } from './channel-meta.js';
/**
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
*
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
* 'direct' | 'group' | 'channel'. We collapse:
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
* - rest → 'group' (multi-party; turn-engine gates speech)
*
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
* otherwise the agent's outbound message lands in a different session
* than the inbound that triggered it and conversation state splits.
*
* Outbound has no live xType (the agent target is just a channelId), so
* it consults the channel-meta cache populated by inbound. Cache miss
* (channel never observed) falls back to 'group' — same as the pre-fix
* behavior, no regression on cold cache. The proactive-DM-first-message
* edge case (agent DMs a channel before any inbound) still lands as
* 'group' on that one outbound; the next inbound + outbound pair will
* agree on 'direct'.
*/
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown }; type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
@@ -78,18 +45,13 @@ export function looksLikeFabricTargetId(raw: string): boolean {
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) { export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
const id = stripFabricTargetPrefix(params.target); const id = stripFabricTargetPrefix(params.target);
if (!id) return null; if (!id) return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({ return buildChannelOutboundSessionRoute({
cfg: params.cfg, cfg: params.cfg,
agentId: params.agentId, agentId: params.agentId,
channel: 'fabric', channel: 'fabric',
accountId: params.accountId, accountId: params.accountId,
peer: { kind: peerKind, id }, peer: { kind: 'group', id },
chatType, chatType: 'group',
from: `fabric:channel:${id}`, from: `fabric:channel:${id}`,
to: `fabric:${id}`, to: `fabric:${id}`,
}); });
@@ -153,20 +115,6 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId), resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId),
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never), defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never),
isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey), isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey),
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
// by the channel-health-monitor — defaults `configured: true` when the
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
// in server-channels). Without this, fabric's synthetic 'default'
// account (returned by listFabricAccountIds when channels.fabric.accounts
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
// running:false} → isManagedAccount=true → not-running → restart loop
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
// Mirror isConfigured here so the snapshot truthfully reports false for
// any account without a fabricApiKey.
describeAccount: (account: ResolvedFabricAccount) => ({
accountId: account.accountId,
configured: Boolean(account.fabricApiKey),
}),
}, },
// Minimal setup adapter: Fabric is configured directly under // Minimal setup adapter: Fabric is configured directly under
// channels.fabric.* (no interactive wizard). applyAccountConfig is the // channels.fabric.* (no interactive wizard). applyAccountConfig is the

View File

@@ -6,15 +6,7 @@ export type FabricSession = {
accessToken: string; accessToken: string;
refreshToken: string; refreshToken: string;
user: { id: string; email: string; name: string }; user: { id: string; email: string; name: string };
guilds: Array<{ guilds: Array<{ nodeId: string; name: string; endpoint: string; status: string }>;
nodeId: string;
name: string;
endpoint: string;
status: string;
// free-form description of this guild's role; admin-set on Center.
// null when the admin hasn't filled it in yet.
purpose?: string | null;
}>;
guildAccessTokens: Array<{ guildNodeId: string; token: string }>; guildAccessTokens: Array<{ guildNodeId: string; token: string }>;
}; };
@@ -108,30 +100,11 @@ export class FabricClient {
memberUserIds?: string[]; memberUserIds?: string[];
onDuty?: string; onDuty?: string;
listeners?: string[]; listeners?: string[];
// free-form purpose; optional. Existing agents can also set/update
// it later via setChannelPurpose().
purpose?: string;
}, },
): Promise<{ id: string }> { ): Promise<{ id: string }> {
return this.post(`${guildEndpoint}/api/channels`, body, guildToken); return this.post(`${guildEndpoint}/api/channels`, body, guildToken);
} }
// PATCH /api/channels/:id — backend currently only patches `purpose`.
// Caller must be a member of the channel (or any user if public).
setChannelPurpose(
guildEndpoint: string,
guildToken: string,
channelId: string,
purpose: string,
): Promise<{ id: string; name: string; xType: string; purpose: string | null }> {
return this.req(
'PATCH',
`${guildEndpoint}/api/channels/${channelId}`,
guildToken,
{ purpose },
);
}
closeChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> { closeChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken); return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken);
} }
@@ -239,7 +212,6 @@ export class FabricClient {
closed: boolean; closed: boolean;
lastSeq: number; lastSeq: number;
createdAt: string; createdAt: string;
purpose?: string | null;
}>> { }>> {
return this.req( return this.req(
'GET', 'GET',

View File

@@ -6,8 +6,6 @@ import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-
import type { FabricClient, FabricSession } from './fabric-client.js'; import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js'; import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js'; import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js'; import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled // COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
@@ -51,38 +49,8 @@ type FabricMessage = {
xType?: string; xType?: string;
}; };
// Walk cfg.bindings for the entry that ties `agentId` to a fabric account.
// Returns the binding's match.accountId (the slot label routing keys on);
// returns undefined when the agent has no explicit fabric binding so the
// caller can fall back to agentId without changing pre-existing semantics
// for agents whose binding accountId == agent_id anyway.
function findFabricBindingAccountId(cfg: unknown, agentId: string): string | undefined {
const bindings = (cfg as { bindings?: Array<{
agentId?: string;
match?: { channel?: string; accountId?: string };
}> })?.bindings;
if (!Array.isArray(bindings)) return undefined;
for (const b of bindings) {
if (
b?.agentId === agentId &&
b?.match?.channel === 'fabric' &&
typeof b?.match?.accountId === 'string' &&
b.match.accountId.length > 0
) {
return b.match.accountId;
}
}
return undefined;
}
export class FabricInbound { export class FabricInbound {
private sockets: Socket[] = []; private sockets: Socket[] = [];
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
// to tear down ONE agent without restarting the whole inbound. New
// sockets get appended on `connectAgent`; both maps are emptied by
// `stop()`.
private socketsByAgent = new Map<string, Socket[]>();
private timersByAgent = new Map<string, NodeJS.Timeout[]>();
private seen = new Set<string>(); private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild). // Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken // Without this, the agent's socket.io subscriptions are a snapshot taken
@@ -293,153 +261,16 @@ export class FabricInbound {
this.channelSyncTimers = []; this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect(); for (const s of this.sockets) s.disconnect();
this.sockets = []; this.sockets = [];
this.socketsByAgent.clear();
this.timersByAgent.clear();
} }
/**
* Bring up ONE new account at runtime (no gateway restart).
*
* Mirrors what `start()` does per entry: login to Center, upsert the
* identity registry, open the socket(s). Idempotent: re-calling with
* the same agentId tears down the previous socket(s) first so the
* fresh apikey replaces the stale one (recruitment onboard rotates
* the agent from the shared `interviewee` placeholder to a real
* per-agent apikey — the old `interviewee` socket must drop before
* the new one comes up or the agent ends up subscribed to both users
* at once).
*
* Used by the `fabric-register` openclaw tool to make recruitment
* end-to-end without a gateway restart between `new-agent` and the
* interview's sub-discussion dispatch.
*/
async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise<void> {
if (this.socketsByAgent.has(entry.agentId)) {
this.removeAccount(entry.agentId);
}
const session = await this.client.agentLogin(entry.fabricApiKey);
this.identity.upsert({
agentId: entry.agentId,
fabricApiKey: entry.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
await this.connectAgent(entry.agentId, session);
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
}
/**
* Tear down ONE account's sockets + timers without touching others.
* Caller is responsible for any identity-registry cleanup; this only
* drops the live socket subscription so the agent stops receiving
* Fabric pushes.
*/
removeAccount(agentId: string): void {
const sockets = this.socketsByAgent.get(agentId);
if (sockets) {
for (const s of sockets) {
try { s.disconnect(); } catch { /* socket already dead */ }
// Also remove from the flat list so `stop()` doesn't double-close.
const idx = this.sockets.indexOf(s);
if (idx !== -1) this.sockets.splice(idx, 1);
}
this.socketsByAgent.delete(agentId);
}
const timers = this.timersByAgent.get(agentId);
if (timers) {
for (const t of timers) {
clearInterval(t);
const idx = this.channelSyncTimers.indexOf(t);
if (idx !== -1) this.channelSyncTimers.splice(idx, 1);
}
this.timersByAgent.delete(agentId);
}
this.firstGuildByAgent.delete(agentId);
this.tokenCache.delete(agentId);
this.agentStatusCache.delete(agentId);
this.log.info(`fabric: agent ${agentId} dynamically removed`);
}
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts(): Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> {
const out: Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId) continue;
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuild) continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target). Stores both
// endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
private async connectAgent(agentId: string, session: FabricSession): Promise<void> { private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
const selfUserId = session.user.id; const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
}
for (const g of session.guilds) { for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue; if (!tok) continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, { const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'], transports: ['websocket'],
auth: (cb) => { auth: { token: tok },
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false, autoConnect: false,
}); });
// Tracked socket.io rooms for this (agent, guild). The initial fetch // Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -523,19 +354,9 @@ export class FabricInbound {
FabricInbound.CHANNEL_SYNC_INTERVAL_MS, FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
); );
this.channelSyncTimers.push(syncTimer); this.channelSyncTimers.push(syncTimer);
const agentTimers = this.timersByAgent.get(agentId) ?? [];
agentTimers.push(syncTimer);
this.timersByAgent.set(agentId, agentTimers);
socket.on('message.created', (m: FabricMessage) => { socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? ''; const channelId = m.channelId ?? '';
if (!channelId) return; if (!channelId) return;
// Record xType into the channel-meta cache before self-author
// / dedup gates — channel type doesn't depend on who sent the
// message, and recording it on observer-only triage messages
// is still useful (the next consumer asking
// __fabric.getChannelType wants the answer regardless of
// whether THIS message was delivered to an agent).
recordChannelType(channelId, m.xType);
if (m.authorUserId && m.authorUserId === selfUserId) return; if (m.authorUserId && m.authorUserId === selfUserId) return;
const key = `${agentId}:${m.messageId}`; const key = `${agentId}:${m.messageId}`;
if (this.seen.has(key)) return; if (this.seen.has(key)) return;
@@ -571,11 +392,6 @@ export class FabricInbound {
}); });
socket.connect(); socket.connect();
this.sockets.push(socket); this.sockets.push(socket);
// Track per-agent so addAccount/removeAccount can teardown
// independently without disturbing other agents.
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
agentSockets.push(socket);
this.socketsByAgent.set(agentId, agentSockets);
} }
} }
@@ -636,31 +452,11 @@ export class FabricInbound {
const core = this.core as Core & Record<string, unknown>; const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } }; const cfg = this.cfg as { session?: { store?: unknown } };
try { try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
// resolveAgentRoute needs the *binding* accountId (the channel-side
// slot name) — not the openclaw agentId. For most agents the binding
// is `{agentId: X, match: {channel: fabric, accountId: X}}` so the
// two coincide; but for shared-placeholder cases (e.g. the recruitment
// `interviewee` slot bound to multiple agents over its lifetime) the
// binding accountId is the slot label ("interviewee", "Neon", …) not
// the agent_id. Passing agentId there returned bindings=0 and silently
// fell back to `main`, hijacking sub-discussion turns. Look up the
// agent's fabric binding accountId here; fall back to agentId when no
// explicit binding exists (preserves prior behavior for agents with
// no fabric binding declared).
const bindingAccountId = findFabricBindingAccountId(this.cfg, agentId) ?? agentId;
const route = core.channel.routing.resolveAgentRoute({ const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg, cfg: this.cfg,
channel: 'fabric', channel: 'fabric',
accountId: bindingAccountId, accountId: agentId,
peer: { kind: peerKind, id: channelId }, peer: { kind: 'group', id: channelId },
}); });
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId, agentId: route.agentId,
@@ -675,7 +471,7 @@ export class FabricInbound {
To: `fabric:${channelId}`, To: `fabric:${channelId}`,
SessionKey: route.sessionKey, SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId, AccountId: route.accountId ?? agentId,
ChatType: chatType, ChatType: 'group',
ConversationLabel: `fabric:${guild.nodeId}`, ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric', SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric', Provider: 'fabric',

View File

@@ -1,177 +0,0 @@
/**
* presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /api/agents/:userId/presence` so the backend can apply
* busy-discard on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
* app.module.js) which expects `Authorization: Bearer <guild-token>`
* — NOT the agent's fabricApiKey directly. So before each PUT we do
* a fresh agent-login (or reuse a cached token if still within its
* 15-min JWT TTL) and pull the guildAccessToken matching the target
* guild. Status changes are rare enough that login overhead is fine.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
import type { FabricClient } from './fabric-client.js';
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
type Logger = { info: (m: string) => void; warn: (m: string) => void };
export interface PresenceSyncAccount {
agentId: string;
fabricUserId: string; // the agent's Fabric Center user id (UUID)
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
fabricApiKey: string; // existing per-account key (used for agent-login)
}
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
interface CachedToken {
token: string;
expiresAt: number; // epoch ms
}
export class PresenceSync {
private timer: ReturnType<typeof setInterval> | null = null;
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
private readonly accounts = new Map<string, PresenceSyncAccount>();
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
private inflight = false;
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
setAccounts(accounts: PresenceSyncAccount[]): void {
this.accounts.clear();
for (const a of accounts) this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000): void {
if (this.timer) return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now) return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
} catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
private async tick(): Promise<void> {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight) return;
this.inflight = true;
try {
await this.tickInner();
} finally {
this.inflight = false;
}
}
private async tickInner(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status: HfStatus | undefined;
try {
status = await bridge.get(agentId);
} catch {
continue;
}
if (!status) continue;
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken) continue;
try {
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401) this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
} catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}

View File

@@ -1,76 +0,0 @@
import type { IdentityRegistry } from './identity.js';
import type { SubDiscussionStore } from './sub-discussion-store.js';
// Plugin-local before_prompt_build hook that injects per-(agent, channel)
// guides for sub-discussion channels created via the `create-sub-discussion`
// tool. Mirrors the pattern used by ClawPrompts' fabric-chat-injector
// (channelId-aware injection) but with content dynamically supplied at
// channel-creation time instead of read from static files via PrismFacet's
// router/rule registry.
//
// Match logic per turn:
// ctx.channelId → store.find() → sub-discussion entry
// ctx.agentId → identity.findByAgentId().fabricUserId
// ─ matches entry.hostUserId → inject hostGuide
// ─ matches entry.guestUserIds → inject guestGuide
// ─ neither → no injection
//
// Fail-closed on unknown agentId/channelId — we never inject "the wrong"
// guide, only the right one or nothing.
const _G = globalThis as Record<string, unknown>;
const DEDUP_KEY = '_fabricSubDiscussionHookDedup';
type PromptCtx = {
agentId?: string;
channelId?: string;
messageProvider?: string;
};
export function registerSubDiscussionHook(
api: {
on: (hook: string, handler: (...args: unknown[]) => unknown) => void;
logger: { info: (m: string) => void; warn: (m: string) => void };
},
store: SubDiscussionStore,
identity: IdentityRegistry,
): void {
if (!(_G[DEDUP_KEY] instanceof WeakSet)) _G[DEDUP_KEY] = new WeakSet<object>();
const dedup = _G[DEDUP_KEY] as WeakSet<object>;
api.on('before_prompt_build', async (...args: unknown[]) => {
const event = args[0];
const ctx = (args[1] ?? {}) as PromptCtx;
// The hook fires both for fabric-driven turns (channelId set) and
// for other triggers (HF wake, exec-event, etc.) — drop those.
if (typeof event === 'object' && event !== null) {
if (dedup.has(event)) return undefined;
dedup.add(event);
}
const agentId = (ctx.agentId ?? '').trim();
const channelId = (ctx.channelId ?? '').trim();
if (!agentId || !channelId) return undefined;
const provider = (ctx.messageProvider ?? '').toLowerCase();
if (provider && provider !== 'fabric') return undefined;
const entry = store.find(channelId);
if (!entry) return undefined;
const ident = identity.findByAgentId(agentId);
const myUserId = (ident?.fabricUserId ?? '').trim();
if (!myUserId) {
// identity registry caches fabricUserId after the first agentLogin
// in inbound.ts. If it's missing here, the agent likely hasn't
// completed login yet — skip rather than guess.
return undefined;
}
if (myUserId === entry.hostUserId) {
return { appendSystemContext: entry.hostGuide };
}
if (entry.guestUserIds.includes(myUserId)) {
return { appendSystemContext: entry.guestGuide };
}
return undefined;
});
}

View File

@@ -1,76 +0,0 @@
import { readFileSync, writeFileSync, existsSync, mkdirSync } from 'node:fs';
import { dirname } from 'node:path';
// Records per-(sub-discussion-channel) state created by the
// `create-sub-discussion` tool and consumed by:
// 1. `before_prompt_build` hook — looks up by (agentId, channelId) to
// inject the host or guest guide as appendSystemContext.
// 2. `close-sub-discussion` tool — looks up by sub channelId to find
// the parent channel to post the callback to and to validate that
// the caller is the original host.
//
// One sub-discussion = one channel. Lifetime: from create-sub-discussion
// return until close-sub-discussion (or gateway-stop / disk corruption).
// We persist to a small JSON file so a gateway restart mid-interview
// doesn't strand both host and guest with no guide.
export type SubDiscussionEntry = {
subChannelId: string;
hostAgentId: string; // the openclaw agentId that called create-sub-discussion
hostUserId: string; // the host's Fabric Center userId (session.user.id)
guestUserIds: string[]; // Fabric Center userIds invited as guests
hostGuide: string; // appended to host's session system prompt while in this channel
guestGuide: string; // appended to each guest's session system prompt while in this channel
callbackGuildNodeId: string; // where to post the callback when close is called
callbackChannelId: string; // parent channel to post callback to (system msg)
createdAt: string; // ISO timestamp
};
type StoreFile = { entries: SubDiscussionEntry[] };
export class SubDiscussionStore {
private byChannelId = new Map<string, SubDiscussionEntry>();
constructor(private readonly filePath: string) {
this.load();
}
private load(): void {
if (!existsSync(this.filePath)) return;
try {
const data = JSON.parse(readFileSync(this.filePath, 'utf8')) as StoreFile;
for (const e of data.entries ?? []) {
if (e?.subChannelId) this.byChannelId.set(e.subChannelId, e);
}
} catch {
// Corrupt file → start empty; first mutation rewrites cleanly.
}
}
private persist(): void {
mkdirSync(dirname(this.filePath), { recursive: true });
const data: StoreFile = { entries: [...this.byChannelId.values()] };
writeFileSync(this.filePath, JSON.stringify(data, null, 2));
}
list(): SubDiscussionEntry[] {
return [...this.byChannelId.values()];
}
find(subChannelId: string): SubDiscussionEntry | undefined {
return this.byChannelId.get(subChannelId);
}
add(entry: SubDiscussionEntry): void {
this.byChannelId.set(entry.subChannelId, entry);
this.persist();
}
remove(subChannelId: string): SubDiscussionEntry | undefined {
const e = this.byChannelId.get(subChannelId);
if (!e) return undefined;
this.byChannelId.delete(subChannelId);
this.persist();
return e;
}
}

View File

@@ -1,7 +1,5 @@
import type { FabricClient } from './fabric-client.js'; import type { FabricClient } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js'; import type { IdentityRegistry } from './identity.js';
import type { SubDiscussionStore } from './sub-discussion-store.js';
import { resolveCommandsSyncKey } from './accounts.js';
// OpenClaw tool registration api (loose typing — concrete shape from // OpenClaw tool registration api (loose typing — concrete shape from
// openclaw/plugin-sdk/core at host SDK version). // openclaw/plugin-sdk/core at host SDK version).
@@ -12,9 +10,6 @@ type ToolApi = {
type Ctx = { agentId?: string }; type Ctx = { agentId?: string };
// Loose config shape — just the fields we read here.
type ToolsCfg = { channels?: { fabric?: { commandsSyncKey?: string } }; [k: string]: unknown };
const X_BY_KIND: Record<string, string> = { const X_BY_KIND: Record<string, string> = {
chat: 'general', chat: 'general',
work: 'work', work: 'work',
@@ -22,34 +17,19 @@ const X_BY_KIND: Record<string, string> = {
discussion: 'discuss', discussion: 'discuss',
}; };
// Delay between create-sub-discussion's channel create and greeting post.
// Backend pushes channel.joined to invitee sockets on create; that push
// has to traverse socket.io rooms before the guest plugin can sub the
// channel:<id> room. If the greeting is posted before that, the guest's
// turn-activation wakeup misses (the socket isn't in the room yet).
// 500ms is empirically slack enough on local sim + production t3, and
// short enough not to feel laggy from the host's tool-result POV. Bump
// via FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS env if needed.
const GREETING_DELAY_MS = (() => {
const v = Number.parseInt(process.env.FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS ?? '', 10);
return Number.isFinite(v) && v >= 0 ? v : 500;
})();
export function registerFabricTools( export function registerFabricTools(
api: ToolApi, api: ToolApi,
client: FabricClient, client: FabricClient,
identity: IdentityRegistry, identity: IdentityRegistry,
store: SubDiscussionStore,
cfg: ToolsCfg,
): void { ): void {
// Resolve the calling agent's Fabric session + a guild's token/endpoint. // Resolve the calling agent's Fabric session + a guild's token/endpoint.
const ctxGuild = async (agentId: string, guildNodeId: string) => { const ctxGuild = async (agentId: string, guildNodeId: string) => {
const entry = identity.findByAgentId(agentId); const entry = identity.findByAgentId(agentId);
if (!entry) if (!entry)
throw new Error( throw new Error(
`agent ${agentId} not registered — call the openclaw \`fabric-register\` ` + `agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`tool (apiKey: <fak_…>, agentId: ${agentId}); the dynamic-subscription ` + `~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`path brings the socket up immediately, no gateway restart needed`, `channels.fabric.accounts.${agentId}); then restart the gateway`,
); );
const session = await client.agentLogin(entry.fabricApiKey); const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId); const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
@@ -58,82 +38,15 @@ export function registerFabricTools(
return { session, guild, token }; return { session, guild, token };
}; };
// Bind an agent's Fabric API key — validates the key against Center, // NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound // It's a one-time step done out-of-band via the installed script
// socket immediately via the live FabricInbound instance (no gateway // ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// restart). The standalone binary `~/.openclaw/bin/fabric-register` // or via static config (channels.fabric.accounts.<agentId>).
// still exists for one-time bootstrap before the gateway runs, but
// recruitment's `register-agent` script should prefer this tool path
// so the new agent's socket is live before `interviewer` fires.
api.registerTool((ctx: Ctx) => ({
name: 'fabric-register',
description:
'Bind an agent to a Fabric Center API key. Validates the key, writes ' +
'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' +
'inbound socket immediately so the agent receives Fabric pushes ' +
'without a gateway restart. Caller defaults to the current agent; ' +
'pass `agentId` to bind on behalf of another agent (recruitment use).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['apiKey'],
properties: {
apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' },
agentId: {
type: 'string',
description:
'Agent to register. Defaults to the calling agent (ctx.agentId). ' +
'Recruitment onboarding may override this when wiring a freshly ' +
'created agent before that agent has a session of its own.',
},
},
},
execute: async (_id: string, p: { apiKey: string; agentId?: string }) => {
const agentId = p.agentId ?? ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' };
if (!p.apiKey || typeof p.apiKey !== 'string') {
return { ok: false, error: 'apiKey required' };
}
// Delegate to FabricInbound.addAccount via the cross-plugin bridge.
// The bridge is installed in index.ts when inbound spins up; if it's
// not present yet, the gateway is still starting and the caller should
// retry (rare path — only hit during the gateway_start window).
const fabricApi = (globalThis as Record<string, unknown>)['__fabric'] as
| { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise<void> }
| undefined;
if (!fabricApi?.addAccount) {
return {
ok: false,
error:
'fabric inbound not ready (gateway still starting?). Fall back to ' +
'~/.openclaw/bin/fabric-register or retry after a few seconds.',
};
}
try {
await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey });
} catch (err) {
return {
ok: false,
error: `fabric-register failed: ${String(err)}`,
};
}
const entry = identity.findByAgentId(agentId);
return {
ok: true,
agentId,
fabricUserId: entry?.fabricUserId,
displayName: entry?.displayName,
};
},
}));
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') => const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({ api.registerTool((ctx: Ctx) => ({
name: `create-${kind}-channel`, name: `create-${kind}-channel`,
description: description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
`Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` +
'Optionally pass `purpose` to describe what this channel is for — ' +
'agents browse channels by purpose via fabric-channel-list.',
parameters: { parameters: {
type: 'object', type: 'object',
additionalProperties: false, additionalProperties: false,
@@ -145,22 +58,13 @@ export function registerFabricTools(
memberUserIds: { type: 'array', items: { type: 'string' } }, memberUserIds: { type: 'array', items: { type: 'string' } },
onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' }, onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' },
listeners: { type: 'array', items: { type: 'string' } }, listeners: { type: 'array', items: { type: 'string' } },
purpose: {
type: 'string',
description:
"Free-form description of what this channel is for. Optional but " +
'strongly recommended so other agents can find this channel by ' +
'intent (via fabric-channel-list). Can be edited later with ' +
'fabric-channel-set-purpose.',
}, },
}, },
}, execute: async (p: {
execute: async (_id: string, p: {
guildNodeId: string; guildNodeId: string;
name: string; name: string;
isPublic?: boolean; isPublic?: boolean;
memberUserIds?: string[]; memberUserIds?: string[];
purpose?: string;
}) => { }) => {
const agentId = ctx.agentId; const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' }; if (!agentId) return { ok: false, error: 'no agent context' };
@@ -171,7 +75,6 @@ export function registerFabricTools(
xType: X_BY_KIND[kind], xType: X_BY_KIND[kind],
isPublic: p.isPublic ?? false, isPublic: p.isPublic ?? false,
memberUserIds: p.memberUserIds ?? [], memberUserIds: p.memberUserIds ?? [],
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
}); });
return { ok: true, channelId: ch.id }; return { ok: true, channelId: ch.id };
}, },
@@ -198,7 +101,7 @@ export function registerFabricTools(
callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' }, callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' },
}, },
}, },
execute: async (_id: string, p: { execute: async (p: {
guildNodeId: string; guildNodeId: string;
channelId: string; channelId: string;
summary: string; summary: string;
@@ -218,245 +121,6 @@ export function registerFabricTools(
}, },
})); }));
// ───────────────────────────────────────────────────────────────────
// create-sub-discussion: open a discuss-type sub-channel hanging off
// the caller's current channel. Designed for host-driven multi-turn
// exchanges (interview, brainstorm, narrow Q&A) where the guests are
// either fresh agents without workflow capability (recruitment
// interviewee) or peers that just need a short scoped chat without
// entering their own subflow.
//
// What it does on top of plain create-discussion-channel:
// 1. Persists a store entry indexed by the new sub channelId, carrying:
// host agent + userId, guest userIds, host/guest guide texts,
// callback (parent) channel info.
// 2. Auto-posts `greetingMsg` using the host's own Fabric account so
// turn rotation's activation rule (first author → newOrder[0],
// currentSpeaker → newOrder[1], wakeup → newOrder[1]) puts the
// first guest on the spot immediately — no race where host posts
// before guest's socket subs the channel room (we wait
// GREETING_DELAY_MS for backend's channel.joined push to land).
// 3. The accompanying before_prompt_build hook (sub-discussion-hook
// registered from index.ts) then injects `hostGuideMsg` into the
// host's session prompt and `guestGuideMsg` into each guest's
// session prompt whenever a turn in this channel fires — so the
// two roles see different instructions, no shared guide file.
// ───────────────────────────────────────────────────────────────────
api.registerTool((ctx: Ctx) => ({
name: 'create-sub-discussion',
description:
'Open a host-driven sub-discussion channel (x_type=discuss) hanging off your current channel, ' +
'with role-specific system-prompt guides for host and guests. Use this for interviews / scoped ' +
'Q&A where you stay in control of when the conversation ends. Returns the sub channelId; ' +
'reach it via fabric-send-message in the rotating turn order. Close with close-sub-discussion ' +
'to write a callback back into the parent channel.',
parameters: {
type: 'object',
additionalProperties: false,
required: [
'guildNodeId',
'currentChannelId',
'channelName',
'greetingMsg',
'hostGuideMsg',
'guestGuideMsg',
'guests',
],
properties: {
guildNodeId: { type: 'string', description: 'Fabric guild node id (same guild for parent + sub).' },
currentChannelId: {
type: 'string',
description: 'Channel id you are currently in (parent). Used as the callback target on close.',
},
channelName: { type: 'string', description: 'Display name for the new sub-discussion channel.' },
greetingMsg: {
type: 'string',
description:
'First message posted by YOU (the host) in the sub channel. Triggers turn rotation so ' +
"the first guest's session wakes immediately with both your greeting in history and the " +
'guest guide injected as system prompt.',
},
hostGuideMsg: {
type: 'string',
description:
"Appended to YOUR session's system prompt whenever a turn fires in this sub channel. " +
'Use it to remind yourself of the procedure (what to ask, when to call close-sub-discussion).',
},
guestGuideMsg: {
type: 'string',
description:
"Appended to EACH GUEST's session system prompt for turns in this sub channel. Use it to " +
'orient guests with no prior workflow context (e.g. a fresh interviewee). Keep it short; ' +
'long guides bloat every turn.',
},
guests: {
type: 'array',
items: { type: 'string' },
minItems: 1,
description:
'Fabric Center userIds invited as guests. Resolve via fabric-channel-list members or the ' +
'<name>@<role>.hangman-lab.top email convention.',
},
purpose: { type: 'string', description: 'Optional channel.purpose for discoverability.' },
},
},
execute: async (
_id: string,
p: {
guildNodeId: string;
currentChannelId: string;
channelName: string;
greetingMsg: string;
hostGuideMsg: string;
guestGuideMsg: string;
guests: string[];
purpose?: string;
},
) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
if (!Array.isArray(p.guests) || p.guests.length === 0) {
return { ok: false, error: 'guests must be a non-empty array of Fabric userIds' };
}
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ch = await client.createChannel(guild.endpoint, token, {
guildId: p.guildNodeId,
name: p.channelName,
xType: 'discuss',
isPublic: false,
memberUserIds: p.guests,
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
});
store.add({
subChannelId: ch.id,
hostAgentId: agentId,
hostUserId: session.user.id,
guestUserIds: [...p.guests],
hostGuide: p.hostGuideMsg,
guestGuide: p.guestGuideMsg,
callbackGuildNodeId: p.guildNodeId,
callbackChannelId: p.currentChannelId,
createdAt: new Date().toISOString(),
});
// Let backend's channel.joined push reach guest sockets before our
// greeting fires — otherwise the wakeup emitted by turn-activation
// races a not-yet-subscribed socket.io room.
if (GREETING_DELAY_MS > 0) {
await new Promise((r) => setTimeout(r, GREETING_DELAY_MS));
}
try {
await client.postMessage(guild.endpoint, token, ch.id, p.greetingMsg, session.user.id);
} catch (err) {
api.logger.warn(
`fabric: create-sub-discussion greeting post failed channel=${ch.id} err=${String(err)}`,
);
}
return { ok: true, subChannelId: ch.id };
},
}));
// ───────────────────────────────────────────────────────────────────
// close-sub-discussion: post a system-authored callback into the
// parent channel + close the sub-discussion channel. Only the original
// host can call this. Uses the Guild's x-fabric-system-key path (shared
// secret = commandsSyncKey) so the callback lands as a guild/system
// author, not the host's personal account — and can wake the host on
// the parent channel to continue whatever workflow opened the sub.
// ───────────────────────────────────────────────────────────────────
api.registerTool((ctx: Ctx) => ({
name: 'close-sub-discussion',
description:
'Close a sub-discussion channel you opened (host-only) and write a callback to the parent ' +
'channel as a system message. Pass `wakeupHost: false` to land the callback silently in ' +
'history without waking yourself.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['subChannelId', 'callbackMsg'],
properties: {
subChannelId: { type: 'string', description: 'The sub-discussion channelId returned by create-sub-discussion.' },
callbackMsg: {
type: 'string',
description:
'Content to post into the parent channel as a system-authored message. Typical content: ' +
'the conclusion / extracted data from the sub-discussion, so the next turn on the parent ' +
'channel can act on it.',
},
wakeupHost: {
type: 'boolean',
description:
'Whether to wake YOU (the host) on the parent channel. Default true — for recruitment ' +
'interview flow where the next workflow step needs to run immediately. Pass false for ' +
'fire-and-forget logging.',
},
},
},
execute: async (
_id: string,
p: { subChannelId: string; callbackMsg: string; wakeupHost?: boolean },
) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const entry = store.find(p.subChannelId);
if (!entry) {
return { ok: false, error: `sub-discussion not found: ${p.subChannelId}` };
}
if (entry.hostAgentId !== agentId) {
return {
ok: false,
error: `only the host (${entry.hostAgentId}) may close this sub-discussion`,
};
}
const systemKey = resolveCommandsSyncKey(cfg);
if (!systemKey) {
return {
ok: false,
error:
'channels.fabric.commandsSyncKey is not configured — close-sub-discussion needs it for ' +
'the x-fabric-system-key callback. Configure via openclaw config.',
};
}
const { guild, token } = await ctxGuild(agentId, entry.callbackGuildNodeId);
const wakeup = p.wakeupHost !== false;
// 1) Post callback into parent channel via the system-key path.
const url = `${guild.endpoint}/api/channels/${encodeURIComponent(entry.callbackChannelId)}/messages`;
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-fabric-system-key': systemKey,
},
body: JSON.stringify({
content: p.callbackMsg,
wakeupUserId: wakeup ? entry.hostUserId : null,
}),
}).catch((err) => {
throw new Error(`callback POST failed: ${String(err)}`);
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return {
ok: false,
error: `callback POST ${url} -> ${res.status} ${text}`,
};
}
// 2) Close the sub channel using the host's own bearer (the host is
// a member of the channel — channel.close auth is per-member).
try {
await client.closeChannel(guild.endpoint, token, entry.subChannelId);
} catch (err) {
api.logger.warn(
`fabric: close-sub-discussion: sub channel close failed channel=${entry.subChannelId} err=${String(err)}`,
);
}
// 3) Drop the store entry so the prompt-injection hook stops firing
// for this channel (the sub is closed; any straggler turns would
// just hit the closed-channel write reject downstream).
store.remove(entry.subChannelId);
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single // fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are // pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise). // sharer-only server-side (the guild returns 403 otherwise).
@@ -487,7 +151,7 @@ export function registerFabricTools(
}, },
}, },
}, },
execute: async (_id: string, p: { execute: async (p: {
action: 'read' | 'share' | 'update' | 'close'; action: 'read' | 'share' | 'update' | 'close';
guildNodeId: string; guildNodeId: string;
channelId: string; channelId: string;
@@ -552,7 +216,7 @@ export function registerFabricTools(
channelId: { type: 'string' }, channelId: { type: 'string' },
}, },
}, },
execute: async (_id: string, p: { execute: async (p: {
action: 'members' | 'join' | 'leave'; action: 'members' | 'join' | 'leave';
guildNodeId: string; guildNodeId: string;
channelId: string; channelId: string;
@@ -604,7 +268,7 @@ export function registerFabricTools(
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' }, content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
}, },
}, },
execute: async (_id: string, p: { guildNodeId: string; channelId: string; content: string }) => { execute: async (p: { guildNodeId: string; channelId: string; content: string }) => {
const agentId = ctx.agentId; const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' }; if (!agentId) return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId); const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
@@ -619,89 +283,6 @@ export function registerFabricTools(
}, },
})); }));
// ───────────────────────────────────────────────────────────────────
// fabric-send-sys-msg: post a system-authored message (author =
// sentinel UUID 0000…, not the calling agent) using the Guild's
// x-fabric-system-key path. Use for cross-agent broadcasts where you
// don't want the message tied to one agent's identity — Dialectic
// topic announcements / lifecycle events, host-system advisories,
// etc. Caller doesn't need to be a member of the channel (the
// backend isSystem branch skips assertParticipant), but must be a
// member of the guild (their session resolves the guild endpoint).
//
// Shared secret: reads channels.fabric.commandsSyncKey (same value
// as the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY env). Empty
// config → tool returns ok:false with a clear error, no fall-through
// to regular agent posting.
// ───────────────────────────────────────────────────────────────────
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-sys-msg',
description:
'Send a SYSTEM-AUTHORED message into a Fabric channel (author = guild sentinel, not you). ' +
'Use for cross-agent broadcasts that should not be attributed to a single agent — ' +
'Dialectic announce-channel topic broadcasts, lifecycle events, system advisories. ' +
'Optionally precise-wake one recipient via wakeupUserId; otherwise the message lands ' +
'silently in history (no wake).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
wakeupUserId: {
type: 'string',
description:
"Optional: a single Fabric userId to wake with this message (everyone else in the " +
'channel sees it but with wakeup=false). Omit for fully silent broadcast.',
},
},
},
execute: async (
_id: string,
p: { guildNodeId: string; channelId: string; content: string; wakeupUserId?: string },
) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const systemKey = resolveCommandsSyncKey(cfg);
if (!systemKey) {
return {
ok: false,
error:
'channels.fabric.commandsSyncKey is not configured — fabric-send-sys-msg needs it for ' +
'the x-fabric-system-key header. Configure via openclaw config.',
};
}
const { guild } = await ctxGuild(agentId, p.guildNodeId);
const url = `${guild.endpoint}/api/channels/${encodeURIComponent(p.channelId)}/messages`;
const wakeup = typeof p.wakeupUserId === 'string' && p.wakeupUserId.trim()
? p.wakeupUserId.trim()
: null;
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-fabric-system-key': systemKey,
},
body: JSON.stringify({ content: p.content, wakeupUserId: wakeup }),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return { ok: false, error: `POST ${url} -> ${res.status} ${text}` };
}
const json = (await res.json().catch(() => null)) as
| { messageId?: string; seq?: number; authorUserId?: string }
| null;
return {
ok: true,
messageId: json?.messageId,
seq: json?.seq,
authorUserId: json?.authorUserId,
};
},
}));
// ----------------------------------------------------------------- // -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see // fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the // in a given guild. Backend filters to public channels + channels the
@@ -729,7 +310,7 @@ export function registerFabricTools(
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' }, includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
}, },
}, },
execute: async (_id: string, p: { execute: async (p: {
guildNodeId: string; guildNodeId: string;
nameFilter?: string; nameFilter?: string;
xType?: string; xType?: string;
@@ -756,114 +337,11 @@ export function registerFabricTools(
isPublic: c.isPublic, isPublic: c.isPublic,
closed: c.closed, closed: c.closed,
lastSeq: c.lastSeq, lastSeq: c.lastSeq,
purpose: c.purpose ?? null,
})), })),
}; };
}, },
})); }));
// -----------------------------------------------------------------
// fabric-guild-list: enumerate guilds the calling agent belongs to.
// Each row carries `purpose` — free-form description of what the
// guild is for (admin-set). Use this as the first step when a
// workflow says "find the right guild for X" — pick by purpose,
// then fabric-channel-list to find the right channel inside it.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-guild-list',
description:
'List guilds the calling agent is a member of. Returns ' +
'{nodeId, name, purpose, status} per row. ' +
"`purpose` is a free-form description of what each guild is for — " +
'pick the guild whose purpose matches your intent. Use this tool ' +
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
'debate broadcasts" → then list its announce-type channels).',
parameters: {
type: 'object',
additionalProperties: false,
properties: {
nameFilter: {
type: 'string',
description: 'optional case-insensitive substring match on guild name',
},
purposeFilter: {
type: 'string',
description:
'optional case-insensitive substring match on guild purpose ' +
'(e.g. "debate", "announcements")',
},
},
},
execute: async (_id: string, p: { nameFilter?: string; purposeFilter?: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const entry = identity.findByAgentId(agentId);
if (!entry) return { ok: false, error: `agent ${agentId} not registered` };
const session = await client.agentLogin(entry.fabricApiKey);
const nameNeedle = (p.nameFilter ?? '').toLowerCase();
const purposeNeedle = (p.purposeFilter ?? '').toLowerCase();
const guilds = session.guilds.filter((g) => {
if (nameNeedle && !g.name.toLowerCase().includes(nameNeedle)) return false;
if (purposeNeedle) {
const purp = (g.purpose ?? '').toLowerCase();
if (!purp.includes(purposeNeedle)) return false;
}
return true;
});
return {
ok: true,
count: guilds.length,
guilds: guilds.map((g) => ({
nodeId: g.nodeId,
name: g.name,
status: g.status,
purpose: g.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-channel-set-purpose: set/update a channel's free-form
// purpose description. Caller must be a channel member (or the
// channel must be public). Use this to backfill purpose on existing
// channels, or to refine it after a channel's role evolves.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-set-purpose',
description:
"Set or update a channel's free-form purpose description. " +
'Channel membership required (or the channel must be public). ' +
'Pass empty string to clear. Use this to make a channel ' +
'discoverable to other agents via fabric-channel-list.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'purpose'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
purpose: {
type: 'string',
description: "What this channel is for. Pass '' (empty string) to clear.",
},
},
},
execute: async (_id: string, p: { guildNodeId: string; channelId: string; purpose: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = await client.setChannelPurpose(
guild.endpoint,
token,
p.channelId,
p.purpose,
);
return { ok: true, channel: res };
},
}));
// ----------------------------------------------------------------- // -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by // fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns // `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
@@ -891,7 +369,7 @@ export function registerFabricTools(
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' }, limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
}, },
}, },
execute: async (_id: string, p: { execute: async (p: {
guildNodeId: string; guildNodeId: string;
channelId: string; channelId: string;
seqFrom?: number; seqFrom?: number;