Compare commits
24 Commits
ab126825ef
...
fix/presen
| Author | SHA1 | Date | |
|---|---|---|---|
| 9419d270e5 | |||
| 79b29db26c | |||
| a87de27cff | |||
| dabaa6e1f2 | |||
| b8e0e424fa | |||
| 81a10f2a1f | |||
| c5429129d9 | |||
| c330571bcb | |||
| 8963f3ca00 | |||
| 0e36457d8f | |||
| 5ff464a055 | |||
| 6fe06f55dd | |||
| a15dc880af | |||
| 5dcbd99c28 | |||
| cd36d1b9e2 | |||
| 9c910f082b | |||
| c5fd091f5a | |||
| c5a33c33ec | |||
| 28f5083679 | |||
| a060ff98a2 | |||
| b9a5456d57 | |||
| d1d5ad10ca | |||
| 92945b777d | |||
| 8774cfd7cc |
@@ -72,6 +72,12 @@ Two ways, both write the same identity registry the transport reads:
|
||||
`FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the
|
||||
guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`.
|
||||
Sourced from config only — never from the environment.
|
||||
- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent
|
||||
turn whose blocks are `text → thinking/tool → text` into multiple
|
||||
`deliver()` calls; this buffers them and posts ONE Fabric message at the
|
||||
deterministic turn boundary (right after the inbound reply dispatch
|
||||
resolves — no hooks, no timers, no idle guessing). `false` = raw
|
||||
per-segment posting.
|
||||
- `channels.fabric.accounts.<agentId>` = `{ fabricApiKey, enabled }`
|
||||
(**agent = account**; the account id is the OpenClaw agentId)
|
||||
- plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json`
|
||||
|
||||
62
dist/fabric/index.js
vendored
62
dist/fabric/index.js
vendored
@@ -5,16 +5,24 @@
|
||||
// the OpenClawPluginApi for runtime startup (transport + tools).
|
||||
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
||||
import { fabricChannelPlugin } from './src/channel.js';
|
||||
import { flushAllFabric } from './src/coalesce.js';
|
||||
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
|
||||
import { FabricInbound } from './src/inbound.js';
|
||||
import { listEnabledFabricAccounts } from './src/accounts.js';
|
||||
import { registerFabricTools } from './src/tools.js';
|
||||
import { FabricClient } from './src/fabric-client.js';
|
||||
import { IdentityRegistry } from './src/identity.js';
|
||||
import { syncFabricCommands } from './src/command-sync.js';
|
||||
import { PresenceSync } from './src/presence-sync.js';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
let runtimeRef = null;
|
||||
let inbound = null;
|
||||
let presence = null;
|
||||
// Periodic re-harvest of presence accounts so newly-connected agents
|
||||
// (registered through tool-based identity flow AFTER initial start)
|
||||
// get picked up. Cleared on gateway_stop.
|
||||
let presenceRefreshTimer = null;
|
||||
export { fabricChannelPlugin } from './src/channel.js';
|
||||
export default defineChannelPluginEntry({
|
||||
id: 'fabric',
|
||||
@@ -36,6 +44,29 @@ export default defineChannelPluginEntry({
|
||||
const client = new FabricClient(centerApiBase);
|
||||
const identity = new IdentityRegistry(idFile);
|
||||
registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity);
|
||||
// Cross-plugin API: globalThis.__fabric
|
||||
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
|
||||
// injection to DM-typed channels only. The channel-meta cache is
|
||||
// populated lazily from inbound (message.created carries xType) and
|
||||
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
|
||||
// very first DM after a fresh gateway start hits cache from the
|
||||
// previous run rather than firing the injector on the wrong type.
|
||||
//
|
||||
// null return = channel never seen (cache cold). Callers MUST NOT
|
||||
// fall back to "assume DM" — fail closed on unknown.
|
||||
{
|
||||
const _G = globalThis;
|
||||
_G['__fabric'] = { getChannelType };
|
||||
// Flush channel-meta cache when the gateway shuts down so
|
||||
// recently-recorded xType entries don't get lost.
|
||||
api.on('gateway_stop', () => {
|
||||
try {
|
||||
flushChannelMeta();
|
||||
}
|
||||
catch { /* ignore */ }
|
||||
});
|
||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
||||
}
|
||||
api.on('gateway_start', () => {
|
||||
const _G = globalThis;
|
||||
if (_G._fabricInboundStarted)
|
||||
@@ -56,11 +87,40 @@ export default defineChannelPluginEntry({
|
||||
return;
|
||||
}
|
||||
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
|
||||
void inbound.start();
|
||||
// start() resolves once all accounts have attempted login; per-
|
||||
// agent failures are logged but don't reject. Once it resolves we
|
||||
// can harvest the presence accounts (those that DID log in have
|
||||
// their fabricUserId + first guild endpoint populated).
|
||||
void inbound.start().then(() => {
|
||||
if (!inbound)
|
||||
return;
|
||||
presence = new PresenceSync(api.logger, client);
|
||||
presence.setAccounts(inbound.getPresenceAccounts());
|
||||
presence.start();
|
||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||
// Re-harvest every 5 min: catches agents added via tool-based
|
||||
// identity provisioning after gateway_start (recruitment flow).
|
||||
// setAccounts is idempotent — duplicates collapse on agentId.
|
||||
presenceRefreshTimer = setInterval(() => {
|
||||
if (inbound && presence)
|
||||
presence.setAccounts(inbound.getPresenceAccounts());
|
||||
}, 5 * 60_000);
|
||||
});
|
||||
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
|
||||
void syncFabricCommands(client, cfg, accounts, api.logger);
|
||||
});
|
||||
// Note: the per-turn coalesce flush happens deterministically in
|
||||
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
|
||||
// is the real "all deliveries done" boundary; the agent_end hook fires
|
||||
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
|
||||
api.on('gateway_stop', () => {
|
||||
void flushAllFabric();
|
||||
if (presenceRefreshTimer) {
|
||||
clearInterval(presenceRefreshTimer);
|
||||
presenceRefreshTimer = null;
|
||||
}
|
||||
presence?.stop();
|
||||
presence = null;
|
||||
inbound?.stop();
|
||||
inbound = null;
|
||||
});
|
||||
|
||||
5
dist/fabric/src/accounts.js
vendored
5
dist/fabric/src/accounts.js
vendored
@@ -13,6 +13,11 @@ function section(cfg) {
|
||||
export function resolveCommandsSyncKey(cfg) {
|
||||
return (section(cfg).commandsSyncKey ?? '').trim();
|
||||
}
|
||||
// Whether to coalesce a split agent turn into one Fabric message
|
||||
// (channel-level). Default true.
|
||||
export function resolveCoalesce(cfg) {
|
||||
return (cfg.channels?.fabric ?? {}).coalesce !== false;
|
||||
}
|
||||
export function listFabricAccountIds(cfg) {
|
||||
const accts = section(cfg).accounts ?? {};
|
||||
const ids = Object.keys(accts);
|
||||
|
||||
105
dist/fabric/src/channel-meta.js
vendored
Normal file
105
dist/fabric/src/channel-meta.js
vendored
Normal file
@@ -0,0 +1,105 @@
|
||||
/**
|
||||
* Channel-meta cache. Records (channelId → xType) for every fabric
|
||||
* channel the gateway has seen at least one inbound message in.
|
||||
*
|
||||
* Populated lazily from inbound (`recordChannelType` is called for
|
||||
* every `message.created` event with non-empty `xType`). Persisted to
|
||||
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
|
||||
* gateway restarts (so the very first DM after restart still gets the
|
||||
* right xType without waiting for a fresh inbound).
|
||||
*
|
||||
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
|
||||
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
|
||||
* to xType==='dm' only.
|
||||
*
|
||||
* Failure mode: lookup misses (channel never seen / inbound dropped
|
||||
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
|
||||
* fall back to "assume DM" or you re-introduce the false-positive on
|
||||
* group channels.
|
||||
*/
|
||||
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
|
||||
import { dirname, join } from 'node:path';
|
||||
import { homedir } from 'node:os';
|
||||
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
|
||||
let memory = new Map();
|
||||
let loaded = false;
|
||||
let dirty = false;
|
||||
let flushTimer = null;
|
||||
function load() {
|
||||
if (loaded)
|
||||
return;
|
||||
loaded = true;
|
||||
try {
|
||||
if (!existsSync(CACHE_FILE))
|
||||
return;
|
||||
const raw = readFileSync(CACHE_FILE, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
|
||||
if (typeof k === 'string' && typeof v === 'string')
|
||||
memory.set(k, v);
|
||||
}
|
||||
}
|
||||
catch {
|
||||
// ignore — start with empty cache on corruption
|
||||
}
|
||||
}
|
||||
function scheduleFlush() {
|
||||
if (flushTimer)
|
||||
return;
|
||||
// Debounce writes — many inbound messages may arrive in a burst.
|
||||
// 250ms coalesces them; on gateway_stop the channel plugin can force
|
||||
// a synchronous flush via flushChannelMeta().
|
||||
flushTimer = setTimeout(() => {
|
||||
flushTimer = null;
|
||||
if (!dirty)
|
||||
return;
|
||||
dirty = false;
|
||||
flushSync();
|
||||
}, 250);
|
||||
}
|
||||
function flushSync() {
|
||||
try {
|
||||
const dir = dirname(CACHE_FILE);
|
||||
if (!existsSync(dir))
|
||||
mkdirSync(dir, { recursive: true });
|
||||
const out = { channels: Object.fromEntries(memory) };
|
||||
const tmp = CACHE_FILE + '.tmp';
|
||||
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
|
||||
renameSync(tmp, CACHE_FILE);
|
||||
}
|
||||
catch {
|
||||
// swallow — cache is an optimization; loss-on-write is recoverable
|
||||
}
|
||||
}
|
||||
/** Called by inbound on every message.created. xType empty → no-op. */
|
||||
export function recordChannelType(channelId, xType) {
|
||||
if (!channelId || !xType)
|
||||
return;
|
||||
load();
|
||||
const existing = memory.get(channelId);
|
||||
if (existing === xType)
|
||||
return;
|
||||
memory.set(channelId, xType);
|
||||
dirty = true;
|
||||
scheduleFlush();
|
||||
}
|
||||
/** Cross-plugin lookup. null when channel never seen / unknown. */
|
||||
export function getChannelType(channelId) {
|
||||
if (!channelId)
|
||||
return null;
|
||||
load();
|
||||
return memory.get(channelId) ?? null;
|
||||
}
|
||||
/** Force-flush — called on plugin shutdown to make sure recently
|
||||
* recorded entries hit disk before the gateway dies. */
|
||||
export function flushChannelMeta() {
|
||||
if (flushTimer) {
|
||||
clearTimeout(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
if (dirty) {
|
||||
dirty = false;
|
||||
flushSync();
|
||||
}
|
||||
}
|
||||
export const CHANNEL_META_PATH = CACHE_FILE;
|
||||
23
dist/fabric/src/channel.js
vendored
23
dist/fabric/src/channel.js
vendored
@@ -11,6 +11,15 @@
|
||||
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
|
||||
import { FabricClient } from './fabric-client.js';
|
||||
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
|
||||
import { getChannelType } from './channel-meta.js';
|
||||
export function fabricPeerRoutingForXType(xType) {
|
||||
if (xType === 'dm')
|
||||
return { peerKind: 'direct', chatType: 'direct' };
|
||||
return { peerKind: 'group', chatType: 'group' };
|
||||
}
|
||||
export function fabricPeerRoutingForChannel(channelId) {
|
||||
return fabricPeerRoutingForXType(getChannelType(channelId));
|
||||
}
|
||||
// ---- target grammar: fabric:<channelId> ----
|
||||
export function stripFabricTargetPrefix(raw) {
|
||||
let s = (raw ?? '').trim();
|
||||
@@ -38,13 +47,18 @@ export function resolveFabricOutboundSessionRoute(params) {
|
||||
const id = stripFabricTargetPrefix(params.target);
|
||||
if (!id)
|
||||
return null;
|
||||
// Consult the channel-meta cache populated by inbound — DM channels
|
||||
// need peer.kind='direct' so the outbound session key matches the
|
||||
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
|
||||
// no regression on cold cache).
|
||||
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
|
||||
return buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: 'fabric',
|
||||
accountId: params.accountId,
|
||||
peer: { kind: 'group', id },
|
||||
chatType: 'group',
|
||||
peer: { kind: peerKind, id },
|
||||
chatType,
|
||||
from: `fabric:channel:${id}`,
|
||||
to: `fabric:${id}`,
|
||||
});
|
||||
@@ -137,7 +151,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({
|
||||
attachedResults: {
|
||||
channel: 'fabric',
|
||||
sendText: async (ctx) => {
|
||||
// openclaw passes config under cfg or config depending on path
|
||||
// openclaw passes config under cfg or config depending on path.
|
||||
// Note: inbound agent replies go through inbound.ts `deliver`
|
||||
// (where turn coalescing happens). This path is for any direct
|
||||
// outbound sends and posts immediately.
|
||||
const cfg = (ctx.cfg ?? ctx.config ?? {});
|
||||
try {
|
||||
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);
|
||||
|
||||
75
dist/fabric/src/coalesce.js
vendored
Normal file
75
dist/fabric/src/coalesce.js
vendored
Normal file
@@ -0,0 +1,75 @@
|
||||
// Deterministic turn coalescer.
|
||||
//
|
||||
// OpenClaw calls the Fabric `deliver` callback once per assistant text
|
||||
// segment; a thinking/tool block between two text blocks is a delivery
|
||||
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
|
||||
// multiple deliver() calls. There is no turn id on the delivery, so we
|
||||
// BUFFER segments by Fabric channelId and post the merged message when the
|
||||
// turn truly ends. The flush is driven by inbound.ts right after
|
||||
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
|
||||
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
|
||||
// agent_end hook, which fires before deliver()). `coalesce=false` posts
|
||||
// each segment immediately.
|
||||
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
|
||||
export function normChannelId(x) {
|
||||
const s = String(x ?? '');
|
||||
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
|
||||
}
|
||||
const pendingByChannel = new Map();
|
||||
async function flushChannel(channelId, reason) {
|
||||
const p = pendingByChannel.get(channelId);
|
||||
if (!p)
|
||||
return;
|
||||
pendingByChannel.delete(channelId);
|
||||
clearTimeout(p.safety);
|
||||
const text = p.parts.join('\n\n').trim();
|
||||
if (!text)
|
||||
return;
|
||||
try {
|
||||
await p.post(text);
|
||||
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
|
||||
}
|
||||
catch (e) {
|
||||
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
|
||||
}
|
||||
}
|
||||
// Buffer one delivered segment (or send immediately when coalesce=false).
|
||||
// `post` performs the real Fabric postMessage with the caller's already
|
||||
// resolved guild/token; on flush it is called once with the merged text.
|
||||
export async function enqueueDelivery(params) {
|
||||
const cid = normChannelId(params.channelId);
|
||||
const text = (params.text ?? '').trim();
|
||||
if (!text)
|
||||
return;
|
||||
if (!params.coalesce) {
|
||||
await params.post(text);
|
||||
return;
|
||||
}
|
||||
const existing = pendingByChannel.get(cid);
|
||||
if (existing) {
|
||||
existing.parts.push(text);
|
||||
existing.post = params.post; // freshest guild/token closure
|
||||
existing.log = params.log;
|
||||
}
|
||||
else {
|
||||
pendingByChannel.set(cid, {
|
||||
parts: [text],
|
||||
post: params.post,
|
||||
log: params.log,
|
||||
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
|
||||
});
|
||||
}
|
||||
}
|
||||
// Called by the agent_end hook with the hook ctx's channelId (bare or
|
||||
// fabric:-prefixed). Deterministic per-turn boundary.
|
||||
export async function flushFabricForChannel(rawChannelId) {
|
||||
const cid = normChannelId(rawChannelId);
|
||||
if (cid)
|
||||
await flushChannel(cid, 'dispatch-end');
|
||||
}
|
||||
// gateway_stop: flush anything still buffered.
|
||||
export async function flushAllFabric() {
|
||||
for (const cid of [...pendingByChannel.keys()]) {
|
||||
await flushChannel(cid, 'gateway_stop');
|
||||
}
|
||||
}
|
||||
32
dist/fabric/src/fabric-client.js
vendored
32
dist/fabric/src/fabric-client.js
vendored
@@ -63,6 +63,11 @@ export class FabricClient {
|
||||
createChannel(guildEndpoint, guildToken, body) {
|
||||
return this.post(`${guildEndpoint}/api/channels`, body, guildToken);
|
||||
}
|
||||
// PATCH /api/channels/:id — backend currently only patches `purpose`.
|
||||
// Caller must be a member of the channel (or any user if public).
|
||||
setChannelPurpose(guildEndpoint, guildToken, channelId, purpose) {
|
||||
return this.req('PATCH', `${guildEndpoint}/api/channels/${channelId}`, guildToken, { purpose });
|
||||
}
|
||||
closeChannel(guildEndpoint, guildToken, channelId) {
|
||||
return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken);
|
||||
}
|
||||
@@ -105,4 +110,31 @@ export class FabricClient {
|
||||
removeCanvas(endpoint, token, channelId) {
|
||||
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
|
||||
}
|
||||
// ---- channel discovery + message read (used by the agent-facing
|
||||
// fabric-channel-list / fabric-message-history tools) ----
|
||||
/**
|
||||
* List channels in a guild visible to the calling user. Backend
|
||||
* filters to public + channels the user is a member of.
|
||||
*/
|
||||
listChannels(guildEndpoint, guildToken, guildNodeId) {
|
||||
return this.req('GET', `${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`, guildToken);
|
||||
}
|
||||
/**
|
||||
* Page through a channel's message history by `seq`.
|
||||
*
|
||||
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
|
||||
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
|
||||
* tail. Page metadata in the response describes what to ask next.
|
||||
*/
|
||||
listMessages(guildEndpoint, guildToken, channelId, opts = {}) {
|
||||
const qs = new URLSearchParams();
|
||||
if (opts.seqFrom !== undefined)
|
||||
qs.set('seq_from', String(opts.seqFrom));
|
||||
if (opts.seqTo !== undefined)
|
||||
qs.set('seq_to', String(opts.seqTo));
|
||||
if (opts.limit !== undefined)
|
||||
qs.set('limit', String(opts.limit));
|
||||
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
|
||||
return this.req('GET', url, guildToken);
|
||||
}
|
||||
}
|
||||
|
||||
339
dist/fabric/src/inbound.js
vendored
339
dist/fabric/src/inbound.js
vendored
@@ -3,6 +3,10 @@ import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { io } from 'socket.io-client';
|
||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||
import { resolveCoalesce } from './accounts.js';
|
||||
import { fabricPeerRoutingForXType } from './channel.js';
|
||||
import { recordChannelType } from './channel-meta.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
export class FabricInbound {
|
||||
core;
|
||||
cfg;
|
||||
@@ -12,12 +16,139 @@ export class FabricInbound {
|
||||
accounts;
|
||||
sockets = [];
|
||||
seen = new Set();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
channelSyncTimers = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
// Re-login per agent on a short TTL to keep a fresh token.
|
||||
tokenCache = new Map();
|
||||
static TOKEN_TTL_MS = 8 * 60 * 1000;
|
||||
// Per-channel serial work queue. Every inbound socket message for a
|
||||
// channel awaits the previous task for that same channel, so model
|
||||
// turns never interleave. Map key = channelId; value is the tail of
|
||||
// the chain (an in-flight promise the next task awaits).
|
||||
//
|
||||
// Why per-channel and not per-agent: a single agent may sit in
|
||||
// several triage / general channels; we want each channel to flow at
|
||||
// its own speed but the SAME channel's traffic to be strictly serial.
|
||||
// For dm and discuss the queue also serialises but those traditionally
|
||||
// had at-most-one-in-flight anyway via the turn engine.
|
||||
channelChains = new Map();
|
||||
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
|
||||
// agent/status round-trip off the hot path for back-to-back triage
|
||||
// messages. Short TTL because status flips are rare-but-meaningful.
|
||||
agentStatusCache = new Map();
|
||||
static AGENT_STATUS_TTL_MS = 5_000;
|
||||
// Triage messages that arrived while the on-duty agent wasn't on_call
|
||||
// — sit here until either (a) the agent becomes on_call and the next
|
||||
// triage arrival drains them, or (b) the gateway restarts (lost; ok
|
||||
// because the underlying Fabric messages are persisted and re-fetched
|
||||
// on agent reconnect's history sweep).
|
||||
pendingTriageGated = [];
|
||||
// Schedule `task` to run after every previous task on the same
|
||||
// channel has completed. Returns the promise so callers can await
|
||||
// their own result if they need to; the chain itself is fire-and-
|
||||
// forget from the socket.on handler.
|
||||
enqueueChannelTask(channelId, task) {
|
||||
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
|
||||
const next = prev.then(task).catch((err) => {
|
||||
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
|
||||
});
|
||||
this.channelChains.set(channelId, next);
|
||||
// Best-effort cleanup so the Map doesn't grow without bound for
|
||||
// long-running gateways: drop the entry when the chain settles, but
|
||||
// only if it's still the latest reference (newer enqueue may have
|
||||
// overwritten it in the meantime).
|
||||
void next.finally(() => {
|
||||
if (this.channelChains.get(channelId) === next) {
|
||||
this.channelChains.delete(channelId);
|
||||
}
|
||||
});
|
||||
return next;
|
||||
}
|
||||
// Hit HF backend to check whether `agentId` is currently on_call.
|
||||
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
|
||||
// on_call" — triage stays gated rather than risking a confused wake.
|
||||
async checkAgentOnCall(agentId) {
|
||||
const cached = this.agentStatusCache.get(agentId);
|
||||
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
|
||||
return cached.onCall;
|
||||
}
|
||||
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
|
||||
// CLAW_IDENTIFIER resolution priority:
|
||||
// 1. HF_CLAW_IDENTIFIER env (operator override)
|
||||
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
|
||||
// plugin itself uses — keeps the two in sync without an extra
|
||||
// env per service unit)
|
||||
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
|
||||
// container hostname is `server.t2` but HF agent row has
|
||||
// `claw_identifier=sim-t2`; matching is mandatory for the HF
|
||||
// backend's _require_agent() check)
|
||||
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
|
||||
if (!claw) {
|
||||
try {
|
||||
// openclaw config shape (verified in sim):
|
||||
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
|
||||
const cfg = this.cfg;
|
||||
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
|
||||
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
|
||||
claw = fromCfg.trim();
|
||||
}
|
||||
}
|
||||
catch {
|
||||
/* fall through to hostname */
|
||||
}
|
||||
}
|
||||
if (!claw) {
|
||||
claw = (await import('os')).hostname();
|
||||
}
|
||||
let onCall = false;
|
||||
try {
|
||||
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
|
||||
const res = await fetch(url, {
|
||||
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
|
||||
});
|
||||
if (res.ok) {
|
||||
const data = (await res.json());
|
||||
onCall = (data.status ?? '').toLowerCase() === 'on_call';
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
|
||||
}
|
||||
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
|
||||
return onCall;
|
||||
}
|
||||
// FIFO drain of all triage-gated messages for `agentId` (called when
|
||||
// we just learned they're on_call). Each drained message is dispatched
|
||||
// through its own channel chain so per-channel serial order is kept.
|
||||
async drainGatedFor(agentId) {
|
||||
const keep = [];
|
||||
const drain = [];
|
||||
for (const item of this.pendingTriageGated) {
|
||||
if (item.agentId === agentId)
|
||||
drain.push(item);
|
||||
else
|
||||
keep.push(item);
|
||||
}
|
||||
if (drain.length === 0)
|
||||
return;
|
||||
this.pendingTriageGated = keep;
|
||||
for (const item of drain) {
|
||||
this.log.info(`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`);
|
||||
// Re-enqueue via the per-channel chain so ordering is preserved.
|
||||
this.enqueueChannelTask(item.channelId, async () => {
|
||||
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
|
||||
});
|
||||
}
|
||||
}
|
||||
// Return a fresh guild access token for the agent, re-authenticating with
|
||||
// the agent's Fabric API key when the cached session is stale. Falls back
|
||||
// to the connect-time session token if re-login fails.
|
||||
@@ -73,12 +204,59 @@ export class FabricInbound {
|
||||
}
|
||||
}
|
||||
stop() {
|
||||
for (const t of this.channelSyncTimers)
|
||||
clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets)
|
||||
s.disconnect();
|
||||
this.sockets = [];
|
||||
}
|
||||
/**
|
||||
* Per-account metadata harvested during `start()` — used by
|
||||
* PresenceSync to know where to push each agent's HF status.
|
||||
*
|
||||
* `fabricUserId` is filled from `session.user.id` after agent-login.
|
||||
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
|
||||
* guild presence push is a future concern; for sim/prod-v1 each agent
|
||||
* is in one guild).
|
||||
*
|
||||
* Returns ONLY agents that successfully connected — failed-login
|
||||
* agents have no fabricUserId yet and are excluded.
|
||||
*/
|
||||
getPresenceAccounts() {
|
||||
const out = [];
|
||||
for (const entry of this.identity.list()) {
|
||||
if (!entry.fabricUserId)
|
||||
continue;
|
||||
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
|
||||
if (!presenceGuild)
|
||||
continue;
|
||||
out.push({
|
||||
agentId: entry.agentId,
|
||||
fabricUserId: entry.fabricUserId,
|
||||
guildBaseUrl: presenceGuild.endpoint,
|
||||
guildNodeId: presenceGuild.nodeId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||
// guild per agent (used as the presence-push target). Stores both
|
||||
// endpoint and nodeId — presence-sync needs both: endpoint to build
|
||||
// the URL, nodeId to pick the matching guildAccessToken from a fresh
|
||||
// agent-login response.
|
||||
firstGuildByAgent = new Map();
|
||||
async connectAgent(agentId, session) {
|
||||
const selfUserId = session.user.id;
|
||||
// First-guild capture for presence-sync push target. session.guilds is
|
||||
// already in priority order from Center; we take the first one with a
|
||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||
if (!this.firstGuildByAgent.has(agentId)) {
|
||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||
if (firstGuild)
|
||||
this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok)
|
||||
@@ -88,23 +266,94 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set();
|
||||
const syncChannels = async (kind) => {
|
||||
let freshTok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
|
||||
const channels = res.ok ? (await res.json()) : [];
|
||||
for (const c of channels)
|
||||
socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
}
|
||||
catch {
|
||||
/* best effort */
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
|
||||
if (!res.ok)
|
||||
return;
|
||||
const channels = (await res.json());
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
|
||||
}
|
||||
else if (added > 0 || removed > 0) {
|
||||
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
|
||||
}
|
||||
}
|
||||
catch {
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
// Push-based membership events from the backend (companion to
|
||||
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
|
||||
// server tells us this user was added to / removed from a
|
||||
// channel, we sub/unsub the socket.io room immediately — no
|
||||
// 60s wait for the polling resync. Polling remains as a safety
|
||||
// net for missed events.
|
||||
socket.on('channel.joined', (evt) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || joined.has(id))
|
||||
return;
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
socket.on('channel.left', (evt) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || !joined.has(id))
|
||||
return;
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId)
|
||||
return;
|
||||
// Record xType into the channel-meta cache before self-author
|
||||
// / dedup gates — channel type doesn't depend on who sent the
|
||||
// message, and recording it on observer-only triage messages
|
||||
// is still useful (the next consumer asking
|
||||
// __fabric.getChannelType wants the answer regardless of
|
||||
// whether THIS message was delivered to an agent).
|
||||
recordChannelType(channelId, m.xType);
|
||||
if (m.authorUserId && m.authorUserId === selfUserId)
|
||||
return;
|
||||
const key = `${agentId}:${m.messageId}`;
|
||||
@@ -113,7 +362,31 @@ export class FabricInbound {
|
||||
this.seen.add(key);
|
||||
if (this.seen.size > 5000)
|
||||
this.seen.clear();
|
||||
void this.dispatch(agentId, g, channelId, m, session);
|
||||
// Per-channel serial queue. Prevents concurrent model turns for
|
||||
// the same channel — important for triage where a second wake
|
||||
// arriving mid-reply would interleave with the in-flight one.
|
||||
this.enqueueChannelTask(channelId, async () => {
|
||||
// Triage on_call gate: if the on-duty agent isn't currently
|
||||
// on_call per HF, don't dispatch yet — just sit on the
|
||||
// per-channel queue. Subsequent triage messages will recheck;
|
||||
// when the agent becomes on_call, the next arrival drains.
|
||||
//
|
||||
// Also handles: triage + wake=true must verify status before
|
||||
// committing to a model turn. Non-triage and triage observer
|
||||
// (wake=false) skip the gate.
|
||||
if (m.xType === 'triage' && m.wakeup === true) {
|
||||
const onCall = await this.checkAgentOnCall(agentId);
|
||||
if (!onCall) {
|
||||
this.log.info(`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`);
|
||||
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
|
||||
return;
|
||||
}
|
||||
// Drain any previously-gated messages (FIFO) before this one,
|
||||
// now that we know the agent is on_call.
|
||||
await this.drainGatedFor(agentId);
|
||||
}
|
||||
await this.dispatch(agentId, g, channelId, m, session);
|
||||
});
|
||||
});
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
@@ -165,11 +438,19 @@ export class FabricInbound {
|
||||
const core = this.core;
|
||||
const cfg = this.cfg;
|
||||
try {
|
||||
// Route by xType. DM channels need peer.kind='direct' so openclaw
|
||||
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
|
||||
// and ctx.ChatType='direct') rather than as a multi-party group.
|
||||
// Without this, the agent's user-prompt metadata says
|
||||
// 'is_group_chat: true' on a DM and downstream prompt logic
|
||||
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||
// misclassifies the turn.
|
||||
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg: this.cfg,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
peer: { kind: 'group', id: channelId },
|
||||
peer: { kind: peerKind, id: channelId },
|
||||
});
|
||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
@@ -183,7 +464,7 @@ export class FabricInbound {
|
||||
To: `fabric:${channelId}`,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId ?? agentId,
|
||||
ChatType: 'group',
|
||||
ChatType: chatType,
|
||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||
SenderId: m.authorUserId ?? 'fabric',
|
||||
Provider: 'fabric',
|
||||
@@ -199,7 +480,22 @@ export class FabricInbound {
|
||||
// the woken speaker emits a normal message or /no-reply). We still
|
||||
// record the message into the agent's session so it has the full
|
||||
// channel conversation as context whenever it IS later woken.
|
||||
if (m.wakeup !== true) {
|
||||
//
|
||||
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
|
||||
// any message that isn't the agent's own (already filtered above) is
|
||||
// always delivered to the model.
|
||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||
// Triage exception: non-wake messages (admin observer) MUST NOT
|
||||
// enter the agent's session at all. The next time the agent
|
||||
// wakes for a triage message, their context should contain only
|
||||
// their own past wakeups + their own outgoing messages — never
|
||||
// the observer-only chatter from other agents. For non-triage
|
||||
// channels keep the legacy "record-as-history" so a later wake
|
||||
// sees the full channel conversation.
|
||||
if (m.xType === 'triage') {
|
||||
this.log.info(`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`);
|
||||
return;
|
||||
}
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||
await core.channel.session.recordInboundSession({
|
||||
storePath,
|
||||
@@ -244,8 +540,16 @@ export class FabricInbound {
|
||||
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
|
||||
if (!text || !gt)
|
||||
return;
|
||||
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
|
||||
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
|
||||
// Buffer segments; the merged message is posted right after
|
||||
// dispatch returns (the deterministic turn boundary, see the
|
||||
// finally below). Disable per channel: channels.fabric.coalesce.
|
||||
await enqueueDelivery({
|
||||
channelId,
|
||||
text,
|
||||
coalesce: resolveCoalesce(this.cfg),
|
||||
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
|
||||
log: (m) => this.log.info(m),
|
||||
});
|
||||
},
|
||||
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
|
||||
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
|
||||
@@ -270,5 +574,12 @@ export class FabricInbound {
|
||||
catch (err) {
|
||||
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
||||
}
|
||||
finally {
|
||||
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
|
||||
// resolves AFTER every deliver() call of this turn has run, so the
|
||||
// buffer now holds all segments — flush them as ONE Fabric message.
|
||||
// No hooks, no timers, no idle guessing.
|
||||
await flushFabricForChannel(channelId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
139
dist/fabric/src/presence-sync.js
vendored
Normal file
139
dist/fabric/src/presence-sync.js
vendored
Normal file
@@ -0,0 +1,139 @@
|
||||
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
|
||||
// safely inside the window even if a tick runs late.
|
||||
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
|
||||
export class PresenceSync {
|
||||
logger;
|
||||
client;
|
||||
timer = null;
|
||||
lastStatus = new Map(); // by agentId
|
||||
accounts = new Map();
|
||||
tokenCache = new Map(); // by agentId
|
||||
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||
// for the previous tick to finish — without this guard the next tick
|
||||
// fires on top of a still-running one and two parallel iterations
|
||||
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||
inflight = false;
|
||||
constructor(logger, client) {
|
||||
this.logger = logger;
|
||||
this.client = client;
|
||||
}
|
||||
setAccounts(accounts) {
|
||||
this.accounts.clear();
|
||||
for (const a of accounts)
|
||||
this.accounts.set(a.agentId, a);
|
||||
}
|
||||
start(intervalMs = 30_000) {
|
||||
if (this.timer)
|
||||
return;
|
||||
this.timer = setInterval(() => {
|
||||
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
|
||||
}, intervalMs);
|
||||
// run once immediately so initial state lands fast
|
||||
void this.tick();
|
||||
}
|
||||
stop() {
|
||||
if (this.timer) {
|
||||
clearInterval(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Fetch a fresh guildAccessToken for `acct`, caching it under the
|
||||
* agentId until just before its JWT expiry. Returns null on login
|
||||
* failure or if the session has no matching guild — caller logs +
|
||||
* skips the PUT.
|
||||
*/
|
||||
async ensureGuildToken(acct) {
|
||||
const now = Date.now();
|
||||
const cached = this.tokenCache.get(acct.agentId);
|
||||
if (cached && cached.expiresAt > now)
|
||||
return cached.token;
|
||||
let session;
|
||||
try {
|
||||
session = await this.client.agentLogin(acct.fabricApiKey);
|
||||
}
|
||||
catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
|
||||
if (!entry?.token) {
|
||||
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
|
||||
return null;
|
||||
}
|
||||
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
|
||||
return entry.token;
|
||||
}
|
||||
async tick() {
|
||||
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||
// overlapping ticks rather than letting them run concurrently —
|
||||
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||
// beat costs nothing the next beat won't catch.
|
||||
if (this.inflight)
|
||||
return;
|
||||
this.inflight = true;
|
||||
try {
|
||||
await this.tickInner();
|
||||
}
|
||||
finally {
|
||||
this.inflight = false;
|
||||
}
|
||||
}
|
||||
async tickInner() {
|
||||
const bridge = globalThis['__hfAgentStatus'];
|
||||
if (!bridge || typeof bridge.get !== 'function')
|
||||
return; // HF plugin not loaded — skip
|
||||
for (const [agentId, acct] of this.accounts) {
|
||||
let status;
|
||||
try {
|
||||
status = await bridge.get(agentId);
|
||||
}
|
||||
catch {
|
||||
continue;
|
||||
}
|
||||
if (!status)
|
||||
continue;
|
||||
if (this.lastStatus.get(agentId) === status)
|
||||
continue; // no change → no PUT
|
||||
const guildToken = await this.ensureGuildToken(acct);
|
||||
if (!guildToken)
|
||||
continue;
|
||||
try {
|
||||
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
|
||||
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
|
||||
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
|
||||
// x-api-key and got 401 "missing bearer token" forever. The /api
|
||||
// prefix is required because the guild backend sets a global
|
||||
// 'api' prefix in main.ts setGlobalPrefix('api').
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
const res = await fetch(url, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
authorization: `Bearer ${guildToken}`,
|
||||
},
|
||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||
});
|
||||
if (res.ok) {
|
||||
this.lastStatus.set(agentId, status);
|
||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||
}
|
||||
else {
|
||||
// 401 here usually means the cached token went stale unexpectedly
|
||||
// (server-side rotation or clock skew) — drop the cache so the
|
||||
// next tick re-logs-in.
|
||||
if (res.status === 401)
|
||||
this.tokenCache.delete(agentId);
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||
}
|
||||
}
|
||||
catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
268
dist/fabric/src/tools.js
vendored
268
dist/fabric/src/tools.js
vendored
@@ -25,7 +25,9 @@ export function registerFabricTools(api, client, identity) {
|
||||
// or via static config (channels.fabric.accounts.<agentId>).
|
||||
const makeCreate = (kind) => api.registerTool((ctx) => ({
|
||||
name: `create-${kind}-channel`,
|
||||
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
|
||||
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` +
|
||||
'Optionally pass `purpose` to describe what this channel is for — ' +
|
||||
'agents browse channels by purpose via fabric-channel-list.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
@@ -37,9 +39,16 @@ export function registerFabricTools(api, client, identity) {
|
||||
memberUserIds: { type: 'array', items: { type: 'string' } },
|
||||
onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' },
|
||||
listeners: { type: 'array', items: { type: 'string' } },
|
||||
purpose: {
|
||||
type: 'string',
|
||||
description: "Free-form description of what this channel is for. Optional but " +
|
||||
'strongly recommended so other agents can find this channel by ' +
|
||||
'intent (via fabric-channel-list). Can be edited later with ' +
|
||||
'fabric-channel-set-purpose.',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (p) => {
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
@@ -50,6 +59,7 @@ export function registerFabricTools(api, client, identity) {
|
||||
xType: X_BY_KIND[kind],
|
||||
isPublic: p.isPublic ?? false,
|
||||
memberUserIds: p.memberUserIds ?? [],
|
||||
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
|
||||
});
|
||||
return { ok: true, channelId: ch.id };
|
||||
},
|
||||
@@ -74,7 +84,7 @@ export function registerFabricTools(api, client, identity) {
|
||||
callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' },
|
||||
},
|
||||
},
|
||||
execute: async (p) => {
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
@@ -118,7 +128,7 @@ export function registerFabricTools(api, client, identity) {
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (p) => {
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
@@ -178,7 +188,7 @@ export function registerFabricTools(api, client, identity) {
|
||||
channelId: { type: 'string' },
|
||||
},
|
||||
},
|
||||
execute: async (p) => {
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
@@ -202,4 +212,252 @@ export function registerFabricTools(api, client, identity) {
|
||||
}
|
||||
},
|
||||
}));
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-send-message: post a message into a specific channel.
|
||||
//
|
||||
// Unlike a normal channel reply (which goes back to whatever channel
|
||||
// woke the agent), this lets the agent proactively initiate text into
|
||||
// any channel they are a member of — e.g. ARD broadcasting daily
|
||||
// workload to #agents-room, or triage agent following up on an
|
||||
// already-routed task by commenting in #updates.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-send-message',
|
||||
description: 'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
|
||||
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId', 'content'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
|
||||
},
|
||||
},
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const res = (await client.postMessage(guild.endpoint, token, p.channelId, p.content, session.user.id));
|
||||
return { ok: true, messageId: res.messageId, seq: res.seq };
|
||||
},
|
||||
}));
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-channel-list: enumerate channels the calling agent can see
|
||||
// in a given guild. Backend filters to public channels + channels the
|
||||
// agent is a member of. Returns id / name / xType per channel so the
|
||||
// agent can pick a channelId for fabric-send-message etc.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-channel-list',
|
||||
description: 'List channels visible to the calling agent in a guild. Optional ' +
|
||||
'nameFilter does a case-insensitive substring match client-side. ' +
|
||||
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
|
||||
xType: {
|
||||
type: 'string',
|
||||
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
|
||||
description: 'optional filter by x_type',
|
||||
},
|
||||
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
|
||||
},
|
||||
},
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
|
||||
const needle = (p.nameFilter ?? '').toLowerCase();
|
||||
const filtered = all.filter((c) => {
|
||||
if (!p.includeClosed && c.closed)
|
||||
return false;
|
||||
if (p.xType && c.xType !== p.xType)
|
||||
return false;
|
||||
if (needle && !c.name.toLowerCase().includes(needle))
|
||||
return false;
|
||||
return true;
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
count: filtered.length,
|
||||
channels: filtered.map((c) => ({
|
||||
id: c.id,
|
||||
name: c.name,
|
||||
xType: c.xType,
|
||||
isPublic: c.isPublic,
|
||||
closed: c.closed,
|
||||
lastSeq: c.lastSeq,
|
||||
purpose: c.purpose ?? null,
|
||||
})),
|
||||
};
|
||||
},
|
||||
}));
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-guild-list: enumerate guilds the calling agent belongs to.
|
||||
// Each row carries `purpose` — free-form description of what the
|
||||
// guild is for (admin-set). Use this as the first step when a
|
||||
// workflow says "find the right guild for X" — pick by purpose,
|
||||
// then fabric-channel-list to find the right channel inside it.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-guild-list',
|
||||
description: 'List guilds the calling agent is a member of. Returns ' +
|
||||
'{nodeId, name, purpose, status} per row. ' +
|
||||
"`purpose` is a free-form description of what each guild is for — " +
|
||||
'pick the guild whose purpose matches your intent. Use this tool ' +
|
||||
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
|
||||
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
|
||||
'debate broadcasts" → then list its announce-type channels).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
properties: {
|
||||
nameFilter: {
|
||||
type: 'string',
|
||||
description: 'optional case-insensitive substring match on guild name',
|
||||
},
|
||||
purposeFilter: {
|
||||
type: 'string',
|
||||
description: 'optional case-insensitive substring match on guild purpose ' +
|
||||
'(e.g. "debate", "announcements")',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const entry = identity.findByAgentId(agentId);
|
||||
if (!entry)
|
||||
return { ok: false, error: `agent ${agentId} not registered` };
|
||||
const session = await client.agentLogin(entry.fabricApiKey);
|
||||
const nameNeedle = (p.nameFilter ?? '').toLowerCase();
|
||||
const purposeNeedle = (p.purposeFilter ?? '').toLowerCase();
|
||||
const guilds = session.guilds.filter((g) => {
|
||||
if (nameNeedle && !g.name.toLowerCase().includes(nameNeedle))
|
||||
return false;
|
||||
if (purposeNeedle) {
|
||||
const purp = (g.purpose ?? '').toLowerCase();
|
||||
if (!purp.includes(purposeNeedle))
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
count: guilds.length,
|
||||
guilds: guilds.map((g) => ({
|
||||
nodeId: g.nodeId,
|
||||
name: g.name,
|
||||
status: g.status,
|
||||
purpose: g.purpose ?? null,
|
||||
})),
|
||||
};
|
||||
},
|
||||
}));
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-channel-set-purpose: set/update a channel's free-form
|
||||
// purpose description. Caller must be a channel member (or the
|
||||
// channel must be public). Use this to backfill purpose on existing
|
||||
// channels, or to refine it after a channel's role evolves.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-channel-set-purpose',
|
||||
description: "Set or update a channel's free-form purpose description. " +
|
||||
'Channel membership required (or the channel must be public). ' +
|
||||
'Pass empty string to clear. Use this to make a channel ' +
|
||||
'discoverable to other agents via fabric-channel-list.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId', 'purpose'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
purpose: {
|
||||
type: 'string',
|
||||
description: "What this channel is for. Pass '' (empty string) to clear.",
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const res = await client.setChannelPurpose(guild.endpoint, token, p.channelId, p.purpose);
|
||||
return { ok: true, channel: res };
|
||||
},
|
||||
}));
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-message-history: read a channel's recent message history by
|
||||
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
|
||||
// the last `limit` messages (limit defaults to 20, max 200).
|
||||
//
|
||||
// Use cases: catch-up on a channel that was muted while the agent was
|
||||
// gated; verify a previous message went through; lookup recent
|
||||
// duplicates before opening a new task in triage.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-message-history',
|
||||
description: "Read a channel's recent message history. Omit seqFrom/seqTo to " +
|
||||
'tail (last `limit` messages, default 20, max 200). Backend ' +
|
||||
'requires the calling agent to be a channel participant.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
|
||||
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
|
||||
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
|
||||
},
|
||||
},
|
||||
execute: async (_id, p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const limit = p.limit ?? 20;
|
||||
// Tail mode: discover channel head via channel listing, then ask
|
||||
// for [head-limit+1, head]. Avoids needing the agent to know seq.
|
||||
let seqFrom = p.seqFrom;
|
||||
let seqTo = p.seqTo;
|
||||
if (seqFrom === undefined && seqTo === undefined) {
|
||||
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
|
||||
const ch = channels.find((c) => c.id === p.channelId);
|
||||
const head = ch?.lastSeq ?? 0;
|
||||
seqFrom = Math.max(1, head - limit + 1);
|
||||
seqTo = head;
|
||||
}
|
||||
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
|
||||
seqFrom,
|
||||
seqTo,
|
||||
limit,
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
page: res.page,
|
||||
messages: res.items.map((m) => ({
|
||||
messageId: m.messageId,
|
||||
seq: m.seq,
|
||||
authorUserId: m.authorUserId,
|
||||
content: m.content,
|
||||
createdAt: m.createdAt,
|
||||
isDeleted: m.isDeleted,
|
||||
})),
|
||||
};
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
58
index.ts
58
index.ts
@@ -6,17 +6,25 @@
|
||||
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
||||
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
|
||||
import { fabricChannelPlugin } from './src/channel.js';
|
||||
import { flushAllFabric } from './src/coalesce.js';
|
||||
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
|
||||
import { FabricInbound } from './src/inbound.js';
|
||||
import { listEnabledFabricAccounts } from './src/accounts.js';
|
||||
import { registerFabricTools } from './src/tools.js';
|
||||
import { FabricClient } from './src/fabric-client.js';
|
||||
import { IdentityRegistry } from './src/identity.js';
|
||||
import { syncFabricCommands } from './src/command-sync.js';
|
||||
import { PresenceSync } from './src/presence-sync.js';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
|
||||
let runtimeRef: unknown = null;
|
||||
let inbound: FabricInbound | null = null;
|
||||
let presence: PresenceSync | null = null;
|
||||
// Periodic re-harvest of presence accounts so newly-connected agents
|
||||
// (registered through tool-based identity flow AFTER initial start)
|
||||
// get picked up. Cleared on gateway_stop.
|
||||
let presenceRefreshTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
export { fabricChannelPlugin } from './src/channel.js';
|
||||
|
||||
@@ -37,7 +45,7 @@ export default defineChannelPluginEntry({
|
||||
config?: unknown;
|
||||
pluginConfig?: { identityFilePath?: string };
|
||||
logger: { info: (m: string) => void; warn: (m: string) => void };
|
||||
on: (ev: string, fn: () => void) => void;
|
||||
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
|
||||
registerTool: (d: unknown) => void;
|
||||
};
|
||||
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
|
||||
@@ -55,6 +63,27 @@ export default defineChannelPluginEntry({
|
||||
identity,
|
||||
);
|
||||
|
||||
// Cross-plugin API: globalThis.__fabric
|
||||
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
|
||||
// injection to DM-typed channels only. The channel-meta cache is
|
||||
// populated lazily from inbound (message.created carries xType) and
|
||||
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
|
||||
// very first DM after a fresh gateway start hits cache from the
|
||||
// previous run rather than firing the injector on the wrong type.
|
||||
//
|
||||
// null return = channel never seen (cache cold). Callers MUST NOT
|
||||
// fall back to "assume DM" — fail closed on unknown.
|
||||
{
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
_G['__fabric'] = { getChannelType };
|
||||
// Flush channel-meta cache when the gateway shuts down so
|
||||
// recently-recorded xType entries don't get lost.
|
||||
api.on('gateway_stop', () => {
|
||||
try { flushChannelMeta(); } catch { /* ignore */ }
|
||||
});
|
||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
||||
}
|
||||
|
||||
api.on('gateway_start', () => {
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
if (_G._fabricInboundStarted) return;
|
||||
@@ -81,12 +110,37 @@ export default defineChannelPluginEntry({
|
||||
api.logger,
|
||||
accounts,
|
||||
);
|
||||
void inbound.start();
|
||||
// start() resolves once all accounts have attempted login; per-
|
||||
// agent failures are logged but don't reject. Once it resolves we
|
||||
// can harvest the presence accounts (those that DID log in have
|
||||
// their fabricUserId + first guild endpoint populated).
|
||||
void inbound.start().then(() => {
|
||||
if (!inbound) return;
|
||||
presence = new PresenceSync(api.logger, client);
|
||||
presence.setAccounts(inbound.getPresenceAccounts());
|
||||
presence.start();
|
||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||
|
||||
// Re-harvest every 5 min: catches agents added via tool-based
|
||||
// identity provisioning after gateway_start (recruitment flow).
|
||||
// setAccounts is idempotent — duplicates collapse on agentId.
|
||||
presenceRefreshTimer = setInterval(() => {
|
||||
if (inbound && presence) presence.setAccounts(inbound.getPresenceAccounts());
|
||||
}, 5 * 60_000);
|
||||
});
|
||||
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
|
||||
void syncFabricCommands(client, cfg, accounts, api.logger);
|
||||
});
|
||||
|
||||
// Note: the per-turn coalesce flush happens deterministically in
|
||||
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
|
||||
// is the real "all deliveries done" boundary; the agent_end hook fires
|
||||
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
|
||||
api.on('gateway_stop', () => {
|
||||
void flushAllFabric();
|
||||
if (presenceRefreshTimer) { clearInterval(presenceRefreshTimer); presenceRefreshTimer = null; }
|
||||
presence?.stop();
|
||||
presence = null;
|
||||
inbound?.stop();
|
||||
inbound = null;
|
||||
});
|
||||
|
||||
@@ -14,7 +14,14 @@
|
||||
"create-work-channel",
|
||||
"create-report-channel",
|
||||
"create-discussion-channel",
|
||||
"discussion-complete"
|
||||
"discussion-complete",
|
||||
"fabric-canvas",
|
||||
"fabric-channel",
|
||||
"fabric-send-message",
|
||||
"fabric-channel-list",
|
||||
"fabric-message-history",
|
||||
"fabric-guild-list",
|
||||
"fabric-channel-set-purpose"
|
||||
]
|
||||
},
|
||||
"configSchema": {
|
||||
@@ -44,6 +51,10 @@
|
||||
"minLength": 1,
|
||||
"description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js"
|
||||
},
|
||||
"coalesce": {
|
||||
"type": "boolean",
|
||||
"description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting."
|
||||
},
|
||||
"dmSecurity": { "type": "string" },
|
||||
"dmPolicy": { "type": "string" },
|
||||
"enabled": { "type": "boolean" },
|
||||
|
||||
@@ -18,6 +18,11 @@ export type FabricChannelConfig = {
|
||||
// (Guild C-2). Required by the channel config schema; sourced from config
|
||||
// only — never from the environment.
|
||||
commandsSyncKey?: string;
|
||||
// Coalesce an agent turn that OpenClaw split into multiple deliveries
|
||||
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
|
||||
// message. The flush boundary is the deterministic `agent_end` hook (not
|
||||
// a timer). Default true; set false for raw per-segment posting.
|
||||
coalesce?: boolean;
|
||||
accounts?: Record<string, FabricAccountConfig>;
|
||||
defaultAccount?: string;
|
||||
} & FabricAccountConfig;
|
||||
@@ -46,6 +51,12 @@ export function resolveCommandsSyncKey(cfg: Cfg): string {
|
||||
return (section(cfg).commandsSyncKey ?? '').trim();
|
||||
}
|
||||
|
||||
// Whether to coalesce a split agent turn into one Fabric message
|
||||
// (channel-level). Default true.
|
||||
export function resolveCoalesce(cfg: Cfg): boolean {
|
||||
return (cfg.channels?.fabric ?? {}).coalesce !== false;
|
||||
}
|
||||
|
||||
export function listFabricAccountIds(cfg: Cfg): string[] {
|
||||
const accts = section(cfg).accounts ?? {};
|
||||
const ids = Object.keys(accts);
|
||||
|
||||
108
src/channel-meta.ts
Normal file
108
src/channel-meta.ts
Normal file
@@ -0,0 +1,108 @@
|
||||
/**
|
||||
* Channel-meta cache. Records (channelId → xType) for every fabric
|
||||
* channel the gateway has seen at least one inbound message in.
|
||||
*
|
||||
* Populated lazily from inbound (`recordChannelType` is called for
|
||||
* every `message.created` event with non-empty `xType`). Persisted to
|
||||
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
|
||||
* gateway restarts (so the very first DM after restart still gets the
|
||||
* right xType without waiting for a fresh inbound).
|
||||
*
|
||||
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
|
||||
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
|
||||
* to xType==='dm' only.
|
||||
*
|
||||
* Failure mode: lookup misses (channel never seen / inbound dropped
|
||||
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
|
||||
* fall back to "assume DM" or you re-introduce the false-positive on
|
||||
* group channels.
|
||||
*/
|
||||
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
|
||||
import { dirname, join } from 'node:path';
|
||||
import { homedir } from 'node:os';
|
||||
|
||||
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
|
||||
|
||||
interface ChannelMetaFile {
|
||||
// channelId → xType ('dm' | 'triage' | 'group' | etc.)
|
||||
channels: Record<string, string>;
|
||||
}
|
||||
|
||||
let memory = new Map<string, string>();
|
||||
let loaded = false;
|
||||
let dirty = false;
|
||||
let flushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
|
||||
function load(): void {
|
||||
if (loaded) return;
|
||||
loaded = true;
|
||||
try {
|
||||
if (!existsSync(CACHE_FILE)) return;
|
||||
const raw = readFileSync(CACHE_FILE, 'utf8');
|
||||
const parsed = JSON.parse(raw) as ChannelMetaFile;
|
||||
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
|
||||
if (typeof k === 'string' && typeof v === 'string') memory.set(k, v);
|
||||
}
|
||||
} catch {
|
||||
// ignore — start with empty cache on corruption
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleFlush(): void {
|
||||
if (flushTimer) return;
|
||||
// Debounce writes — many inbound messages may arrive in a burst.
|
||||
// 250ms coalesces them; on gateway_stop the channel plugin can force
|
||||
// a synchronous flush via flushChannelMeta().
|
||||
flushTimer = setTimeout(() => {
|
||||
flushTimer = null;
|
||||
if (!dirty) return;
|
||||
dirty = false;
|
||||
flushSync();
|
||||
}, 250);
|
||||
}
|
||||
|
||||
function flushSync(): void {
|
||||
try {
|
||||
const dir = dirname(CACHE_FILE);
|
||||
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
|
||||
const out: ChannelMetaFile = { channels: Object.fromEntries(memory) };
|
||||
const tmp = CACHE_FILE + '.tmp';
|
||||
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
|
||||
renameSync(tmp, CACHE_FILE);
|
||||
} catch {
|
||||
// swallow — cache is an optimization; loss-on-write is recoverable
|
||||
}
|
||||
}
|
||||
|
||||
/** Called by inbound on every message.created. xType empty → no-op. */
|
||||
export function recordChannelType(channelId: string, xType: string | undefined): void {
|
||||
if (!channelId || !xType) return;
|
||||
load();
|
||||
const existing = memory.get(channelId);
|
||||
if (existing === xType) return;
|
||||
memory.set(channelId, xType);
|
||||
dirty = true;
|
||||
scheduleFlush();
|
||||
}
|
||||
|
||||
/** Cross-plugin lookup. null when channel never seen / unknown. */
|
||||
export function getChannelType(channelId: string): string | null {
|
||||
if (!channelId) return null;
|
||||
load();
|
||||
return memory.get(channelId) ?? null;
|
||||
}
|
||||
|
||||
/** Force-flush — called on plugin shutdown to make sure recently
|
||||
* recorded entries hit disk before the gateway dies. */
|
||||
export function flushChannelMeta(): void {
|
||||
if (flushTimer) {
|
||||
clearTimeout(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
if (dirty) {
|
||||
dirty = false;
|
||||
flushSync();
|
||||
}
|
||||
}
|
||||
|
||||
export const CHANNEL_META_PATH = CACHE_FILE;
|
||||
@@ -21,6 +21,39 @@ import {
|
||||
resolveDefaultFabricAccountId,
|
||||
type ResolvedFabricAccount,
|
||||
} from './accounts.js';
|
||||
import { getChannelType } from './channel-meta.js';
|
||||
|
||||
/**
|
||||
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
|
||||
*
|
||||
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
|
||||
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
|
||||
* 'direct' | 'group' | 'channel'. We collapse:
|
||||
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
|
||||
* - rest → 'group' (multi-party; turn-engine gates speech)
|
||||
*
|
||||
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
|
||||
* otherwise the agent's outbound message lands in a different session
|
||||
* than the inbound that triggered it and conversation state splits.
|
||||
*
|
||||
* Outbound has no live xType (the agent target is just a channelId), so
|
||||
* it consults the channel-meta cache populated by inbound. Cache miss
|
||||
* (channel never observed) falls back to 'group' — same as the pre-fix
|
||||
* behavior, no regression on cold cache. The proactive-DM-first-message
|
||||
* edge case (agent DMs a channel before any inbound) still lands as
|
||||
* 'group' on that one outbound; the next inbound + outbound pair will
|
||||
* agree on 'direct'.
|
||||
*/
|
||||
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
|
||||
|
||||
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
|
||||
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
|
||||
return { peerKind: 'group', chatType: 'group' };
|
||||
}
|
||||
|
||||
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
|
||||
return fabricPeerRoutingForXType(getChannelType(channelId));
|
||||
}
|
||||
|
||||
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
|
||||
|
||||
@@ -45,13 +78,18 @@ export function looksLikeFabricTargetId(raw: string): boolean {
|
||||
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
|
||||
const id = stripFabricTargetPrefix(params.target);
|
||||
if (!id) return null;
|
||||
// Consult the channel-meta cache populated by inbound — DM channels
|
||||
// need peer.kind='direct' so the outbound session key matches the
|
||||
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
|
||||
// no regression on cold cache).
|
||||
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
|
||||
return buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: 'fabric',
|
||||
accountId: params.accountId,
|
||||
peer: { kind: 'group', id },
|
||||
chatType: 'group',
|
||||
peer: { kind: peerKind, id },
|
||||
chatType,
|
||||
from: `fabric:channel:${id}`,
|
||||
to: `fabric:${id}`,
|
||||
});
|
||||
@@ -159,7 +197,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
|
||||
cfg?: unknown;
|
||||
config?: unknown;
|
||||
}) => {
|
||||
// openclaw passes config under cfg or config depending on path
|
||||
// openclaw passes config under cfg or config depending on path.
|
||||
// Note: inbound agent replies go through inbound.ts `deliver`
|
||||
// (where turn coalescing happens). This path is for any direct
|
||||
// outbound sends and posts immediately.
|
||||
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
|
||||
try {
|
||||
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);
|
||||
|
||||
93
src/coalesce.ts
Normal file
93
src/coalesce.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
// Deterministic turn coalescer.
|
||||
//
|
||||
// OpenClaw calls the Fabric `deliver` callback once per assistant text
|
||||
// segment; a thinking/tool block between two text blocks is a delivery
|
||||
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
|
||||
// multiple deliver() calls. There is no turn id on the delivery, so we
|
||||
// BUFFER segments by Fabric channelId and post the merged message when the
|
||||
// turn truly ends. The flush is driven by inbound.ts right after
|
||||
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
|
||||
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
|
||||
// agent_end hook, which fires before deliver()). `coalesce=false` posts
|
||||
// each segment immediately.
|
||||
|
||||
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
|
||||
|
||||
export function normChannelId(x: string | null | undefined): string {
|
||||
const s = String(x ?? '');
|
||||
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
|
||||
}
|
||||
|
||||
type Pending = {
|
||||
parts: string[];
|
||||
post: (text: string) => Promise<void>;
|
||||
log?: (m: string) => void;
|
||||
safety: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
const pendingByChannel = new Map<string, Pending>();
|
||||
|
||||
async function flushChannel(channelId: string, reason: string): Promise<void> {
|
||||
const p = pendingByChannel.get(channelId);
|
||||
if (!p) return;
|
||||
pendingByChannel.delete(channelId);
|
||||
clearTimeout(p.safety);
|
||||
const text = p.parts.join('\n\n').trim();
|
||||
if (!text) return;
|
||||
try {
|
||||
await p.post(text);
|
||||
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
|
||||
} catch (e) {
|
||||
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Buffer one delivered segment (or send immediately when coalesce=false).
|
||||
// `post` performs the real Fabric postMessage with the caller's already
|
||||
// resolved guild/token; on flush it is called once with the merged text.
|
||||
export async function enqueueDelivery(params: {
|
||||
channelId: string;
|
||||
text: string;
|
||||
coalesce: boolean;
|
||||
post: (text: string) => Promise<void>;
|
||||
log?: (m: string) => void;
|
||||
}): Promise<void> {
|
||||
const cid = normChannelId(params.channelId);
|
||||
const text = (params.text ?? '').trim();
|
||||
if (!text) return;
|
||||
if (!params.coalesce) {
|
||||
await params.post(text);
|
||||
return;
|
||||
}
|
||||
const existing = pendingByChannel.get(cid);
|
||||
if (existing) {
|
||||
existing.parts.push(text);
|
||||
existing.post = params.post; // freshest guild/token closure
|
||||
existing.log = params.log;
|
||||
} else {
|
||||
pendingByChannel.set(cid, {
|
||||
parts: [text],
|
||||
post: params.post,
|
||||
log: params.log,
|
||||
safety: setTimeout(
|
||||
() => void flushChannel(cid, 'safety-timeout'),
|
||||
SAFETY_FLUSH_MS,
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Called by the agent_end hook with the hook ctx's channelId (bare or
|
||||
// fabric:-prefixed). Deterministic per-turn boundary.
|
||||
export async function flushFabricForChannel(
|
||||
rawChannelId: string | null | undefined,
|
||||
): Promise<void> {
|
||||
const cid = normChannelId(rawChannelId);
|
||||
if (cid) await flushChannel(cid, 'dispatch-end');
|
||||
}
|
||||
|
||||
// gateway_stop: flush anything still buffered.
|
||||
export async function flushAllFabric(): Promise<void> {
|
||||
for (const cid of [...pendingByChannel.keys()]) {
|
||||
await flushChannel(cid, 'gateway_stop');
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,15 @@ export type FabricSession = {
|
||||
accessToken: string;
|
||||
refreshToken: string;
|
||||
user: { id: string; email: string; name: string };
|
||||
guilds: Array<{ nodeId: string; name: string; endpoint: string; status: string }>;
|
||||
guilds: Array<{
|
||||
nodeId: string;
|
||||
name: string;
|
||||
endpoint: string;
|
||||
status: string;
|
||||
// free-form description of this guild's role; admin-set on Center.
|
||||
// null when the admin hasn't filled it in yet.
|
||||
purpose?: string | null;
|
||||
}>;
|
||||
guildAccessTokens: Array<{ guildNodeId: string; token: string }>;
|
||||
};
|
||||
|
||||
@@ -100,11 +108,30 @@ export class FabricClient {
|
||||
memberUserIds?: string[];
|
||||
onDuty?: string;
|
||||
listeners?: string[];
|
||||
// free-form purpose; optional. Existing agents can also set/update
|
||||
// it later via setChannelPurpose().
|
||||
purpose?: string;
|
||||
},
|
||||
): Promise<{ id: string }> {
|
||||
return this.post(`${guildEndpoint}/api/channels`, body, guildToken);
|
||||
}
|
||||
|
||||
// PATCH /api/channels/:id — backend currently only patches `purpose`.
|
||||
// Caller must be a member of the channel (or any user if public).
|
||||
setChannelPurpose(
|
||||
guildEndpoint: string,
|
||||
guildToken: string,
|
||||
channelId: string,
|
||||
purpose: string,
|
||||
): Promise<{ id: string; name: string; xType: string; purpose: string | null }> {
|
||||
return this.req(
|
||||
'PATCH',
|
||||
`${guildEndpoint}/api/channels/${channelId}`,
|
||||
guildToken,
|
||||
{ purpose },
|
||||
);
|
||||
}
|
||||
|
||||
closeChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
|
||||
return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken);
|
||||
}
|
||||
@@ -190,6 +217,77 @@ export class FabricClient {
|
||||
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
|
||||
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
|
||||
}
|
||||
|
||||
// ---- channel discovery + message read (used by the agent-facing
|
||||
// fabric-channel-list / fabric-message-history tools) ----
|
||||
|
||||
/**
|
||||
* List channels in a guild visible to the calling user. Backend
|
||||
* filters to public + channels the user is a member of.
|
||||
*/
|
||||
listChannels(
|
||||
guildEndpoint: string,
|
||||
guildToken: string,
|
||||
guildNodeId: string,
|
||||
): Promise<Array<{
|
||||
id: string;
|
||||
guildId: string;
|
||||
name: string;
|
||||
xType: string;
|
||||
kind: string;
|
||||
isPublic: boolean;
|
||||
closed: boolean;
|
||||
lastSeq: number;
|
||||
createdAt: string;
|
||||
purpose?: string | null;
|
||||
}>> {
|
||||
return this.req(
|
||||
'GET',
|
||||
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
|
||||
guildToken,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Page through a channel's message history by `seq`.
|
||||
*
|
||||
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
|
||||
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
|
||||
* tail. Page metadata in the response describes what to ask next.
|
||||
*/
|
||||
listMessages(
|
||||
guildEndpoint: string,
|
||||
guildToken: string,
|
||||
channelId: string,
|
||||
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
|
||||
): Promise<{
|
||||
items: Array<{
|
||||
messageId: string;
|
||||
seq: number;
|
||||
content: string;
|
||||
authorUserId: string;
|
||||
createdAt: string;
|
||||
editedAt: string | null;
|
||||
deletedAt: string | null;
|
||||
isDeleted: boolean;
|
||||
}>;
|
||||
page: {
|
||||
seqFrom: number;
|
||||
seqTo: number;
|
||||
limit: number;
|
||||
returned: number;
|
||||
hasMore: boolean;
|
||||
nextExpectedSeq: number;
|
||||
highestCommittedSeq: number;
|
||||
};
|
||||
}> {
|
||||
const qs = new URLSearchParams();
|
||||
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
|
||||
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
|
||||
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
|
||||
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
|
||||
return this.req('GET', url, guildToken);
|
||||
}
|
||||
}
|
||||
|
||||
export type CanvasFormat = 'md' | 'html' | 'text';
|
||||
|
||||
368
src/inbound.ts
368
src/inbound.ts
@@ -5,6 +5,10 @@ import { io, type Socket } from 'socket.io-client';
|
||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||
import type { FabricClient, FabricSession } from './fabric-client.js';
|
||||
import type { IdentityRegistry } from './identity.js';
|
||||
import { resolveCoalesce } from './accounts.js';
|
||||
import { fabricPeerRoutingForXType } from './channel.js';
|
||||
import { recordChannelType } from './channel-meta.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
|
||||
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
|
||||
// channels (nextcloud-talk) drive the kernel:
|
||||
@@ -42,11 +46,22 @@ type FabricMessage = {
|
||||
channelId?: string;
|
||||
attachments?: FabricAttachment[];
|
||||
wakeup?: boolean;
|
||||
// x-type of the channel (sent on message.created). 'dm' bypasses the
|
||||
// wakeup gate: any message that isn't the agent's own is delivered.
|
||||
xType?: string;
|
||||
};
|
||||
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
private seen = new Set<string>();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
private channelSyncTimers: NodeJS.Timeout[] = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -54,6 +69,136 @@ export class FabricInbound {
|
||||
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
||||
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
||||
|
||||
// Per-channel serial work queue. Every inbound socket message for a
|
||||
// channel awaits the previous task for that same channel, so model
|
||||
// turns never interleave. Map key = channelId; value is the tail of
|
||||
// the chain (an in-flight promise the next task awaits).
|
||||
//
|
||||
// Why per-channel and not per-agent: a single agent may sit in
|
||||
// several triage / general channels; we want each channel to flow at
|
||||
// its own speed but the SAME channel's traffic to be strictly serial.
|
||||
// For dm and discuss the queue also serialises but those traditionally
|
||||
// had at-most-one-in-flight anyway via the turn engine.
|
||||
private channelChains = new Map<string, Promise<void>>();
|
||||
|
||||
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
|
||||
// agent/status round-trip off the hot path for back-to-back triage
|
||||
// messages. Short TTL because status flips are rare-but-meaningful.
|
||||
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
|
||||
private static readonly AGENT_STATUS_TTL_MS = 5_000;
|
||||
|
||||
// Triage messages that arrived while the on-duty agent wasn't on_call
|
||||
// — sit here until either (a) the agent becomes on_call and the next
|
||||
// triage arrival drains them, or (b) the gateway restarts (lost; ok
|
||||
// because the underlying Fabric messages are persisted and re-fetched
|
||||
// on agent reconnect's history sweep).
|
||||
private pendingTriageGated: Array<{
|
||||
agentId: string;
|
||||
g: { nodeId: string; endpoint: string };
|
||||
channelId: string;
|
||||
m: FabricMessage;
|
||||
session: FabricSession;
|
||||
}> = [];
|
||||
|
||||
// Schedule `task` to run after every previous task on the same
|
||||
// channel has completed. Returns the promise so callers can await
|
||||
// their own result if they need to; the chain itself is fire-and-
|
||||
// forget from the socket.on handler.
|
||||
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
|
||||
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
|
||||
const next = prev.then(task).catch((err) => {
|
||||
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
|
||||
});
|
||||
this.channelChains.set(channelId, next);
|
||||
// Best-effort cleanup so the Map doesn't grow without bound for
|
||||
// long-running gateways: drop the entry when the chain settles, but
|
||||
// only if it's still the latest reference (newer enqueue may have
|
||||
// overwritten it in the meantime).
|
||||
void next.finally(() => {
|
||||
if (this.channelChains.get(channelId) === next) {
|
||||
this.channelChains.delete(channelId);
|
||||
}
|
||||
});
|
||||
return next;
|
||||
}
|
||||
|
||||
// Hit HF backend to check whether `agentId` is currently on_call.
|
||||
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
|
||||
// on_call" — triage stays gated rather than risking a confused wake.
|
||||
private async checkAgentOnCall(agentId: string): Promise<boolean> {
|
||||
const cached = this.agentStatusCache.get(agentId);
|
||||
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
|
||||
return cached.onCall;
|
||||
}
|
||||
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
|
||||
// CLAW_IDENTIFIER resolution priority:
|
||||
// 1. HF_CLAW_IDENTIFIER env (operator override)
|
||||
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
|
||||
// plugin itself uses — keeps the two in sync without an extra
|
||||
// env per service unit)
|
||||
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
|
||||
// container hostname is `server.t2` but HF agent row has
|
||||
// `claw_identifier=sim-t2`; matching is mandatory for the HF
|
||||
// backend's _require_agent() check)
|
||||
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
|
||||
if (!claw) {
|
||||
try {
|
||||
// openclaw config shape (verified in sim):
|
||||
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
|
||||
const cfg = this.cfg as {
|
||||
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
|
||||
};
|
||||
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
|
||||
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
|
||||
claw = fromCfg.trim();
|
||||
}
|
||||
} catch {
|
||||
/* fall through to hostname */
|
||||
}
|
||||
}
|
||||
if (!claw) {
|
||||
claw = (await import('os')).hostname();
|
||||
}
|
||||
let onCall = false;
|
||||
try {
|
||||
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
|
||||
const res = await fetch(url, {
|
||||
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
|
||||
});
|
||||
if (res.ok) {
|
||||
const data = (await res.json()) as { status?: string };
|
||||
onCall = (data.status ?? '').toLowerCase() === 'on_call';
|
||||
}
|
||||
} catch (err) {
|
||||
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
|
||||
}
|
||||
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
|
||||
return onCall;
|
||||
}
|
||||
|
||||
// FIFO drain of all triage-gated messages for `agentId` (called when
|
||||
// we just learned they're on_call). Each drained message is dispatched
|
||||
// through its own channel chain so per-channel serial order is kept.
|
||||
private async drainGatedFor(agentId: string): Promise<void> {
|
||||
const keep: typeof this.pendingTriageGated = [];
|
||||
const drain: typeof this.pendingTriageGated = [];
|
||||
for (const item of this.pendingTriageGated) {
|
||||
if (item.agentId === agentId) drain.push(item);
|
||||
else keep.push(item);
|
||||
}
|
||||
if (drain.length === 0) return;
|
||||
this.pendingTriageGated = keep;
|
||||
for (const item of drain) {
|
||||
this.log.info(
|
||||
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
|
||||
);
|
||||
// Re-enqueue via the per-channel chain so ordering is preserved.
|
||||
this.enqueueChannelTask(item.channelId, async () => {
|
||||
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Return a fresh guild access token for the agent, re-authenticating with
|
||||
// the agent's Fabric API key when the cached session is stale. Falls back
|
||||
// to the connect-time session token if re-login fails.
|
||||
@@ -114,12 +259,69 @@ export class FabricInbound {
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const t of this.channelSyncTimers) clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-account metadata harvested during `start()` — used by
|
||||
* PresenceSync to know where to push each agent's HF status.
|
||||
*
|
||||
* `fabricUserId` is filled from `session.user.id` after agent-login.
|
||||
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
|
||||
* guild presence push is a future concern; for sim/prod-v1 each agent
|
||||
* is in one guild).
|
||||
*
|
||||
* Returns ONLY agents that successfully connected — failed-login
|
||||
* agents have no fabricUserId yet and are excluded.
|
||||
*/
|
||||
getPresenceAccounts(): Array<{
|
||||
agentId: string;
|
||||
fabricUserId: string;
|
||||
guildBaseUrl: string;
|
||||
guildNodeId: string;
|
||||
fabricApiKey: string;
|
||||
}> {
|
||||
const out: Array<{
|
||||
agentId: string;
|
||||
fabricUserId: string;
|
||||
guildBaseUrl: string;
|
||||
guildNodeId: string;
|
||||
fabricApiKey: string;
|
||||
}> = [];
|
||||
for (const entry of this.identity.list()) {
|
||||
if (!entry.fabricUserId) continue;
|
||||
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
|
||||
if (!presenceGuild) continue;
|
||||
out.push({
|
||||
agentId: entry.agentId,
|
||||
fabricUserId: entry.fabricUserId,
|
||||
guildBaseUrl: presenceGuild.endpoint,
|
||||
guildNodeId: presenceGuild.nodeId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||
// guild per agent (used as the presence-push target). Stores both
|
||||
// endpoint and nodeId — presence-sync needs both: endpoint to build
|
||||
// the URL, nodeId to pick the matching guildAccessToken from a fresh
|
||||
// agent-login response.
|
||||
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
|
||||
|
||||
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
||||
const selfUserId = session.user.id;
|
||||
// First-guild capture for presence-sync push target. session.guilds is
|
||||
// already in priority order from Center; we take the first one with a
|
||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||
if (!this.firstGuildByAgent.has(agentId)) {
|
||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||
if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok) continue;
|
||||
@@ -128,29 +330,129 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set<string>();
|
||||
const syncChannels = async (kind: 'initial' | 'resync') => {
|
||||
let freshTok: string | undefined;
|
||||
try {
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
} catch {
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(
|
||||
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
|
||||
{ headers: { authorization: `Bearer ${tok}` } },
|
||||
{ headers: { authorization: `Bearer ${authTok}` } },
|
||||
);
|
||||
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
|
||||
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
if (!res.ok) return;
|
||||
const channels = (await res.json()) as Array<{ id: string }>;
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
|
||||
);
|
||||
} else if (added > 0 || removed > 0) {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
/* best effort */
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
// Push-based membership events from the backend (companion to
|
||||
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
|
||||
// server tells us this user was added to / removed from a
|
||||
// channel, we sub/unsub the socket.io room immediately — no
|
||||
// 60s wait for the polling resync. Polling remains as a safety
|
||||
// net for missed events.
|
||||
socket.on('channel.joined', (evt: { channelId?: string }) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || joined.has(id)) return;
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
socket.on('channel.left', (evt: { channelId?: string }) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || !joined.has(id)) return;
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
const syncTimer = setInterval(
|
||||
() => void syncChannels('resync'),
|
||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||
);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
// Record xType into the channel-meta cache before self-author
|
||||
// / dedup gates — channel type doesn't depend on who sent the
|
||||
// message, and recording it on observer-only triage messages
|
||||
// is still useful (the next consumer asking
|
||||
// __fabric.getChannelType wants the answer regardless of
|
||||
// whether THIS message was delivered to an agent).
|
||||
recordChannelType(channelId, m.xType);
|
||||
if (m.authorUserId && m.authorUserId === selfUserId) return;
|
||||
const key = `${agentId}:${m.messageId}`;
|
||||
if (this.seen.has(key)) return;
|
||||
this.seen.add(key);
|
||||
if (this.seen.size > 5000) this.seen.clear();
|
||||
void this.dispatch(agentId, g, channelId, m, session);
|
||||
// Per-channel serial queue. Prevents concurrent model turns for
|
||||
// the same channel — important for triage where a second wake
|
||||
// arriving mid-reply would interleave with the in-flight one.
|
||||
this.enqueueChannelTask(channelId, async () => {
|
||||
// Triage on_call gate: if the on-duty agent isn't currently
|
||||
// on_call per HF, don't dispatch yet — just sit on the
|
||||
// per-channel queue. Subsequent triage messages will recheck;
|
||||
// when the agent becomes on_call, the next arrival drains.
|
||||
//
|
||||
// Also handles: triage + wake=true must verify status before
|
||||
// committing to a model turn. Non-triage and triage observer
|
||||
// (wake=false) skip the gate.
|
||||
if (m.xType === 'triage' && m.wakeup === true) {
|
||||
const onCall = await this.checkAgentOnCall(agentId);
|
||||
if (!onCall) {
|
||||
this.log.info(
|
||||
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
|
||||
);
|
||||
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
|
||||
return;
|
||||
}
|
||||
// Drain any previously-gated messages (FIFO) before this one,
|
||||
// now that we know the agent is on_call.
|
||||
await this.drainGatedFor(agentId);
|
||||
}
|
||||
await this.dispatch(agentId, g, channelId, m, session);
|
||||
});
|
||||
});
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
@@ -214,11 +516,19 @@ export class FabricInbound {
|
||||
const core = this.core as Core & Record<string, unknown>;
|
||||
const cfg = this.cfg as { session?: { store?: unknown } };
|
||||
try {
|
||||
// Route by xType. DM channels need peer.kind='direct' so openclaw
|
||||
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
|
||||
// and ctx.ChatType='direct') rather than as a multi-party group.
|
||||
// Without this, the agent's user-prompt metadata says
|
||||
// 'is_group_chat: true' on a DM and downstream prompt logic
|
||||
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||
// misclassifies the turn.
|
||||
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg: this.cfg,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
peer: { kind: 'group', id: channelId },
|
||||
peer: { kind: peerKind, id: channelId },
|
||||
});
|
||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
@@ -233,7 +543,7 @@ export class FabricInbound {
|
||||
To: `fabric:${channelId}`,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId ?? agentId,
|
||||
ChatType: 'group',
|
||||
ChatType: chatType,
|
||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||
SenderId: m.authorUserId ?? 'fabric',
|
||||
Provider: 'fabric',
|
||||
@@ -250,7 +560,24 @@ export class FabricInbound {
|
||||
// the woken speaker emits a normal message or /no-reply). We still
|
||||
// record the message into the agent's session so it has the full
|
||||
// channel conversation as context whenever it IS later woken.
|
||||
if (m.wakeup !== true) {
|
||||
//
|
||||
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
|
||||
// any message that isn't the agent's own (already filtered above) is
|
||||
// always delivered to the model.
|
||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||
// Triage exception: non-wake messages (admin observer) MUST NOT
|
||||
// enter the agent's session at all. The next time the agent
|
||||
// wakes for a triage message, their context should contain only
|
||||
// their own past wakeups + their own outgoing messages — never
|
||||
// the observer-only chatter from other agents. For non-triage
|
||||
// channels keep the legacy "record-as-history" so a later wake
|
||||
// sees the full channel conversation.
|
||||
if (m.xType === 'triage') {
|
||||
this.log.info(
|
||||
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||
await core.channel.session.recordInboundSession({
|
||||
storePath,
|
||||
@@ -301,8 +628,17 @@ export class FabricInbound {
|
||||
const text = (payload?.text ?? '').trim();
|
||||
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
|
||||
if (!text || !gt) return;
|
||||
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
|
||||
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
|
||||
// Buffer segments; the merged message is posted right after
|
||||
// dispatch returns (the deterministic turn boundary, see the
|
||||
// finally below). Disable per channel: channels.fabric.coalesce.
|
||||
await enqueueDelivery({
|
||||
channelId,
|
||||
text,
|
||||
coalesce: resolveCoalesce(this.cfg as never),
|
||||
post: (t) =>
|
||||
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
|
||||
log: (m) => this.log.info(m),
|
||||
});
|
||||
},
|
||||
onRecordError: (err: unknown) =>
|
||||
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
|
||||
@@ -327,6 +663,12 @@ export class FabricInbound {
|
||||
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
|
||||
} catch (err) {
|
||||
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
||||
} finally {
|
||||
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
|
||||
// resolves AFTER every deliver() call of this turn has run, so the
|
||||
// buffer now holds all segments — flush them as ONE Fabric message.
|
||||
// No hooks, no timers, no idle guessing.
|
||||
await flushFabricForChannel(channelId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
177
src/presence-sync.ts
Normal file
177
src/presence-sync.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
/**
|
||||
* presence-sync — read each connected agent's HF status (via the
|
||||
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
||||
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
||||
* `PUT /api/agents/:userId/presence` so the backend can apply
|
||||
* busy-discard on `announce`-type channel deliveries.
|
||||
*
|
||||
* Push model: we only PUT when an agent's status actually changes
|
||||
* (since the last push). The HF-side accessor has its own TTL cache
|
||||
* to absorb the every-30s polling.
|
||||
*
|
||||
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
|
||||
* app.module.js) which expects `Authorization: Bearer <guild-token>`
|
||||
* — NOT the agent's fabricApiKey directly. So before each PUT we do
|
||||
* a fresh agent-login (or reuse a cached token if still within its
|
||||
* 15-min JWT TTL) and pull the guildAccessToken matching the target
|
||||
* guild. Status changes are rare enough that login overhead is fine.
|
||||
*
|
||||
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
||||
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
||||
* treated as not-busy. Announce-channel delivery still works; busy
|
||||
* filtering simply doesn't kick in.
|
||||
*/
|
||||
import type { FabricClient } from './fabric-client.js';
|
||||
|
||||
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
|
||||
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
|
||||
type Logger = { info: (m: string) => void; warn: (m: string) => void };
|
||||
|
||||
export interface PresenceSyncAccount {
|
||||
agentId: string;
|
||||
fabricUserId: string; // the agent's Fabric Center user id (UUID)
|
||||
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
|
||||
guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
|
||||
fabricApiKey: string; // existing per-account key (used for agent-login)
|
||||
}
|
||||
|
||||
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
|
||||
// safely inside the window even if a tick runs late.
|
||||
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
|
||||
|
||||
interface CachedToken {
|
||||
token: string;
|
||||
expiresAt: number; // epoch ms
|
||||
}
|
||||
|
||||
export class PresenceSync {
|
||||
private timer: ReturnType<typeof setInterval> | null = null;
|
||||
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
|
||||
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
||||
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
|
||||
|
||||
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||
// for the previous tick to finish — without this guard the next tick
|
||||
// fires on top of a still-running one and two parallel iterations
|
||||
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||
private inflight = false;
|
||||
|
||||
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
|
||||
|
||||
setAccounts(accounts: PresenceSyncAccount[]): void {
|
||||
this.accounts.clear();
|
||||
for (const a of accounts) this.accounts.set(a.agentId, a);
|
||||
}
|
||||
|
||||
start(intervalMs = 30_000): void {
|
||||
if (this.timer) return;
|
||||
this.timer = setInterval(() => {
|
||||
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
|
||||
}, intervalMs);
|
||||
// run once immediately so initial state lands fast
|
||||
void this.tick();
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
if (this.timer) {
|
||||
clearInterval(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a fresh guildAccessToken for `acct`, caching it under the
|
||||
* agentId until just before its JWT expiry. Returns null on login
|
||||
* failure or if the session has no matching guild — caller logs +
|
||||
* skips the PUT.
|
||||
*/
|
||||
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
|
||||
const now = Date.now();
|
||||
const cached = this.tokenCache.get(acct.agentId);
|
||||
if (cached && cached.expiresAt > now) return cached.token;
|
||||
|
||||
let session;
|
||||
try {
|
||||
session = await this.client.agentLogin(acct.fabricApiKey);
|
||||
} catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
|
||||
if (!entry?.token) {
|
||||
this.logger.warn(
|
||||
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
|
||||
return entry.token;
|
||||
}
|
||||
|
||||
private async tick(): Promise<void> {
|
||||
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||
// overlapping ticks rather than letting them run concurrently —
|
||||
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||
// beat costs nothing the next beat won't catch.
|
||||
if (this.inflight) return;
|
||||
this.inflight = true;
|
||||
try {
|
||||
await this.tickInner();
|
||||
} finally {
|
||||
this.inflight = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async tickInner(): Promise<void> {
|
||||
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
||||
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
||||
|
||||
for (const [agentId, acct] of this.accounts) {
|
||||
let status: HfStatus | undefined;
|
||||
try {
|
||||
status = await bridge.get(agentId);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!status) continue;
|
||||
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
|
||||
|
||||
const guildToken = await this.ensureGuildToken(acct);
|
||||
if (!guildToken) continue;
|
||||
|
||||
try {
|
||||
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
|
||||
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
|
||||
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
|
||||
// x-api-key and got 401 "missing bearer token" forever. The /api
|
||||
// prefix is required because the guild backend sets a global
|
||||
// 'api' prefix in main.ts setGlobalPrefix('api').
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
const res = await fetch(url, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
authorization: `Bearer ${guildToken}`,
|
||||
},
|
||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||
});
|
||||
if (res.ok) {
|
||||
this.lastStatus.set(agentId, status);
|
||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||
} else {
|
||||
// 401 here usually means the cached token went stale unexpectedly
|
||||
// (server-side rotation or clock skew) — drop the cache so the
|
||||
// next tick re-logs-in.
|
||||
if (res.status === 401) this.tokenCache.delete(agentId);
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
295
src/tools.ts
295
src/tools.ts
@@ -46,7 +46,10 @@ export function registerFabricTools(
|
||||
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: `create-${kind}-channel`,
|
||||
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
|
||||
description:
|
||||
`Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` +
|
||||
'Optionally pass `purpose` to describe what this channel is for — ' +
|
||||
'agents browse channels by purpose via fabric-channel-list.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
@@ -58,13 +61,22 @@ export function registerFabricTools(
|
||||
memberUserIds: { type: 'array', items: { type: 'string' } },
|
||||
onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' },
|
||||
listeners: { type: 'array', items: { type: 'string' } },
|
||||
purpose: {
|
||||
type: 'string',
|
||||
description:
|
||||
"Free-form description of what this channel is for. Optional but " +
|
||||
'strongly recommended so other agents can find this channel by ' +
|
||||
'intent (via fabric-channel-list). Can be edited later with ' +
|
||||
'fabric-channel-set-purpose.',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (p: {
|
||||
execute: async (_id: string, p: {
|
||||
guildNodeId: string;
|
||||
name: string;
|
||||
isPublic?: boolean;
|
||||
memberUserIds?: string[];
|
||||
purpose?: string;
|
||||
}) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
@@ -75,6 +87,7 @@ export function registerFabricTools(
|
||||
xType: X_BY_KIND[kind],
|
||||
isPublic: p.isPublic ?? false,
|
||||
memberUserIds: p.memberUserIds ?? [],
|
||||
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
|
||||
});
|
||||
return { ok: true, channelId: ch.id };
|
||||
},
|
||||
@@ -101,7 +114,7 @@ export function registerFabricTools(
|
||||
callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' },
|
||||
},
|
||||
},
|
||||
execute: async (p: {
|
||||
execute: async (_id: string, p: {
|
||||
guildNodeId: string;
|
||||
channelId: string;
|
||||
summary: string;
|
||||
@@ -151,7 +164,7 @@ export function registerFabricTools(
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (p: {
|
||||
execute: async (_id: string, p: {
|
||||
action: 'read' | 'share' | 'update' | 'close';
|
||||
guildNodeId: string;
|
||||
channelId: string;
|
||||
@@ -216,7 +229,7 @@ export function registerFabricTools(
|
||||
channelId: { type: 'string' },
|
||||
},
|
||||
},
|
||||
execute: async (p: {
|
||||
execute: async (_id: string, p: {
|
||||
action: 'members' | 'join' | 'leave';
|
||||
guildNodeId: string;
|
||||
channelId: string;
|
||||
@@ -243,4 +256,276 @@ export function registerFabricTools(
|
||||
}
|
||||
},
|
||||
}));
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-send-message: post a message into a specific channel.
|
||||
//
|
||||
// Unlike a normal channel reply (which goes back to whatever channel
|
||||
// woke the agent), this lets the agent proactively initiate text into
|
||||
// any channel they are a member of — e.g. ARD broadcasting daily
|
||||
// workload to #agents-room, or triage agent following up on an
|
||||
// already-routed task by commenting in #updates.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-send-message',
|
||||
description:
|
||||
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
|
||||
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId', 'content'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: { guildNodeId: string; channelId: string; content: string }) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const res = (await client.postMessage(
|
||||
guild.endpoint,
|
||||
token,
|
||||
p.channelId,
|
||||
p.content,
|
||||
session.user.id,
|
||||
)) as { messageId?: string; seq?: number };
|
||||
return { ok: true, messageId: res.messageId, seq: res.seq };
|
||||
},
|
||||
}));
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-channel-list: enumerate channels the calling agent can see
|
||||
// in a given guild. Backend filters to public channels + channels the
|
||||
// agent is a member of. Returns id / name / xType per channel so the
|
||||
// agent can pick a channelId for fabric-send-message etc.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-channel-list',
|
||||
description:
|
||||
'List channels visible to the calling agent in a guild. Optional ' +
|
||||
'nameFilter does a case-insensitive substring match client-side. ' +
|
||||
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
|
||||
xType: {
|
||||
type: 'string',
|
||||
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
|
||||
description: 'optional filter by x_type',
|
||||
},
|
||||
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: {
|
||||
guildNodeId: string;
|
||||
nameFilter?: string;
|
||||
xType?: string;
|
||||
includeClosed?: boolean;
|
||||
}) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
|
||||
const needle = (p.nameFilter ?? '').toLowerCase();
|
||||
const filtered = all.filter((c) => {
|
||||
if (!p.includeClosed && c.closed) return false;
|
||||
if (p.xType && c.xType !== p.xType) return false;
|
||||
if (needle && !c.name.toLowerCase().includes(needle)) return false;
|
||||
return true;
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
count: filtered.length,
|
||||
channels: filtered.map((c) => ({
|
||||
id: c.id,
|
||||
name: c.name,
|
||||
xType: c.xType,
|
||||
isPublic: c.isPublic,
|
||||
closed: c.closed,
|
||||
lastSeq: c.lastSeq,
|
||||
purpose: c.purpose ?? null,
|
||||
})),
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-guild-list: enumerate guilds the calling agent belongs to.
|
||||
// Each row carries `purpose` — free-form description of what the
|
||||
// guild is for (admin-set). Use this as the first step when a
|
||||
// workflow says "find the right guild for X" — pick by purpose,
|
||||
// then fabric-channel-list to find the right channel inside it.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-guild-list',
|
||||
description:
|
||||
'List guilds the calling agent is a member of. Returns ' +
|
||||
'{nodeId, name, purpose, status} per row. ' +
|
||||
"`purpose` is a free-form description of what each guild is for — " +
|
||||
'pick the guild whose purpose matches your intent. Use this tool ' +
|
||||
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
|
||||
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
|
||||
'debate broadcasts" → then list its announce-type channels).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
properties: {
|
||||
nameFilter: {
|
||||
type: 'string',
|
||||
description: 'optional case-insensitive substring match on guild name',
|
||||
},
|
||||
purposeFilter: {
|
||||
type: 'string',
|
||||
description:
|
||||
'optional case-insensitive substring match on guild purpose ' +
|
||||
'(e.g. "debate", "announcements")',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: { nameFilter?: string; purposeFilter?: string }) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const entry = identity.findByAgentId(agentId);
|
||||
if (!entry) return { ok: false, error: `agent ${agentId} not registered` };
|
||||
const session = await client.agentLogin(entry.fabricApiKey);
|
||||
const nameNeedle = (p.nameFilter ?? '').toLowerCase();
|
||||
const purposeNeedle = (p.purposeFilter ?? '').toLowerCase();
|
||||
const guilds = session.guilds.filter((g) => {
|
||||
if (nameNeedle && !g.name.toLowerCase().includes(nameNeedle)) return false;
|
||||
if (purposeNeedle) {
|
||||
const purp = (g.purpose ?? '').toLowerCase();
|
||||
if (!purp.includes(purposeNeedle)) return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
count: guilds.length,
|
||||
guilds: guilds.map((g) => ({
|
||||
nodeId: g.nodeId,
|
||||
name: g.name,
|
||||
status: g.status,
|
||||
purpose: g.purpose ?? null,
|
||||
})),
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-channel-set-purpose: set/update a channel's free-form
|
||||
// purpose description. Caller must be a channel member (or the
|
||||
// channel must be public). Use this to backfill purpose on existing
|
||||
// channels, or to refine it after a channel's role evolves.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-channel-set-purpose',
|
||||
description:
|
||||
"Set or update a channel's free-form purpose description. " +
|
||||
'Channel membership required (or the channel must be public). ' +
|
||||
'Pass empty string to clear. Use this to make a channel ' +
|
||||
'discoverable to other agents via fabric-channel-list.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId', 'purpose'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
purpose: {
|
||||
type: 'string',
|
||||
description: "What this channel is for. Pass '' (empty string) to clear.",
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: { guildNodeId: string; channelId: string; purpose: string }) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const res = await client.setChannelPurpose(
|
||||
guild.endpoint,
|
||||
token,
|
||||
p.channelId,
|
||||
p.purpose,
|
||||
);
|
||||
return { ok: true, channel: res };
|
||||
},
|
||||
}));
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-message-history: read a channel's recent message history by
|
||||
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
|
||||
// the last `limit` messages (limit defaults to 20, max 200).
|
||||
//
|
||||
// Use cases: catch-up on a channel that was muted while the agent was
|
||||
// gated; verify a previous message went through; lookup recent
|
||||
// duplicates before opening a new task in triage.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-message-history',
|
||||
description:
|
||||
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
|
||||
'tail (last `limit` messages, default 20, max 200). Backend ' +
|
||||
'requires the calling agent to be a channel participant.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
|
||||
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
|
||||
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: {
|
||||
guildNodeId: string;
|
||||
channelId: string;
|
||||
seqFrom?: number;
|
||||
seqTo?: number;
|
||||
limit?: number;
|
||||
}) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const limit = p.limit ?? 20;
|
||||
|
||||
// Tail mode: discover channel head via channel listing, then ask
|
||||
// for [head-limit+1, head]. Avoids needing the agent to know seq.
|
||||
let seqFrom = p.seqFrom;
|
||||
let seqTo = p.seqTo;
|
||||
if (seqFrom === undefined && seqTo === undefined) {
|
||||
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
|
||||
const ch = channels.find((c) => c.id === p.channelId);
|
||||
const head = ch?.lastSeq ?? 0;
|
||||
seqFrom = Math.max(1, head - limit + 1);
|
||||
seqTo = head;
|
||||
}
|
||||
|
||||
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
|
||||
seqFrom,
|
||||
seqTo,
|
||||
limit,
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
page: res.page,
|
||||
messages: res.items.map((m) => ({
|
||||
messageId: m.messageId,
|
||||
seq: m.seq,
|
||||
authorUserId: m.authorUserId,
|
||||
content: m.content,
|
||||
createdAt: m.createdAt,
|
||||
isDeleted: m.isDeleted,
|
||||
})),
|
||||
};
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user