15 Commits

Author SHA1 Message Date
h z
fc2ab628b2 Merge pull request 'feat(fabric): dynamic-subscription via fabric-register openclaw tool' (#12) from feat/dynamic-account-subscription into main 2026-06-01 07:57:10 +00:00
893b93198d feat(fabric): dynamic-subscription via fabric-register openclaw tool
The plugin manifest declared `fabric-register` as a tool name but
tools.ts never registered it — recruitment fell through to the
standalone `/root/.openclaw/bin/fabric-register` binary, which writes
~/.openclaw/fabric-identity.json correctly but exits without notifying
the running plugin. That left fabric inbound's subscription as a
connect-time snapshot: every new agent required a gateway restart
between `new-agent` and the interview's sub-discussion or the message
had no socket to dispatch on.

Wire the full path:
  - `FabricInbound.addAccount(entry)` — login, upsert identity, open socket(s),
    track per-agent so removeAccount can teardown cleanly. Idempotent: a
    second call replaces the previous socket (used post-onboard when the
    agent rotates off the shared `interviewee` placeholder onto its own
    apikey).
  - `FabricInbound.removeAccount(agentId)` — disconnect sockets, clear
    timers + per-agent caches.
  - `__fabric.addAccount` / `removeAccount` — cross-plugin bridge so the
    `fabric-register` tool can reach the live FabricInbound instance from
    its tool handler context.
  - `fabric-register` openclaw tool — validates apiKey, calls
    `__fabric.addAccount`, returns `{ok, fabricUserId, displayName}`.
    Accepts `agentId` arg so recruitment can bind on behalf of a
    freshly-created agent before that agent has a session of its own.

Removes the "restart the gateway" advice from the ctxGuild
"agent not registered" error message — operators should now call the
tool path instead.

After this lands + the ClawSkills register-agent script flip (separate
commit), `recruitment.new-agent` -> interviewer sub-discussion runs
without a gateway restart in between.
2026-06-01 08:56:53 +01:00
h z
260d50196b Merge pull request 'fix(routing): resolveAgentRoute uses binding.accountId, not agent_id' (#11) from fix/routing-use-binding-accountid into main 2026-05-31 19:33:13 +00:00
ea713064e1 fix(routing): resolveAgentRoute uses binding.accountId, not agent_id
`socket.on('message.created', ...)` dispatched with
`accountId: agentId` (the openclaw agent id, e.g. 'analyst2') instead
of the binding's `match.accountId` (the fabric account slot label, e.g.
'interviewee'). For most agents the binding is `{agentId: X, accountId: X}`
so the two coincide and the call works by accident. For shared-placeholder
slots (the recruitment `interviewee` apikey reused across pre-onboard
agents) the slot label is `interviewee` not the agent_id, so the lookup
returns bindings=0 and openclaw core silently falls back to the `main`
agent — which then handles the sub-discussion turn under main's
workspace identity. Symptom: every sub-discussion interview reply
masquerades as the human user's IDENTITY.md text.

Walk cfg.bindings for the entry that ties this agentId to a fabric
account; use its accountId. Fall back to agentId when the agent has
no explicit fabric binding declared (preserves prior behavior for
agents wired before the binding format was uniform).

Verified on prod-t2 recruitment retest 2026-05-31:
  Before: routing log `accountId=analyst2 ... bindings=0`, main session
          ran instead of analyst2.
  After:  `accountId=interviewee ... bindings=1`, analyst2 session ran
          (0 main sessions in sub channel).
2026-05-31 20:32:41 +01:00
180b717eda feat: add fabric-send-sys-msg tool for system-authored Fabric posts
Thin agent-facing wrapper over the Guild backend's x-fabric-system-key
path (see [[reference_fabric_system_msg_api]]). Posts a message into a
specific channel as the guild sentinel author
(00000000-0000-0000-0000-000000000000), not as the calling agent.

Use case driver: Dialectic recruitment broadcasts. ClawSkills'
`analyze-intel` Step 4 currently posts via `fabric-send-message` which
attributes the message to the proposing agent; that's fine for DM
fallbacks but for announce-channel broadcasts the message should look
like a system lifecycle event, not a personal ping. Without this tool,
the only way to get a system-authored post was the close-sub-discussion
internal path — generic broadcast use cases had no door.

Tool shape mirrors fabric-send-message but:
- Reads channels.fabric.commandsSyncKey from openclaw config; empty →
  ok:false with a configuration error (no silent fallthrough to
  agent-bearer posting).
- Optional `wakeupUserId` plumbs through to the backend's
  emitMessageTargeted path: precise wake one recipient or fully silent
  broadcast (default). For announce-channel broadcasts the silent path
  is right — agents poll/discover, they shouldn't be woken on broadcast.
- Caller doesn't need to be a member of the channel (backend isSystem
  branch skips assertParticipant). Guild membership is still required
  because we resolve guild.endpoint from the agent's session.

Manifest gets the tool name so it surfaces in the agent registry.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 23:46:33 +01:00
152b465e64 fix(channel): include accountId in describeAccount snapshot
ChannelAccountSnapshot from openclaw plugin-sdk requires accountId
(everything else is optional). Returning bare { configured: boolean }
satisfies neither tsc -p tsconfig.json strict checks nor the runtime
contract — the snapshot fed into channelManager.getRuntimeSnapshot() is
keyed by accountId, so an unkeyed entry is dropped before
applyDescribedAccountFields ever sees the configured flag the original
PR #10 was trying to surface.

Pre-existing bug (introduced together with describeAccount in #10) that
was masked because tsc invoked without -p falls back to Node10 module
resolution, which doesn't read openclaw's package.json exports map, so
plugin-sdk/core resolves to nothing, so ChannelAccountSnapshot ends up
unknown, so { configured: boolean } passes. Tripped now because
install.mjs's build step uses `tsc -p tsconfig.json` (NodeNext) and
properly resolves the SDK types — exposes the missing accountId.

Trivial fix: pull accountId off the same ResolvedFabricAccount that
already carries it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 21:03:47 +01:00
7f96fffca9 feat: create-sub-discussion / close-sub-discussion + per-role guide injection
Adds a host-driven sub-discussion mechanism for short-lived multi-agent
exchanges where one participant (recruitment interviewee, scoped Q&A
target) has no workflow capability of its own — no Meridian state, no
installed skills, no ego identity, possibly just a placeholder Fabric
account. The host stays procedurally in control; the guest just answers
natural-language questions.

The fundamental shift from `create-discussion-channel` is the guide
plumbing: instead of a single shared guide file posted as the channel's
first message (Discord-era model), each role gets its OWN system-prompt
guide, injected at turn time via `before_prompt_build`. The host can
carry the full procedure; the guest sees a tiny orientation tailored to
"answer my questions, don't try to enter workflows you can't enter".

Three new pieces:

1. SubDiscussionStore (src/sub-discussion-store.ts) — in-memory store
   keyed by sub channelId, mirror-persisted to
   ~/.openclaw/fabric-sub-discussion.json so a gateway restart
   mid-interview doesn't strand both parties with no guide. Carries
   host agentId + userId, guest userIds, host guide text, guest guide
   text, callback (parent) channelId / guildNodeId, createdAt.

2. create-sub-discussion tool (src/tools.ts):
   - Creates a discuss-type Fabric channel with guests as members
     (host is creator → auto-included).
   - Persists a store entry indexed by the new channelId.
   - Sleeps FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS (default 500ms,
     env-overridable) for the backend's channel.joined push to land on
     guest sockets, then posts greetingMsg using the host's own Fabric
     account. Turn rotation's activation rule then puts the first guest
     on the spot with wakeup=true — no race where the host posts before
     the guest's socket subs the channel room.

3. close-sub-discussion tool (src/tools.ts):
   - Host-only (rejects non-host callers by agentId match against the
     store).
   - Posts callbackMsg back into the parent channel using the Guild's
     x-fabric-system-key path so the callback lands as a guild/system-
     authored message rather than the host's personal account.
   - Default wakeupHost=true precisely wakes the host on the parent
     channel so the next workflow step (e.g. recruitment onboard) fires
     without waiting for unrelated traffic.
   - Closes the sub channel and drops the store entry.

Plus a `before_prompt_build` hook (src/sub-discussion-hook.ts) the plugin
registers at startup:

  ctx.channelId  → store.find()  → entry
  identity.findByAgentId(ctx.agentId).fabricUserId
  ─ matches entry.hostUserId   → appendSystemContext: entry.hostGuide
  ─ in     entry.guestUserIds  → appendSystemContext: entry.guestGuide
  ─ neither                    → no injection

Fail-closed on unknown agentId / channelId — we never inject the wrong
guide, only the right one or nothing. Provider-gated to messageProvider
in {empty,'fabric'} so non-fabric triggers (HF wake, exec-event) don't
unintentionally pick up an injection.

Wiring in index.ts threads the new store + config into both
registerFabricTools (so close-sub-discussion can read the system key
from channels.fabric.commandsSyncKey) and registerSubDiscussionHook (so
the prompt hook can resolve agentId → fabricUserId via identity).

Manifest update: openclaw.plugin.json lists `create-sub-discussion`
and `close-sub-discussion` in contracts.tools so the registry surfaces
them to the agent.

Backend prereq: Fabric.Backend.Guild commit 340eed8 restores the
x-fabric-system-key bypass on POST /channels/:id/messages with
wakeupUserId support — close-sub-discussion is a no-op without it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 20:52:01 +01:00
h z
b659dadb9e Merge pull request 'fix(channel): add describeAccount so health-monitor sees real configured state' (#10) from fix/describe-account-stops-default-restart-loop into main 2026-05-26 15:49:21 +00:00
20e55849eb fix(channel): add describeAccount so health-monitor sees real configured
openclaw's `channelManager.getRuntimeSnapshot()` — called every minute
by the channel-health-monitor — runs accounts through
`applyDescribedAccountFields(next, plugin.config.describeAccount?.(...))`.
When the callback is missing it defaults `configured: true`. Fabric
never defined it, so every health-monitor cycle:

  snapshot = { enabled: true, configured: true, running: false }

For fabric's synthetic 'default' account (returned by
`listFabricAccountIds` when `channels.fabric.accounts` is empty —
the prod shape, where per-agent api-keys live in
`~/.openclaw/fabric-identity.json` and the channel framework never
runs `startAccount` so `running` stays false):

  isManagedAccount({enabled:true, configured:true}) === true
  -> not-running -> 'stopped' -> restart every ~10 min, logging
  '[fabric:default] health-monitor: restarting (reason: stopped)'

The restart is a no-op (fabric's `gateway.startAccount` is absent so
`startChannelInternal` returns early), but the log is loud and
operators chasing real outages keep wasting time on it.

Mirror `isConfigured` from describeAccount so the snapshot
truthfully reports configured:false for any account without a
fabricApiKey. The fabric plugin still self-manages real agents via
`gateway_start` -> `FabricInbound.start()`; the framework just no
longer thinks 'default' is something it should restart.

Verified in sim (this patch alone, no debug instrumentation):
- gateway up 8+ minutes, 0 restart events
- pre-patch sim with same config restarted at 5min mark
- evaluateChannelHealth snapshot for both 'default' and 'recruiter'
  accountId reads configured:false (instrumented with temporary
  console.log in channel-health-policy, since reverted)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 16:48:53 +01:00
h z
d47d3467df Merge pull request 'fix(inbound): refresh socket.io auth on (re)connect via callback' (#9) from fix/socket-auth-callback-refresh into main 2026-05-26 12:51:04 +00:00
7dc70522d1 fix(inbound): refresh socket.io auth on (re)connect via callback
Backend issues short-lived guildAccessToken (TTL=900s). The previous
`auth: { token: tok }` shape captured the JWT once in connectAgent's
closure: after socket.io's auto-reconnect the backend kept getting the
same expired JWT and silently rejected the handshake at the application
layer (RealtimeGateway logs 'socket rejected: <id>'). The client's
'connect' event still fired (TCP succeeded) so the plugin happily ran
the channel-resync, emitted join_channel into the void, and logged
'joined N channel(s)' while the backend was actually broadcasting
message.created to a room with zero subscribers. End-user symptom:
DMs/group messages to agents silently dropped 15 min after gateway
start, with no error anywhere on the agent side.

Switch to the callback form, which socket.io re-evaluates on every
(re)connect — same call site we already use for the HTTP path via
freshGuildToken/tokenCache.

Verified in sim (commit 2acb084 + this patch):
1. Connect new DM channel + post msg -> dispatch + reply ✓
2. `docker restart fabric-backend-guild` to force socket disconnect
3. Plugin reconnects automatically and logs
   'fabric: agent recruiter joined 12 channel(s) on sim-guild-1' ✓
   (without the fix this reconnect was silently rejected; sim used to
    log 'WARN socket rejected: <id>' on the guild backend)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 13:50:24 +01:00
h z
2acb084ee4 fix(presence-sync): tick mutex (#8) 2026-05-26 02:06:21 +00:00
9419d270e5 fix(presence-sync): tick mutex so setInterval overlap can't spawn parallel ticks
The presence-sync tick iterates accounts serially with await on each
agent-login + PUT round-trip — a single tick can easily run 20+s when
there are several accounts. setInterval(intervalMs) does NOT wait for
the previous tick to finish, so on a busy gateway the next tick fires
on top of a still-running one and two parallel iterations each PUT
the same agentId within ~10 ms. That tipped the guild backend's
first-time-insert race (separate fix in nav/Fabric.Backend.Guild) into
500s on prod (caught in t2 gateway 2026-05-25 23:23:35Z; 6 of 6 agents
showed paired log lines 4-10 ms apart for the same agent → idle).

Fix: a simple `inflight` boolean. tick() returns immediately if
already running; the next interval beat catches up. lastStatus !==
bridge.get gating already means status changes catch the next tick
anyway, so skipping a beat costs nothing the next beat won't fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:08 +01:00
h z
79b29db26c fix(presence-sync): /api prefix + Bearer guildAccessToken (#7) 2026-05-25 23:17:45 +00:00
a87de27cff fix(presence-sync): use /api prefix + Bearer guildAccessToken (not x-api-key)
Two layered bugs in the presence-sync loop, both causing every PUT to
fail forever in prod:

1. **Missing /api prefix.** URL was `${guildBaseUrl}/agents/<id>/presence`
   but the guild backend sets a global prefix 'api' in main.ts
   `setGlobalPrefix('api')`. Every other REST call in this plugin
   (channel.ts channels list, fabric-client.ts postMessage, canvas)
   already prepends /api/ — only presence-sync missed it. Returned 404
   "Cannot PUT /agents/...".

2. **Wrong auth scheme.** Plugin sent `x-api-key: <fabricApiKey>`, but
   the endpoint sits behind the global APP_GUARD = ApiKeyGuard, which
   actually expects `Authorization: Bearer <guildAccessToken>` (despite
   its name — confusing naming on the backend side). With /api added,
   error became 401 "missing bearer token". Confirmed by `docker exec
   fabric-backend-guild grep APP_GUARD /app/dist/app.module.js` and
   manual curl: Bearer guild token → 200 OK.

**Fix**

- presence-sync.ts: do agent-login on demand to obtain a fresh
  guildAccessToken, cache it per-agent for 13 min (under the 15-min
  JWT TTL), use it as Bearer for the PUT. 401 response invalidates
  the cache so the next tick re-logs-in. Pushes are gated on status
  changes (rare), so the login overhead is negligible.

- inbound.ts: firstGuildEndpointByAgent → firstGuildByAgent storing
  both endpoint and nodeId (presence-sync needs nodeId to pick the
  right token out of guildAccessTokens[]).

- index.ts: pass FabricClient to PresenceSync constructor.

**Verified in sim**

After restart, gateway log shows `fabric: presence-sync recruiter →
idle` (200 OK), zero failed PUTs, where previously it would log a 404
every ~5s per agent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 23:54:38 +01:00
12 changed files with 984 additions and 55 deletions

View File

@@ -94,7 +94,7 @@ export default defineChannelPluginEntry({
void inbound.start().then(() => { void inbound.start().then(() => {
if (!inbound) if (!inbound)
return; return;
presence = new PresenceSync(api.logger); presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts()); presence.setAccounts(inbound.getPresenceAccounts());
presence.start(); presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`); api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);

View File

@@ -117,6 +117,19 @@ export const fabricChannelPlugin = createChatChannelPlugin({
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId), resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId),
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg), defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg),
isConfigured: (account) => Boolean(account.fabricApiKey), isConfigured: (account) => Boolean(account.fabricApiKey),
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
// by the channel-health-monitor — defaults `configured: true` when the
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
// in server-channels). Without this, fabric's synthetic 'default'
// account (returned by listFabricAccountIds when channels.fabric.accounts
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
// running:false} → isManagedAccount=true → not-running → restart loop
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
// Mirror isConfigured here so the snapshot truthfully reports false for
// any account without a fabricApiKey.
describeAccount: (account) => ({
configured: Boolean(account.fabricApiKey),
}),
}, },
// Minimal setup adapter: Fabric is configured directly under // Minimal setup adapter: Fabric is configured directly under
// channels.fabric.* (no interactive wizard). applyAccountConfig is the // channels.fabric.* (no interactive wizard). applyAccountConfig is the

View File

@@ -228,38 +228,59 @@ export class FabricInbound {
for (const entry of this.identity.list()) { for (const entry of this.identity.list()) {
if (!entry.fabricUserId) if (!entry.fabricUserId)
continue; continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId); const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuildUrl) if (!presenceGuild)
continue; continue;
out.push({ out.push({
agentId: entry.agentId, agentId: entry.agentId,
fabricUserId: entry.fabricUserId, fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl, guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey, fabricApiKey: entry.fabricApiKey,
}); });
} }
return out; return out;
} }
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first // Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target). // guild per agent (used as the presence-push target). Stores both
firstGuildEndpointByAgent = new Map(); // endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
firstGuildByAgent = new Map();
async connectAgent(agentId, session) { async connectAgent(agentId, session) {
const selfUserId = session.user.id; const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is // First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a // already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern. // valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) { if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0); const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) if (firstGuild)
this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint); this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
} }
for (const g of session.guilds) { for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) if (!tok)
continue; continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, { const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'], transports: ['websocket'],
auth: { token: tok }, auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false, autoConnect: false,
}); });
// Tracked socket.io rooms for this (agent, guild). The initial fetch // Tracked socket.io rooms for this (agent, guild). The initial fetch

View File

@@ -1,26 +1,25 @@
/** // Guild access JWTs expire every 900s. Refresh ~2 min early to stay
* presence-sync — read each connected agent's HF status (via the // safely inside the window even if a tick runs late.
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
* on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
export class PresenceSync { export class PresenceSync {
logger; logger;
client;
timer = null; timer = null;
lastStatus = new Map(); // by agentId lastStatus = new Map(); // by agentId
accounts = new Map(); accounts = new Map();
constructor(logger) { tokenCache = new Map(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
inflight = false;
constructor(logger, client) {
this.logger = logger; this.logger = logger;
this.client = client;
} }
setAccounts(accounts) { setAccounts(accounts) {
this.accounts.clear(); this.accounts.clear();
@@ -42,7 +41,49 @@ export class PresenceSync {
this.timer = null; this.timer = null;
} }
} }
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
async ensureGuildToken(acct) {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now)
return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
}
catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
async tick() { async tick() {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight)
return;
this.inflight = true;
try {
await this.tickInner();
}
finally {
this.inflight = false;
}
}
async tickInner() {
const bridge = globalThis['__hfAgentStatus']; const bridge = globalThis['__hfAgentStatus'];
if (!bridge || typeof bridge.get !== 'function') if (!bridge || typeof bridge.get !== 'function')
return; // HF plugin not loaded — skip return; // HF plugin not loaded — skip
@@ -58,13 +99,22 @@ export class PresenceSync {
continue; continue;
if (this.lastStatus.get(agentId) === status) if (this.lastStatus.get(agentId) === status)
continue; // no change → no PUT continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken)
continue;
try { try {
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`; // Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, { const res = await fetch(url, {
method: 'PUT', method: 'PUT',
headers: { headers: {
'content-type': 'application/json', 'content-type': 'application/json',
'x-api-key': acct.fabricApiKey, authorization: `Bearer ${guildToken}`,
}, },
body: JSON.stringify({ status, source: 'hf-plugin' }), body: JSON.stringify({ status, source: 'hf-plugin' }),
}); });
@@ -73,6 +123,11 @@ export class PresenceSync {
this.logger.info(`fabric: presence-sync ${agentId}${status}`); this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} }
else { else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401)
this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`); this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
} }
} }

View File

@@ -15,6 +15,8 @@ import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js'; import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js'; import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js'; import { PresenceSync } from './src/presence-sync.js';
import { SubDiscussionStore } from './src/sub-discussion-store.js';
import { registerSubDiscussionHook } from './src/sub-discussion-hook.js';
import path from 'node:path'; import path from 'node:path';
import os from 'node:os'; import os from 'node:os';
@@ -48,19 +50,38 @@ export default defineChannelPluginEntry({
on: (ev: string, fn: (...args: unknown[]) => unknown) => void; on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
registerTool: (d: unknown) => void; registerTool: (d: unknown) => void;
}; };
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } }; const cfg = (api.config ?? {}) as {
channels?: { fabric?: { centerApiBase?: string; commandsSyncKey?: string } };
};
const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api'; const centerApiBase = cfg.channels?.fabric?.centerApiBase ?? 'http://localhost:7001/api';
const idFile = const idFile =
api.pluginConfig?.identityFilePath ?? api.pluginConfig?.identityFilePath ??
path.join(os.homedir(), '.openclaw', 'fabric-identity.json'); path.join(os.homedir(), '.openclaw', 'fabric-identity.json');
const subDiscussionFile = path.join(
os.homedir(),
'.openclaw',
'fabric-sub-discussion.json',
);
// tools operate against a default Center; per-account keys come from config // tools operate against a default Center; per-account keys come from config
const client = new FabricClient(centerApiBase); const client = new FabricClient(centerApiBase);
const identity = new IdentityRegistry(idFile); const identity = new IdentityRegistry(idFile);
const subDiscussion = new SubDiscussionStore(subDiscussionFile);
registerFabricTools( registerFabricTools(
{ registerTool: (d) => api.registerTool(d), logger: api.logger }, { registerTool: (d) => api.registerTool(d), logger: api.logger },
client, client,
identity, identity,
subDiscussion,
cfg,
);
// Per-(agent, channel) prompt injection for sub-discussion channels.
// Runs as a sibling to PrismFacet's before_prompt_build hook (and
// ClawPrompts' fabric-chat-injector); openclaw composes
// appendSystemContext from all registered handlers.
registerSubDiscussionHook(
{ on: api.on, logger: api.logger },
subDiscussion,
identity,
); );
// Cross-plugin API: globalThis.__fabric // Cross-plugin API: globalThis.__fabric
@@ -75,13 +96,29 @@ export default defineChannelPluginEntry({
// fall back to "assume DM" — fail closed on unknown. // fall back to "assume DM" — fail closed on unknown.
{ {
const _G = globalThis as Record<string, unknown>; const _G = globalThis as Record<string, unknown>;
_G['__fabric'] = { getChannelType }; _G['__fabric'] = {
getChannelType,
// Dynamic-subscription bridges: tools (notably `fabric-register`)
// call these to add/remove an account's inbound socket without
// a gateway restart. Both delegate to the live FabricInbound
// instance via the module-level `inbound` closure variable; the
// closures stay valid across gateway_start / gateway_stop
// because we re-assign the variable, not the property.
addAccount: async (entry: { agentId: string; fabricApiKey: string }) => {
if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)');
await inbound.addAccount(entry);
},
removeAccount: (agentId: string) => {
if (!inbound) return;
inbound.removeAccount(agentId);
},
};
// Flush channel-meta cache when the gateway shuts down so // Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost. // recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => { api.on('gateway_stop', () => {
try { flushChannelMeta(); } catch { /* ignore */ } try { flushChannelMeta(); } catch { /* ignore */ }
}); });
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)'); api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)');
} }
api.on('gateway_start', () => { api.on('gateway_start', () => {
@@ -116,7 +153,7 @@ export default defineChannelPluginEntry({
// their fabricUserId + first guild endpoint populated). // their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => { void inbound.start().then(() => {
if (!inbound) return; if (!inbound) return;
presence = new PresenceSync(api.logger); presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts()); presence.setAccounts(inbound.getPresenceAccounts());
presence.start(); presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`); api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);

View File

@@ -14,10 +14,13 @@
"create-work-channel", "create-work-channel",
"create-report-channel", "create-report-channel",
"create-discussion-channel", "create-discussion-channel",
"create-sub-discussion",
"discussion-complete", "discussion-complete",
"close-sub-discussion",
"fabric-canvas", "fabric-canvas",
"fabric-channel", "fabric-channel",
"fabric-send-message", "fabric-send-message",
"fabric-send-sys-msg",
"fabric-channel-list", "fabric-channel-list",
"fabric-message-history", "fabric-message-history",
"fabric-guild-list", "fabric-guild-list",

View File

@@ -153,6 +153,20 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId), resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId),
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never), defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never),
isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey), isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey),
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
// by the channel-health-monitor — defaults `configured: true` when the
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
// in server-channels). Without this, fabric's synthetic 'default'
// account (returned by listFabricAccountIds when channels.fabric.accounts
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
// running:false} → isManagedAccount=true → not-running → restart loop
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
// Mirror isConfigured here so the snapshot truthfully reports false for
// any account without a fabricApiKey.
describeAccount: (account: ResolvedFabricAccount) => ({
accountId: account.accountId,
configured: Boolean(account.fabricApiKey),
}),
}, },
// Minimal setup adapter: Fabric is configured directly under // Minimal setup adapter: Fabric is configured directly under
// channels.fabric.* (no interactive wizard). applyAccountConfig is the // channels.fabric.* (no interactive wizard). applyAccountConfig is the

View File

@@ -51,8 +51,38 @@ type FabricMessage = {
xType?: string; xType?: string;
}; };
// Walk cfg.bindings for the entry that ties `agentId` to a fabric account.
// Returns the binding's match.accountId (the slot label routing keys on);
// returns undefined when the agent has no explicit fabric binding so the
// caller can fall back to agentId without changing pre-existing semantics
// for agents whose binding accountId == agent_id anyway.
function findFabricBindingAccountId(cfg: unknown, agentId: string): string | undefined {
const bindings = (cfg as { bindings?: Array<{
agentId?: string;
match?: { channel?: string; accountId?: string };
}> })?.bindings;
if (!Array.isArray(bindings)) return undefined;
for (const b of bindings) {
if (
b?.agentId === agentId &&
b?.match?.channel === 'fabric' &&
typeof b?.match?.accountId === 'string' &&
b.match.accountId.length > 0
) {
return b.match.accountId;
}
}
return undefined;
}
export class FabricInbound { export class FabricInbound {
private sockets: Socket[] = []; private sockets: Socket[] = [];
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
// to tear down ONE agent without restarting the whole inbound. New
// sockets get appended on `connectAgent`; both maps are emptied by
// `stop()`.
private socketsByAgent = new Map<string, Socket[]>();
private timersByAgent = new Map<string, NodeJS.Timeout[]>();
private seen = new Set<string>(); private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild). // Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken // Without this, the agent's socket.io subscriptions are a snapshot taken
@@ -263,6 +293,71 @@ export class FabricInbound {
this.channelSyncTimers = []; this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect(); for (const s of this.sockets) s.disconnect();
this.sockets = []; this.sockets = [];
this.socketsByAgent.clear();
this.timersByAgent.clear();
}
/**
* Bring up ONE new account at runtime (no gateway restart).
*
* Mirrors what `start()` does per entry: login to Center, upsert the
* identity registry, open the socket(s). Idempotent: re-calling with
* the same agentId tears down the previous socket(s) first so the
* fresh apikey replaces the stale one (recruitment onboard rotates
* the agent from the shared `interviewee` placeholder to a real
* per-agent apikey — the old `interviewee` socket must drop before
* the new one comes up or the agent ends up subscribed to both users
* at once).
*
* Used by the `fabric-register` openclaw tool to make recruitment
* end-to-end without a gateway restart between `new-agent` and the
* interview's sub-discussion dispatch.
*/
async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise<void> {
if (this.socketsByAgent.has(entry.agentId)) {
this.removeAccount(entry.agentId);
}
const session = await this.client.agentLogin(entry.fabricApiKey);
this.identity.upsert({
agentId: entry.agentId,
fabricApiKey: entry.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
await this.connectAgent(entry.agentId, session);
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
}
/**
* Tear down ONE account's sockets + timers without touching others.
* Caller is responsible for any identity-registry cleanup; this only
* drops the live socket subscription so the agent stops receiving
* Fabric pushes.
*/
removeAccount(agentId: string): void {
const sockets = this.socketsByAgent.get(agentId);
if (sockets) {
for (const s of sockets) {
try { s.disconnect(); } catch { /* socket already dead */ }
// Also remove from the flat list so `stop()` doesn't double-close.
const idx = this.sockets.indexOf(s);
if (idx !== -1) this.sockets.splice(idx, 1);
}
this.socketsByAgent.delete(agentId);
}
const timers = this.timersByAgent.get(agentId);
if (timers) {
for (const t of timers) {
clearInterval(t);
const idx = this.channelSyncTimers.indexOf(t);
if (idx !== -1) this.channelSyncTimers.splice(idx, 1);
}
this.timersByAgent.delete(agentId);
}
this.firstGuildByAgent.delete(agentId);
this.tokenCache.delete(agentId);
this.agentStatusCache.delete(agentId);
this.log.info(`fabric: agent ${agentId} dynamically removed`);
} }
/** /**
@@ -281,17 +376,25 @@ export class FabricInbound {
agentId: string; agentId: string;
fabricUserId: string; fabricUserId: string;
guildBaseUrl: string; guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string; fabricApiKey: string;
}> { }> {
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = []; const out: Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> = [];
for (const entry of this.identity.list()) { for (const entry of this.identity.list()) {
if (!entry.fabricUserId) continue; if (!entry.fabricUserId) continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId); const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuildUrl) continue; if (!presenceGuild) continue;
out.push({ out.push({
agentId: entry.agentId, agentId: entry.agentId,
fabricUserId: entry.fabricUserId, fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl, guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey, fabricApiKey: entry.fabricApiKey,
}); });
} }
@@ -299,24 +402,44 @@ export class FabricInbound {
} }
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first // Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target). // guild per agent (used as the presence-push target). Stores both
private firstGuildEndpointByAgent = new Map<string, string>(); // endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
private async connectAgent(agentId: string, session: FabricSession): Promise<void> { private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
const selfUserId = session.user.id; const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is // First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a // already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern. // valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) { if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0); const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint); if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
} }
for (const g of session.guilds) { for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue; if (!tok) continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, { const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'], transports: ['websocket'],
auth: { token: tok }, auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false, autoConnect: false,
}); });
// Tracked socket.io rooms for this (agent, guild). The initial fetch // Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -400,6 +523,9 @@ export class FabricInbound {
FabricInbound.CHANNEL_SYNC_INTERVAL_MS, FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
); );
this.channelSyncTimers.push(syncTimer); this.channelSyncTimers.push(syncTimer);
const agentTimers = this.timersByAgent.get(agentId) ?? [];
agentTimers.push(syncTimer);
this.timersByAgent.set(agentId, agentTimers);
socket.on('message.created', (m: FabricMessage) => { socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? ''; const channelId = m.channelId ?? '';
if (!channelId) return; if (!channelId) return;
@@ -445,6 +571,11 @@ export class FabricInbound {
}); });
socket.connect(); socket.connect();
this.sockets.push(socket); this.sockets.push(socket);
// Track per-agent so addAccount/removeAccount can teardown
// independently without disturbing other agents.
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
agentSockets.push(socket);
this.socketsByAgent.set(agentId, agentSockets);
} }
} }
@@ -513,10 +644,22 @@ export class FabricInbound {
// (commands-handlers `isDirectMessage` checks ChatType==='direct') // (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn. // misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType); const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
// resolveAgentRoute needs the *binding* accountId (the channel-side
// slot name) — not the openclaw agentId. For most agents the binding
// is `{agentId: X, match: {channel: fabric, accountId: X}}` so the
// two coincide; but for shared-placeholder cases (e.g. the recruitment
// `interviewee` slot bound to multiple agents over its lifetime) the
// binding accountId is the slot label ("interviewee", "Neon", …) not
// the agent_id. Passing agentId there returned bindings=0 and silently
// fell back to `main`, hijacking sub-discussion turns. Look up the
// agent's fabric binding accountId here; fall back to agentId when no
// explicit binding exists (preserves prior behavior for agents with
// no fabric binding declared).
const bindingAccountId = findFabricBindingAccountId(this.cfg, agentId) ?? agentId;
const route = core.channel.routing.resolveAgentRoute({ const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg, cfg: this.cfg,
channel: 'fabric', channel: 'fabric',
accountId: agentId, accountId: bindingAccountId,
peer: { kind: peerKind, id: channelId }, peer: { kind: peerKind, id: channelId },
}); });
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {

View File

@@ -2,18 +2,26 @@
* presence-sync — read each connected agent's HF status (via the * presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by * cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild * HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /agents/:userId/presence` so the backend can apply busy-discard * `PUT /api/agents/:userId/presence` so the backend can apply
* on `announce`-type channel deliveries. * busy-discard on `announce`-type channel deliveries.
* *
* Push model: we only PUT when an agent's status actually changes * Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache * (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling. * to absorb the every-30s polling.
* *
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
* app.module.js) which expects `Authorization: Bearer <guild-token>`
* — NOT the agent's fabricApiKey directly. So before each PUT we do
* a fresh agent-login (or reuse a cached token if still within its
* 15-min JWT TTL) and pull the guildAccessToken matching the target
* guild. Status changes are rare enough that login overhead is fine.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop * If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is * is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy * treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in. * filtering simply doesn't kick in.
*/ */
import type { FabricClient } from './fabric-client.js';
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline'; type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> }; type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
@@ -23,15 +31,36 @@ export interface PresenceSyncAccount {
agentId: string; agentId: string;
fabricUserId: string; // the agent's Fabric Center user id (UUID) fabricUserId: string; // the agent's Fabric Center user id (UUID)
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id> guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
fabricApiKey: string; // existing per-account key guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
fabricApiKey: string; // existing per-account key (used for agent-login)
}
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
interface CachedToken {
token: string;
expiresAt: number; // epoch ms
} }
export class PresenceSync { export class PresenceSync {
private timer: ReturnType<typeof setInterval> | null = null; private timer: ReturnType<typeof setInterval> | null = null;
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
private readonly accounts = new Map<string, PresenceSyncAccount>(); private readonly accounts = new Map<string, PresenceSyncAccount>();
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
constructor(private readonly logger: Logger) {} // Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
private inflight = false;
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
setAccounts(accounts: PresenceSyncAccount[]): void { setAccounts(accounts: PresenceSyncAccount[]): void {
this.accounts.clear(); this.accounts.clear();
@@ -54,7 +83,50 @@ export class PresenceSync {
} }
} }
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now) return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
} catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
private async tick(): Promise<void> { private async tick(): Promise<void> {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight) return;
this.inflight = true;
try {
await this.tickInner();
} finally {
this.inflight = false;
}
}
private async tickInner(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined; const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
@@ -68,13 +140,22 @@ export class PresenceSync {
if (!status) continue; if (!status) continue;
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken) continue;
try { try {
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`; // Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, { const res = await fetch(url, {
method: 'PUT', method: 'PUT',
headers: { headers: {
'content-type': 'application/json', 'content-type': 'application/json',
'x-api-key': acct.fabricApiKey, authorization: `Bearer ${guildToken}`,
}, },
body: JSON.stringify({ status, source: 'hf-plugin' }), body: JSON.stringify({ status, source: 'hf-plugin' }),
}); });
@@ -82,6 +163,10 @@ export class PresenceSync {
this.lastStatus.set(agentId, status); this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`); this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} else { } else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401) this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`); this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
} }
} catch (err) { } catch (err) {

View File

@@ -0,0 +1,76 @@
import type { IdentityRegistry } from './identity.js';
import type { SubDiscussionStore } from './sub-discussion-store.js';
// Plugin-local before_prompt_build hook that injects per-(agent, channel)
// guides for sub-discussion channels created via the `create-sub-discussion`
// tool. Mirrors the pattern used by ClawPrompts' fabric-chat-injector
// (channelId-aware injection) but with content dynamically supplied at
// channel-creation time instead of read from static files via PrismFacet's
// router/rule registry.
//
// Match logic per turn:
// ctx.channelId → store.find() → sub-discussion entry
// ctx.agentId → identity.findByAgentId().fabricUserId
// ─ matches entry.hostUserId → inject hostGuide
// ─ matches entry.guestUserIds → inject guestGuide
// ─ neither → no injection
//
// Fail-closed on unknown agentId/channelId — we never inject "the wrong"
// guide, only the right one or nothing.
const _G = globalThis as Record<string, unknown>;
const DEDUP_KEY = '_fabricSubDiscussionHookDedup';
type PromptCtx = {
agentId?: string;
channelId?: string;
messageProvider?: string;
};
export function registerSubDiscussionHook(
api: {
on: (hook: string, handler: (...args: unknown[]) => unknown) => void;
logger: { info: (m: string) => void; warn: (m: string) => void };
},
store: SubDiscussionStore,
identity: IdentityRegistry,
): void {
if (!(_G[DEDUP_KEY] instanceof WeakSet)) _G[DEDUP_KEY] = new WeakSet<object>();
const dedup = _G[DEDUP_KEY] as WeakSet<object>;
api.on('before_prompt_build', async (...args: unknown[]) => {
const event = args[0];
const ctx = (args[1] ?? {}) as PromptCtx;
// The hook fires both for fabric-driven turns (channelId set) and
// for other triggers (HF wake, exec-event, etc.) — drop those.
if (typeof event === 'object' && event !== null) {
if (dedup.has(event)) return undefined;
dedup.add(event);
}
const agentId = (ctx.agentId ?? '').trim();
const channelId = (ctx.channelId ?? '').trim();
if (!agentId || !channelId) return undefined;
const provider = (ctx.messageProvider ?? '').toLowerCase();
if (provider && provider !== 'fabric') return undefined;
const entry = store.find(channelId);
if (!entry) return undefined;
const ident = identity.findByAgentId(agentId);
const myUserId = (ident?.fabricUserId ?? '').trim();
if (!myUserId) {
// identity registry caches fabricUserId after the first agentLogin
// in inbound.ts. If it's missing here, the agent likely hasn't
// completed login yet — skip rather than guess.
return undefined;
}
if (myUserId === entry.hostUserId) {
return { appendSystemContext: entry.hostGuide };
}
if (entry.guestUserIds.includes(myUserId)) {
return { appendSystemContext: entry.guestGuide };
}
return undefined;
});
}

View File

@@ -0,0 +1,76 @@
import { readFileSync, writeFileSync, existsSync, mkdirSync } from 'node:fs';
import { dirname } from 'node:path';
// Records per-(sub-discussion-channel) state created by the
// `create-sub-discussion` tool and consumed by:
// 1. `before_prompt_build` hook — looks up by (agentId, channelId) to
// inject the host or guest guide as appendSystemContext.
// 2. `close-sub-discussion` tool — looks up by sub channelId to find
// the parent channel to post the callback to and to validate that
// the caller is the original host.
//
// One sub-discussion = one channel. Lifetime: from create-sub-discussion
// return until close-sub-discussion (or gateway-stop / disk corruption).
// We persist to a small JSON file so a gateway restart mid-interview
// doesn't strand both host and guest with no guide.
export type SubDiscussionEntry = {
subChannelId: string;
hostAgentId: string; // the openclaw agentId that called create-sub-discussion
hostUserId: string; // the host's Fabric Center userId (session.user.id)
guestUserIds: string[]; // Fabric Center userIds invited as guests
hostGuide: string; // appended to host's session system prompt while in this channel
guestGuide: string; // appended to each guest's session system prompt while in this channel
callbackGuildNodeId: string; // where to post the callback when close is called
callbackChannelId: string; // parent channel to post callback to (system msg)
createdAt: string; // ISO timestamp
};
type StoreFile = { entries: SubDiscussionEntry[] };
export class SubDiscussionStore {
private byChannelId = new Map<string, SubDiscussionEntry>();
constructor(private readonly filePath: string) {
this.load();
}
private load(): void {
if (!existsSync(this.filePath)) return;
try {
const data = JSON.parse(readFileSync(this.filePath, 'utf8')) as StoreFile;
for (const e of data.entries ?? []) {
if (e?.subChannelId) this.byChannelId.set(e.subChannelId, e);
}
} catch {
// Corrupt file → start empty; first mutation rewrites cleanly.
}
}
private persist(): void {
mkdirSync(dirname(this.filePath), { recursive: true });
const data: StoreFile = { entries: [...this.byChannelId.values()] };
writeFileSync(this.filePath, JSON.stringify(data, null, 2));
}
list(): SubDiscussionEntry[] {
return [...this.byChannelId.values()];
}
find(subChannelId: string): SubDiscussionEntry | undefined {
return this.byChannelId.get(subChannelId);
}
add(entry: SubDiscussionEntry): void {
this.byChannelId.set(entry.subChannelId, entry);
this.persist();
}
remove(subChannelId: string): SubDiscussionEntry | undefined {
const e = this.byChannelId.get(subChannelId);
if (!e) return undefined;
this.byChannelId.delete(subChannelId);
this.persist();
return e;
}
}

View File

@@ -1,5 +1,7 @@
import type { FabricClient } from './fabric-client.js'; import type { FabricClient } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js'; import type { IdentityRegistry } from './identity.js';
import type { SubDiscussionStore } from './sub-discussion-store.js';
import { resolveCommandsSyncKey } from './accounts.js';
// OpenClaw tool registration api (loose typing — concrete shape from // OpenClaw tool registration api (loose typing — concrete shape from
// openclaw/plugin-sdk/core at host SDK version). // openclaw/plugin-sdk/core at host SDK version).
@@ -10,6 +12,9 @@ type ToolApi = {
type Ctx = { agentId?: string }; type Ctx = { agentId?: string };
// Loose config shape — just the fields we read here.
type ToolsCfg = { channels?: { fabric?: { commandsSyncKey?: string } }; [k: string]: unknown };
const X_BY_KIND: Record<string, string> = { const X_BY_KIND: Record<string, string> = {
chat: 'general', chat: 'general',
work: 'work', work: 'work',
@@ -17,19 +22,34 @@ const X_BY_KIND: Record<string, string> = {
discussion: 'discuss', discussion: 'discuss',
}; };
// Delay between create-sub-discussion's channel create and greeting post.
// Backend pushes channel.joined to invitee sockets on create; that push
// has to traverse socket.io rooms before the guest plugin can sub the
// channel:<id> room. If the greeting is posted before that, the guest's
// turn-activation wakeup misses (the socket isn't in the room yet).
// 500ms is empirically slack enough on local sim + production t3, and
// short enough not to feel laggy from the host's tool-result POV. Bump
// via FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS env if needed.
const GREETING_DELAY_MS = (() => {
const v = Number.parseInt(process.env.FABRIC_SUB_DISCUSSION_GREETING_DELAY_MS ?? '', 10);
return Number.isFinite(v) && v >= 0 ? v : 500;
})();
export function registerFabricTools( export function registerFabricTools(
api: ToolApi, api: ToolApi,
client: FabricClient, client: FabricClient,
identity: IdentityRegistry, identity: IdentityRegistry,
store: SubDiscussionStore,
cfg: ToolsCfg,
): void { ): void {
// Resolve the calling agent's Fabric session + a guild's token/endpoint. // Resolve the calling agent's Fabric session + a guild's token/endpoint.
const ctxGuild = async (agentId: string, guildNodeId: string) => { const ctxGuild = async (agentId: string, guildNodeId: string) => {
const entry = identity.findByAgentId(agentId); const entry = identity.findByAgentId(agentId);
if (!entry) if (!entry)
throw new Error( throw new Error(
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` + `agent ${agentId} not registered — call the openclaw \`fabric-register\` ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` + `tool (apiKey: <fak_…>, agentId: ${agentId}); the dynamic-subscription ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`, `path brings the socket up immediately, no gateway restart needed`,
); );
const session = await client.agentLogin(entry.fabricApiKey); const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId); const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
@@ -38,10 +58,74 @@ export function registerFabricTools(
return { session, guild, token }; return { session, guild, token };
}; };
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool. // Bind an agent's Fabric API key — validates the key against Center,
// It's a one-time step done out-of-band via the installed script // upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id) // socket immediately via the live FabricInbound instance (no gateway
// or via static config (channels.fabric.accounts.<agentId>). // restart). The standalone binary `~/.openclaw/bin/fabric-register`
// still exists for one-time bootstrap before the gateway runs, but
// recruitment's `register-agent` script should prefer this tool path
// so the new agent's socket is live before `interviewer` fires.
api.registerTool((ctx: Ctx) => ({
name: 'fabric-register',
description:
'Bind an agent to a Fabric Center API key. Validates the key, writes ' +
'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' +
'inbound socket immediately so the agent receives Fabric pushes ' +
'without a gateway restart. Caller defaults to the current agent; ' +
'pass `agentId` to bind on behalf of another agent (recruitment use).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['apiKey'],
properties: {
apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' },
agentId: {
type: 'string',
description:
'Agent to register. Defaults to the calling agent (ctx.agentId). ' +
'Recruitment onboarding may override this when wiring a freshly ' +
'created agent before that agent has a session of its own.',
},
},
},
execute: async (_id: string, p: { apiKey: string; agentId?: string }) => {
const agentId = p.agentId ?? ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' };
if (!p.apiKey || typeof p.apiKey !== 'string') {
return { ok: false, error: 'apiKey required' };
}
// Delegate to FabricInbound.addAccount via the cross-plugin bridge.
// The bridge is installed in index.ts when inbound spins up; if it's
// not present yet, the gateway is still starting and the caller should
// retry (rare path — only hit during the gateway_start window).
const fabricApi = (globalThis as Record<string, unknown>)['__fabric'] as
| { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise<void> }
| undefined;
if (!fabricApi?.addAccount) {
return {
ok: false,
error:
'fabric inbound not ready (gateway still starting?). Fall back to ' +
'~/.openclaw/bin/fabric-register or retry after a few seconds.',
};
}
try {
await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey });
} catch (err) {
return {
ok: false,
error: `fabric-register failed: ${String(err)}`,
};
}
const entry = identity.findByAgentId(agentId);
return {
ok: true,
agentId,
fabricUserId: entry?.fabricUserId,
displayName: entry?.displayName,
};
},
}));
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') => const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({ api.registerTool((ctx: Ctx) => ({
@@ -134,6 +218,245 @@ export function registerFabricTools(
}, },
})); }));
// ───────────────────────────────────────────────────────────────────
// create-sub-discussion: open a discuss-type sub-channel hanging off
// the caller's current channel. Designed for host-driven multi-turn
// exchanges (interview, brainstorm, narrow Q&A) where the guests are
// either fresh agents without workflow capability (recruitment
// interviewee) or peers that just need a short scoped chat without
// entering their own subflow.
//
// What it does on top of plain create-discussion-channel:
// 1. Persists a store entry indexed by the new sub channelId, carrying:
// host agent + userId, guest userIds, host/guest guide texts,
// callback (parent) channel info.
// 2. Auto-posts `greetingMsg` using the host's own Fabric account so
// turn rotation's activation rule (first author → newOrder[0],
// currentSpeaker → newOrder[1], wakeup → newOrder[1]) puts the
// first guest on the spot immediately — no race where host posts
// before guest's socket subs the channel room (we wait
// GREETING_DELAY_MS for backend's channel.joined push to land).
// 3. The accompanying before_prompt_build hook (sub-discussion-hook
// registered from index.ts) then injects `hostGuideMsg` into the
// host's session prompt and `guestGuideMsg` into each guest's
// session prompt whenever a turn in this channel fires — so the
// two roles see different instructions, no shared guide file.
// ───────────────────────────────────────────────────────────────────
api.registerTool((ctx: Ctx) => ({
name: 'create-sub-discussion',
description:
'Open a host-driven sub-discussion channel (x_type=discuss) hanging off your current channel, ' +
'with role-specific system-prompt guides for host and guests. Use this for interviews / scoped ' +
'Q&A where you stay in control of when the conversation ends. Returns the sub channelId; ' +
'reach it via fabric-send-message in the rotating turn order. Close with close-sub-discussion ' +
'to write a callback back into the parent channel.',
parameters: {
type: 'object',
additionalProperties: false,
required: [
'guildNodeId',
'currentChannelId',
'channelName',
'greetingMsg',
'hostGuideMsg',
'guestGuideMsg',
'guests',
],
properties: {
guildNodeId: { type: 'string', description: 'Fabric guild node id (same guild for parent + sub).' },
currentChannelId: {
type: 'string',
description: 'Channel id you are currently in (parent). Used as the callback target on close.',
},
channelName: { type: 'string', description: 'Display name for the new sub-discussion channel.' },
greetingMsg: {
type: 'string',
description:
'First message posted by YOU (the host) in the sub channel. Triggers turn rotation so ' +
"the first guest's session wakes immediately with both your greeting in history and the " +
'guest guide injected as system prompt.',
},
hostGuideMsg: {
type: 'string',
description:
"Appended to YOUR session's system prompt whenever a turn fires in this sub channel. " +
'Use it to remind yourself of the procedure (what to ask, when to call close-sub-discussion).',
},
guestGuideMsg: {
type: 'string',
description:
"Appended to EACH GUEST's session system prompt for turns in this sub channel. Use it to " +
'orient guests with no prior workflow context (e.g. a fresh interviewee). Keep it short; ' +
'long guides bloat every turn.',
},
guests: {
type: 'array',
items: { type: 'string' },
minItems: 1,
description:
'Fabric Center userIds invited as guests. Resolve via fabric-channel-list members or the ' +
'<name>@<role>.hangman-lab.top email convention.',
},
purpose: { type: 'string', description: 'Optional channel.purpose for discoverability.' },
},
},
execute: async (
_id: string,
p: {
guildNodeId: string;
currentChannelId: string;
channelName: string;
greetingMsg: string;
hostGuideMsg: string;
guestGuideMsg: string;
guests: string[];
purpose?: string;
},
) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
if (!Array.isArray(p.guests) || p.guests.length === 0) {
return { ok: false, error: 'guests must be a non-empty array of Fabric userIds' };
}
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ch = await client.createChannel(guild.endpoint, token, {
guildId: p.guildNodeId,
name: p.channelName,
xType: 'discuss',
isPublic: false,
memberUserIds: p.guests,
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
});
store.add({
subChannelId: ch.id,
hostAgentId: agentId,
hostUserId: session.user.id,
guestUserIds: [...p.guests],
hostGuide: p.hostGuideMsg,
guestGuide: p.guestGuideMsg,
callbackGuildNodeId: p.guildNodeId,
callbackChannelId: p.currentChannelId,
createdAt: new Date().toISOString(),
});
// Let backend's channel.joined push reach guest sockets before our
// greeting fires — otherwise the wakeup emitted by turn-activation
// races a not-yet-subscribed socket.io room.
if (GREETING_DELAY_MS > 0) {
await new Promise((r) => setTimeout(r, GREETING_DELAY_MS));
}
try {
await client.postMessage(guild.endpoint, token, ch.id, p.greetingMsg, session.user.id);
} catch (err) {
api.logger.warn(
`fabric: create-sub-discussion greeting post failed channel=${ch.id} err=${String(err)}`,
);
}
return { ok: true, subChannelId: ch.id };
},
}));
// ───────────────────────────────────────────────────────────────────
// close-sub-discussion: post a system-authored callback into the
// parent channel + close the sub-discussion channel. Only the original
// host can call this. Uses the Guild's x-fabric-system-key path (shared
// secret = commandsSyncKey) so the callback lands as a guild/system
// author, not the host's personal account — and can wake the host on
// the parent channel to continue whatever workflow opened the sub.
// ───────────────────────────────────────────────────────────────────
api.registerTool((ctx: Ctx) => ({
name: 'close-sub-discussion',
description:
'Close a sub-discussion channel you opened (host-only) and write a callback to the parent ' +
'channel as a system message. Pass `wakeupHost: false` to land the callback silently in ' +
'history without waking yourself.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['subChannelId', 'callbackMsg'],
properties: {
subChannelId: { type: 'string', description: 'The sub-discussion channelId returned by create-sub-discussion.' },
callbackMsg: {
type: 'string',
description:
'Content to post into the parent channel as a system-authored message. Typical content: ' +
'the conclusion / extracted data from the sub-discussion, so the next turn on the parent ' +
'channel can act on it.',
},
wakeupHost: {
type: 'boolean',
description:
'Whether to wake YOU (the host) on the parent channel. Default true — for recruitment ' +
'interview flow where the next workflow step needs to run immediately. Pass false for ' +
'fire-and-forget logging.',
},
},
},
execute: async (
_id: string,
p: { subChannelId: string; callbackMsg: string; wakeupHost?: boolean },
) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const entry = store.find(p.subChannelId);
if (!entry) {
return { ok: false, error: `sub-discussion not found: ${p.subChannelId}` };
}
if (entry.hostAgentId !== agentId) {
return {
ok: false,
error: `only the host (${entry.hostAgentId}) may close this sub-discussion`,
};
}
const systemKey = resolveCommandsSyncKey(cfg);
if (!systemKey) {
return {
ok: false,
error:
'channels.fabric.commandsSyncKey is not configured — close-sub-discussion needs it for ' +
'the x-fabric-system-key callback. Configure via openclaw config.',
};
}
const { guild, token } = await ctxGuild(agentId, entry.callbackGuildNodeId);
const wakeup = p.wakeupHost !== false;
// 1) Post callback into parent channel via the system-key path.
const url = `${guild.endpoint}/api/channels/${encodeURIComponent(entry.callbackChannelId)}/messages`;
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-fabric-system-key': systemKey,
},
body: JSON.stringify({
content: p.callbackMsg,
wakeupUserId: wakeup ? entry.hostUserId : null,
}),
}).catch((err) => {
throw new Error(`callback POST failed: ${String(err)}`);
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return {
ok: false,
error: `callback POST ${url} -> ${res.status} ${text}`,
};
}
// 2) Close the sub channel using the host's own bearer (the host is
// a member of the channel — channel.close auth is per-member).
try {
await client.closeChannel(guild.endpoint, token, entry.subChannelId);
} catch (err) {
api.logger.warn(
`fabric: close-sub-discussion: sub channel close failed channel=${entry.subChannelId} err=${String(err)}`,
);
}
// 3) Drop the store entry so the prompt-injection hook stops firing
// for this channel (the sub is closed; any straggler turns would
// just hit the closed-channel write reject downstream).
store.remove(entry.subChannelId);
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single // fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are // pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise). // sharer-only server-side (the guild returns 403 otherwise).
@@ -296,6 +619,89 @@ export function registerFabricTools(
}, },
})); }));
// ───────────────────────────────────────────────────────────────────
// fabric-send-sys-msg: post a system-authored message (author =
// sentinel UUID 0000…, not the calling agent) using the Guild's
// x-fabric-system-key path. Use for cross-agent broadcasts where you
// don't want the message tied to one agent's identity — Dialectic
// topic announcements / lifecycle events, host-system advisories,
// etc. Caller doesn't need to be a member of the channel (the
// backend isSystem branch skips assertParticipant), but must be a
// member of the guild (their session resolves the guild endpoint).
//
// Shared secret: reads channels.fabric.commandsSyncKey (same value
// as the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY env). Empty
// config → tool returns ok:false with a clear error, no fall-through
// to regular agent posting.
// ───────────────────────────────────────────────────────────────────
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-sys-msg',
description:
'Send a SYSTEM-AUTHORED message into a Fabric channel (author = guild sentinel, not you). ' +
'Use for cross-agent broadcasts that should not be attributed to a single agent — ' +
'Dialectic announce-channel topic broadcasts, lifecycle events, system advisories. ' +
'Optionally precise-wake one recipient via wakeupUserId; otherwise the message lands ' +
'silently in history (no wake).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
wakeupUserId: {
type: 'string',
description:
"Optional: a single Fabric userId to wake with this message (everyone else in the " +
'channel sees it but with wakeup=false). Omit for fully silent broadcast.',
},
},
},
execute: async (
_id: string,
p: { guildNodeId: string; channelId: string; content: string; wakeupUserId?: string },
) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const systemKey = resolveCommandsSyncKey(cfg);
if (!systemKey) {
return {
ok: false,
error:
'channels.fabric.commandsSyncKey is not configured — fabric-send-sys-msg needs it for ' +
'the x-fabric-system-key header. Configure via openclaw config.',
};
}
const { guild } = await ctxGuild(agentId, p.guildNodeId);
const url = `${guild.endpoint}/api/channels/${encodeURIComponent(p.channelId)}/messages`;
const wakeup = typeof p.wakeupUserId === 'string' && p.wakeupUserId.trim()
? p.wakeupUserId.trim()
: null;
const res = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-fabric-system-key': systemKey,
},
body: JSON.stringify({ content: p.content, wakeupUserId: wakeup }),
});
if (!res.ok) {
const text = await res.text().catch(() => '');
return { ok: false, error: `POST ${url} -> ${res.status} ${text}` };
}
const json = (await res.json().catch(() => null)) as
| { messageId?: string; seq?: number; authorUserId?: string }
| null;
return {
ok: true,
messageId: json?.messageId,
seq: json?.seq,
authorUserId: json?.authorUserId,
};
},
}));
// ----------------------------------------------------------------- // -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see // fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the // in a given guild. Backend filters to public channels + channels the