Compare commits
18 Commits
feat/expos
...
fc2ab628b2
| Author | SHA1 | Date | |
|---|---|---|---|
| fc2ab628b2 | |||
| 893b93198d | |||
| 260d50196b | |||
| ea713064e1 | |||
| 180b717eda | |||
| 152b465e64 | |||
| 7f96fffca9 | |||
| b659dadb9e | |||
| 20e55849eb | |||
| d47d3467df | |||
| 7dc70522d1 | |||
| 2acb084ee4 | |||
| 9419d270e5 | |||
| 79b29db26c | |||
| a87de27cff | |||
| dabaa6e1f2 | |||
| b8e0e424fa | |||
| 81a10f2a1f |
2
dist/fabric/index.js
vendored
2
dist/fabric/index.js
vendored
@@ -94,7 +94,7 @@ export default defineChannelPluginEntry({
|
||||
void inbound.start().then(() => {
|
||||
if (!inbound)
|
||||
return;
|
||||
presence = new PresenceSync(api.logger);
|
||||
presence = new PresenceSync(api.logger, client);
|
||||
presence.setAccounts(inbound.getPresenceAccounts());
|
||||
presence.start();
|
||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||
|
||||
31
dist/fabric/src/channel.js
vendored
31
dist/fabric/src/channel.js
vendored
@@ -11,6 +11,15 @@
|
||||
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
|
||||
import { FabricClient } from './fabric-client.js';
|
||||
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
|
||||
import { getChannelType } from './channel-meta.js';
|
||||
export function fabricPeerRoutingForXType(xType) {
|
||||
if (xType === 'dm')
|
||||
return { peerKind: 'direct', chatType: 'direct' };
|
||||
return { peerKind: 'group', chatType: 'group' };
|
||||
}
|
||||
export function fabricPeerRoutingForChannel(channelId) {
|
||||
return fabricPeerRoutingForXType(getChannelType(channelId));
|
||||
}
|
||||
// ---- target grammar: fabric:<channelId> ----
|
||||
export function stripFabricTargetPrefix(raw) {
|
||||
let s = (raw ?? '').trim();
|
||||
@@ -38,13 +47,18 @@ export function resolveFabricOutboundSessionRoute(params) {
|
||||
const id = stripFabricTargetPrefix(params.target);
|
||||
if (!id)
|
||||
return null;
|
||||
// Consult the channel-meta cache populated by inbound — DM channels
|
||||
// need peer.kind='direct' so the outbound session key matches the
|
||||
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
|
||||
// no regression on cold cache).
|
||||
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
|
||||
return buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: 'fabric',
|
||||
accountId: params.accountId,
|
||||
peer: { kind: 'group', id },
|
||||
chatType: 'group',
|
||||
peer: { kind: peerKind, id },
|
||||
chatType,
|
||||
from: `fabric:channel:${id}`,
|
||||
to: `fabric:${id}`,
|
||||
});
|
||||
@@ -103,6 +117,19 @@ export const fabricChannelPlugin = createChatChannelPlugin({
|
||||
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId),
|
||||
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg),
|
||||
isConfigured: (account) => Boolean(account.fabricApiKey),
|
||||
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
|
||||
// by the channel-health-monitor — defaults `configured: true` when the
|
||||
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
|
||||
// in server-channels). Without this, fabric's synthetic 'default'
|
||||
// account (returned by listFabricAccountIds when channels.fabric.accounts
|
||||
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
|
||||
// running:false} → isManagedAccount=true → not-running → restart loop
|
||||
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
|
||||
// Mirror isConfigured here so the snapshot truthfully reports false for
|
||||
// any account without a fabricApiKey.
|
||||
describeAccount: (account) => ({
|
||||
configured: Boolean(account.fabricApiKey),
|
||||
}),
|
||||
},
|
||||
// Minimal setup adapter: Fabric is configured directly under
|
||||
// channels.fabric.* (no interactive wizard). applyAccountConfig is the
|
||||
|
||||
50
dist/fabric/src/inbound.js
vendored
50
dist/fabric/src/inbound.js
vendored
@@ -4,6 +4,7 @@ import { join } from 'node:path';
|
||||
import { io } from 'socket.io-client';
|
||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||
import { resolveCoalesce } from './accounts.js';
|
||||
import { fabricPeerRoutingForXType } from './channel.js';
|
||||
import { recordChannelType } from './channel-meta.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
export class FabricInbound {
|
||||
@@ -227,38 +228,59 @@ export class FabricInbound {
|
||||
for (const entry of this.identity.list()) {
|
||||
if (!entry.fabricUserId)
|
||||
continue;
|
||||
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
|
||||
if (!presenceGuildUrl)
|
||||
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
|
||||
if (!presenceGuild)
|
||||
continue;
|
||||
out.push({
|
||||
agentId: entry.agentId,
|
||||
fabricUserId: entry.fabricUserId,
|
||||
guildBaseUrl: presenceGuildUrl,
|
||||
guildBaseUrl: presenceGuild.endpoint,
|
||||
guildNodeId: presenceGuild.nodeId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||
// guild per agent (used as the presence-push target).
|
||||
firstGuildEndpointByAgent = new Map();
|
||||
// guild per agent (used as the presence-push target). Stores both
|
||||
// endpoint and nodeId — presence-sync needs both: endpoint to build
|
||||
// the URL, nodeId to pick the matching guildAccessToken from a fresh
|
||||
// agent-login response.
|
||||
firstGuildByAgent = new Map();
|
||||
async connectAgent(agentId, session) {
|
||||
const selfUserId = session.user.id;
|
||||
// First-guild capture for presence-sync push target. session.guilds is
|
||||
// already in priority order from Center; we take the first one with a
|
||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||
if (!this.firstGuildEndpointByAgent.has(agentId)) {
|
||||
if (!this.firstGuildByAgent.has(agentId)) {
|
||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||
if (firstGuild)
|
||||
this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
|
||||
this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok)
|
||||
continue;
|
||||
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
|
||||
// on every (re)connect. The single-shot `auth: { token: tok }` shape
|
||||
// captured the token in closure: after socket.io's silent auto-reconnect
|
||||
// the backend got the same JWT that expired ~15 min into the session
|
||||
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
|
||||
// the application layer. The client's `connect` event still fired (TCP
|
||||
// succeeded), so the plugin happily ran the channel-resync, emitted
|
||||
// `join_channel` into the void, and logged "joined N channel(s)" while
|
||||
// the backend was actually broadcasting message.created to a room with
|
||||
// zero subscribers. End user symptom: DMs to agents silently dropped.
|
||||
const socket = io(`${g.endpoint}/realtime`, {
|
||||
transports: ['websocket'],
|
||||
auth: { token: tok },
|
||||
auth: (cb) => {
|
||||
// Best-effort fresh token; on transient failure fall back to the
|
||||
// last known good one. tokenCache also keeps HTTP calls (attachment
|
||||
// download / reply post) from 401'ing in the same window.
|
||||
this.freshGuildToken(agentId, g.nodeId, session)
|
||||
.then((fresh) => cb({ token: fresh ?? tok }))
|
||||
.catch(() => cb({ token: tok }));
|
||||
},
|
||||
autoConnect: false,
|
||||
});
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
@@ -433,11 +455,19 @@ export class FabricInbound {
|
||||
const core = this.core;
|
||||
const cfg = this.cfg;
|
||||
try {
|
||||
// Route by xType. DM channels need peer.kind='direct' so openclaw
|
||||
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
|
||||
// and ctx.ChatType='direct') rather than as a multi-party group.
|
||||
// Without this, the agent's user-prompt metadata says
|
||||
// 'is_group_chat: true' on a DM and downstream prompt logic
|
||||
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||
// misclassifies the turn.
|
||||
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg: this.cfg,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
peer: { kind: 'group', id: channelId },
|
||||
peer: { kind: peerKind, id: channelId },
|
||||
});
|
||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
@@ -451,7 +481,7 @@ export class FabricInbound {
|
||||
To: `fabric:${channelId}`,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId ?? agentId,
|
||||
ChatType: 'group',
|
||||
ChatType: chatType,
|
||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||
SenderId: m.authorUserId ?? 'fabric',
|
||||
Provider: 'fabric',
|
||||
|
||||
93
dist/fabric/src/presence-sync.js
vendored
93
dist/fabric/src/presence-sync.js
vendored
@@ -1,26 +1,25 @@
|
||||
/**
|
||||
* presence-sync — read each connected agent's HF status (via the
|
||||
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
||||
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
||||
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
|
||||
* on `announce`-type channel deliveries.
|
||||
*
|
||||
* Push model: we only PUT when an agent's status actually changes
|
||||
* (since the last push). The HF-side accessor has its own TTL cache
|
||||
* to absorb the every-30s polling.
|
||||
*
|
||||
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
||||
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
||||
* treated as not-busy. Announce-channel delivery still works; busy
|
||||
* filtering simply doesn't kick in.
|
||||
*/
|
||||
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
|
||||
// safely inside the window even if a tick runs late.
|
||||
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
|
||||
export class PresenceSync {
|
||||
logger;
|
||||
client;
|
||||
timer = null;
|
||||
lastStatus = new Map(); // by agentId
|
||||
accounts = new Map();
|
||||
constructor(logger) {
|
||||
tokenCache = new Map(); // by agentId
|
||||
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||
// for the previous tick to finish — without this guard the next tick
|
||||
// fires on top of a still-running one and two parallel iterations
|
||||
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||
inflight = false;
|
||||
constructor(logger, client) {
|
||||
this.logger = logger;
|
||||
this.client = client;
|
||||
}
|
||||
setAccounts(accounts) {
|
||||
this.accounts.clear();
|
||||
@@ -42,7 +41,49 @@ export class PresenceSync {
|
||||
this.timer = null;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Fetch a fresh guildAccessToken for `acct`, caching it under the
|
||||
* agentId until just before its JWT expiry. Returns null on login
|
||||
* failure or if the session has no matching guild — caller logs +
|
||||
* skips the PUT.
|
||||
*/
|
||||
async ensureGuildToken(acct) {
|
||||
const now = Date.now();
|
||||
const cached = this.tokenCache.get(acct.agentId);
|
||||
if (cached && cached.expiresAt > now)
|
||||
return cached.token;
|
||||
let session;
|
||||
try {
|
||||
session = await this.client.agentLogin(acct.fabricApiKey);
|
||||
}
|
||||
catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
|
||||
if (!entry?.token) {
|
||||
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
|
||||
return null;
|
||||
}
|
||||
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
|
||||
return entry.token;
|
||||
}
|
||||
async tick() {
|
||||
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||
// overlapping ticks rather than letting them run concurrently —
|
||||
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||
// beat costs nothing the next beat won't catch.
|
||||
if (this.inflight)
|
||||
return;
|
||||
this.inflight = true;
|
||||
try {
|
||||
await this.tickInner();
|
||||
}
|
||||
finally {
|
||||
this.inflight = false;
|
||||
}
|
||||
}
|
||||
async tickInner() {
|
||||
const bridge = globalThis['__hfAgentStatus'];
|
||||
if (!bridge || typeof bridge.get !== 'function')
|
||||
return; // HF plugin not loaded — skip
|
||||
@@ -58,13 +99,22 @@ export class PresenceSync {
|
||||
continue;
|
||||
if (this.lastStatus.get(agentId) === status)
|
||||
continue; // no change → no PUT
|
||||
const guildToken = await this.ensureGuildToken(acct);
|
||||
if (!guildToken)
|
||||
continue;
|
||||
try {
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
|
||||
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
|
||||
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
|
||||
// x-api-key and got 401 "missing bearer token" forever. The /api
|
||||
// prefix is required because the guild backend sets a global
|
||||
// 'api' prefix in main.ts setGlobalPrefix('api').
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
const res = await fetch(url, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': acct.fabricApiKey,
|
||||
authorization: `Bearer ${guildToken}`,
|
||||
},
|
||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||
});
|
||||
@@ -73,6 +123,11 @@ export class PresenceSync {
|
||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||
}
|
||||
else {
|
||||
// 401 here usually means the cached token went stale unexpectedly
|
||||
// (server-side rotation or clock skew) — drop the cache so the
|
||||
// next tick re-logs-in.
|
||||
if (res.status === 401)
|
||||
this.tokenCache.delete(agentId);
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||
}
|
||||
}
|
||||
|
||||
45
index.ts
45
index.ts
@@ -15,6 +15,8 @@ import { FabricClient } from './src/fabric-client.js';
|
||||
import { IdentityRegistry } from './src/identity.js';
|
||||
import { syncFabricCommands } from './src/command-sync.js';
|
||||
import { PresenceSync } from './src/presence-sync.js';
|
||||
import { SubDiscussionStore } from './src/sub-discussion-store.js';
|
||||
import { registerSubDiscussionHook } from './src/sub-discussion-hook.js';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
|
||||
@@ -48,19 +50,38 @@ export default defineChannelPluginEntry({
|
||||
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
|
||||
registerTool: (d: unknown) => void;
|
||||
};
|
||||
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
|
||||
const cfg = (api.config ?? {}) as {
|
||||
channels?: { fabric?: { centerApiBase?: string; commandsSyncKey?: string } };
|
||||
};
|
||||
const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api';
|
||||
const idFile =
|
||||
api.pluginConfig?.identityFilePath ??
|
||||
path.join(os.homedir(), '.openclaw', 'fabric-identity.json');
|
||||
const subDiscussionFile = path.join(
|
||||
os.homedir(),
|
||||
'.openclaw',
|
||||
'fabric-sub-discussion.json',
|
||||
);
|
||||
|
||||
// tools operate against a default Center; per-account keys come from config
|
||||
const client = new FabricClient(centerApiBase);
|
||||
const identity = new IdentityRegistry(idFile);
|
||||
const subDiscussion = new SubDiscussionStore(subDiscussionFile);
|
||||
registerFabricTools(
|
||||
{ registerTool: (d) => api.registerTool(d), logger: api.logger },
|
||||
client,
|
||||
identity,
|
||||
subDiscussion,
|
||||
cfg,
|
||||
);
|
||||
// Per-(agent, channel) prompt injection for sub-discussion channels.
|
||||
// Runs as a sibling to PrismFacet's before_prompt_build hook (and
|
||||
// ClawPrompts' fabric-chat-injector); openclaw composes
|
||||
// appendSystemContext from all registered handlers.
|
||||
registerSubDiscussionHook(
|
||||
{ on: api.on, logger: api.logger },
|
||||
subDiscussion,
|
||||
identity,
|
||||
);
|
||||
|
||||
// Cross-plugin API: globalThis.__fabric
|
||||
@@ -75,13 +96,29 @@ export default defineChannelPluginEntry({
|
||||
// fall back to "assume DM" — fail closed on unknown.
|
||||
{
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
_G['__fabric'] = { getChannelType };
|
||||
_G['__fabric'] = {
|
||||
getChannelType,
|
||||
// Dynamic-subscription bridges: tools (notably `fabric-register`)
|
||||
// call these to add/remove an account's inbound socket without
|
||||
// a gateway restart. Both delegate to the live FabricInbound
|
||||
// instance via the module-level `inbound` closure variable; the
|
||||
// closures stay valid across gateway_start / gateway_stop
|
||||
// because we re-assign the variable, not the property.
|
||||
addAccount: async (entry: { agentId: string; fabricApiKey: string }) => {
|
||||
if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)');
|
||||
await inbound.addAccount(entry);
|
||||
},
|
||||
removeAccount: (agentId: string) => {
|
||||
if (!inbound) return;
|
||||
inbound.removeAccount(agentId);
|
||||
},
|
||||
};
|
||||
// Flush channel-meta cache when the gateway shuts down so
|
||||
// recently-recorded xType entries don't get lost.
|
||||
api.on('gateway_stop', () => {
|
||||
try { flushChannelMeta(); } catch { /* ignore */ }
|
||||
});
|
||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)');
|
||||
}
|
||||
|
||||
api.on('gateway_start', () => {
|
||||
@@ -116,7 +153,7 @@ export default defineChannelPluginEntry({
|
||||
// their fabricUserId + first guild endpoint populated).
|
||||
void inbound.start().then(() => {
|
||||
if (!inbound) return;
|
||||
presence = new PresenceSync(api.logger);
|
||||
presence = new PresenceSync(api.logger, client);
|
||||
presence.setAccounts(inbound.getPresenceAccounts());
|
||||
presence.start();
|
||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||
|
||||
@@ -14,10 +14,13 @@
|
||||
"create-work-channel",
|
||||
"create-report-channel",
|
||||
"create-discussion-channel",
|
||||
"create-sub-discussion",
|
||||
"discussion-complete",
|
||||
"close-sub-discussion",
|
||||
"fabric-canvas",
|
||||
"fabric-channel",
|
||||
"fabric-send-message",
|
||||
"fabric-send-sys-msg",
|
||||
"fabric-channel-list",
|
||||
"fabric-message-history",
|
||||
"fabric-guild-list",
|
||||
|
||||
@@ -21,6 +21,39 @@ import {
|
||||
resolveDefaultFabricAccountId,
|
||||
type ResolvedFabricAccount,
|
||||
} from './accounts.js';
|
||||
import { getChannelType } from './channel-meta.js';
|
||||
|
||||
/**
|
||||
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
|
||||
*
|
||||
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
|
||||
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
|
||||
* 'direct' | 'group' | 'channel'. We collapse:
|
||||
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
|
||||
* - rest → 'group' (multi-party; turn-engine gates speech)
|
||||
*
|
||||
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
|
||||
* otherwise the agent's outbound message lands in a different session
|
||||
* than the inbound that triggered it and conversation state splits.
|
||||
*
|
||||
* Outbound has no live xType (the agent target is just a channelId), so
|
||||
* it consults the channel-meta cache populated by inbound. Cache miss
|
||||
* (channel never observed) falls back to 'group' — same as the pre-fix
|
||||
* behavior, no regression on cold cache. The proactive-DM-first-message
|
||||
* edge case (agent DMs a channel before any inbound) still lands as
|
||||
* 'group' on that one outbound; the next inbound + outbound pair will
|
||||
* agree on 'direct'.
|
||||
*/
|
||||
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
|
||||
|
||||
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
|
||||
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
|
||||
return { peerKind: 'group', chatType: 'group' };
|
||||
}
|
||||
|
||||
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
|
||||
return fabricPeerRoutingForXType(getChannelType(channelId));
|
||||
}
|
||||
|
||||
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
|
||||
|
||||
@@ -45,13 +78,18 @@ export function looksLikeFabricTargetId(raw: string): boolean {
|
||||
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
|
||||
const id = stripFabricTargetPrefix(params.target);
|
||||
if (!id) return null;
|
||||
// Consult the channel-meta cache populated by inbound — DM channels
|
||||
// need peer.kind='direct' so the outbound session key matches the
|
||||
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
|
||||
// no regression on cold cache).
|
||||
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
|
||||
return buildChannelOutboundSessionRoute({
|
||||
cfg: params.cfg,
|
||||
agentId: params.agentId,
|
||||
channel: 'fabric',
|
||||
accountId: params.accountId,
|
||||
peer: { kind: 'group', id },
|
||||
chatType: 'group',
|
||||
peer: { kind: peerKind, id },
|
||||
chatType,
|
||||
from: `fabric:channel:${id}`,
|
||||
to: `fabric:${id}`,
|
||||
});
|
||||
@@ -115,6 +153,20 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
|
||||
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId),
|
||||
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never),
|
||||
isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey),
|
||||
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
|
||||
// by the channel-health-monitor — defaults `configured: true` when the
|
||||
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
|
||||
// in server-channels). Without this, fabric's synthetic 'default'
|
||||
// account (returned by listFabricAccountIds when channels.fabric.accounts
|
||||
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
|
||||
// running:false} → isManagedAccount=true → not-running → restart loop
|
||||
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
|
||||
// Mirror isConfigured here so the snapshot truthfully reports false for
|
||||
// any account without a fabricApiKey.
|
||||
describeAccount: (account: ResolvedFabricAccount) => ({
|
||||
accountId: account.accountId,
|
||||
configured: Boolean(account.fabricApiKey),
|
||||
}),
|
||||
},
|
||||
// Minimal setup adapter: Fabric is configured directly under
|
||||
// channels.fabric.* (no interactive wizard). applyAccountConfig is the
|
||||
|
||||
176
src/inbound.ts
176
src/inbound.ts
@@ -6,6 +6,7 @@ import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-
|
||||
import type { FabricClient, FabricSession } from './fabric-client.js';
|
||||
import type { IdentityRegistry } from './identity.js';
|
||||
import { resolveCoalesce } from './accounts.js';
|
||||
import { fabricPeerRoutingForXType } from './channel.js';
|
||||
import { recordChannelType } from './channel-meta.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
|
||||
@@ -50,8 +51,38 @@ type FabricMessage = {
|
||||
xType?: string;
|
||||
};
|
||||
|
||||
// Walk cfg.bindings for the entry that ties `agentId` to a fabric account.
|
||||
// Returns the binding's match.accountId (the slot label routing keys on);
|
||||
// returns undefined when the agent has no explicit fabric binding so the
|
||||
// caller can fall back to agentId without changing pre-existing semantics
|
||||
// for agents whose binding accountId == agent_id anyway.
|
||||
function findFabricBindingAccountId(cfg: unknown, agentId: string): string | undefined {
|
||||
const bindings = (cfg as { bindings?: Array<{
|
||||
agentId?: string;
|
||||
match?: { channel?: string; accountId?: string };
|
||||
}> })?.bindings;
|
||||
if (!Array.isArray(bindings)) return undefined;
|
||||
for (const b of bindings) {
|
||||
if (
|
||||
b?.agentId === agentId &&
|
||||
b?.match?.channel === 'fabric' &&
|
||||
typeof b?.match?.accountId === 'string' &&
|
||||
b.match.accountId.length > 0
|
||||
) {
|
||||
return b.match.accountId;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
|
||||
// to tear down ONE agent without restarting the whole inbound. New
|
||||
// sockets get appended on `connectAgent`; both maps are emptied by
|
||||
// `stop()`.
|
||||
private socketsByAgent = new Map<string, Socket[]>();
|
||||
private timersByAgent = new Map<string, NodeJS.Timeout[]>();
|
||||
private seen = new Set<string>();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
@@ -262,6 +293,71 @@ export class FabricInbound {
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
this.socketsByAgent.clear();
|
||||
this.timersByAgent.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Bring up ONE new account at runtime (no gateway restart).
|
||||
*
|
||||
* Mirrors what `start()` does per entry: login to Center, upsert the
|
||||
* identity registry, open the socket(s). Idempotent: re-calling with
|
||||
* the same agentId tears down the previous socket(s) first so the
|
||||
* fresh apikey replaces the stale one (recruitment onboard rotates
|
||||
* the agent from the shared `interviewee` placeholder to a real
|
||||
* per-agent apikey — the old `interviewee` socket must drop before
|
||||
* the new one comes up or the agent ends up subscribed to both users
|
||||
* at once).
|
||||
*
|
||||
* Used by the `fabric-register` openclaw tool to make recruitment
|
||||
* end-to-end without a gateway restart between `new-agent` and the
|
||||
* interview's sub-discussion dispatch.
|
||||
*/
|
||||
async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise<void> {
|
||||
if (this.socketsByAgent.has(entry.agentId)) {
|
||||
this.removeAccount(entry.agentId);
|
||||
}
|
||||
const session = await this.client.agentLogin(entry.fabricApiKey);
|
||||
this.identity.upsert({
|
||||
agentId: entry.agentId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
fabricUserId: session.user.id,
|
||||
displayName: session.user.name,
|
||||
});
|
||||
await this.connectAgent(entry.agentId, session);
|
||||
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tear down ONE account's sockets + timers without touching others.
|
||||
* Caller is responsible for any identity-registry cleanup; this only
|
||||
* drops the live socket subscription so the agent stops receiving
|
||||
* Fabric pushes.
|
||||
*/
|
||||
removeAccount(agentId: string): void {
|
||||
const sockets = this.socketsByAgent.get(agentId);
|
||||
if (sockets) {
|
||||
for (const s of sockets) {
|
||||
try { s.disconnect(); } catch { /* socket already dead */ }
|
||||
// Also remove from the flat list so `stop()` doesn't double-close.
|
||||
const idx = this.sockets.indexOf(s);
|
||||
if (idx !== -1) this.sockets.splice(idx, 1);
|
||||
}
|
||||
this.socketsByAgent.delete(agentId);
|
||||
}
|
||||
const timers = this.timersByAgent.get(agentId);
|
||||
if (timers) {
|
||||
for (const t of timers) {
|
||||
clearInterval(t);
|
||||
const idx = this.channelSyncTimers.indexOf(t);
|
||||
if (idx !== -1) this.channelSyncTimers.splice(idx, 1);
|
||||
}
|
||||
this.timersByAgent.delete(agentId);
|
||||
}
|
||||
this.firstGuildByAgent.delete(agentId);
|
||||
this.tokenCache.delete(agentId);
|
||||
this.agentStatusCache.delete(agentId);
|
||||
this.log.info(`fabric: agent ${agentId} dynamically removed`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -280,17 +376,25 @@ export class FabricInbound {
|
||||
agentId: string;
|
||||
fabricUserId: string;
|
||||
guildBaseUrl: string;
|
||||
guildNodeId: string;
|
||||
fabricApiKey: string;
|
||||
}> {
|
||||
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = [];
|
||||
const out: Array<{
|
||||
agentId: string;
|
||||
fabricUserId: string;
|
||||
guildBaseUrl: string;
|
||||
guildNodeId: string;
|
||||
fabricApiKey: string;
|
||||
}> = [];
|
||||
for (const entry of this.identity.list()) {
|
||||
if (!entry.fabricUserId) continue;
|
||||
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
|
||||
if (!presenceGuildUrl) continue;
|
||||
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
|
||||
if (!presenceGuild) continue;
|
||||
out.push({
|
||||
agentId: entry.agentId,
|
||||
fabricUserId: entry.fabricUserId,
|
||||
guildBaseUrl: presenceGuildUrl,
|
||||
guildBaseUrl: presenceGuild.endpoint,
|
||||
guildNodeId: presenceGuild.nodeId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
});
|
||||
}
|
||||
@@ -298,24 +402,44 @@ export class FabricInbound {
|
||||
}
|
||||
|
||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||
// guild per agent (used as the presence-push target).
|
||||
private firstGuildEndpointByAgent = new Map<string, string>();
|
||||
// guild per agent (used as the presence-push target). Stores both
|
||||
// endpoint and nodeId — presence-sync needs both: endpoint to build
|
||||
// the URL, nodeId to pick the matching guildAccessToken from a fresh
|
||||
// agent-login response.
|
||||
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
|
||||
|
||||
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
||||
const selfUserId = session.user.id;
|
||||
// First-guild capture for presence-sync push target. session.guilds is
|
||||
// already in priority order from Center; we take the first one with a
|
||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||
if (!this.firstGuildEndpointByAgent.has(agentId)) {
|
||||
if (!this.firstGuildByAgent.has(agentId)) {
|
||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
|
||||
if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok) continue;
|
||||
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
|
||||
// on every (re)connect. The single-shot `auth: { token: tok }` shape
|
||||
// captured the token in closure: after socket.io's silent auto-reconnect
|
||||
// the backend got the same JWT that expired ~15 min into the session
|
||||
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
|
||||
// the application layer. The client's `connect` event still fired (TCP
|
||||
// succeeded), so the plugin happily ran the channel-resync, emitted
|
||||
// `join_channel` into the void, and logged "joined N channel(s)" while
|
||||
// the backend was actually broadcasting message.created to a room with
|
||||
// zero subscribers. End user symptom: DMs to agents silently dropped.
|
||||
const socket = io(`${g.endpoint}/realtime`, {
|
||||
transports: ['websocket'],
|
||||
auth: { token: tok },
|
||||
auth: (cb) => {
|
||||
// Best-effort fresh token; on transient failure fall back to the
|
||||
// last known good one. tokenCache also keeps HTTP calls (attachment
|
||||
// download / reply post) from 401'ing in the same window.
|
||||
this.freshGuildToken(agentId, g.nodeId, session)
|
||||
.then((fresh) => cb({ token: fresh ?? tok }))
|
||||
.catch(() => cb({ token: tok }));
|
||||
},
|
||||
autoConnect: false,
|
||||
});
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
@@ -399,6 +523,9 @@ export class FabricInbound {
|
||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||
);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
const agentTimers = this.timersByAgent.get(agentId) ?? [];
|
||||
agentTimers.push(syncTimer);
|
||||
this.timersByAgent.set(agentId, agentTimers);
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
@@ -444,6 +571,11 @@ export class FabricInbound {
|
||||
});
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
// Track per-agent so addAccount/removeAccount can teardown
|
||||
// independently without disturbing other agents.
|
||||
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
|
||||
agentSockets.push(socket);
|
||||
this.socketsByAgent.set(agentId, agentSockets);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -504,11 +636,31 @@ export class FabricInbound {
|
||||
const core = this.core as Core & Record<string, unknown>;
|
||||
const cfg = this.cfg as { session?: { store?: unknown } };
|
||||
try {
|
||||
// Route by xType. DM channels need peer.kind='direct' so openclaw
|
||||
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
|
||||
// and ctx.ChatType='direct') rather than as a multi-party group.
|
||||
// Without this, the agent's user-prompt metadata says
|
||||
// 'is_group_chat: true' on a DM and downstream prompt logic
|
||||
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||
// misclassifies the turn.
|
||||
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||
// resolveAgentRoute needs the *binding* accountId (the channel-side
|
||||
// slot name) — not the openclaw agentId. For most agents the binding
|
||||
// is `{agentId: X, match: {channel: fabric, accountId: X}}` so the
|
||||
// two coincide; but for shared-placeholder cases (e.g. the recruitment
|
||||
// `interviewee` slot bound to multiple agents over its lifetime) the
|
||||
// binding accountId is the slot label ("interviewee", "Neon", …) not
|
||||
// the agent_id. Passing agentId there returned bindings=0 and silently
|
||||
// fell back to `main`, hijacking sub-discussion turns. Look up the
|
||||
// agent's fabric binding accountId here; fall back to agentId when no
|
||||
// explicit binding exists (preserves prior behavior for agents with
|
||||
// no fabric binding declared).
|
||||
const bindingAccountId = findFabricBindingAccountId(this.cfg, agentId) ?? agentId;
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg: this.cfg,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
peer: { kind: 'group', id: channelId },
|
||||
accountId: bindingAccountId,
|
||||
peer: { kind: peerKind, id: channelId },
|
||||
});
|
||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
@@ -523,7 +675,7 @@ export class FabricInbound {
|
||||
To: `fabric:${channelId}`,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId ?? agentId,
|
||||
ChatType: 'group',
|
||||
ChatType: chatType,
|
||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||
SenderId: m.authorUserId ?? 'fabric',
|
||||
Provider: 'fabric',
|
||||
|
||||
@@ -2,18 +2,26 @@
|
||||
* presence-sync — read each connected agent's HF status (via the
|
||||
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
||||
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
||||
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
|
||||
* on `announce`-type channel deliveries.
|
||||
* `PUT /api/agents/:userId/presence` so the backend can apply
|
||||
* busy-discard on `announce`-type channel deliveries.
|
||||
*
|
||||
* Push model: we only PUT when an agent's status actually changes
|
||||
* (since the last push). The HF-side accessor has its own TTL cache
|
||||
* to absorb the every-30s polling.
|
||||
*
|
||||
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
|
||||
* app.module.js) which expects `Authorization: Bearer <guild-token>`
|
||||
* — NOT the agent's fabricApiKey directly. So before each PUT we do
|
||||
* a fresh agent-login (or reuse a cached token if still within its
|
||||
* 15-min JWT TTL) and pull the guildAccessToken matching the target
|
||||
* guild. Status changes are rare enough that login overhead is fine.
|
||||
*
|
||||
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
||||
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
||||
* treated as not-busy. Announce-channel delivery still works; busy
|
||||
* filtering simply doesn't kick in.
|
||||
*/
|
||||
import type { FabricClient } from './fabric-client.js';
|
||||
|
||||
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
|
||||
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
|
||||
@@ -23,15 +31,36 @@ export interface PresenceSyncAccount {
|
||||
agentId: string;
|
||||
fabricUserId: string; // the agent's Fabric Center user id (UUID)
|
||||
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
|
||||
fabricApiKey: string; // existing per-account key
|
||||
guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
|
||||
fabricApiKey: string; // existing per-account key (used for agent-login)
|
||||
}
|
||||
|
||||
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
|
||||
// safely inside the window even if a tick runs late.
|
||||
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
|
||||
|
||||
interface CachedToken {
|
||||
token: string;
|
||||
expiresAt: number; // epoch ms
|
||||
}
|
||||
|
||||
export class PresenceSync {
|
||||
private timer: ReturnType<typeof setInterval> | null = null;
|
||||
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
|
||||
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
||||
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
|
||||
|
||||
constructor(private readonly logger: Logger) {}
|
||||
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||
// for the previous tick to finish — without this guard the next tick
|
||||
// fires on top of a still-running one and two parallel iterations
|
||||
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||
private inflight = false;
|
||||
|
||||
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
|
||||
|
||||
setAccounts(accounts: PresenceSyncAccount[]): void {
|
||||
this.accounts.clear();
|
||||
@@ -54,7 +83,50 @@ export class PresenceSync {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch a fresh guildAccessToken for `acct`, caching it under the
|
||||
* agentId until just before its JWT expiry. Returns null on login
|
||||
* failure or if the session has no matching guild — caller logs +
|
||||
* skips the PUT.
|
||||
*/
|
||||
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
|
||||
const now = Date.now();
|
||||
const cached = this.tokenCache.get(acct.agentId);
|
||||
if (cached && cached.expiresAt > now) return cached.token;
|
||||
|
||||
let session;
|
||||
try {
|
||||
session = await this.client.agentLogin(acct.fabricApiKey);
|
||||
} catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
|
||||
if (!entry?.token) {
|
||||
this.logger.warn(
|
||||
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
|
||||
);
|
||||
return null;
|
||||
}
|
||||
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
|
||||
return entry.token;
|
||||
}
|
||||
|
||||
private async tick(): Promise<void> {
|
||||
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||
// overlapping ticks rather than letting them run concurrently —
|
||||
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||
// beat costs nothing the next beat won't catch.
|
||||
if (this.inflight) return;
|
||||
this.inflight = true;
|
||||
try {
|
||||
await this.tickInner();
|
||||
} finally {
|
||||
this.inflight = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async tickInner(): Promise<void> {
|
||||
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
||||
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
||||
|
||||
@@ -68,13 +140,22 @@ export class PresenceSync {
|
||||
if (!status) continue;
|
||||
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
|
||||
|
||||
const guildToken = await this.ensureGuildToken(acct);
|
||||
if (!guildToken) continue;
|
||||
|
||||
try {
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
|
||||
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
|
||||
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
|
||||
// x-api-key and got 401 "missing bearer token" forever. The /api
|
||||
// prefix is required because the guild backend sets a global
|
||||
// 'api' prefix in main.ts setGlobalPrefix('api').
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
const res = await fetch(url, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': acct.fabricApiKey,
|
||||
authorization: `Bearer ${guildToken}`,
|
||||
},
|
||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||
});
|
||||
@@ -82,6 +163,10 @@ export class PresenceSync {
|
||||
this.lastStatus.set(agentId, status);
|
||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||
} else {
|
||||
// 401 here usually means the cached token went stale unexpectedly
|
||||
// (server-side rotation or clock skew) — drop the cache so the
|
||||
// next tick re-logs-in.
|
||||
if (res.status === 401) this.tokenCache.delete(agentId);
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||
}
|
||||
} catch (err) {
|
||||
|
||||
76
src/sub-discussion-hook.ts
Normal file
76
src/sub-discussion-hook.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import type { IdentityRegistry } from './identity.js';
|
||||
import type { SubDiscussionStore } from './sub-discussion-store.js';
|
||||
|
||||
// Plugin-local before_prompt_build hook that injects per-(agent, channel)
|
||||
// guides for sub-discussion channels created via the `create-sub-discussion`
|
||||
// tool. Mirrors the pattern used by ClawPrompts' fabric-chat-injector
|
||||
// (channelId-aware injection) but with content dynamically supplied at
|
||||
// channel-creation time instead of read from static files via PrismFacet's
|
||||
// router/rule registry.
|
||||
//
|
||||
// Match logic per turn:
|
||||
// ctx.channelId → store.find() → sub-discussion entry
|
||||
// ctx.agentId → identity.findByAgentId().fabricUserId
|
||||
// ─ matches entry.hostUserId → inject hostGuide
|
||||
// ─ matches entry.guestUserIds → inject guestGuide
|
||||
// ─ neither → no injection
|
||||
//
|
||||
// Fail-closed on unknown agentId/channelId — we never inject "the wrong"
|
||||
// guide, only the right one or nothing.
|
||||
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
const DEDUP_KEY = '_fabricSubDiscussionHookDedup';
|
||||
|
||||
type PromptCtx = {
|
||||
agentId?: string;
|
||||
channelId?: string;
|
||||
messageProvider?: string;
|
||||
};
|
||||
|
||||
export function registerSubDiscussionHook(
|
||||
api: {
|
||||
on: (hook: string, handler: (...args: unknown[]) => unknown) => void;
|
||||
logger: { info: (m: string) => void; warn: (m: string) => void };
|
||||
},
|
||||
store: SubDiscussionStore,
|
||||
identity: IdentityRegistry,
|
||||
): void {
|
||||
if (!(_G[DEDUP_KEY] instanceof WeakSet)) _G[DEDUP_KEY] = new WeakSet<object>();
|
||||
const dedup = _G[DEDUP_KEY] as WeakSet<object>;
|
||||
|
||||
api.on('before_prompt_build', async (...args: unknown[]) => {
|
||||
const event = args[0];
|
||||
const ctx = (args[1] ?? {}) as PromptCtx;
|
||||
// The hook fires both for fabric-driven turns (channelId set) and
|
||||
// for other triggers (HF wake, exec-event, etc.) — drop those.
|
||||
if (typeof event === 'object' && event !== null) {
|
||||
if (dedup.has(event)) return undefined;
|
||||
dedup.add(event);
|
||||
}
|
||||
const agentId = (ctx.agentId ?? '').trim();
|
||||
const channelId = (ctx.channelId ?? '').trim();
|
||||
if (!agentId || !channelId) return undefined;
|
||||
const provider = (ctx.messageProvider ?? '').toLowerCase();
|
||||
if (provider && provider !== 'fabric') return undefined;
|
||||
|
||||
const entry = store.find(channelId);
|
||||
if (!entry) return undefined;
|
||||
|
||||
const ident = identity.findByAgentId(agentId);
|
||||
const myUserId = (ident?.fabricUserId ?? '').trim();
|
||||
if (!myUserId) {
|
||||
// identity registry caches fabricUserId after the first agentLogin
|
||||
// in inbound.ts. If it's missing here, the agent likely hasn't
|
||||
// completed login yet — skip rather than guess.
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (myUserId === entry.hostUserId) {
|
||||
return { appendSystemContext: entry.hostGuide };
|
||||
}
|
||||
if (entry.guestUserIds.includes(myUserId)) {
|
||||
return { appendSystemContext: entry.guestGuide };
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
}
|
||||
76
src/sub-discussion-store.ts
Normal file
76
src/sub-discussion-store.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import { readFileSync, writeFileSync, existsSync, mkdirSync } from 'node:fs';
|
||||
import { dirname } from 'node:path';
|
||||
|
||||
// Records per-(sub-discussion-channel) state created by the
|
||||
// `create-sub-discussion` tool and consumed by:
|
||||
// 1. `before_prompt_build` hook — looks up by (agentId, channelId) to
|
||||
// inject the host or guest guide as appendSystemContext.
|
||||
// 2. `close-sub-discussion` tool — looks up by sub channelId to find
|
||||
// the parent channel to post the callback to and to validate that
|
||||
// the caller is the original host.
|
||||
//
|
||||
// One sub-discussion = one channel. Lifetime: from create-sub-discussion
|
||||
// return until close-sub-discussion (or gateway-stop / disk corruption).
|
||||
// We persist to a small JSON file so a gateway restart mid-interview
|
||||
// doesn't strand both host and guest with no guide.
|
||||
|
||||
export type SubDiscussionEntry = {
|
||||
subChannelId: string;
|
||||
hostAgentId: string; // the openclaw agentId that called create-sub-discussion
|
||||
hostUserId: string; // the host's Fabric Center userId (session.user.id)
|
||||
guestUserIds: string[]; // Fabric Center userIds invited as guests
|
||||
hostGuide: string; // appended to host's session system prompt while in this channel
|
||||
guestGuide: string; // appended to each guest's session system prompt while in this channel
|
||||
callbackGuildNodeId: string; // where to post the callback when close is called
|
||||
callbackChannelId: string; // parent channel to post callback to (system msg)
|
||||
createdAt: string; // ISO timestamp
|
||||
};
|
||||
|
||||
type StoreFile = { entries: SubDiscussionEntry[] };
|
||||
|
||||
export class SubDiscussionStore {
|
||||
private byChannelId = new Map<string, SubDiscussionEntry>();
|
||||
|
||||
constructor(private readonly filePath: string) {
|
||||
this.load();
|
||||
}
|
||||
|
||||
private load(): void {
|
||||
if (!existsSync(this.filePath)) return;
|
||||
try {
|
||||
const data = JSON.parse(readFileSync(this.filePath, 'utf8')) as StoreFile;
|
||||
for (const e of data.entries ?? []) {
|
||||
if (e?.subChannelId) this.byChannelId.set(e.subChannelId, e);
|
||||
}
|
||||
} catch {
|
||||
// Corrupt file → start empty; first mutation rewrites cleanly.
|
||||
}
|
||||
}
|
||||
|
||||
private persist(): void {
|
||||
mkdirSync(dirname(this.filePath), { recursive: true });
|
||||
const data: StoreFile = { entries: [...this.byChannelId.values()] };
|
||||
writeFileSync(this.filePath, JSON.stringify(data, null, 2));
|
||||
}
|
||||
|
||||
list(): SubDiscussionEntry[] {
|
||||
return [...this.byChannelId.values()];
|
||||
}
|
||||
|
||||
find(subChannelId: string): SubDiscussionEntry | undefined {
|
||||
return this.byChannelId.get(subChannelId);
|
||||
}
|
||||
|
||||
add(entry: SubDiscussionEntry): void {
|
||||
this.byChannelId.set(entry.subChannelId, entry);
|
||||
this.persist();
|
||||
}
|
||||
|
||||
remove(subChannelId: string): SubDiscussionEntry | undefined {
|
||||
const e = this.byChannelId.get(subChannelId);
|
||||
if (!e) return undefined;
|
||||
this.byChannelId.delete(subChannelId);
|
||||
this.persist();
|
||||
return e;
|
||||
}
|
||||
}
|
||||
420
src/tools.ts
420
src/tools.ts
@@ -1,5 +1,7 @@
|
||||
import type { FabricClient } from './fabric-client.js';
|
||||
import type { IdentityRegistry } from './identity.js';
|
||||
import type { SubDiscussionStore } from './sub-discussion-store.js';
|
||||
import { resolveCommandsSyncKey } from './accounts.js';
|
||||
|
||||
// OpenClaw tool registration api (loose typing — concrete shape from
|
||||
// openclaw/plugin-sdk/core at host SDK version).
|
||||
@@ -10,6 +12,9 @@ type ToolApi = {
|
||||
|
||||
type Ctx = { agentId?: string };
|
||||
|
||||
// Loose config shape — just the fields we read here.
|
||||
type ToolsCfg = { channels?: { fabric?: { commandsSyncKey?: string } }; [k: string]: unknown };
|
||||
|
||||
const X_BY_KIND: Record<string, string> = {
|
||||
chat: 'general',
|
||||
work: 'work',
|
||||
@@ -17,19 +22,34 @@ const X_BY_KIND: Record<string, string> = {
|
||||
discussion: 'discuss',
|
||||
};
|
||||
|
||||
// Delay between create-sub-discussion's channel create and greeting post.
|
||||
// Backend pushes channel.joined to invitee sockets on create; that push
|
||||
// has to traverse socket.io rooms before the guest plugin can sub the
|
||||
// channel:<id> room. If the greeting is posted before that, the guest's
|
||||
// turn-activation wakeup misses (the socket isn't in the room yet).
|
||||
// 500ms is empirically slack enough on local sim + production t3, and
|
||||
// short enough not to feel laggy from the host's tool-result POV. Bump
|
||||
// via FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS env if needed.
|
||||
const GREETING_DELAY_MS = (() => {
|
||||
const v = Number.parseInt(process.env.FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS ?? '', 10);
|
||||
return Number.isFinite(v) && v >= 0 ? v : 500;
|
||||
})();
|
||||
|
||||
export function registerFabricTools(
|
||||
api: ToolApi,
|
||||
client: FabricClient,
|
||||
identity: IdentityRegistry,
|
||||
store: SubDiscussionStore,
|
||||
cfg: ToolsCfg,
|
||||
): void {
|
||||
// Resolve the calling agent's Fabric session + a guild's token/endpoint.
|
||||
const ctxGuild = async (agentId: string, guildNodeId: string) => {
|
||||
const entry = identity.findByAgentId(agentId);
|
||||
if (!entry)
|
||||
throw new Error(
|
||||
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
|
||||
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
|
||||
`channels.fabric.accounts.${agentId}); then restart the gateway`,
|
||||
`agent ${agentId} not registered — call the openclaw \`fabric-register\` ` +
|
||||
`tool (apiKey: <fak_…>, agentId: ${agentId}); the dynamic-subscription ` +
|
||||
`path brings the socket up immediately, no gateway restart needed`,
|
||||
);
|
||||
const session = await client.agentLogin(entry.fabricApiKey);
|
||||
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
|
||||
@@ -38,10 +58,74 @@ export function registerFabricTools(
|
||||
return { session, guild, token };
|
||||
};
|
||||
|
||||
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
|
||||
// It's a one-time step done out-of-band via the installed script
|
||||
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
|
||||
// or via static config (channels.fabric.accounts.<agentId>).
|
||||
// Bind an agent's Fabric API key — validates the key against Center,
|
||||
// upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound
|
||||
// socket immediately via the live FabricInbound instance (no gateway
|
||||
// restart). The standalone binary `~/.openclaw/bin/fabric-register`
|
||||
// still exists for one-time bootstrap before the gateway runs, but
|
||||
// recruitment's `register-agent` script should prefer this tool path
|
||||
// so the new agent's socket is live before `interviewer` fires.
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-register',
|
||||
description:
|
||||
'Bind an agent to a Fabric Center API key. Validates the key, writes ' +
|
||||
'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' +
|
||||
'inbound socket immediately so the agent receives Fabric pushes ' +
|
||||
'without a gateway restart. Caller defaults to the current agent; ' +
|
||||
'pass `agentId` to bind on behalf of another agent (recruitment use).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['apiKey'],
|
||||
properties: {
|
||||
apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' },
|
||||
agentId: {
|
||||
type: 'string',
|
||||
description:
|
||||
'Agent to register. Defaults to the calling agent (ctx.agentId). ' +
|
||||
'Recruitment onboarding may override this when wiring a freshly ' +
|
||||
'created agent before that agent has a session of its own.',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: { apiKey: string; agentId?: string }) => {
|
||||
const agentId = p.agentId ?? ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' };
|
||||
if (!p.apiKey || typeof p.apiKey !== 'string') {
|
||||
return { ok: false, error: 'apiKey required' };
|
||||
}
|
||||
// Delegate to FabricInbound.addAccount via the cross-plugin bridge.
|
||||
// The bridge is installed in index.ts when inbound spins up; if it's
|
||||
// not present yet, the gateway is still starting and the caller should
|
||||
// retry (rare path — only hit during the gateway_start window).
|
||||
const fabricApi = (globalThis as Record<string, unknown>)['__fabric'] as
|
||||
| { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise<void> }
|
||||
| undefined;
|
||||
if (!fabricApi?.addAccount) {
|
||||
return {
|
||||
ok: false,
|
||||
error:
|
||||
'fabric inbound not ready (gateway still starting?). Fall back to ' +
|
||||
'~/.openclaw/bin/fabric-register or retry after a few seconds.',
|
||||
};
|
||||
}
|
||||
try {
|
||||
await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey });
|
||||
} catch (err) {
|
||||
return {
|
||||
ok: false,
|
||||
error: `fabric-register failed: ${String(err)}`,
|
||||
};
|
||||
}
|
||||
const entry = identity.findByAgentId(agentId);
|
||||
return {
|
||||
ok: true,
|
||||
agentId,
|
||||
fabricUserId: entry?.fabricUserId,
|
||||
displayName: entry?.displayName,
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
@@ -134,6 +218,245 @@ export function registerFabricTools(
|
||||
},
|
||||
}));
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
// create-sub-discussion: open a discuss-type sub-channel hanging off
|
||||
// the caller's current channel. Designed for host-driven multi-turn
|
||||
// exchanges (interview, brainstorm, narrow Q&A) where the guests are
|
||||
// either fresh agents without workflow capability (recruitment
|
||||
// interviewee) or peers that just need a short scoped chat without
|
||||
// entering their own subflow.
|
||||
//
|
||||
// What it does on top of plain create-discussion-channel:
|
||||
// 1. Persists a store entry indexed by the new sub channelId, carrying:
|
||||
// host agent + userId, guest userIds, host/guest guide texts,
|
||||
// callback (parent) channel info.
|
||||
// 2. Auto-posts `greetingMsg` using the host's own Fabric account so
|
||||
// turn rotation's activation rule (first author → newOrder[0],
|
||||
// currentSpeaker → newOrder[1], wakeup → newOrder[1]) puts the
|
||||
// first guest on the spot immediately — no race where host posts
|
||||
// before guest's socket subs the channel room (we wait
|
||||
// GREETING_DELAY_MS for backend's channel.joined push to land).
|
||||
// 3. The accompanying before_prompt_build hook (sub-discussion-hook
|
||||
// registered from index.ts) then injects `hostGuideMsg` into the
|
||||
// host's session prompt and `guestGuideMsg` into each guest's
|
||||
// session prompt whenever a turn in this channel fires — so the
|
||||
// two roles see different instructions, no shared guide file.
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'create-sub-discussion',
|
||||
description:
|
||||
'Open a host-driven sub-discussion channel (x_type=discuss) hanging off your current channel, ' +
|
||||
'with role-specific system-prompt guides for host and guests. Use this for interviews / scoped ' +
|
||||
'Q&A where you stay in control of when the conversation ends. Returns the sub channelId; ' +
|
||||
'reach it via fabric-send-message in the rotating turn order. Close with close-sub-discussion ' +
|
||||
'to write a callback back into the parent channel.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: [
|
||||
'guildNodeId',
|
||||
'currentChannelId',
|
||||
'channelName',
|
||||
'greetingMsg',
|
||||
'hostGuideMsg',
|
||||
'guestGuideMsg',
|
||||
'guests',
|
||||
],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string', description: 'Fabric guild node id (same guild for parent + sub).' },
|
||||
currentChannelId: {
|
||||
type: 'string',
|
||||
description: 'Channel id you are currently in (parent). Used as the callback target on close.',
|
||||
},
|
||||
channelName: { type: 'string', description: 'Display name for the new sub-discussion channel.' },
|
||||
greetingMsg: {
|
||||
type: 'string',
|
||||
description:
|
||||
'First message posted by YOU (the host) in the sub channel. Triggers turn rotation so ' +
|
||||
"the first guest's session wakes immediately with both your greeting in history and the " +
|
||||
'guest guide injected as system prompt.',
|
||||
},
|
||||
hostGuideMsg: {
|
||||
type: 'string',
|
||||
description:
|
||||
"Appended to YOUR session's system prompt whenever a turn fires in this sub channel. " +
|
||||
'Use it to remind yourself of the procedure (what to ask, when to call close-sub-discussion).',
|
||||
},
|
||||
guestGuideMsg: {
|
||||
type: 'string',
|
||||
description:
|
||||
"Appended to EACH GUEST's session system prompt for turns in this sub channel. Use it to " +
|
||||
'orient guests with no prior workflow context (e.g. a fresh interviewee). Keep it short; ' +
|
||||
'long guides bloat every turn.',
|
||||
},
|
||||
guests: {
|
||||
type: 'array',
|
||||
items: { type: 'string' },
|
||||
minItems: 1,
|
||||
description:
|
||||
'Fabric Center userIds invited as guests. Resolve via fabric-channel-list members or the ' +
|
||||
'<name>@<role>.hangman-lab.top email convention.',
|
||||
},
|
||||
purpose: { type: 'string', description: 'Optional channel.purpose for discoverability.' },
|
||||
},
|
||||
},
|
||||
execute: async (
|
||||
_id: string,
|
||||
p: {
|
||||
guildNodeId: string;
|
||||
currentChannelId: string;
|
||||
channelName: string;
|
||||
greetingMsg: string;
|
||||
hostGuideMsg: string;
|
||||
guestGuideMsg: string;
|
||||
guests: string[];
|
||||
purpose?: string;
|
||||
},
|
||||
) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
if (!Array.isArray(p.guests) || p.guests.length === 0) {
|
||||
return { ok: false, error: 'guests must be a non-empty array of Fabric userIds' };
|
||||
}
|
||||
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const ch = await client.createChannel(guild.endpoint, token, {
|
||||
guildId: p.guildNodeId,
|
||||
name: p.channelName,
|
||||
xType: 'discuss',
|
||||
isPublic: false,
|
||||
memberUserIds: p.guests,
|
||||
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
|
||||
});
|
||||
store.add({
|
||||
subChannelId: ch.id,
|
||||
hostAgentId: agentId,
|
||||
hostUserId: session.user.id,
|
||||
guestUserIds: [...p.guests],
|
||||
hostGuide: p.hostGuideMsg,
|
||||
guestGuide: p.guestGuideMsg,
|
||||
callbackGuildNodeId: p.guildNodeId,
|
||||
callbackChannelId: p.currentChannelId,
|
||||
createdAt: new Date().toISOString(),
|
||||
});
|
||||
// Let backend's channel.joined push reach guest sockets before our
|
||||
// greeting fires — otherwise the wakeup emitted by turn-activation
|
||||
// races a not-yet-subscribed socket.io room.
|
||||
if (GREETING_DELAY_MS > 0) {
|
||||
await new Promise((r) => setTimeout(r, GREETING_DELAY_MS));
|
||||
}
|
||||
try {
|
||||
await client.postMessage(guild.endpoint, token, ch.id, p.greetingMsg, session.user.id);
|
||||
} catch (err) {
|
||||
api.logger.warn(
|
||||
`fabric: create-sub-discussion greeting post failed channel=${ch.id} err=${String(err)}`,
|
||||
);
|
||||
}
|
||||
return { ok: true, subChannelId: ch.id };
|
||||
},
|
||||
}));
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
// close-sub-discussion: post a system-authored callback into the
|
||||
// parent channel + close the sub-discussion channel. Only the original
|
||||
// host can call this. Uses the Guild's x-fabric-system-key path (shared
|
||||
// secret = commandsSyncKey) so the callback lands as a guild/system
|
||||
// author, not the host's personal account — and can wake the host on
|
||||
// the parent channel to continue whatever workflow opened the sub.
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'close-sub-discussion',
|
||||
description:
|
||||
'Close a sub-discussion channel you opened (host-only) and write a callback to the parent ' +
|
||||
'channel as a system message. Pass `wakeupHost: false` to land the callback silently in ' +
|
||||
'history without waking yourself.',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['subChannelId', 'callbackMsg'],
|
||||
properties: {
|
||||
subChannelId: { type: 'string', description: 'The sub-discussion channelId returned by create-sub-discussion.' },
|
||||
callbackMsg: {
|
||||
type: 'string',
|
||||
description:
|
||||
'Content to post into the parent channel as a system-authored message. Typical content: ' +
|
||||
'the conclusion / extracted data from the sub-discussion, so the next turn on the parent ' +
|
||||
'channel can act on it.',
|
||||
},
|
||||
wakeupHost: {
|
||||
type: 'boolean',
|
||||
description:
|
||||
'Whether to wake YOU (the host) on the parent channel. Default true — for recruitment ' +
|
||||
'interview flow where the next workflow step needs to run immediately. Pass false for ' +
|
||||
'fire-and-forget logging.',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (
|
||||
_id: string,
|
||||
p: { subChannelId: string; callbackMsg: string; wakeupHost?: boolean },
|
||||
) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const entry = store.find(p.subChannelId);
|
||||
if (!entry) {
|
||||
return { ok: false, error: `sub-discussion not found: ${p.subChannelId}` };
|
||||
}
|
||||
if (entry.hostAgentId !== agentId) {
|
||||
return {
|
||||
ok: false,
|
||||
error: `only the host (${entry.hostAgentId}) may close this sub-discussion`,
|
||||
};
|
||||
}
|
||||
const systemKey = resolveCommandsSyncKey(cfg);
|
||||
if (!systemKey) {
|
||||
return {
|
||||
ok: false,
|
||||
error:
|
||||
'channels.fabric.commandsSyncKey is not configured — close-sub-discussion needs it for ' +
|
||||
'the x-fabric-system-key callback. Configure via openclaw config.',
|
||||
};
|
||||
}
|
||||
const { guild, token } = await ctxGuild(agentId, entry.callbackGuildNodeId);
|
||||
const wakeup = p.wakeupHost !== false;
|
||||
// 1) Post callback into parent channel via the system-key path.
|
||||
const url = `${guild.endpoint}/api/channels/${encodeURIComponent(entry.callbackChannelId)}/messages`;
|
||||
const res = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-fabric-system-key': systemKey,
|
||||
},
|
||||
body: JSON.stringify({
|
||||
content: p.callbackMsg,
|
||||
wakeupUserId: wakeup ? entry.hostUserId : null,
|
||||
}),
|
||||
}).catch((err) => {
|
||||
throw new Error(`callback POST failed: ${String(err)}`);
|
||||
});
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '');
|
||||
return {
|
||||
ok: false,
|
||||
error: `callback POST ${url} -> ${res.status} ${text}`,
|
||||
};
|
||||
}
|
||||
// 2) Close the sub channel using the host's own bearer (the host is
|
||||
// a member of the channel — channel.close auth is per-member).
|
||||
try {
|
||||
await client.closeChannel(guild.endpoint, token, entry.subChannelId);
|
||||
} catch (err) {
|
||||
api.logger.warn(
|
||||
`fabric: close-sub-discussion: sub channel close failed channel=${entry.subChannelId} err=${String(err)}`,
|
||||
);
|
||||
}
|
||||
// 3) Drop the store entry so the prompt-injection hook stops firing
|
||||
// for this channel (the sub is closed; any straggler turns would
|
||||
// just hit the closed-channel write reject downstream).
|
||||
store.remove(entry.subChannelId);
|
||||
return { ok: true, closed: true };
|
||||
},
|
||||
}));
|
||||
|
||||
// fabric-canvas: share / update / read / close the channel's single
|
||||
// pinned canvas document (one tool, four actions). update/close are
|
||||
// sharer-only server-side (the guild returns 403 otherwise).
|
||||
@@ -296,6 +619,89 @@ export function registerFabricTools(
|
||||
},
|
||||
}));
|
||||
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
// fabric-send-sys-msg: post a system-authored message (author =
|
||||
// sentinel UUID 0000…, not the calling agent) using the Guild's
|
||||
// x-fabric-system-key path. Use for cross-agent broadcasts where you
|
||||
// don't want the message tied to one agent's identity — Dialectic
|
||||
// topic announcements / lifecycle events, host-system advisories,
|
||||
// etc. Caller doesn't need to be a member of the channel (the
|
||||
// backend isSystem branch skips assertParticipant), but must be a
|
||||
// member of the guild (their session resolves the guild endpoint).
|
||||
//
|
||||
// Shared secret: reads channels.fabric.commandsSyncKey (same value
|
||||
// as the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY env). Empty
|
||||
// config → tool returns ok:false with a clear error, no fall-through
|
||||
// to regular agent posting.
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-send-sys-msg',
|
||||
description:
|
||||
'Send a SYSTEM-AUTHORED message into a Fabric channel (author = guild sentinel, not you). ' +
|
||||
'Use for cross-agent broadcasts that should not be attributed to a single agent — ' +
|
||||
'Dialectic announce-channel topic broadcasts, lifecycle events, system advisories. ' +
|
||||
'Optionally precise-wake one recipient via wakeupUserId; otherwise the message lands ' +
|
||||
'silently in history (no wake).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['guildNodeId', 'channelId', 'content'],
|
||||
properties: {
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
|
||||
wakeupUserId: {
|
||||
type: 'string',
|
||||
description:
|
||||
"Optional: a single Fabric userId to wake with this message (everyone else in the " +
|
||||
'channel sees it but with wakeup=false). Omit for fully silent broadcast.',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (
|
||||
_id: string,
|
||||
p: { guildNodeId: string; channelId: string; content: string; wakeupUserId?: string },
|
||||
) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const systemKey = resolveCommandsSyncKey(cfg);
|
||||
if (!systemKey) {
|
||||
return {
|
||||
ok: false,
|
||||
error:
|
||||
'channels.fabric.commandsSyncKey is not configured — fabric-send-sys-msg needs it for ' +
|
||||
'the x-fabric-system-key header. Configure via openclaw config.',
|
||||
};
|
||||
}
|
||||
const { guild } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const url = `${guild.endpoint}/api/channels/${encodeURIComponent(p.channelId)}/messages`;
|
||||
const wakeup = typeof p.wakeupUserId === 'string' && p.wakeupUserId.trim()
|
||||
? p.wakeupUserId.trim()
|
||||
: null;
|
||||
const res = await fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-fabric-system-key': systemKey,
|
||||
},
|
||||
body: JSON.stringify({ content: p.content, wakeupUserId: wakeup }),
|
||||
});
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '');
|
||||
return { ok: false, error: `POST ${url} -> ${res.status} ${text}` };
|
||||
}
|
||||
const json = (await res.json().catch(() => null)) as
|
||||
| { messageId?: string; seq?: number; authorUserId?: string }
|
||||
| null;
|
||||
return {
|
||||
ok: true,
|
||||
messageId: json?.messageId,
|
||||
seq: json?.seq,
|
||||
authorUserId: json?.authorUserId,
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// fabric-channel-list: enumerate channels the calling agent can see
|
||||
// in a given guild. Backend filters to public channels + channels the
|
||||
|
||||
Reference in New Issue
Block a user