25 Commits

Author SHA1 Message Date
cd36d1b9e2 feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history
Plugin previously had no way for an agent to send text into a specific
channel proactively — outbound went only through the channel-reply
path (responds to the channel that woke the agent). discussion-complete
internally called client.postMessage but only for the close-time
summary, no general-purpose surface.

Three new tools (+ declare existing fabric-canvas / fabric-channel that
were registered but missing from contracts.tools so agents couldn't
see them per the openclaw plugin contract):

  * fabric-send-message {guildNodeId, channelId, content}
      → {ok, messageId, seq}
    Author = calling agent. Use for ARD broadcasts, follow-ups in a
    different channel, etc.

  * fabric-channel-list {guildNodeId, nameFilter?, xType?, includeClosed?}
      → {ok, count, channels[]}
    Backend filters to public + member channels; nameFilter is client-
    side case-insensitive substring; xType / includeClosed apply post-
    fetch. Returns id/name/xType/lastSeq so callers can pipe into the
    other tools.

  * fabric-message-history {guildNodeId, channelId, seqFrom?, seqTo?, limit?}
      → {ok, page, messages[]}
    Tail-by-default: omit seqFrom/seqTo and the tool fetches the
    channel head from listChannels then asks for [head-limit+1, head].
    Limit default 20, max 200. Backend rejects non-participants.

Plus 3 supporting client methods (listChannels, listMessages — both
GET via existing req helper).

contracts.tools updated to declare these 5 (3 new + 2 previously-
silent ones). Verified earlier in sim restart logs: openclaw warned
'plugin tool is undeclared (fabric): fabric-canvas / fabric-channel'
so agents couldn't use them despite registerTool firing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:11:01 +01:00
9c910f082b Merge pull request 'feat(triage): per-channel serial queue + HF on_call gate + observer skip' (#3) from feat/triage-on-call-gate-and-queue into main 2026-05-22 21:59:23 +00:00
c5fd091f5a fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier)
os.hostname() fallback is wrong in sim where container hostname (server.t2)
doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate
fallback that reads openclaw config plugins.harbor-forge.identifier — the
same value the HF plugin uses for its outbound HF calls — keeping plugin
and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER
env override.

Priority:
  1. HF_CLAW_IDENTIFIER env (operator override)
  2. openclaw config plugins.harbor-forge.identifier (NEW)
  3. os.hostname() last-resort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:49:31 +01:00
c5a33c33ec feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the
new triage flow:

## 1. Per-channel serial queue

Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel
chain so consecutive messages on the same channel are processed strictly
in order — no concurrent model turns for the same channel. Other
channels remain independent (parallelism preserved across channels).

Implementation: `Map<channelId, Promise>` where each new task awaits
the previous. The map entry self-cleans when the chain settles AND
no newer task has overwritten it.

## 2. HF on_call gate (triage + wake=true only)

Before dispatching a triage wake to the on-duty agent, hit HF
`GET /calendar/agent/status?agent_id=...`. If the agent isn't
currently on_call, the message is pushed to a per-agent gated queue
instead of dispatched — no model turn fires.

Status check is cached for 5s to amortise across rapid triage bursts.

When a subsequent triage message arrives and the agent IS on_call by
that point, the gated queue drains FIFO (re-enqueued through the same
per-channel chain so order is kept) before the new message dispatches.

Drained queue is in-memory only; on gateway restart the underlying
Fabric messages get re-fetched via the connect-time history sweep.

## 3. Triage observer skip (wake=false)

Triage messages that arrive with wakeup=false are admin observers — by
spec they MUST NOT enter the agent's session history. Skipped entirely
(no recordInboundSession call). The next time this agent legitimately
wakes for triage, their context contains only past wakeups + their own
outgoing messages — no observer-side chatter from other agents.

For NON-triage channels the legacy "record-as-history" stays — those
keep their full channel conversation available for later wakes.

## Env

- HF_API_BASE_URL  — defaults `https://monitor.hangman-lab.top`
- HF_CLAW_IDENTIFIER — defaults to `os.hostname()`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:17:39 +01:00
28f5083679 Merge pull request 'feat(inbound): listen for backend-pushed channel.joined/left events' (#2) from feat/inbound-listen-push-events into main 2026-05-21 07:12:51 +00:00
a060ff98a2 feat(inbound): listen for backend-pushed channel.joined/left events
Companion to nav/Fabric.Backend.Guild#<TBD> which adds the server-side
emitToUser broadcast on channel membership changes. Before, the inbound
only learned about new channels via the 60s polling resync (worst-case
60s lag). Now the backend tells us directly so sub/unsub is realtime.

socket.on('channel.joined', evt) → join the socket.io room for evt.channelId
                                    and add to the local 'joined' set.
socket.on('channel.left',   evt) → leave + remove from 'joined'.

Both events are idempotent (`if (joined.has(id))` / `if (!joined.has(id))`)
so duplicate emits from server are safe. Polling resync still runs every
60s as a safety net for transient socket drops between emit and
reconnect, partial server failures, etc.

When backend lacks this support (older deployments), nothing breaks —
the event simply never fires and polling carries the load as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:08:33 +01:00
b9a5456d57 Merge pull request 'fix: dynamically sync inbound channel subscriptions' (#1) from fix/inbound-dynamic-channel-sync into main 2026-05-21 06:56:49 +00:00
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
92945b777d feat(fabric): dm channels deliver any non-self message (no wakeup gate)
inbound: FabricMessage gains xType; the wakeup gate is bypassed when
xType==='dm' (self messages are already filtered upstream), so a 1:1
dm always reaches the model regardless of wakeup metadata.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:20 +01:00
8774cfd7cc feat(fabric): coalesce a split agent turn into ONE message (deterministic)
OpenClaw delivers an agent turn whose blocks are text -> thinking/tool
-> text via multiple inbound deliver() calls (a non-text block is a
delivery boundary), so one turn became N Fabric messages.

Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush
them as ONE postMessage at a deterministic boundary — the finally after
dispatchInboundReplyWithBase() resolves, which provably runs only after
every deliver() of the turn (verified: deliver,deliver -> dispatch
returned -> flush). No hooks, no timers, no idle guessing. The
agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop
flushes any leftover; a long safety timeout is a leak-guard only.
channels.fabric.coalesce=false restores raw per-segment posting.

Verified on local openclaw + Fabric with a fake text/thinking/text
model: single trigger -> exactly one merged message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 22:15:46 +01:00
ab126825ef feat(security): commandsSyncKey is a required channel-config field (Guild C-2)
The slash-command sync secret now comes from
channels.fabric.commandsSyncKey (configSchema marks it required) and
is no longer read from FABRIC_COMMANDS_SYNC_KEY env. command-sync
resolves it from config and threads it into client.syncCommands;
when absent, sync is skipped with a clear warning. README updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:44:25 +01:00
bb63a57384 feat(security): send x-commands-sync-key when configured (Guild C-2)
syncCommands attaches the FABRIC_COMMANDS_SYNC_KEY header when the
operator sets it, so the guild can restrict slash-command catalog
writes to this plugin. No-op / backward compatible when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:14 +01:00
fc6edaabfd docs: slash-command catalog section
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:15:03 +01:00
c03562046d feat(plugin): sync OpenClaw slash-command catalog to Fabric
- command-sync.ts: buildFabricCommandSpecs(cfg) reads OpenClaw native
  command specs via openclaw/plugin-sdk/native-command-registry
  (listNativeCommandSpecsForConfig + findCommandByNativeName), resolves
  dynamic arg choices to a static snapshot (resolveCommandArgChoices) —
  same data Discord registers as slash commands.
- syncFabricCommands(): on gateway_start, after inbound starts, PUT the
  catalog to each connected guild (FabricClient.syncCommands ->
  PUT /api/commands; idempotent, one per guild).
- Fabric stays a TEXT-command surface (no nativeCommands capability):
  execution still flows as a /<cmd> message into OpenClaw's command
  system; this catalog only drives frontend autocomplete.

Verified: 41 specs built (args/choices incl. dynamic), synced to
test-guild1, GET /api/commands round-trips count=41.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:06:22 +01:00
fac6debfa5 feat(plugin): fabric-channel tool (members / join / leave)
One tool, three actions backed by FabricClient channelMembers (GET
/channels/:id/members -> [{userId,bypass}]), joinChannel, and new
leaveChannel (POST /channels/:id/leave).

Verified: client-level smoke against the running guild — members
initial=[tester], after join echo2 present, after leave echo2 gone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 15:33:47 +01:00
aaabb0ddb0 feat(plugin): fabric-canvas tool; fabric-register env=AGENT_ID only
- bin/fabric-register.mjs: only AGENT_ID is read from the environment;
  --api-key is flag-only (no FABRIC_API_KEY); dropped FABRIC_CENTER_API_BASE
  / FABRIC_IDENTITY_FILE / OPENCLAW_PATH env fallbacks (flags + sensible
  defaults; --center still falls back to openclaw.json).
- New fabric-canvas tool (one tool, four actions): read / share / update /
  close the channel's single pinned canvas. Backed by FabricClient
  get/share/update/removeCanvas (GET/PUT/PATCH/DELETE; empty 2xx body ->
  null). update/close are sharer-only server-side.
- README updated.

Verified: client-level smoke against the running guild —
read(empty→null) → share(v1) → read → update(v2) → close(→null) all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 15:28:13 +01:00
26c12533fb refactor(plugin): fabric-register is a script, not a tool
Binding an agent's Fabric API key was an OpenClaw tool; make it a
self-contained Node script installed to ~/.openclaw/bin/fabric-register
instead.

- bin/fabric-register.mjs: no plugin deps; AGENT_ID env wins, else
  --agent-id required; --api-key validated via POST /auth/agent/login;
  on success upserts ~/.openclaw/fabric-identity.json (format matches
  IdentityRegistry). Flags/env for center, identity-file, openclaw-path.
- install.mjs: copy the script to ~/.openclaw/bin (chmod 0755) on
  install, remove on uninstall; Next-steps updated.
- tools.ts: drop the fabric-register tool; ctxGuild error now points to
  the script / static accounts config.
- README updated.

Verified: missing-id -> exit 2; --agent-id and AGENT_ID both bind and
write a valid identity file; bad key -> 401, no write.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 13:12:48 +01:00
9d0fa1d5c8 docs: rewrite README to match current architecture
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 12:53:25 +01:00
892db9f9be feat(plugin): record non-wakeup messages as session history (no model)
Previously a non-wakeup message returned immediately and was fully
discarded — the agent kept zero record of it, so when later woken in a
discuss/work channel it replied without the conversation context.

Now non-wakeup messages are ingested into the agent's OpenClaw session
via recordInboundSession (createIfMissing) WITHOUT dispatch: the real
model is not invoked and nothing is sent back to Fabric. This is
correct for the turn engine — only the woken speaker emits a normal
message or /no-reply; non-woken agents stay silent — while still
giving the agent full channel context whenever it IS woken.

Verified live: report-channel (all recipients wakeup=false) message
logs 'recorded (no wakeup, history only)' with 0 dispatch/deliver/
posted; wakeup path unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 10:42:36 +01:00
fc7efd0227 fix(plugin): force automatic source-reply delivery (fixes no-reply)
OpenClaw defaults group-chat replies to sourceReplyDeliveryMode
'message_tool_only', which suppresses auto-delivery of the agent's
text reply (it expects the agent to call a message tool). With
ChatType 'group', the Fabric plugin's deliver callback was therefore
NEVER invoked — the agent ran but no reply ever returned to Fabric.

Fabric already gates *when* an agent speaks via the per-recipient
wakeup flag, so once a turn is dispatched the reply must always flow
back. Pass replyOptions.sourceReplyDeliveryMode='automatic' so
OpenClaw delivers the agent's reply through  regardless of
the group default (source-reply-delivery-mode honors a truthy
requested mode).

Verified live end-to-end: human posts -> wakeup -> agent runs ->
'fabric: deliver' + 'fabric: posted reply' -> agent message appears
in the Fabric channel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 10:02:47 +01:00
d79a04b8a3 fix(plugin): refresh guild token per dispatch (fixes attachment 401)
Guild access tokens are short-lived (~15 min); the inbound socket
survives via socket.io reconnect but the token captured at connect
time goes stale, so attachment downloads (and reply posts) start
401ing on long-lived agents. Re-login with the agent's Fabric API key
on a short TTL and use the fresh token for fetch + post.

Verified live: 'fabric: fetched 1 attachment(s)' now succeeds where it
previously logged 'attachment fetch 401'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 00:03:31 +01:00
cc655ffcc3 fix(plugin): pass only local MediaPaths (drop SSRF-blocked MediaUrls)
Live round-trip test showed openclaw's SSRF guard blocking the
localhost guild file URL passed via MediaUrls. We already download the
bytes with the agent's guild token, so MediaUrls is redundant and
noisy — provide only local MediaPaths/MediaTypes. Verified: plugin
logs 'fetched N attachment(s)' and the SSRF WARN is gone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 21:50:34 +01:00
42228e0a23 chore(plugin): rebuild dist (file delivery)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 20:17:28 +01:00
2abd0000e6 feat(plugin): deliver uploaded files to the agent
When an inbound Fabric message has attachments, download each with the
agent's guild token to a temp dir and set MediaPaths/MediaTypes/
MediaUrls (+ singular) on the finalized inbound context so openclaw
hands the files to the model.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 20:17:17 +01:00
25473384d8 docs: require cfg.bindings entry for account->agent routing
agent=account only routes correctly when openclaw cfg.bindings has a
{agentId, match:{channel:fabric, accountId}} entry; else falls back to
the default agent. Verified: with the binding, account echo -> agent
echo, reply posted back as the agent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 18:35:49 +01:00
20 changed files with 2136 additions and 129 deletions

136
README.md
View File

@@ -1,51 +1,139 @@
# Fabric.OpenclawPlugin
An **OpenClaw channel plugin** that connects OpenClaw agents to a Fabric guild.
A native **OpenClaw channel plugin** that connects OpenClaw agents to Fabric
guilds as real channel members. Independent of OpenClaw's source — it only
uses the public plugin SDK.
## Model
- `kind: "channel"` plugin (like the bundled discord channel). OpenClaw **core
owns dispatch** (inbound → agent run) and the reply pipeline via the channel
turn kernel `runtime.channel.turn.run(...)`.
- Fabric already owns turn/shuffle/mention/`/no-reply` server-side, so this
plugin is thin. Fabric's per-recipient **`wakeup`** maps to channel-turn
**admission**:
- `wakeup === true``dispatch` (agent runs, may reply)
- otherwise → `{ kind: "drop", recordHistory: true }` (kept as context)
- **No sidecar, no fake no-reply model, no `before_model_resolve` gating.**
- `kind: "channel"` plugin (like the bundled discord channel). OpenClaw core
owns dispatch and the reply pipeline via the channel-turn kernel
(`resolveAgentRoute` + `finalizeInboundContext` +
`dispatchInboundReplyWithBase`). Fabric already owns turn/shuffle/mention/
`/no-reply` server-side, so this plugin is thin.
- Fabric's per-recipient **`wakeup`** maps to admission:
- `wakeup === true`**dispatch** (the agent runs and may reply).
- `wakeup !== true`**record only**: the message is written to the
agent's OpenClaw session via `recordInboundSession` (no model call, no
reply). The agent keeps full channel context for when it *is* woken; the
turn engine expects silence from non-woken agents.
- Replies are forced to **automatic** delivery
(`replyOptions.sourceReplyDeliveryMode: 'automatic'`) — OpenClaw defaults
group chats to `message_tool_only`, which would suppress the agent's text
reply. Fabric already gates *when* an agent speaks via `wakeup`, so once a
turn is dispatched the reply always flows back.
- One Fabric socket per agent identity. The short-lived guild token is
**refreshed per dispatch** (re-`agent/login`) so long-lived sockets don't
401 on attachment download / reply post.
## Files to agents
When an inbound message has `attachments[]`, the plugin downloads each file
(with the agent's guild token) to a temp dir and sets local
`MediaPaths`/`MediaTypes` on the inbound context so the agent receives the
files. `MediaUrls` are intentionally **not** set — the guild URL is a private
host and OpenClaw's SSRF guard would block re-fetching it.
## Auth
Each agent has a Fabric Center **API key** (mint via Center CLI:
`node dist/cli.js user apikey --email <agent-email>`). The key is exchanged for
a user session (`POST /auth/agent/login`) used to receive (socket) and post
replies. Bind a key to an agent via the `fabric-register` tool, or pre-populate
the identity file.
`node dist/cli.js user apikey --email <agent-email>`). The key is exchanged
for a user session (`POST /auth/agent/login`).
### Binding a key to an agent (one-time)
Two ways, both write the same identity registry the transport reads:
1. **Static config** — set `channels.fabric.accounts.<agentId> =
{ fabricApiKey, enabled }`. The agent never runs anything.
2. **`fabric-register` script** — installed to `~/.openclaw/bin/fabric-register`
by the installer (it is **not** an OpenClaw tool):
```bash
# agent id from $AGENT_ID (set in the agent runtime):
~/.openclaw/bin/fabric-register --api-key fak_…
# or pass it explicitly:
~/.openclaw/bin/fabric-register --agent-id <agent> --api-key fak_…
```
It validates the key against Center, then writes
`~/.openclaw/fabric-identity.json`. One-time and persistent — *not* per
login; the plugin's transport logs in and stays connected on its own.
**Only `AGENT_ID` is read from the environment** — if unset, `--agent-id`
is required. `--api-key` is flag-only (never from the env). Other flags:
`--center`, `--identity-file`, `--openclaw-path` (sensible defaults;
`--center` also falls back to `openclaw.json`). Restart the gateway
afterwards.
## Config
- `channels.fabric.centerApiBase` — e.g. `http://localhost:7001/api`
- `channels.fabric.commandsSyncKey` — **required**; must equal the guild's
`FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the
guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`.
Sourced from config only — never from the environment.
- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent
turn whose blocks are `text → thinking/tool → text` into multiple
`deliver()` calls; this buffers them and posts ONE Fabric message at the
deterministic turn boundary (right after the inbound reply dispatch
resolves — no hooks, no timers, no idle guessing). `false` = raw
per-segment posting.
- `channels.fabric.accounts.<agentId>` = `{ fabricApiKey, enabled }`
(**agent = account**; the account id is the OpenClaw agentId)
- plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json`
(`{ entries: [{ agentId, fabricApiKey }] }`)
### Required: route binding (account → agent)
OpenClaw routes a channel turn via `cfg.bindings`; without a Fabric binding
it falls back to the default agent. One per account:
```json
{ "agentId": "<agent>", "match": { "channel": "fabric", "accountId": "<account>" } }
```
Then `openclaw gateway restart`.
## Tools
- `fabric-register` — bind this agent's Fabric API key
(Key binding is **not** a tool — see *Binding a key to an agent* above.)
- `create-chat-channel` (general) / `create-work-channel` (work) /
`create-report-channel` (report) / `create-discussion-channel` (discuss)
- `discussion-complete` — post a summary then close the channel
(Fabric `POST /channels/:id/close`; closed → history readable, posts → 409)
- `discussion-complete` — post a summary, then close the channel
(closed → history readable; new posts → `409`)
- `fabric-canvas` — manage a channel's single pinned canvas document; one
tool, four `action`s: `read` (current canvas or null) · `share`
(create/replace; caller becomes sharer) · `update` (edit in place;
sharer-only) · `close` (remove; sharer-only). `share` needs
`title`/`format`(`md`|`html`|`text`)/`source`.
- `fabric-channel` — channel membership; one tool, three `action`s:
`members` (list the channel's member userIds) · `join` (this agent
joins) · `leave` (this agent leaves).
## Transport (Phase 1 = B1)
## Slash-command catalog
One Fabric socket per agent identity, in the plugin runtime. Firehose (B2) is
a later drop-in behind the same `dispatch()` seam.
On `gateway_start` the plugin syncs OpenClaw's native-command catalog to
each guild (`command-sync.ts`: `listNativeCommandSpecsForConfig` +
`findCommandByNativeName`, dynamic arg `choices` snapshotted via
`resolveCommandArgChoices`) → `PUT /api/commands`. This is exactly the data
Discord registers as slash commands; Fabric's frontend uses it for `/`
autocomplete. Fabric deliberately does **not** advertise the
`nativeCommands` channel capability — it stays a text-command surface, so a
`/<cmd>` message is delivered normally and OpenClaw's command system
executes it (text-command + command session + auth), reusing the standard
inbound path. Only `/no-reply` and `/force-proceed` stay
server-intercepted by the guild.
## Build
## Install / build
```bash
npm install && npm run build
node install.mjs # build + copy to ~/.openclaw/plugins/fabric + configure
```
> The plugin compiles against the host's OpenClaw SDK
> (`openclaw/plugin-sdk/*`); build inside the OpenClaw plugin environment.
`install.mjs` mirrors the PaddedCell-style installer (also `--uninstall`).
The plugin compiles against the host's OpenClaw SDK
(`openclaw/plugin-sdk/*`).
> Transport is Phase 1 (one socket per agent). A firehose variant (B2) is a
> later drop-in behind the same `dispatch()` seam.

161
bin/fabric-register.mjs Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env node
/**
* fabric-register — bind an OpenClaw agent to a Fabric Center API key.
*
* One-time, self-contained (no plugin deps). Installed to
* ~/.openclaw/bin/fabric-register by the plugin installer.
*
* AGENT_ID is read from the environment if set; otherwise --agent-id is
* required. The API key is validated against Center (POST
* /auth/agent/login) and, on success, written to the plugin's identity
* file so the Fabric channel plugin can connect that agent.
*
* Usage:
* fabric-register --api-key fak_xxx # uses $AGENT_ID
* fabric-register --agent-id echo --api-key fak_xxx
*
* Only AGENT_ID is read from the environment; everything else is a flag.
*
* Flags:
* --agent-id <id> required unless the AGENT_ID env var is set
* --api-key <fak_…> required (flag only — never from the environment)
* --center <url> else openclaw.json channels.fabric.centerApiBase;
* else http://localhost:7001/api
* --identity-file <path> default ~/.openclaw/fabric-identity.json
* --openclaw-path <dir> default ~/.openclaw
* -h | --help
*/
import { readFileSync, writeFileSync, mkdirSync, existsSync } from 'node:fs';
import { dirname, join, resolve } from 'node:path';
import { homedir } from 'node:os';
function parseArgs(argv) {
const out = {};
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '-h' || a === '--help') out.help = true;
else if (a.startsWith('--')) {
const key = a.slice(2);
const v = argv[i + 1];
if (v === undefined || v.startsWith('--')) out[key] = true;
else {
out[key] = v;
i++;
}
}
}
return out;
}
const HELP = `fabric-register — bind an OpenClaw agent to a Fabric Center API key
fabric-register --api-key fak_xxx # agent id from $AGENT_ID
fabric-register --agent-id <id> --api-key fak_xxx
--agent-id <id> required unless the AGENT_ID env var is set
--api-key <fak_…> required (flag only — never read from the env)
--center <url> Center API base (else openclaw.json
channels.fabric.centerApiBase; else
http://localhost:7001/api)
--identity-file <path> default ~/.openclaw/fabric-identity.json
--openclaw-path <dir> default ~/.openclaw
-h, --help
Only AGENT_ID is taken from the environment; everything else is a flag.
`;
function fail(msg) {
console.error(`fabric-register: ${msg}`);
process.exit(2);
}
async function main() {
const a = parseArgs(process.argv.slice(2));
if (a.help) {
process.stdout.write(HELP);
return;
}
// agent id: env AGENT_ID wins; else --agent-id is required.
const agentId =
(process.env.AGENT_ID && process.env.AGENT_ID.trim()) ||
(typeof a['agent-id'] === 'string' && a['agent-id'].trim());
if (!agentId) {
fail('no agent id: set the AGENT_ID environment variable or pass --agent-id <id>');
}
// api key: flag ONLY — never from the environment.
const apiKey = typeof a['api-key'] === 'string' && a['api-key'].trim();
if (!apiKey) fail('missing --api-key <fak_…> (flag only)');
const openclawPath = resolve(
(typeof a['openclaw-path'] === 'string' && a['openclaw-path']) ||
join(homedir(), '.openclaw'),
);
// center api base: flag > openclaw.json > default
let center = (typeof a.center === 'string' && a.center) || '';
if (!center) {
try {
const cfg = JSON.parse(readFileSync(join(openclawPath, 'openclaw.json'), 'utf8'));
center = cfg?.channels?.fabric?.centerApiBase || '';
} catch {
/* fall through to default */
}
}
if (!center) center = 'http://localhost:7001/api';
center = center.replace(/\/+$/, '');
const identityFile = resolve(
(typeof a['identity-file'] === 'string' && a['identity-file']) ||
join(openclawPath, 'fabric-identity.json'),
);
// 1) validate the key against Center (also resolves the Fabric identity)
let session;
try {
const res = await fetch(`${center}/auth/agent/login`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ apiKey }),
});
if (!res.ok) {
const t = await res.text().catch(() => '');
fail(`Center rejected the key: POST ${center}/auth/agent/login -> ${res.status} ${t}`);
}
session = await res.json();
} catch (e) {
fail(`could not reach Center at ${center}: ${e?.message || e}`);
}
// 2) upsert into the identity file (merge by agentId)
let file = { entries: [] };
if (existsSync(identityFile)) {
try {
const parsed = JSON.parse(readFileSync(identityFile, 'utf8'));
if (parsed && Array.isArray(parsed.entries)) file = parsed;
} catch {
/* corrupt -> overwrite */
}
}
const entry = {
agentId,
fabricApiKey: apiKey,
fabricUserId: session?.user?.id,
displayName: session?.user?.name,
};
const i = file.entries.findIndex((e) => e && e.agentId === agentId);
if (i >= 0) file.entries[i] = { ...file.entries[i], ...entry };
else file.entries.push(entry);
mkdirSync(dirname(identityFile), { recursive: true });
writeFileSync(identityFile, JSON.stringify(file, null, 2));
console.log(
`fabric-register: bound agent "${agentId}" -> Fabric user ` +
`${session?.user?.email || session?.user?.id} (${identityFile}). ` +
`Restart the gateway to connect: openclaw gateway restart`,
);
}
main().catch((e) => fail(e?.message || String(e)));

View File

@@ -5,11 +5,13 @@
// the OpenClawPluginApi for runtime startup (transport + tools).
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef = null;
@@ -57,8 +59,14 @@ export default defineChannelPluginEntry({
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
void inbound.start();
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -7,6 +7,17 @@ const DEFAULT_CENTER = 'http://localhost:7001/api';
function section(cfg) {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg) {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg) {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg) {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -137,7 +137,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({
attachedResults: {
channel: 'fabric',
sendText: async (ctx) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {});
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

75
dist/fabric/src/coalesce.js vendored Normal file
View File

@@ -0,0 +1,75 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x) {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
const pendingByChannel = new Map();
async function flushChannel(channelId, reason) {
const p = pendingByChannel.get(channelId);
if (!p)
return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text)
return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
}
catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params) {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text)
return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
}
else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(rawChannelId) {
const cid = normChannelId(rawChannelId);
if (cid)
await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric() {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

109
dist/fabric/src/command-sync.js vendored Normal file
View File

@@ -0,0 +1,109 @@
// Build the Fabric slash-command catalog from OpenClaw's native-command
// specs (the same source Discord uses to register slash commands) and push
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
// message is delivered normally and OpenClaw's command system executes it —
// this catalog only drives the frontend `/` autocomplete, so we resolve any
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import { listNativeCommandSpecsForConfig, findCommandByNativeName, resolveCommandArgChoices, } from 'openclaw/plugin-sdk/native-command-registry';
import { resolveCommandsSyncKey } from './accounts.js';
function normChoice(c) {
if (typeof c === 'string')
return { value: c, label: c };
const o = c;
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
}
export function buildFabricCommandSpecs(cfg) {
const specs = listNativeCommandSpecsForConfig(cfg, {
provider: 'fabric',
});
return specs.map((s) => {
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
const def = findCommandByNativeName(s.name, 'fabric');
const args = (s.args ?? []).map((a) => {
const raw = a.choices;
let choices = null;
if (Array.isArray(raw)) {
choices = raw.map(normChoice);
}
else if (typeof raw === 'function' && def) {
try {
const r = resolveCommandArgChoices({
command: def,
arg: a,
cfg: cfg,
provider: 'fabric',
});
choices = r.map((x) => ({ value: x.value, label: x.label }));
}
catch {
choices = null;
}
}
return {
name: String(a.name ?? ''),
description: String(a.description ?? ''),
type: String(a.type ?? 'string'),
required: !!a.required,
captureRemaining: !!a.captureRemaining,
preferAutocomplete: !!a.preferAutocomplete,
choices,
};
});
return {
name: s.name,
nativeName: s.name,
description: s.description,
acceptsArgs: !!s.acceptsArgs,
args,
argsParsing: def?.argsParsing ?? 'positional',
};
});
}
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(client, cfg, accounts, log) {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg);
if (!syncKey) {
log.warn('fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)');
return;
}
let specs;
try {
specs = buildFabricCommandSpecs(cfg);
}
catch (err) {
log.warn(`fabric: build command specs failed: ${String(err)}`);
return;
}
if (!specs.length)
return;
const done = new Set();
for (const a of accounts) {
let session;
try {
session = await client.agentLogin(a.fabricApiKey);
}
catch {
continue;
}
for (const g of session.guilds) {
if (done.has(g.nodeId))
continue;
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
continue;
try {
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
}
catch (err) {
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
}
}
}
}

View File

@@ -21,6 +21,25 @@ export class FabricClient {
}
return (await res.json());
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
async req(method, url, auth, body, extraHeaders) {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
}
const text = await res.text();
return (text ? JSON.parse(text) : null);
}
// Exchange an agent API key for a Fabric user session (+ guild tokens).
agentLogin(apiKey) {
return this.post(`${this.centerApiBase}/auth/agent/login`, { apiKey });
@@ -50,4 +69,40 @@ export class FabricClient {
joinChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
}
leaveChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
}
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(guildEndpoint, guildToken, commands, syncKey) {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands }, syncKey ? { 'x-commands-sync-key': syncKey } : undefined);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(guildEndpoint, guildToken, channelId) {
return this.req('GET', `${guildEndpoint}/api/channels/${channelId}/members`, guildToken);
}
// ---- channel canvas (one pinned doc per channel) ----
canvasUrl(endpoint, channelId) {
return `${endpoint}/api/channels/${channelId}/canvas`;
}
// null when the channel has no canvas
getCanvas(endpoint, token, channelId) {
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
}
// share / replace (caller becomes the sharer)
shareCanvas(endpoint, token, channelId, body) {
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
}
// update in place (original sharer only — else the guild returns 403)
updateCanvas(endpoint, token, channelId, body) {
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
}
// remove ("close") the canvas (original sharer only)
removeCanvas(endpoint, token, channelId) {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
}

View File

@@ -1,5 +1,10 @@
import { promises as fs } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
cfg;
@@ -9,6 +14,43 @@ export class FabricInbound {
accounts;
sockets = [];
seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
// Re-login per agent on a short TTL to keep a fresh token.
tokenCache = new Map();
static TOKEN_TTL_MS = 8 * 60 * 1000;
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
async freshGuildToken(agentId, guildNodeId, fallback) {
const pick = (s) => s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
const now = Date.now();
const cached = this.tokenCache.get(agentId);
if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) {
return pick(cached.session) ?? pick(fallback);
}
const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey;
if (apiKey) {
try {
const s = await this.client.agentLogin(apiKey);
this.tokenCache.set(agentId, { session: s, at: now });
return pick(s) ?? pick(fallback);
}
catch (err) {
this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`);
}
}
return pick(fallback);
}
constructor(core, // PluginRuntime
cfg, // OpenClawConfig
client, identity, log, accounts = []) {
@@ -41,6 +83,9 @@ export class FabricInbound {
}
}
stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets)
s.disconnect();
this.sockets = [];
@@ -56,19 +101,83 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
const channels = res.ok ? (await res.json()) : [];
for (const c of channels)
socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
}
catch {
/* best effort */
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt) => {
const id = evt?.channelId;
if (!id || joined.has(id))
return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt) => {
const id = evt?.channelId;
if (!id || !joined.has(id))
return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)
@@ -87,13 +196,49 @@ export class FabricInbound {
this.sockets.push(socket);
}
}
async dispatch(agentId, guild, channelId, m, session) {
// wakeup === false -> drop (Fabric already decided this agent is silent)
if (m.wakeup !== true) {
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
return;
// Download a message's attachments to a temp dir using the agent's guild
// token; returns local paths/types/urls for the inbound media context.
async fetchAttachments(agentId, endpoint, token, m) {
const out = { paths: [], types: [], urls: [] };
const list = m.attachments ?? [];
if (!list.length || !token)
return out;
const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_'));
try {
await fs.mkdir(dir, { recursive: true });
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
catch {
return out;
}
let i = 0;
for (const a of list) {
try {
const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`;
const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } });
if (!res.ok) {
this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`);
continue;
}
const buf = Buffer.from(await res.arrayBuffer());
const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`;
const p = join(dir, `${i}-${safe}`);
await fs.writeFile(p, buf);
out.paths.push(p);
out.types.push(a.mimeType ||
res.headers.get('content-type')?.split(';')[0] ||
'application/octet-stream');
out.urls.push(abs);
i++;
}
catch (err) {
this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`);
}
}
if (out.paths.length)
this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`);
return out;
}
async dispatch(agentId, guild, channelId, m, session) {
const core = this.core;
const cfg = this.cfg;
try {
@@ -106,7 +251,7 @@ export class FabricInbound {
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
const baseCtx = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
@@ -124,8 +269,49 @@ export class FabricInbound {
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
};
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err) => this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
// bytes) and noisy. Local MediaPaths is the reliable delivery.
...(media.paths.length
? {
MediaPaths: media.paths,
MediaTypes: media.types,
MediaPath: media.paths[0],
MediaType: media.types[0],
}
: {}),
});
const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token;
await dispatchInboundReplyWithBase({
cfg: this.cfg,
channel: 'fabric',
@@ -139,18 +325,46 @@ export class FabricInbound {
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt)
return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg),
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
log: (m) => this.log.info(m),
});
},
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
// Fabric has no length limit: deliver the whole reply as ONE message.
replyOptions: { disableBlockStreaming: true },
// - disableBlockStreaming: Fabric has no length limit, deliver the
// whole reply as ONE message.
// - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group
// chats to "message_tool_only", which SUPPRESSES auto-delivery of
// the agent's text reply (it expects the agent to call a message
// tool). Fabric already gates *when* an agent speaks via the
// per-recipient wakeup flag, so once a turn is dispatched the
// reply must always flow back through `deliver`. Forcing
// 'automatic' overrides the group default so the reply is
// delivered. (source-reply-delivery-mode: a truthy `requested`
// wins unless it's message_tool_only with no tool available.)
replyOptions: {
disableBlockStreaming: true,
sourceReplyDeliveryMode: 'automatic',
},
});
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
}
catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
}
finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -9,7 +9,9 @@ export function registerFabricTools(api, client, identity) {
const ctxGuild = async (agentId, guildNodeId) => {
const entry = identity.findByAgentId(agentId);
if (!entry)
throw new Error(`agent ${agentId} not registered (call fabric-register)`);
throw new Error(`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
@@ -17,32 +19,10 @@ export function registerFabricTools(api, client, identity) {
throw new Error(`agent not a member of guild ${guildNodeId}`);
return { session, guild, token };
};
// fabric-register: bind this agent to a Fabric API key.
api.registerTool((ctx) => ({
name: 'fabric-register',
description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).",
parameters: {
type: 'object',
additionalProperties: false,
required: ['fabricApiKey'],
properties: {
fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' },
},
},
execute: async (params) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const session = await client.agentLogin(params.fabricApiKey);
identity.upsert({
agentId,
fabricApiKey: params.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
return { ok: true, user: session.user };
},
}));
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind) => api.registerTool((ctx) => ({
name: `create-${kind}-channel`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
@@ -109,4 +89,117 @@ export function registerFabricTools(api, client, identity) {
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise).
api.registerTool((ctx) => ({
name: 'fabric-canvas',
description: "Manage a channel's pinned canvas document. action: " +
"read (current canvas or null) | share (create/replace; you become " +
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
'sharer only).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
title: { type: 'string', description: 'share: required; update: optional' },
format: {
type: 'string',
enum: ['md', 'html', 'text'],
description: 'share: required; update: optional',
},
source: {
type: 'string',
description: 'document body. share: required; update: optional',
},
},
},
execute: async (p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'read': {
const canvas = await client.getCanvas(ep, token, p.channelId);
return { ok: true, canvas };
}
case 'share': {
if (!p.title || !p.format || p.source === undefined) {
return { ok: false, error: 'share requires title, format, and source' };
}
const canvas = await client.shareCanvas(ep, token, p.channelId, {
title: p.title,
format: p.format,
source: p.source,
});
return { ok: true, canvas };
}
case 'update': {
const body = {};
if (p.title !== undefined)
body.title = p.title;
if (p.format !== undefined)
body.format = p.format;
if (p.source !== undefined)
body.source = p.source;
if (Object.keys(body).length === 0) {
return { ok: false, error: 'update needs at least one of title/format/source' };
}
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
return { ok: true, canvas };
}
case 'close': {
await client.removeCanvas(ep, token, p.channelId);
return { ok: true, removed: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// fabric-channel: channel membership (one tool, three actions).
api.registerTool((ctx) => ({
name: 'fabric-channel',
description: 'Channel membership. action: members (list channel member userIds) | ' +
'join (this agent joins the channel) | leave (this agent leaves).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['members', 'join', 'leave'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
},
},
execute: async (p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'members': {
const members = await client.channelMembers(ep, token, p.channelId);
return { ok: true, members };
}
case 'join': {
await client.joinChannel(ep, token, p.channelId);
return { ok: true, joined: true };
}
case 'leave': {
await client.leaveChannel(ep, token, p.channelId);
return { ok: true, left: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
}

View File

@@ -6,11 +6,13 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import path from 'node:path';
import os from 'node:os';
@@ -36,7 +38,7 @@ export default defineChannelPluginEntry({
config?: unknown;
pluginConfig?: { identityFilePath?: string };
logger: { info: (m: string) => void; warn: (m: string) => void };
on: (ev: string, fn: () => void) => void;
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
registerTool: (d: unknown) => void;
};
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
@@ -82,9 +84,15 @@ export default defineChannelPluginEntry({
);
void inbound.start();
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -10,7 +10,7 @@
*/
import { execSync } from 'child_process';
import { existsSync, mkdirSync, copyFileSync, readdirSync, rmSync } from 'fs';
import { existsSync, mkdirSync, copyFileSync, readdirSync, rmSync, chmodSync } from 'fs';
import { dirname, join, resolve } from 'path';
import { fileURLToPath } from 'url';
import { homedir } from 'os';
@@ -109,12 +109,30 @@ function build() {
ok('compiled -> dist/fabric');
}
function binTarget(base) {
return join(base, 'bin', 'fabric-register');
}
function installBinScript(base) {
const src = join(__dirname, 'bin', 'fabric-register.mjs');
const dst = binTarget(base);
mkdirSync(dirname(dst), { recursive: true });
copyFileSync(src, dst);
chmodSync(dst, 0o755);
ok(`fabric-register -> ${dst}`);
}
function clearInstall(base) {
const dest = join(base, 'plugins', PLUGIN_ID);
if (existsSync(dest)) {
rmSync(dest, { recursive: true, force: true });
ok(`removed ${dest}`);
}
const bin = binTarget(base);
if (existsSync(bin)) {
rmSync(bin, { force: true });
ok(`removed ${bin}`);
}
}
function cleanupConfig(base) {
const dest = join(base, 'plugins', PLUGIN_ID);
@@ -149,6 +167,7 @@ function install() {
ok(`plugin files -> ${dest}`);
exec('npm install --omit=dev', { cwd: dest, silent: !opt.verbose });
ok('runtime deps installed');
installBinScript(base);
return { base, dest };
}
@@ -200,8 +219,10 @@ function main() {
console.log('');
log('Install complete. Next:', 'blue');
log(' 1. Mint an agent key: (in Center) node dist/cli.js user apikey --email <agent-email>', 'cyan');
log(' 2. openclaw gateway restart', 'cyan');
log(' 3. As an agent, call the fabric-register tool with that key', 'cyan');
log(' 2. Bind it to an agent (one-time), either:', 'cyan');
log(' AGENT_ID=<agent> ~/.openclaw/bin/fabric-register --api-key <fak_…>', 'cyan');
log(' (or pass --agent-id <agent>; or set channels.fabric.accounts.<agent>)', 'cyan');
log(' 3. openclaw gateway restart', 'cyan');
console.log('');
} catch (e) {
log(`\nInstall failed: ${e.message}`, 'red');

View File

@@ -14,7 +14,12 @@
"create-work-channel",
"create-report-channel",
"create-discussion-channel",
"discussion-complete"
"discussion-complete",
"fabric-canvas",
"fabric-channel",
"fabric-send-message",
"fabric-channel-list",
"fabric-message-history"
]
},
"configSchema": {
@@ -39,6 +44,15 @@
"type": "string",
"description": "Fabric Center API base, e.g. http://localhost:7001/api"
},
"commandsSyncKey": {
"type": "string",
"minLength": 1,
"description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js"
},
"coalesce": {
"type": "boolean",
"description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting."
},
"dmSecurity": { "type": "string" },
"dmPolicy": { "type": "string" },
"enabled": { "type": "boolean" },
@@ -59,10 +73,12 @@
}
}
}
}
},
"required": ["commandsSyncKey"]
},
"uiHints": {
"centerApiBase": { "label": "Center API base" }
"centerApiBase": { "label": "Center API base" },
"commandsSyncKey": { "label": "Commands sync key" }
}
}
}

View File

@@ -14,6 +14,15 @@ export type FabricAccountConfig = {
export type FabricChannelConfig = {
centerApiBase?: string;
// Shared secret matching the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY
// (Guild C-2). Required by the channel config schema; sourced from config
// only — never from the environment.
commandsSyncKey?: string;
// Coalesce an agent turn that OpenClaw split into multiple deliveries
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
// message. The flush boundary is the deterministic `agent_end` hook (not
// a timer). Default true; set false for raw per-segment posting.
coalesce?: boolean;
accounts?: Record<string, FabricAccountConfig>;
defaultAccount?: string;
} & FabricAccountConfig;
@@ -35,6 +44,19 @@ function section(cfg: Cfg): FabricChannelConfig {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg: Cfg): string {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg: Cfg): boolean {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg: Cfg): string[] {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
cfg?: unknown;
config?: unknown;
}) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

93
src/coalesce.ts Normal file
View File

@@ -0,0 +1,93 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x: string | null | undefined): string {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
type Pending = {
parts: string[];
post: (text: string) => Promise<void>;
log?: (m: string) => void;
safety: ReturnType<typeof setTimeout>;
};
const pendingByChannel = new Map<string, Pending>();
async function flushChannel(channelId: string, reason: string): Promise<void> {
const p = pendingByChannel.get(channelId);
if (!p) return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text) return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
} catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params: {
channelId: string;
text: string;
coalesce: boolean;
post: (text: string) => Promise<void>;
log?: (m: string) => void;
}): Promise<void> {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text) return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
} else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(
() => void flushChannel(cid, 'safety-timeout'),
SAFETY_FLUSH_MS,
),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(
rawChannelId: string | null | undefined,
): Promise<void> {
const cid = normChannelId(rawChannelId);
if (cid) await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric(): Promise<void> {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

149
src/command-sync.ts Normal file
View File

@@ -0,0 +1,149 @@
// Build the Fabric slash-command catalog from OpenClaw's native-command
// specs (the same source Discord uses to register slash commands) and push
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
// message is delivered normally and OpenClaw's command system executes it —
// this catalog only drives the frontend `/` autocomplete, so we resolve any
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import {
listNativeCommandSpecsForConfig,
findCommandByNativeName,
resolveCommandArgChoices,
} from 'openclaw/plugin-sdk/native-command-registry';
import type { FabricClient } from './fabric-client.js';
import { resolveCommandsSyncKey } from './accounts.js';
type Logger = { info: (m: string) => void; warn: (m: string) => void };
type FabricArg = {
name: string;
description: string;
type: string;
required: boolean;
captureRemaining: boolean;
preferAutocomplete: boolean;
choices: Array<{ value: string; label: string }> | null;
};
type FabricCommand = {
name: string;
nativeName: string;
description: string;
acceptsArgs: boolean;
args: FabricArg[];
argsParsing: string;
};
function normChoice(c: unknown): { value: string; label: string } {
if (typeof c === 'string') return { value: c, label: c };
const o = c as { value?: string; label?: string };
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
}
export function buildFabricCommandSpecs(cfg: unknown): FabricCommand[] {
const specs = listNativeCommandSpecsForConfig(cfg as never, {
provider: 'fabric',
}) as Array<{
name: string;
description: string;
acceptsArgs?: boolean;
args?: Array<Record<string, unknown>>;
}>;
return specs.map((s) => {
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
const def = findCommandByNativeName(s.name, 'fabric') as
| { argsParsing?: string; args?: Array<Record<string, unknown>> }
| undefined;
const args: FabricArg[] = (s.args ?? []).map((a) => {
const raw = a.choices;
let choices: Array<{ value: string; label: string }> | null = null;
if (Array.isArray(raw)) {
choices = raw.map(normChoice);
} else if (typeof raw === 'function' && def) {
try {
const r = resolveCommandArgChoices({
command: def as never,
arg: a as never,
cfg: cfg as never,
provider: 'fabric',
}) as Array<{ value: string; label: string }>;
choices = r.map((x) => ({ value: x.value, label: x.label }));
} catch {
choices = null;
}
}
return {
name: String(a.name ?? ''),
description: String(a.description ?? ''),
type: String(a.type ?? 'string'),
required: !!a.required,
captureRemaining: !!a.captureRemaining,
preferAutocomplete: !!a.preferAutocomplete,
choices,
};
});
return {
name: s.name,
nativeName: s.name,
description: s.description,
acceptsArgs: !!s.acceptsArgs,
args,
argsParsing: def?.argsParsing ?? 'positional',
};
});
}
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(
client: FabricClient,
cfg: unknown,
accounts: Array<{ agentId: string; fabricApiKey: string }>,
log: Logger,
): Promise<void> {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg as never);
if (!syncKey) {
log.warn(
'fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)',
);
return;
}
let specs: FabricCommand[];
try {
specs = buildFabricCommandSpecs(cfg);
} catch (err) {
log.warn(`fabric: build command specs failed: ${String(err)}`);
return;
}
if (!specs.length) return;
const done = new Set<string>();
for (const a of accounts) {
let session;
try {
session = await client.agentLogin(a.fabricApiKey);
} catch {
continue;
}
for (const g of session.guilds) {
if (done.has(g.nodeId)) continue;
const tok = session.guildAccessTokens.find(
(t) => t.guildNodeId === g.nodeId,
)?.token;
if (!tok) continue;
try {
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
} catch (err) {
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
}
}
}
}

View File

@@ -29,6 +29,32 @@ export class FabricClient {
return (await res.json()) as T;
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
private async req<T>(
method: string,
url: string,
auth?: string,
body?: unknown,
extraHeaders?: Record<string, string>,
): Promise<T> {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
}
const text = await res.text();
return (text ? JSON.parse(text) : null) as T;
}
// Exchange an agent API key for a Fabric user session (+ guild tokens).
agentLogin(apiKey: string): Promise<FabricSession> {
return this.post<FabricSession>(`${this.centerApiBase}/auth/agent/login`, { apiKey });
@@ -86,4 +112,165 @@ export class FabricClient {
joinChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
}
leaveChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
}
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(
guildEndpoint: string,
guildToken: string,
commands: unknown[],
syncKey: string,
): Promise<unknown> {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req(
'PUT',
`${guildEndpoint}/api/commands`,
guildToken,
{ commands },
syncKey ? { 'x-commands-sync-key': syncKey } : undefined,
);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(
guildEndpoint: string,
guildToken: string,
channelId: string,
): Promise<Array<{ userId: string; bypass?: boolean }>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels/${channelId}/members`,
guildToken,
);
}
// ---- channel canvas (one pinned doc per channel) ----
private canvasUrl(endpoint: string, channelId: string): string {
return `${endpoint}/api/channels/${channelId}/canvas`;
}
// null when the channel has no canvas
getCanvas(
endpoint: string,
token: string,
channelId: string,
): Promise<FabricCanvas | null> {
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
}
// share / replace (caller becomes the sharer)
shareCanvas(
endpoint: string,
token: string,
channelId: string,
body: CanvasInput,
): Promise<FabricCanvas> {
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
}
// update in place (original sharer only — else the guild returns 403)
updateCanvas(
endpoint: string,
token: string,
channelId: string,
body: Partial<CanvasInput>,
): Promise<FabricCanvas> {
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
}
// remove ("close") the canvas (original sharer only)
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(
guildEndpoint: string,
guildToken: string,
guildNodeId: string,
): Promise<Array<{
id: string;
guildId: string;
name: string;
xType: string;
kind: string;
isPublic: boolean;
closed: boolean;
lastSeq: number;
createdAt: string;
}>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
guildToken,
);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(
guildEndpoint: string,
guildToken: string,
channelId: string,
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
): Promise<{
items: Array<{
messageId: string;
seq: number;
content: string;
authorUserId: string;
createdAt: string;
editedAt: string | null;
deletedAt: string | null;
isDeleted: boolean;
}>;
page: {
seqFrom: number;
seqTo: number;
limit: number;
returned: number;
hasMore: boolean;
nextExpectedSeq: number;
highestCommittedSeq: number;
};
}> {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
}
export type CanvasFormat = 'md' | 'html' | 'text';
export type CanvasInput = { title: string; format: CanvasFormat; source: string };
export type FabricCanvas = {
channelId: string;
sharerUserId: string;
title: string;
format: CanvasFormat;
source: string;
version: number;
createdAt: string;
updatedAt: string;
};

View File

@@ -1,7 +1,12 @@
import { promises as fs } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io, type Socket } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
// channels (nextcloud-talk) drive the kernel:
@@ -13,13 +18,23 @@ import type { IdentityRegistry } from './identity.js';
type Core = {
channel: {
routing: { resolveAgentRoute: (p: unknown) => { agentId: string; sessionKey: string; accountId?: string } };
session: { resolveStorePath: (store: unknown, o: { agentId: string }) => string };
session: {
resolveStorePath: (store: unknown, o: { agentId: string }) => string;
recordInboundSession: (p: {
storePath: string;
sessionKey: string;
ctx: unknown;
createIfMissing?: boolean;
onRecordError: (e: unknown) => void;
}) => Promise<unknown>;
};
reply: { finalizeInboundContext: (p: Record<string, unknown>) => unknown };
};
};
type Logger = { info: (m: string) => void; warn: (m: string) => void; error?: (m: string) => void };
type FabricAttachment = { url: string; name?: string; mimeType?: string };
type FabricMessage = {
messageId: string;
seq: number;
@@ -27,12 +42,188 @@ type FabricMessage = {
authorUserId?: string;
createdAt?: string;
channelId?: string;
attachments?: FabricAttachment[];
wakeup?: boolean;
// x-type of the channel (sent on message.created). 'dm' bypasses the
// wakeup gate: any message that isn't the agent's own is delivered.
xType?: string;
};
export class FabricInbound {
private sockets: Socket[] = [];
private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
private channelSyncTimers: NodeJS.Timeout[] = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
// Re-login per agent on a short TTL to keep a fresh token.
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
private channelChains = new Map<string, Promise<void>>();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
private static readonly AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
private pendingTriageGated: Array<{
agentId: string;
g: { nodeId: string; endpoint: string };
channelId: string;
m: FabricMessage;
session: FabricSession;
}> = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
private async checkAgentOnCall(agentId: string): Promise<boolean> {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg as {
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
};
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
} catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json()) as { status?: string };
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
} catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
private async drainGatedFor(agentId: string): Promise<void> {
const keep: typeof this.pendingTriageGated = [];
const drain: typeof this.pendingTriageGated = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId) drain.push(item);
else keep.push(item);
}
if (drain.length === 0) return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
private async freshGuildToken(
agentId: string,
guildNodeId: string,
fallback: FabricSession,
): Promise<string | undefined> {
const pick = (s: FabricSession) =>
s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
const now = Date.now();
const cached = this.tokenCache.get(agentId);
if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) {
return pick(cached.session) ?? pick(fallback);
}
const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey;
if (apiKey) {
try {
const s = await this.client.agentLogin(apiKey);
this.tokenCache.set(agentId, { session: s, at: now });
return pick(s) ?? pick(fallback);
} catch (err) {
this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`);
}
}
return pick(fallback);
}
constructor(
private readonly core: unknown, // PluginRuntime
@@ -66,6 +257,8 @@ export class FabricInbound {
}
stop(): void {
for (const t of this.channelSyncTimers) clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect();
this.sockets = [];
}
@@ -80,20 +273,87 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set<string>();
const syncChannels = async (kind: 'initial' | 'resync') => {
let freshTok: string | undefined;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
} catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
{ headers: { authorization: `Bearer ${tok}` } },
{ headers: { authorization: `Bearer ${authTok}` } },
);
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
if (!res.ok) return;
const channels = (await res.json()) as Array<{ id: string }>;
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
);
} else if (added > 0 || removed > 0) {
this.log.info(
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
);
}
} catch {
/* best effort */
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || joined.has(id)) return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || !joined.has(id)) return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
@@ -102,13 +362,86 @@ export class FabricInbound {
if (this.seen.has(key)) return;
this.seen.add(key);
if (this.seen.size > 5000) this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
// Per-channel serial queue. Prevents concurrent model turns for
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
});
socket.connect();
this.sockets.push(socket);
}
}
// Download a message's attachments to a temp dir using the agent's guild
// token; returns local paths/types/urls for the inbound media context.
private async fetchAttachments(
agentId: string,
endpoint: string,
token: string | undefined,
m: FabricMessage,
): Promise<{ paths: string[]; types: string[]; urls: string[] }> {
const out = { paths: [] as string[], types: [] as string[], urls: [] as string[] };
const list = m.attachments ?? [];
if (!list.length || !token) return out;
const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_'));
try {
await fs.mkdir(dir, { recursive: true });
} catch {
return out;
}
let i = 0;
for (const a of list) {
try {
const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`;
const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } });
if (!res.ok) {
this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`);
continue;
}
const buf = Buffer.from(await res.arrayBuffer());
const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`;
const p = join(dir, `${i}-${safe}`);
await fs.writeFile(p, buf);
out.paths.push(p);
out.types.push(
a.mimeType ||
res.headers.get('content-type')?.split(';')[0] ||
'application/octet-stream',
);
out.urls.push(abs);
i++;
} catch (err) {
this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`);
}
}
if (out.paths.length)
this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`);
return out;
}
private async dispatch(
agentId: string,
guild: { nodeId: string; endpoint: string },
@@ -116,13 +449,6 @@ export class FabricInbound {
m: FabricMessage,
session: FabricSession,
): Promise<void> {
// wakeup === false -> drop (Fabric already decided this agent is silent)
if (m.wakeup !== true) {
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } };
try {
@@ -135,7 +461,8 @@ export class FabricInbound {
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
const baseCtx: Record<string, unknown> = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
@@ -153,9 +480,69 @@ export class FabricInbound {
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
});
};
const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token;
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err: unknown) =>
this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(
`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`,
);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
// bytes) and noisy. Local MediaPaths is the reliable delivery.
...(media.paths.length
? {
MediaPaths: media.paths,
MediaTypes: media.types,
MediaPath: media.paths[0],
MediaType: media.types[0],
}
: {}),
});
await dispatchInboundReplyWithBase({
cfg: this.cfg as never,
@@ -169,19 +556,47 @@ export class FabricInbound {
const text = (payload?.text ?? '').trim();
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt) return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg as never),
post: (t) =>
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
log: (m) => this.log.info(m),
});
},
onRecordError: (err: unknown) =>
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err: unknown, info: { kind: string }) =>
this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
// Fabric has no length limit: deliver the whole reply as ONE message.
replyOptions: { disableBlockStreaming: true } as never,
// - disableBlockStreaming: Fabric has no length limit, deliver the
// whole reply as ONE message.
// - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group
// chats to "message_tool_only", which SUPPRESSES auto-delivery of
// the agent's text reply (it expects the agent to call a message
// tool). Fabric already gates *when* an agent speaks via the
// per-recipient wakeup flag, so once a turn is dispatched the
// reply must always flow back through `deliver`. Forcing
// 'automatic' overrides the group default so the reply is
// delivered. (source-reply-delivery-mode: a truthy `requested`
// wins unless it's message_tool_only with no tool available.)
replyOptions: {
disableBlockStreaming: true,
sourceReplyDeliveryMode: 'automatic',
} as never,
});
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
} catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
} finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -25,7 +25,12 @@ export function registerFabricTools(
// Resolve the calling agent's Fabric session + a guild's token/endpoint.
const ctxGuild = async (agentId: string, guildNodeId: string) => {
const entry = identity.findByAgentId(agentId);
if (!entry) throw new Error(`agent ${agentId} not registered (call fabric-register)`);
if (!entry)
throw new Error(
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`,
);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
@@ -33,31 +38,10 @@ export function registerFabricTools(
return { session, guild, token };
};
// fabric-register: bind this agent to a Fabric API key.
api.registerTool((ctx: Ctx) => ({
name: 'fabric-register',
description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).",
parameters: {
type: 'object',
additionalProperties: false,
required: ['fabricApiKey'],
properties: {
fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' },
},
},
execute: async (params: { fabricApiKey: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const session = await client.agentLogin(params.fabricApiKey);
identity.upsert({
agentId,
fabricApiKey: params.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
return { ok: true, user: session.user };
},
}));
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({
@@ -136,4 +120,296 @@ export function registerFabricTools(
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise).
api.registerTool((ctx: Ctx) => ({
name: 'fabric-canvas',
description:
"Manage a channel's pinned canvas document. action: " +
"read (current canvas or null) | share (create/replace; you become " +
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
'sharer only).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
title: { type: 'string', description: 'share: required; update: optional' },
format: {
type: 'string',
enum: ['md', 'html', 'text'],
description: 'share: required; update: optional',
},
source: {
type: 'string',
description: 'document body. share: required; update: optional',
},
},
},
execute: async (p: {
action: 'read' | 'share' | 'update' | 'close';
guildNodeId: string;
channelId: string;
title?: string;
format?: 'md' | 'html' | 'text';
source?: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'read': {
const canvas = await client.getCanvas(ep, token, p.channelId);
return { ok: true, canvas };
}
case 'share': {
if (!p.title || !p.format || p.source === undefined) {
return { ok: false, error: 'share requires title, format, and source' };
}
const canvas = await client.shareCanvas(ep, token, p.channelId, {
title: p.title,
format: p.format,
source: p.source,
});
return { ok: true, canvas };
}
case 'update': {
const body: Partial<{ title: string; format: 'md' | 'html' | 'text'; source: string }> = {};
if (p.title !== undefined) body.title = p.title;
if (p.format !== undefined) body.format = p.format;
if (p.source !== undefined) body.source = p.source;
if (Object.keys(body).length === 0) {
return { ok: false, error: 'update needs at least one of title/format/source' };
}
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
return { ok: true, canvas };
}
case 'close': {
await client.removeCanvas(ep, token, p.channelId);
return { ok: true, removed: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// fabric-channel: channel membership (one tool, three actions).
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel',
description:
'Channel membership. action: members (list channel member userIds) | ' +
'join (this agent joins the channel) | leave (this agent leaves).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['members', 'join', 'leave'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
},
},
execute: async (p: {
action: 'members' | 'join' | 'leave';
guildNodeId: string;
channelId: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'members': {
const members = await client.channelMembers(ep, token, p.channelId);
return { ok: true, members };
}
case 'join': {
await client.joinChannel(ep, token, p.channelId);
return { ok: true, joined: true };
}
case 'leave': {
await client.leaveChannel(ep, token, p.channelId);
return { ok: true, left: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-message',
description:
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (p: { guildNodeId: string; channelId: string; content: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(
guild.endpoint,
token,
p.channelId,
p.content,
session.user.id,
)) as { messageId?: string; seq?: number };
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-list',
description:
'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (p: {
guildNodeId: string;
nameFilter?: string;
xType?: string;
includeClosed?: boolean;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed) return false;
if (p.xType && c.xType !== p.xType) return false;
if (needle && !c.name.toLowerCase().includes(needle)) return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-message-history',
description:
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (p: {
guildNodeId: string;
channelId: string;
seqFrom?: number;
seqTo?: number;
limit?: number;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
}