23 Commits

Author SHA1 Message Date
7dc70522d1 fix(inbound): refresh socket.io auth on (re)connect via callback
Backend issues short-lived guildAccessToken (TTL=900s). The previous
`auth: { token: tok }` shape captured the JWT once in connectAgent's
closure: after socket.io's auto-reconnect the backend kept getting the
same expired JWT and silently rejected the handshake at the application
layer (RealtimeGateway logs 'socket rejected: <id>'). The client's
'connect' event still fired (TCP succeeded) so the plugin happily ran
the channel-resync, emitted join_channel into the void, and logged
'joined N channel(s)' while the backend was actually broadcasting
message.created to a room with zero subscribers. End-user symptom:
DMs/group messages to agents silently dropped 15 min after gateway
start, with no error anywhere on the agent side.

Switch to the callback form, which socket.io re-evaluates on every
(re)connect — same call site we already use for the HTTP path via
freshGuildToken/tokenCache.

Verified in sim (commit 2acb084 + this patch):
1. Connect new DM channel + post msg -> dispatch + reply ✓
2. `docker restart fabric-backend-guild` to force socket disconnect
3. Plugin reconnects automatically and logs
   'fabric: agent recruiter joined 12 channel(s) on sim-guild-1' ✓
   (without the fix this reconnect was silently rejected; sim used to
    log 'WARN socket rejected: <id>' on the guild backend)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 13:50:24 +01:00
h z
2acb084ee4 fix(presence-sync): tick mutex (#8) 2026-05-26 02:06:21 +00:00
9419d270e5 fix(presence-sync): tick mutex so setInterval overlap can't spawn parallel ticks
The presence-sync tick iterates accounts serially with await on each
agent-login + PUT round-trip — a single tick can easily run 20+s when
there are several accounts. setInterval(intervalMs) does NOT wait for
the previous tick to finish, so on a busy gateway the next tick fires
on top of a still-running one and two parallel iterations each PUT
the same agentId within ~10 ms. That tipped the guild backend's
first-time-insert race (separate fix in nav/Fabric.Backend.Guild) into
500s on prod (caught in t2 gateway 2026-05-25 23:23:35Z; 6 of 6 agents
showed paired log lines 4-10 ms apart for the same agent → idle).

Fix: a simple `inflight` boolean. tick() returns immediately if
already running; the next interval beat catches up. lastStatus !==
bridge.get gating already means status changes catch the next tick
anyway, so skipping a beat costs nothing the next beat won't fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:08 +01:00
h z
79b29db26c fix(presence-sync): /api prefix + Bearer guildAccessToken (#7) 2026-05-25 23:17:45 +00:00
a87de27cff fix(presence-sync): use /api prefix + Bearer guildAccessToken (not x-api-key)
Two layered bugs in the presence-sync loop, both causing every PUT to
fail forever in prod:

1. **Missing /api prefix.** URL was `${guildBaseUrl}/agents/<id>/presence`
   but the guild backend sets a global prefix 'api' in main.ts
   `setGlobalPrefix('api')`. Every other REST call in this plugin
   (channel.ts channels list, fabric-client.ts postMessage, canvas)
   already prepends /api/ — only presence-sync missed it. Returned 404
   "Cannot PUT /agents/...".

2. **Wrong auth scheme.** Plugin sent `x-api-key: <fabricApiKey>`, but
   the endpoint sits behind the global APP_GUARD = ApiKeyGuard, which
   actually expects `Authorization: Bearer <guildAccessToken>` (despite
   its name — confusing naming on the backend side). With /api added,
   error became 401 "missing bearer token". Confirmed by `docker exec
   fabric-backend-guild grep APP_GUARD /app/dist/app.module.js` and
   manual curl: Bearer guild token → 200 OK.

**Fix**

- presence-sync.ts: do agent-login on demand to obtain a fresh
  guildAccessToken, cache it per-agent for 13 min (under the 15-min
  JWT TTL), use it as Bearer for the PUT. 401 response invalidates
  the cache so the next tick re-logs-in. Pushes are gated on status
  changes (rare), so the login overhead is negligible.

- inbound.ts: firstGuildEndpointByAgent → firstGuildByAgent storing
  both endpoint and nodeId (presence-sync needs nodeId to pick the
  right token out of guildAccessTokens[]).

- index.ts: pass FabricClient to PresenceSync constructor.

**Verified in sim**

After restart, gateway log shows `fabric: presence-sync recruiter →
idle` (200 OK), zero failed PUTs, where previously it would log a 404
every ~5s per agent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 23:54:38 +01:00
h z
dabaa6e1f2 fix(inbound): route fabric DM channels as peer.kind=direct / ChatType=direct (#6) 2026-05-25 14:03:20 +00:00
b8e0e424fa fix(inbound): route fabric DM channels as peer.kind='direct' / ChatType='direct'
Inbound was hardcoding `peer: { kind: 'group' }` and `ChatType: 'group'`
for every fabric channel regardless of xType. As a result:

- sessionKey for a DM was `agent:<id>:fabric:group:<chan>` instead of
  `agent:<id>:fabric:direct:<chan>`
- ctx.ChatType='group' caused user-prompt metadata to render
  `is_group_chat: true` on a DM
- openclaw's `isDirectMessage()` check (ChatType==='direct') returned
  false, so DM-specific prompt and turn behavior never engaged

Caught by recruiter test in session 40c51de2: the model's thinking trace
acknowledged "fabric DM channel" (from the ClawPrompts chat-injector
hook) but the surrounding user-prompt metadata contradicted it with
`is_group_chat: true`, and the model reasoned its way out of running
`workflow_start`.

Fix factors a small helper `fabricPeerRoutingForXType` (and a cache-
backed `fabricPeerRoutingForChannel` for outbound) in channel.ts that
maps:
  - 'dm'  → { peerKind: 'direct', chatType: 'direct' }
  - rest  → { peerKind: 'group',  chatType: 'group' }   (no change)

Inbound uses m.xType directly (live, authoritative). Outbound has no
xType in its call signature, so it consults the channel-meta cache
populated by inbound (same `getChannelType` already exposed via
__fabric). Cache miss falls back to 'group' — the pre-fix default, no
regression. The proactive-DM-without-prior-inbound edge case still
routes that one outbound as 'group'; the next round agrees on 'direct'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 14:26:42 +01:00
h z
81a10f2a1f Merge #5 feat(channel-meta): __fabric.getChannelType 2026-05-25 10:38:22 +00:00
c5429129d9 feat(channel-meta): expose globalThis.__fabric.getChannelType for narrow gating
Inbound `message.created` already carries `xType` (dm / triage / group /
broadcast / etc.) — record it in a per-channel cache so other plugins
can answer "is this channel a DM?" without poking the Center API.

New module src/channel-meta.ts:
  - in-memory Map<channelId, xType>
  - lazily loaded from ~/.openclaw/fabric-channel-meta.json on first
    access (so first-ever DM after a fresh gateway start still hits
    cache from the previous run)
  - debounced 250ms flush on dirty; force-flush on gateway_stop
  - recordChannelType(channelId, xType): called from inbound
  - getChannelType(channelId): null if unknown — caller MUST treat null
    as "don't know", NOT as "assume DM" (would re-introduce the false-
    positive on group channels we're trying to eliminate)

Wiring:
  - inbound.ts socket.on('message.created'): records xType BEFORE the
    self-author / dedup gates (channel type is observer-agnostic)
  - index.ts: installs globalThis.__fabric = { getChannelType } on
    registerFull(); flushes on gateway_stop

Consumer: ClawPrompts' fabric-chat-injector will start gating its prompt
injection on getChannelType(channelId) === 'dm' (companion PR on
ClawPrompts). Removes the phase-1 "any fabric channel" false-positive.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 11:28:36 +01:00
c330571bcb refactor(tools): drop serviceEndpoint from fabric-guild-list response
Pairs with Fabric.Backend.Center@9e1909a which removed the
serviceEndpoint column. The agent-driven recruitment model no longer
needs a service-to-service URL distinction — agents post recruitment
broadcasts directly via fabric-send-message using their normal
session, no backend dials Fabric anymore.

Removed:
  - FabricSession.guilds.serviceEndpoint type field
  - fabric-guild-list response mapping for serviceEndpoint
  - Tool description text that taught agents to plumb it into
    dialectic_propose_topic.announce_guild_base_url (that param is
    also gone — see Dialectic.OpenclawPlugin@3ce5439).

Kept:
  - guild.purpose field (intent-based discovery, still wanted)
  - guild.endpoint field (clients dial it directly; unchanged)
2026-05-23 23:51:05 +01:00
8963f3ca00 feat(tools): fabric-guild-list returns serviceEndpoint for announce targets
FabricSession.guilds type + fabric-guild-list response now carry the
new serviceEndpoint field (added to GuildNode in Fabric.Backend.Center).
The tool description teaches agents the distinction: 'endpoint' is the
client-facing URL (which they themselves use to call the guild from
plugin context), 'serviceEndpoint' is what to plumb into
dialectic_propose_topic's announce_guild_base_url so the dialectic
backend can dial it from inside the deployment.

Fixes bug #2 from the first e2e debate run: agent-supplied
'http://server.t3:7002' wasn't backend-reachable; agent now
supplies 'http://fabric-backend-guild:7002' via serviceEndpoint
and broadcasts actually land.
2026-05-23 22:21:03 +01:00
0e36457d8f fix(tools): execute receives (callId, args), not (args) — pre-existing bug
OpenClaw plugin-sdk's registerTool execute signature is:
  execute: async (_id: string, params) => { ... }

Fabric tools were calling it as `(p) => { ... }`, so `p` held the
call id (a string) and the real args were silently dropped onto the
floor. Every tool that read a required field from `p` failed with
the field surfacing as undefined.

fabric-guild-list (just added) appeared to work because all its
properties are optional — `p.nameFilter` and `p.purposeFilter`
both being undefined produced empty filter needles, which let the
unfiltered guild list through. The real bug surfaced the moment
fabric-channel-list (required: guildNodeId) was invoked: the
ctxGuild helper saw `undefined` and reported `agent not a member
of guild undefined`.

Compare dialectic plugin's tools.ts which has always used the
correct `async (_id: string, params) => {...}` shape and worked
end-to-end. Aligning the fabric signature to match.

Verified end-to-end on sim:
  - fabric-guild-list returns 1 guild with the purpose set via the
    new `cli node set-purpose`
  - fabric-channel-list returns 3 channels including a now-populated
    `purpose` field on each row
  - fabric-channel-set-purpose successfully patches a channel and
    the subsequent fabric-channel-list shows the new purpose
2026-05-23 19:35:38 +01:00
5ff464a055 feat(plugin): fabric-guild-list + fabric-channel-set-purpose tools + purpose on existing tools
Adds two agent-facing tools that close the discoverability loop:

  - fabric-guild-list — enumerates guilds the agent belongs to with
    name + purpose + status (no api calls beyond the existing agentLogin
    response). Optional nameFilter/purposeFilter for narrowing.
  - fabric-channel-set-purpose — PATCH /api/channels/:id { purpose }
    so agents can backfill or update an existing channel's purpose.

Extends existing tools:
  - fabric-channel-list now returns purpose on each row.
  - create-{chat,work,report,discussion}-channel accept optional purpose.

FabricClient + FabricSession type changes carry the new field through.
Manifest contracts.tools updated (jiti loader needs both manifest entry
and onStartup activation to register).

Lets workflows that previously needed hardcoded channel ids instead say
'find a guild whose purpose mentions debate, then a channel of x_type
announce whose purpose covers public debate broadcasts.'
2026-05-23 19:22:10 +01:00
6fe06f55dd feat(plugin): wire PresenceSync into gateway_start lifecycle (Phase 1.5)
Completes the Phase 1 hand-off chain — HF status now actually reaches
Fabric.Backend.Guild and busy-discard on announce channels becomes
operational end-to-end.

inbound.ts:
- Add getPresenceAccounts() — returns per-agent {agentId, fabricUserId,
  guildBaseUrl, fabricApiKey} for every agent that successfully logged
  in. fabricUserId comes from session.user.id cached on the identity
  registry; guildBaseUrl from session.guilds[0].endpoint captured in a
  new private firstGuildEndpointByAgent map during connectAgent().
- Multi-guild presence is deferred; the first guild per agent is the
  push target. For sim/prod-v1 each agent is in one guild so this is a
  no-op simplification.

index.ts gateway_start:
- After inbound.start() resolves, instantiate PresenceSync, call
  setAccounts(inbound.getPresenceAccounts()), start().
- 5-min refresh timer re-harvests accounts (catches agents added via
  tool-based identity registration AFTER initial start — e.g.
  recruitment flow). setAccounts is idempotent.
- gateway_stop now clears the refresh timer and stops PresenceSync
  before stopping inbound.

End-to-end check (still need sim verification):
  HF plugin scheduler heartbeat -> globalThis.__hfAgentStatus
   -> PresenceSync tick (30s) -> PUT /agents/:uid/presence
   -> agent_presences row -> computeDelivery for xType=announce
   -> busy recipients skipped, idle recipients get observer delivery.

Type-check: only pre-existing openclaw/* runtime-resolved-by-jiti
errors remain; new presence wiring compiles clean.

See DIALECTIC-V2-DESIGN.md section 10 Phase 1 (deferred items now
landed).
2026-05-23 11:37:08 +01:00
a15dc880af feat(plugin): add presence-sync module (Phase 1 partial wire)
Drops the PresenceSync class file under src/. Reads each agents HF
status from globalThis.__hfAgentStatus (exposed by
HarborForge.OpenclawPlugin) every 30s and PUTs deltas to
Fabric.Backend.Guild PUT /agents/:userId/presence so the backend can
do busy-discard on announce channel deliveries.

Implementation:
- Diffs against in-memory lastStatus map per agentId; PUT only on
  change. No-op when __hfAgentStatus is undefined (HF plugin not
  loaded) — degrades gracefully, backend defaults presence to
  unknown which means no busy filtering.
- Per-account context: {agentId, fabricUserId, guildBaseUrl,
  fabricApiKey}. Uses x-api-key header so it goes through the
  existing ApiKeyGuard path on the backend.

NOT YET WIRED into index.ts gateway_start lifecycle. To finish
wiring, the registerFull block needs to:
  1. After FabricInbound.start() resolves, harvest each agents
     fabric user id (introspected by Center during session login —
     available on FabricSession.user.id).
  2. Build PresenceSyncAccount[] from those + the existing accounts
     list (which already has agentId + fabricApiKey + guildBaseUrl).
  3. presence = new PresenceSync(api.logger); presence.setAccounts(...);
     presence.start();
  4. presence.stop() on gateway_stop.

Reason for splitting: wiring needs the FabricInbound public API to
expose per-account session metadata, which is a small but separate
refactor. Module ships standalone now so the dependency direction is
clear and the wire-up patch is small.

See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md section 7 (resolved
push-model design).
2026-05-23 11:32:24 +01:00
5dcbd99c28 Merge pull request 'feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history' (#4) from feat/fabric-send-and-discover-tools into main 2026-05-22 22:11:06 +00:00
cd36d1b9e2 feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history
Plugin previously had no way for an agent to send text into a specific
channel proactively — outbound went only through the channel-reply
path (responds to the channel that woke the agent). discussion-complete
internally called client.postMessage but only for the close-time
summary, no general-purpose surface.

Three new tools (+ declare existing fabric-canvas / fabric-channel that
were registered but missing from contracts.tools so agents couldn't
see them per the openclaw plugin contract):

  * fabric-send-message {guildNodeId, channelId, content}
      → {ok, messageId, seq}
    Author = calling agent. Use for ARD broadcasts, follow-ups in a
    different channel, etc.

  * fabric-channel-list {guildNodeId, nameFilter?, xType?, includeClosed?}
      → {ok, count, channels[]}
    Backend filters to public + member channels; nameFilter is client-
    side case-insensitive substring; xType / includeClosed apply post-
    fetch. Returns id/name/xType/lastSeq so callers can pipe into the
    other tools.

  * fabric-message-history {guildNodeId, channelId, seqFrom?, seqTo?, limit?}
      → {ok, page, messages[]}
    Tail-by-default: omit seqFrom/seqTo and the tool fetches the
    channel head from listChannels then asks for [head-limit+1, head].
    Limit default 20, max 200. Backend rejects non-participants.

Plus 3 supporting client methods (listChannels, listMessages — both
GET via existing req helper).

contracts.tools updated to declare these 5 (3 new + 2 previously-
silent ones). Verified earlier in sim restart logs: openclaw warned
'plugin tool is undeclared (fabric): fabric-canvas / fabric-channel'
so agents couldn't use them despite registerTool firing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:11:01 +01:00
9c910f082b Merge pull request 'feat(triage): per-channel serial queue + HF on_call gate + observer skip' (#3) from feat/triage-on-call-gate-and-queue into main 2026-05-22 21:59:23 +00:00
c5fd091f5a fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier)
os.hostname() fallback is wrong in sim where container hostname (server.t2)
doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate
fallback that reads openclaw config plugins.harbor-forge.identifier — the
same value the HF plugin uses for its outbound HF calls — keeping plugin
and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER
env override.

Priority:
  1. HF_CLAW_IDENTIFIER env (operator override)
  2. openclaw config plugins.harbor-forge.identifier (NEW)
  3. os.hostname() last-resort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:49:31 +01:00
c5a33c33ec feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the
new triage flow:

## 1. Per-channel serial queue

Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel
chain so consecutive messages on the same channel are processed strictly
in order — no concurrent model turns for the same channel. Other
channels remain independent (parallelism preserved across channels).

Implementation: `Map<channelId, Promise>` where each new task awaits
the previous. The map entry self-cleans when the chain settles AND
no newer task has overwritten it.

## 2. HF on_call gate (triage + wake=true only)

Before dispatching a triage wake to the on-duty agent, hit HF
`GET /calendar/agent/status?agent_id=...`. If the agent isn't
currently on_call, the message is pushed to a per-agent gated queue
instead of dispatched — no model turn fires.

Status check is cached for 5s to amortise across rapid triage bursts.

When a subsequent triage message arrives and the agent IS on_call by
that point, the gated queue drains FIFO (re-enqueued through the same
per-channel chain so order is kept) before the new message dispatches.

Drained queue is in-memory only; on gateway restart the underlying
Fabric messages get re-fetched via the connect-time history sweep.

## 3. Triage observer skip (wake=false)

Triage messages that arrive with wakeup=false are admin observers — by
spec they MUST NOT enter the agent's session history. Skipped entirely
(no recordInboundSession call). The next time this agent legitimately
wakes for triage, their context contains only past wakeups + their own
outgoing messages — no observer-side chatter from other agents.

For NON-triage channels the legacy "record-as-history" stays — those
keep their full channel conversation available for later wakes.

## Env

- HF_API_BASE_URL  — defaults `https://monitor.hangman-lab.top`
- HF_CLAW_IDENTIFIER — defaults to `os.hostname()`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:17:39 +01:00
28f5083679 Merge pull request 'feat(inbound): listen for backend-pushed channel.joined/left events' (#2) from feat/inbound-listen-push-events into main 2026-05-21 07:12:51 +00:00
a060ff98a2 feat(inbound): listen for backend-pushed channel.joined/left events
Companion to nav/Fabric.Backend.Guild#<TBD> which adds the server-side
emitToUser broadcast on channel membership changes. Before, the inbound
only learned about new channels via the 60s polling resync (worst-case
60s lag). Now the backend tells us directly so sub/unsub is realtime.

socket.on('channel.joined', evt) → join the socket.io room for evt.channelId
                                    and add to the local 'joined' set.
socket.on('channel.left',   evt) → leave + remove from 'joined'.

Both events are idempotent (`if (joined.has(id))` / `if (!joined.has(id))`)
so duplicate emits from server are safe. Polling resync still runs every
60s as a safety net for transient socket drops between emit and
reconnect, partial server failures, etc.

When backend lacks this support (older deployments), nothing breaks —
the event simply never fires and polling carries the load as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:08:33 +01:00
b9a5456d57 Merge pull request 'fix: dynamically sync inbound channel subscriptions' (#1) from fix/inbound-dynamic-channel-sync into main 2026-05-21 06:56:49 +00:00
15 changed files with 1921 additions and 26 deletions

56
dist/fabric/index.js vendored
View File

@@ -6,16 +6,23 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef = null;
let inbound = null;
let presence = null;
// Periodic re-harvest of presence accounts so newly-connected agents
// (registered through tool-based identity flow AFTER initial start)
// get picked up. Cleared on gateway_stop.
let presenceRefreshTimer = null;
export { fabricChannelPlugin } from './src/channel.js';
export default defineChannelPluginEntry({
id: 'fabric',
@@ -37,6 +44,29 @@ export default defineChannelPluginEntry({
const client = new FabricClient(centerApiBase);
const identity = new IdentityRegistry(idFile);
registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity);
// Cross-plugin API: globalThis.__fabric
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
// injection to DM-typed channels only. The channel-meta cache is
// populated lazily from inbound (message.created carries xType) and
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
// very first DM after a fresh gateway start hits cache from the
// previous run rather than firing the injector on the wrong type.
//
// null return = channel never seen (cache cold). Callers MUST NOT
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis;
_G['__fabric'] = { getChannelType };
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try {
flushChannelMeta();
}
catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
}
api.on('gateway_start', () => {
const _G = globalThis;
if (_G._fabricInboundStarted)
@@ -57,7 +87,25 @@ export default defineChannelPluginEntry({
return;
}
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
void inbound.start();
// start() resolves once all accounts have attempted login; per-
// agent failures are logged but don't reject. Once it resolves we
// can harvest the presence accounts (those that DID log in have
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound)
return;
presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
// Re-harvest every 5 min: catches agents added via tool-based
// identity provisioning after gateway_start (recruitment flow).
// setAccounts is idempotent — duplicates collapse on agentId.
presenceRefreshTimer = setInterval(() => {
if (inbound && presence)
presence.setAccounts(inbound.getPresenceAccounts());
}, 5 * 60_000);
});
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
@@ -67,6 +115,12 @@ export default defineChannelPluginEntry({
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
if (presenceRefreshTimer) {
clearInterval(presenceRefreshTimer);
presenceRefreshTimer = null;
}
presence?.stop();
presence = null;
inbound?.stop();
inbound = null;
});

105
dist/fabric/src/channel-meta.js vendored Normal file
View File

@@ -0,0 +1,105 @@
/**
* Channel-meta cache. Records (channelId → xType) for every fabric
* channel the gateway has seen at least one inbound message in.
*
* Populated lazily from inbound (`recordChannelType` is called for
* every `message.created` event with non-empty `xType`). Persisted to
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
* gateway restarts (so the very first DM after restart still gets the
* right xType without waiting for a fresh inbound).
*
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
* to xType==='dm' only.
*
* Failure mode: lookup misses (channel never seen / inbound dropped
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
* fall back to "assume DM" or you re-introduce the false-positive on
* group channels.
*/
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { homedir } from 'node:os';
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
let memory = new Map();
let loaded = false;
let dirty = false;
let flushTimer = null;
function load() {
if (loaded)
return;
loaded = true;
try {
if (!existsSync(CACHE_FILE))
return;
const raw = readFileSync(CACHE_FILE, 'utf8');
const parsed = JSON.parse(raw);
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
if (typeof k === 'string' && typeof v === 'string')
memory.set(k, v);
}
}
catch {
// ignore — start with empty cache on corruption
}
}
function scheduleFlush() {
if (flushTimer)
return;
// Debounce writes — many inbound messages may arrive in a burst.
// 250ms coalesces them; on gateway_stop the channel plugin can force
// a synchronous flush via flushChannelMeta().
flushTimer = setTimeout(() => {
flushTimer = null;
if (!dirty)
return;
dirty = false;
flushSync();
}, 250);
}
function flushSync() {
try {
const dir = dirname(CACHE_FILE);
if (!existsSync(dir))
mkdirSync(dir, { recursive: true });
const out = { channels: Object.fromEntries(memory) };
const tmp = CACHE_FILE + '.tmp';
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
renameSync(tmp, CACHE_FILE);
}
catch {
// swallow — cache is an optimization; loss-on-write is recoverable
}
}
/** Called by inbound on every message.created. xType empty → no-op. */
export function recordChannelType(channelId, xType) {
if (!channelId || !xType)
return;
load();
const existing = memory.get(channelId);
if (existing === xType)
return;
memory.set(channelId, xType);
dirty = true;
scheduleFlush();
}
/** Cross-plugin lookup. null when channel never seen / unknown. */
export function getChannelType(channelId) {
if (!channelId)
return null;
load();
return memory.get(channelId) ?? null;
}
/** Force-flush — called on plugin shutdown to make sure recently
* recorded entries hit disk before the gateway dies. */
export function flushChannelMeta() {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (dirty) {
dirty = false;
flushSync();
}
}
export const CHANNEL_META_PATH = CACHE_FILE;

View File

@@ -11,6 +11,15 @@
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
import { FabricClient } from './fabric-client.js';
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
import { getChannelType } from './channel-meta.js';
export function fabricPeerRoutingForXType(xType) {
if (xType === 'dm')
return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId) {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
// ---- target grammar: fabric:<channelId> ----
export function stripFabricTargetPrefix(raw) {
let s = (raw ?? '').trim();
@@ -38,13 +47,18 @@ export function resolveFabricOutboundSessionRoute(params) {
const id = stripFabricTargetPrefix(params.target);
if (!id)
return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: 'fabric',
accountId: params.accountId,
peer: { kind: 'group', id },
chatType: 'group',
peer: { kind: peerKind, id },
chatType,
from: `fabric:channel:${id}`,
to: `fabric:${id}`,
});

View File

@@ -63,6 +63,11 @@ export class FabricClient {
createChannel(guildEndpoint, guildToken, body) {
return this.post(`${guildEndpoint}/api/channels`, body, guildToken);
}
// PATCH /api/channels/:id — backend currently only patches `purpose`.
// Caller must be a member of the channel (or any user if public).
setChannelPurpose(guildEndpoint, guildToken, channelId, purpose) {
return this.req('PATCH', `${guildEndpoint}/api/channels/${channelId}`, guildToken, { purpose });
}
closeChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken);
}
@@ -105,4 +110,31 @@ export class FabricClient {
removeCanvas(endpoint, token, channelId) {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(guildEndpoint, guildToken, guildNodeId) {
return this.req('GET', `${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`, guildToken);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(guildEndpoint, guildToken, channelId, opts = {}) {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined)
qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined)
qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined)
qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
}

View File

@@ -4,6 +4,8 @@ import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
@@ -28,6 +30,125 @@ export class FabricInbound {
// Re-login per agent on a short TTL to keep a fresh token.
tokenCache = new Map();
static TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
channelChains = new Map();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
agentStatusCache = new Map();
static AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
pendingTriageGated = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
enqueueChannelTask(channelId, task) {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
async checkAgentOnCall(agentId) {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg;
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
}
catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json());
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
}
catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
async drainGatedFor(agentId) {
const keep = [];
const drain = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId)
drain.push(item);
else
keep.push(item);
}
if (drain.length === 0)
return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
@@ -90,15 +211,76 @@ export class FabricInbound {
s.disconnect();
this.sockets = [];
}
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts() {
const out = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId)
continue;
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuild)
continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target). Stores both
// endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
firstGuildByAgent = new Map();
async connectAgent(agentId, session) {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild)
this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'],
auth: { token: tok },
auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false,
});
// Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -154,12 +336,41 @@ export class FabricInbound {
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt) => {
const id = evt?.channelId;
if (!id || joined.has(id))
return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt) => {
const id = evt?.channelId;
if (!id || !joined.has(id))
return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)
return;
// Record xType into the channel-meta cache before self-author
// / dedup gates — channel type doesn't depend on who sent the
// message, and recording it on observer-only triage messages
// is still useful (the next consumer asking
// __fabric.getChannelType wants the answer regardless of
// whether THIS message was delivered to an agent).
recordChannelType(channelId, m.xType);
if (m.authorUserId && m.authorUserId === selfUserId)
return;
const key = `${agentId}:${m.messageId}`;
@@ -168,7 +379,31 @@ export class FabricInbound {
this.seen.add(key);
if (this.seen.size > 5000)
this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
// Per-channel serial queue. Prevents concurrent model turns for
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
});
socket.connect();
this.sockets.push(socket);
@@ -220,11 +455,19 @@ export class FabricInbound {
const core = this.core;
const cfg = this.cfg;
try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
@@ -238,7 +481,7 @@ export class FabricInbound {
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ChatType: chatType,
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',
@@ -259,6 +502,17 @@ export class FabricInbound {
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,

139
dist/fabric/src/presence-sync.js vendored Normal file
View File

@@ -0,0 +1,139 @@
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
export class PresenceSync {
logger;
client;
timer = null;
lastStatus = new Map(); // by agentId
accounts = new Map();
tokenCache = new Map(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
inflight = false;
constructor(logger, client) {
this.logger = logger;
this.client = client;
}
setAccounts(accounts) {
this.accounts.clear();
for (const a of accounts)
this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000) {
if (this.timer)
return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
async ensureGuildToken(acct) {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now)
return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
}
catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
async tick() {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight)
return;
this.inflight = true;
try {
await this.tickInner();
}
finally {
this.inflight = false;
}
}
async tickInner() {
const bridge = globalThis['__hfAgentStatus'];
if (!bridge || typeof bridge.get !== 'function')
return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status;
try {
status = await bridge.get(agentId);
}
catch {
continue;
}
if (!status)
continue;
if (this.lastStatus.get(agentId) === status)
continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken)
continue;
try {
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
}
else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401)
this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
}
catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}

View File

@@ -25,7 +25,9 @@ export function registerFabricTools(api, client, identity) {
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind) => api.registerTool((ctx) => ({
name: `create-${kind}-channel`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` +
'Optionally pass `purpose` to describe what this channel is for — ' +
'agents browse channels by purpose via fabric-channel-list.',
parameters: {
type: 'object',
additionalProperties: false,
@@ -37,9 +39,16 @@ export function registerFabricTools(api, client, identity) {
memberUserIds: { type: 'array', items: { type: 'string' } },
onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' },
listeners: { type: 'array', items: { type: 'string' } },
purpose: {
type: 'string',
description: "Free-form description of what this channel is for. Optional but " +
'strongly recommended so other agents can find this channel by ' +
'intent (via fabric-channel-list). Can be edited later with ' +
'fabric-channel-set-purpose.',
},
},
execute: async (p) => {
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
@@ -50,6 +59,7 @@ export function registerFabricTools(api, client, identity) {
xType: X_BY_KIND[kind],
isPublic: p.isPublic ?? false,
memberUserIds: p.memberUserIds ?? [],
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
});
return { ok: true, channelId: ch.id };
},
@@ -74,7 +84,7 @@ export function registerFabricTools(api, client, identity) {
callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' },
},
},
execute: async (p) => {
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
@@ -118,7 +128,7 @@ export function registerFabricTools(api, client, identity) {
},
},
},
execute: async (p) => {
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
@@ -178,7 +188,7 @@ export function registerFabricTools(api, client, identity) {
channelId: { type: 'string' },
},
},
execute: async (p) => {
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
@@ -202,4 +212,252 @@ export function registerFabricTools(api, client, identity) {
}
},
}));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-send-message',
description: 'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(guild.endpoint, token, p.channelId, p.content, session.user.id));
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-channel-list',
description: 'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed)
return false;
if (p.xType && c.xType !== p.xType)
return false;
if (needle && !c.name.toLowerCase().includes(needle))
return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
purpose: c.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-guild-list: enumerate guilds the calling agent belongs to.
// Each row carries `purpose` — free-form description of what the
// guild is for (admin-set). Use this as the first step when a
// workflow says "find the right guild for X" — pick by purpose,
// then fabric-channel-list to find the right channel inside it.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-guild-list',
description: 'List guilds the calling agent is a member of. Returns ' +
'{nodeId, name, purpose, status} per row. ' +
"`purpose` is a free-form description of what each guild is for — " +
'pick the guild whose purpose matches your intent. Use this tool ' +
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
'debate broadcasts" → then list its announce-type channels).',
parameters: {
type: 'object',
additionalProperties: false,
properties: {
nameFilter: {
type: 'string',
description: 'optional case-insensitive substring match on guild name',
},
purposeFilter: {
type: 'string',
description: 'optional case-insensitive substring match on guild purpose ' +
'(e.g. "debate", "announcements")',
},
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const entry = identity.findByAgentId(agentId);
if (!entry)
return { ok: false, error: `agent ${agentId} not registered` };
const session = await client.agentLogin(entry.fabricApiKey);
const nameNeedle = (p.nameFilter ?? '').toLowerCase();
const purposeNeedle = (p.purposeFilter ?? '').toLowerCase();
const guilds = session.guilds.filter((g) => {
if (nameNeedle && !g.name.toLowerCase().includes(nameNeedle))
return false;
if (purposeNeedle) {
const purp = (g.purpose ?? '').toLowerCase();
if (!purp.includes(purposeNeedle))
return false;
}
return true;
});
return {
ok: true,
count: guilds.length,
guilds: guilds.map((g) => ({
nodeId: g.nodeId,
name: g.name,
status: g.status,
purpose: g.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-channel-set-purpose: set/update a channel's free-form
// purpose description. Caller must be a channel member (or the
// channel must be public). Use this to backfill purpose on existing
// channels, or to refine it after a channel's role evolves.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-channel-set-purpose',
description: "Set or update a channel's free-form purpose description. " +
'Channel membership required (or the channel must be public). ' +
'Pass empty string to clear. Use this to make a channel ' +
'discoverable to other agents via fabric-channel-list.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'purpose'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
purpose: {
type: 'string',
description: "What this channel is for. Pass '' (empty string) to clear.",
},
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = await client.setChannelPurpose(guild.endpoint, token, p.channelId, p.purpose);
return { ok: true, channel: res };
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx) => ({
name: 'fabric-message-history',
description: "Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (_id, p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
}

View File

@@ -7,17 +7,24 @@ import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef: unknown = null;
let inbound: FabricInbound | null = null;
let presence: PresenceSync | null = null;
// Periodic re-harvest of presence accounts so newly-connected agents
// (registered through tool-based identity flow AFTER initial start)
// get picked up. Cleared on gateway_stop.
let presenceRefreshTimer: ReturnType<typeof setInterval> | null = null;
export { fabricChannelPlugin } from './src/channel.js';
@@ -56,6 +63,27 @@ export default defineChannelPluginEntry({
identity,
);
// Cross-plugin API: globalThis.__fabric
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
// injection to DM-typed channels only. The channel-meta cache is
// populated lazily from inbound (message.created carries xType) and
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
// very first DM after a fresh gateway start hits cache from the
// previous run rather than firing the injector on the wrong type.
//
// null return = channel never seen (cache cold). Callers MUST NOT
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis as Record<string, unknown>;
_G['__fabric'] = { getChannelType };
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try { flushChannelMeta(); } catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
}
api.on('gateway_start', () => {
const _G = globalThis as Record<string, unknown>;
if (_G._fabricInboundStarted) return;
@@ -82,7 +110,24 @@ export default defineChannelPluginEntry({
api.logger,
accounts,
);
void inbound.start();
// start() resolves once all accounts have attempted login; per-
// agent failures are logged but don't reject. Once it resolves we
// can harvest the presence accounts (those that DID log in have
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound) return;
presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
// Re-harvest every 5 min: catches agents added via tool-based
// identity provisioning after gateway_start (recruitment flow).
// setAccounts is idempotent — duplicates collapse on agentId.
presenceRefreshTimer = setInterval(() => {
if (inbound && presence) presence.setAccounts(inbound.getPresenceAccounts());
}, 5 * 60_000);
});
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
@@ -93,6 +138,9 @@ export default defineChannelPluginEntry({
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
if (presenceRefreshTimer) { clearInterval(presenceRefreshTimer); presenceRefreshTimer = null; }
presence?.stop();
presence = null;
inbound?.stop();
inbound = null;
});

View File

@@ -14,7 +14,14 @@
"create-work-channel",
"create-report-channel",
"create-discussion-channel",
"discussion-complete"
"discussion-complete",
"fabric-canvas",
"fabric-channel",
"fabric-send-message",
"fabric-channel-list",
"fabric-message-history",
"fabric-guild-list",
"fabric-channel-set-purpose"
]
},
"configSchema": {

108
src/channel-meta.ts Normal file
View File

@@ -0,0 +1,108 @@
/**
* Channel-meta cache. Records (channelId → xType) for every fabric
* channel the gateway has seen at least one inbound message in.
*
* Populated lazily from inbound (`recordChannelType` is called for
* every `message.created` event with non-empty `xType`). Persisted to
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
* gateway restarts (so the very first DM after restart still gets the
* right xType without waiting for a fresh inbound).
*
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
* to xType==='dm' only.
*
* Failure mode: lookup misses (channel never seen / inbound dropped
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
* fall back to "assume DM" or you re-introduce the false-positive on
* group channels.
*/
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { homedir } from 'node:os';
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
interface ChannelMetaFile {
// channelId → xType ('dm' | 'triage' | 'group' | etc.)
channels: Record<string, string>;
}
let memory = new Map<string, string>();
let loaded = false;
let dirty = false;
let flushTimer: ReturnType<typeof setTimeout> | null = null;
function load(): void {
if (loaded) return;
loaded = true;
try {
if (!existsSync(CACHE_FILE)) return;
const raw = readFileSync(CACHE_FILE, 'utf8');
const parsed = JSON.parse(raw) as ChannelMetaFile;
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
if (typeof k === 'string' && typeof v === 'string') memory.set(k, v);
}
} catch {
// ignore — start with empty cache on corruption
}
}
function scheduleFlush(): void {
if (flushTimer) return;
// Debounce writes — many inbound messages may arrive in a burst.
// 250ms coalesces them; on gateway_stop the channel plugin can force
// a synchronous flush via flushChannelMeta().
flushTimer = setTimeout(() => {
flushTimer = null;
if (!dirty) return;
dirty = false;
flushSync();
}, 250);
}
function flushSync(): void {
try {
const dir = dirname(CACHE_FILE);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const out: ChannelMetaFile = { channels: Object.fromEntries(memory) };
const tmp = CACHE_FILE + '.tmp';
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
renameSync(tmp, CACHE_FILE);
} catch {
// swallow — cache is an optimization; loss-on-write is recoverable
}
}
/** Called by inbound on every message.created. xType empty → no-op. */
export function recordChannelType(channelId: string, xType: string | undefined): void {
if (!channelId || !xType) return;
load();
const existing = memory.get(channelId);
if (existing === xType) return;
memory.set(channelId, xType);
dirty = true;
scheduleFlush();
}
/** Cross-plugin lookup. null when channel never seen / unknown. */
export function getChannelType(channelId: string): string | null {
if (!channelId) return null;
load();
return memory.get(channelId) ?? null;
}
/** Force-flush — called on plugin shutdown to make sure recently
* recorded entries hit disk before the gateway dies. */
export function flushChannelMeta(): void {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (dirty) {
dirty = false;
flushSync();
}
}
export const CHANNEL_META_PATH = CACHE_FILE;

View File

@@ -21,6 +21,39 @@ import {
resolveDefaultFabricAccountId,
type ResolvedFabricAccount,
} from './accounts.js';
import { getChannelType } from './channel-meta.js';
/**
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
*
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
* 'direct' | 'group' | 'channel'. We collapse:
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
* - rest → 'group' (multi-party; turn-engine gates speech)
*
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
* otherwise the agent's outbound message lands in a different session
* than the inbound that triggered it and conversation state splits.
*
* Outbound has no live xType (the agent target is just a channelId), so
* it consults the channel-meta cache populated by inbound. Cache miss
* (channel never observed) falls back to 'group' — same as the pre-fix
* behavior, no regression on cold cache. The proactive-DM-first-message
* edge case (agent DMs a channel before any inbound) still lands as
* 'group' on that one outbound; the next inbound + outbound pair will
* agree on 'direct'.
*/
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
@@ -45,13 +78,18 @@ export function looksLikeFabricTargetId(raw: string): boolean {
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
const id = stripFabricTargetPrefix(params.target);
if (!id) return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: 'fabric',
accountId: params.accountId,
peer: { kind: 'group', id },
chatType: 'group',
peer: { kind: peerKind, id },
chatType,
from: `fabric:channel:${id}`,
to: `fabric:${id}`,
});

View File

@@ -6,7 +6,15 @@ export type FabricSession = {
accessToken: string;
refreshToken: string;
user: { id: string; email: string; name: string };
guilds: Array<{ nodeId: string; name: string; endpoint: string; status: string }>;
guilds: Array<{
nodeId: string;
name: string;
endpoint: string;
status: string;
// free-form description of this guild's role; admin-set on Center.
// null when the admin hasn't filled it in yet.
purpose?: string | null;
}>;
guildAccessTokens: Array<{ guildNodeId: string; token: string }>;
};
@@ -100,11 +108,30 @@ export class FabricClient {
memberUserIds?: string[];
onDuty?: string;
listeners?: string[];
// free-form purpose; optional. Existing agents can also set/update
// it later via setChannelPurpose().
purpose?: string;
},
): Promise<{ id: string }> {
return this.post(`${guildEndpoint}/api/channels`, body, guildToken);
}
// PATCH /api/channels/:id — backend currently only patches `purpose`.
// Caller must be a member of the channel (or any user if public).
setChannelPurpose(
guildEndpoint: string,
guildToken: string,
channelId: string,
purpose: string,
): Promise<{ id: string; name: string; xType: string; purpose: string | null }> {
return this.req(
'PATCH',
`${guildEndpoint}/api/channels/${channelId}`,
guildToken,
{ purpose },
);
}
closeChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/close`, {}, guildToken);
}
@@ -190,6 +217,77 @@ export class FabricClient {
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(
guildEndpoint: string,
guildToken: string,
guildNodeId: string,
): Promise<Array<{
id: string;
guildId: string;
name: string;
xType: string;
kind: string;
isPublic: boolean;
closed: boolean;
lastSeq: number;
createdAt: string;
purpose?: string | null;
}>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
guildToken,
);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(
guildEndpoint: string,
guildToken: string,
channelId: string,
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
): Promise<{
items: Array<{
messageId: string;
seq: number;
content: string;
authorUserId: string;
createdAt: string;
editedAt: string | null;
deletedAt: string | null;
isDeleted: boolean;
}>;
page: {
seqFrom: number;
seqTo: number;
limit: number;
returned: number;
hasMore: boolean;
nextExpectedSeq: number;
highestCommittedSeq: number;
};
}> {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
}
export type CanvasFormat = 'md' | 'html' | 'text';

View File

@@ -6,6 +6,8 @@ import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
@@ -67,6 +69,136 @@ export class FabricInbound {
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
private channelChains = new Map<string, Promise<void>>();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
private static readonly AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
private pendingTriageGated: Array<{
agentId: string;
g: { nodeId: string; endpoint: string };
channelId: string;
m: FabricMessage;
session: FabricSession;
}> = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
private async checkAgentOnCall(agentId: string): Promise<boolean> {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg as {
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
};
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
} catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json()) as { status?: string };
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
} catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
private async drainGatedFor(agentId: string): Promise<void> {
const keep: typeof this.pendingTriageGated = [];
const drain: typeof this.pendingTriageGated = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId) drain.push(item);
else keep.push(item);
}
if (drain.length === 0) return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
@@ -133,14 +265,86 @@ export class FabricInbound {
this.sockets = [];
}
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts(): Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> {
const out: Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId) continue;
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuild) continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target). Stores both
// endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'],
auth: { token: tok },
auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false,
});
// Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -199,6 +403,26 @@ export class FabricInbound {
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || joined.has(id)) return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || !joined.has(id)) return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
@@ -207,12 +431,45 @@ export class FabricInbound {
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
// Record xType into the channel-meta cache before self-author
// / dedup gates — channel type doesn't depend on who sent the
// message, and recording it on observer-only triage messages
// is still useful (the next consumer asking
// __fabric.getChannelType wants the answer regardless of
// whether THIS message was delivered to an agent).
recordChannelType(channelId, m.xType);
if (m.authorUserId && m.authorUserId === selfUserId) return;
const key = `${agentId}:${m.messageId}`;
if (this.seen.has(key)) return;
this.seen.add(key);
if (this.seen.size > 5000) this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
// Per-channel serial queue. Prevents concurrent model turns for
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
});
socket.connect();
this.sockets.push(socket);
@@ -276,11 +533,19 @@ export class FabricInbound {
const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } };
try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
@@ -295,7 +560,7 @@ export class FabricInbound {
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ChatType: chatType,
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',
@@ -317,6 +582,19 @@ export class FabricInbound {
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,

177
src/presence-sync.ts Normal file
View File

@@ -0,0 +1,177 @@
/**
* presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /api/agents/:userId/presence` so the backend can apply
* busy-discard on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
* app.module.js) which expects `Authorization: Bearer <guild-token>`
* — NOT the agent's fabricApiKey directly. So before each PUT we do
* a fresh agent-login (or reuse a cached token if still within its
* 15-min JWT TTL) and pull the guildAccessToken matching the target
* guild. Status changes are rare enough that login overhead is fine.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
import type { FabricClient } from './fabric-client.js';
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
type Logger = { info: (m: string) => void; warn: (m: string) => void };
export interface PresenceSyncAccount {
agentId: string;
fabricUserId: string; // the agent's Fabric Center user id (UUID)
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
fabricApiKey: string; // existing per-account key (used for agent-login)
}
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
interface CachedToken {
token: string;
expiresAt: number; // epoch ms
}
export class PresenceSync {
private timer: ReturnType<typeof setInterval> | null = null;
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
private readonly accounts = new Map<string, PresenceSyncAccount>();
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
private inflight = false;
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
setAccounts(accounts: PresenceSyncAccount[]): void {
this.accounts.clear();
for (const a of accounts) this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000): void {
if (this.timer) return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now) return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
} catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
private async tick(): Promise<void> {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight) return;
this.inflight = true;
try {
await this.tickInner();
} finally {
this.inflight = false;
}
}
private async tickInner(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status: HfStatus | undefined;
try {
status = await bridge.get(agentId);
} catch {
continue;
}
if (!status) continue;
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken) continue;
try {
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401) this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
} catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}

View File

@@ -46,7 +46,10 @@ export function registerFabricTools(
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({
name: `create-${kind}-channel`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
description:
`Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` +
'Optionally pass `purpose` to describe what this channel is for — ' +
'agents browse channels by purpose via fabric-channel-list.',
parameters: {
type: 'object',
additionalProperties: false,
@@ -58,13 +61,22 @@ export function registerFabricTools(
memberUserIds: { type: 'array', items: { type: 'string' } },
onDuty: { type: 'string', description: 'required for triage-like flows (unused for these kinds)' },
listeners: { type: 'array', items: { type: 'string' } },
purpose: {
type: 'string',
description:
"Free-form description of what this channel is for. Optional but " +
'strongly recommended so other agents can find this channel by ' +
'intent (via fabric-channel-list). Can be edited later with ' +
'fabric-channel-set-purpose.',
},
},
execute: async (p: {
},
execute: async (_id: string, p: {
guildNodeId: string;
name: string;
isPublic?: boolean;
memberUserIds?: string[];
purpose?: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
@@ -75,6 +87,7 @@ export function registerFabricTools(
xType: X_BY_KIND[kind],
isPublic: p.isPublic ?? false,
memberUserIds: p.memberUserIds ?? [],
...(p.purpose !== undefined ? { purpose: p.purpose } : {}),
});
return { ok: true, channelId: ch.id };
},
@@ -101,7 +114,7 @@ export function registerFabricTools(
callbackChannelId: { type: 'string', description: 'optional channel to also post the summary to' },
},
},
execute: async (p: {
execute: async (_id: string, p: {
guildNodeId: string;
channelId: string;
summary: string;
@@ -151,7 +164,7 @@ export function registerFabricTools(
},
},
},
execute: async (p: {
execute: async (_id: string, p: {
action: 'read' | 'share' | 'update' | 'close';
guildNodeId: string;
channelId: string;
@@ -216,7 +229,7 @@ export function registerFabricTools(
channelId: { type: 'string' },
},
},
execute: async (p: {
execute: async (_id: string, p: {
action: 'members' | 'join' | 'leave';
guildNodeId: string;
channelId: string;
@@ -243,4 +256,276 @@ export function registerFabricTools(
}
},
}));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-message',
description:
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (_id: string, p: { guildNodeId: string; channelId: string; content: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(
guild.endpoint,
token,
p.channelId,
p.content,
session.user.id,
)) as { messageId?: string; seq?: number };
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-list',
description:
'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (_id: string, p: {
guildNodeId: string;
nameFilter?: string;
xType?: string;
includeClosed?: boolean;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed) return false;
if (p.xType && c.xType !== p.xType) return false;
if (needle && !c.name.toLowerCase().includes(needle)) return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
purpose: c.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-guild-list: enumerate guilds the calling agent belongs to.
// Each row carries `purpose` — free-form description of what the
// guild is for (admin-set). Use this as the first step when a
// workflow says "find the right guild for X" — pick by purpose,
// then fabric-channel-list to find the right channel inside it.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-guild-list',
description:
'List guilds the calling agent is a member of. Returns ' +
'{nodeId, name, purpose, status} per row. ' +
"`purpose` is a free-form description of what each guild is for — " +
'pick the guild whose purpose matches your intent. Use this tool ' +
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
'debate broadcasts" → then list its announce-type channels).',
parameters: {
type: 'object',
additionalProperties: false,
properties: {
nameFilter: {
type: 'string',
description: 'optional case-insensitive substring match on guild name',
},
purposeFilter: {
type: 'string',
description:
'optional case-insensitive substring match on guild purpose ' +
'(e.g. "debate", "announcements")',
},
},
},
execute: async (_id: string, p: { nameFilter?: string; purposeFilter?: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const entry = identity.findByAgentId(agentId);
if (!entry) return { ok: false, error: `agent ${agentId} not registered` };
const session = await client.agentLogin(entry.fabricApiKey);
const nameNeedle = (p.nameFilter ?? '').toLowerCase();
const purposeNeedle = (p.purposeFilter ?? '').toLowerCase();
const guilds = session.guilds.filter((g) => {
if (nameNeedle && !g.name.toLowerCase().includes(nameNeedle)) return false;
if (purposeNeedle) {
const purp = (g.purpose ?? '').toLowerCase();
if (!purp.includes(purposeNeedle)) return false;
}
return true;
});
return {
ok: true,
count: guilds.length,
guilds: guilds.map((g) => ({
nodeId: g.nodeId,
name: g.name,
status: g.status,
purpose: g.purpose ?? null,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-channel-set-purpose: set/update a channel's free-form
// purpose description. Caller must be a channel member (or the
// channel must be public). Use this to backfill purpose on existing
// channels, or to refine it after a channel's role evolves.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-set-purpose',
description:
"Set or update a channel's free-form purpose description. " +
'Channel membership required (or the channel must be public). ' +
'Pass empty string to clear. Use this to make a channel ' +
'discoverable to other agents via fabric-channel-list.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'purpose'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
purpose: {
type: 'string',
description: "What this channel is for. Pass '' (empty string) to clear.",
},
},
},
execute: async (_id: string, p: { guildNodeId: string; channelId: string; purpose: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = await client.setChannelPurpose(
guild.endpoint,
token,
p.channelId,
p.purpose,
);
return { ok: true, channel: res };
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-message-history',
description:
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (_id: string, p: {
guildNodeId: string;
channelId: string;
seqFrom?: number;
seqTo?: number;
limit?: number;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
}