17 Commits

Author SHA1 Message Date
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
92945b777d feat(fabric): dm channels deliver any non-self message (no wakeup gate)
inbound: FabricMessage gains xType; the wakeup gate is bypassed when
xType==='dm' (self messages are already filtered upstream), so a 1:1
dm always reaches the model regardless of wakeup metadata.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:20 +01:00
8774cfd7cc feat(fabric): coalesce a split agent turn into ONE message (deterministic)
OpenClaw delivers an agent turn whose blocks are text -> thinking/tool
-> text via multiple inbound deliver() calls (a non-text block is a
delivery boundary), so one turn became N Fabric messages.

Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush
them as ONE postMessage at a deterministic boundary — the finally after
dispatchInboundReplyWithBase() resolves, which provably runs only after
every deliver() of the turn (verified: deliver,deliver -> dispatch
returned -> flush). No hooks, no timers, no idle guessing. The
agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop
flushes any leftover; a long safety timeout is a leak-guard only.
channels.fabric.coalesce=false restores raw per-segment posting.

Verified on local openclaw + Fabric with a fake text/thinking/text
model: single trigger -> exactly one merged message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 22:15:46 +01:00
ab126825ef feat(security): commandsSyncKey is a required channel-config field (Guild C-2)
The slash-command sync secret now comes from
channels.fabric.commandsSyncKey (configSchema marks it required) and
is no longer read from FABRIC_COMMANDS_SYNC_KEY env. command-sync
resolves it from config and threads it into client.syncCommands;
when absent, sync is skipped with a clear warning. README updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:44:25 +01:00
bb63a57384 feat(security): send x-commands-sync-key when configured (Guild C-2)
syncCommands attaches the FABRIC_COMMANDS_SYNC_KEY header when the
operator sets it, so the guild can restrict slash-command catalog
writes to this plugin. No-op / backward compatible when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:14 +01:00
fc6edaabfd docs: slash-command catalog section
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:15:03 +01:00
c03562046d feat(plugin): sync OpenClaw slash-command catalog to Fabric
- command-sync.ts: buildFabricCommandSpecs(cfg) reads OpenClaw native
  command specs via openclaw/plugin-sdk/native-command-registry
  (listNativeCommandSpecsForConfig + findCommandByNativeName), resolves
  dynamic arg choices to a static snapshot (resolveCommandArgChoices) —
  same data Discord registers as slash commands.
- syncFabricCommands(): on gateway_start, after inbound starts, PUT the
  catalog to each connected guild (FabricClient.syncCommands ->
  PUT /api/commands; idempotent, one per guild).
- Fabric stays a TEXT-command surface (no nativeCommands capability):
  execution still flows as a /<cmd> message into OpenClaw's command
  system; this catalog only drives frontend autocomplete.

Verified: 41 specs built (args/choices incl. dynamic), synced to
test-guild1, GET /api/commands round-trips count=41.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:06:22 +01:00
fac6debfa5 feat(plugin): fabric-channel tool (members / join / leave)
One tool, three actions backed by FabricClient channelMembers (GET
/channels/:id/members -> [{userId,bypass}]), joinChannel, and new
leaveChannel (POST /channels/:id/leave).

Verified: client-level smoke against the running guild — members
initial=[tester], after join echo2 present, after leave echo2 gone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 15:33:47 +01:00
aaabb0ddb0 feat(plugin): fabric-canvas tool; fabric-register env=AGENT_ID only
- bin/fabric-register.mjs: only AGENT_ID is read from the environment;
  --api-key is flag-only (no FABRIC_API_KEY); dropped FABRIC_CENTER_API_BASE
  / FABRIC_IDENTITY_FILE / OPENCLAW_PATH env fallbacks (flags + sensible
  defaults; --center still falls back to openclaw.json).
- New fabric-canvas tool (one tool, four actions): read / share / update /
  close the channel's single pinned canvas. Backed by FabricClient
  get/share/update/removeCanvas (GET/PUT/PATCH/DELETE; empty 2xx body ->
  null). update/close are sharer-only server-side.
- README updated.

Verified: client-level smoke against the running guild —
read(empty→null) → share(v1) → read → update(v2) → close(→null) all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 15:28:13 +01:00
26c12533fb refactor(plugin): fabric-register is a script, not a tool
Binding an agent's Fabric API key was an OpenClaw tool; make it a
self-contained Node script installed to ~/.openclaw/bin/fabric-register
instead.

- bin/fabric-register.mjs: no plugin deps; AGENT_ID env wins, else
  --agent-id required; --api-key validated via POST /auth/agent/login;
  on success upserts ~/.openclaw/fabric-identity.json (format matches
  IdentityRegistry). Flags/env for center, identity-file, openclaw-path.
- install.mjs: copy the script to ~/.openclaw/bin (chmod 0755) on
  install, remove on uninstall; Next-steps updated.
- tools.ts: drop the fabric-register tool; ctxGuild error now points to
  the script / static accounts config.
- README updated.

Verified: missing-id -> exit 2; --agent-id and AGENT_ID both bind and
write a valid identity file; bad key -> 401, no write.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 13:12:48 +01:00
9d0fa1d5c8 docs: rewrite README to match current architecture
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 12:53:25 +01:00
892db9f9be feat(plugin): record non-wakeup messages as session history (no model)
Previously a non-wakeup message returned immediately and was fully
discarded — the agent kept zero record of it, so when later woken in a
discuss/work channel it replied without the conversation context.

Now non-wakeup messages are ingested into the agent's OpenClaw session
via recordInboundSession (createIfMissing) WITHOUT dispatch: the real
model is not invoked and nothing is sent back to Fabric. This is
correct for the turn engine — only the woken speaker emits a normal
message or /no-reply; non-woken agents stay silent — while still
giving the agent full channel context whenever it IS woken.

Verified live: report-channel (all recipients wakeup=false) message
logs 'recorded (no wakeup, history only)' with 0 dispatch/deliver/
posted; wakeup path unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 10:42:36 +01:00
fc7efd0227 fix(plugin): force automatic source-reply delivery (fixes no-reply)
OpenClaw defaults group-chat replies to sourceReplyDeliveryMode
'message_tool_only', which suppresses auto-delivery of the agent's
text reply (it expects the agent to call a message tool). With
ChatType 'group', the Fabric plugin's deliver callback was therefore
NEVER invoked — the agent ran but no reply ever returned to Fabric.

Fabric already gates *when* an agent speaks via the per-recipient
wakeup flag, so once a turn is dispatched the reply must always flow
back. Pass replyOptions.sourceReplyDeliveryMode='automatic' so
OpenClaw delivers the agent's reply through  regardless of
the group default (source-reply-delivery-mode honors a truthy
requested mode).

Verified live end-to-end: human posts -> wakeup -> agent runs ->
'fabric: deliver' + 'fabric: posted reply' -> agent message appears
in the Fabric channel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 10:02:47 +01:00
d79a04b8a3 fix(plugin): refresh guild token per dispatch (fixes attachment 401)
Guild access tokens are short-lived (~15 min); the inbound socket
survives via socket.io reconnect but the token captured at connect
time goes stale, so attachment downloads (and reply posts) start
401ing on long-lived agents. Re-login with the agent's Fabric API key
on a short TTL and use the fresh token for fetch + post.

Verified live: 'fabric: fetched 1 attachment(s)' now succeeds where it
previously logged 'attachment fetch 401'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 00:03:31 +01:00
cc655ffcc3 fix(plugin): pass only local MediaPaths (drop SSRF-blocked MediaUrls)
Live round-trip test showed openclaw's SSRF guard blocking the
localhost guild file URL passed via MediaUrls. We already download the
bytes with the agent's guild token, so MediaUrls is redundant and
noisy — provide only local MediaPaths/MediaTypes. Verified: plugin
logs 'fetched N attachment(s)' and the SSRF WARN is gone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 21:50:34 +01:00
42228e0a23 chore(plugin): rebuild dist (file delivery)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 20:17:28 +01:00
2abd0000e6 feat(plugin): deliver uploaded files to the agent
When an inbound Fabric message has attachments, download each with the
agent's guild token to a temp dir and set MediaPaths/MediaTypes/
MediaUrls (+ singular) on the finalized inbound context so openclaw
hands the files to the model.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 20:17:17 +01:00
20 changed files with 1670 additions and 132 deletions

132
README.md
View File

@@ -1,65 +1,139 @@
# Fabric.OpenclawPlugin
An **OpenClaw channel plugin** that connects OpenClaw agents to a Fabric guild.
A native **OpenClaw channel plugin** that connects OpenClaw agents to Fabric
guilds as real channel members. Independent of OpenClaw's source — it only
uses the public plugin SDK.
## Model
- `kind: "channel"` plugin (like the bundled discord channel). OpenClaw **core
owns dispatch** (inbound → agent run) and the reply pipeline via the channel
turn kernel `runtime.channel.turn.run(...)`.
- Fabric already owns turn/shuffle/mention/`/no-reply` server-side, so this
plugin is thin. Fabric's per-recipient **`wakeup`** maps to channel-turn
**admission**:
- `wakeup === true``dispatch` (agent runs, may reply)
- otherwise → `{ kind: "drop", recordHistory: true }` (kept as context)
- **No sidecar, no fake no-reply model, no `before_model_resolve` gating.**
- `kind: "channel"` plugin (like the bundled discord channel). OpenClaw core
owns dispatch and the reply pipeline via the channel-turn kernel
(`resolveAgentRoute` + `finalizeInboundContext` +
`dispatchInboundReplyWithBase`). Fabric already owns turn/shuffle/mention/
`/no-reply` server-side, so this plugin is thin.
- Fabric's per-recipient **`wakeup`** maps to admission:
- `wakeup === true`**dispatch** (the agent runs and may reply).
- `wakeup !== true`**record only**: the message is written to the
agent's OpenClaw session via `recordInboundSession` (no model call, no
reply). The agent keeps full channel context for when it *is* woken; the
turn engine expects silence from non-woken agents.
- Replies are forced to **automatic** delivery
(`replyOptions.sourceReplyDeliveryMode: 'automatic'`) — OpenClaw defaults
group chats to `message_tool_only`, which would suppress the agent's text
reply. Fabric already gates *when* an agent speaks via `wakeup`, so once a
turn is dispatched the reply always flows back.
- One Fabric socket per agent identity. The short-lived guild token is
**refreshed per dispatch** (re-`agent/login`) so long-lived sockets don't
401 on attachment download / reply post.
## Files to agents
When an inbound message has `attachments[]`, the plugin downloads each file
(with the agent's guild token) to a temp dir and sets local
`MediaPaths`/`MediaTypes` on the inbound context so the agent receives the
files. `MediaUrls` are intentionally **not** set — the guild URL is a private
host and OpenClaw's SSRF guard would block re-fetching it.
## Auth
Each agent has a Fabric Center **API key** (mint via Center CLI:
`node dist/cli.js user apikey --email <agent-email>`). The key is exchanged for
a user session (`POST /auth/agent/login`) used to receive (socket) and post
replies. Bind a key to an agent via the `fabric-register` tool, or pre-populate
the identity file.
`node dist/cli.js user apikey --email <agent-email>`). The key is exchanged
for a user session (`POST /auth/agent/login`).
### Binding a key to an agent (one-time)
Two ways, both write the same identity registry the transport reads:
1. **Static config** — set `channels.fabric.accounts.<agentId> =
{ fabricApiKey, enabled }`. The agent never runs anything.
2. **`fabric-register` script** — installed to `~/.openclaw/bin/fabric-register`
by the installer (it is **not** an OpenClaw tool):
```bash
# agent id from $AGENT_ID (set in the agent runtime):
~/.openclaw/bin/fabric-register --api-key fak_…
# or pass it explicitly:
~/.openclaw/bin/fabric-register --agent-id <agent> --api-key fak_…
```
It validates the key against Center, then writes
`~/.openclaw/fabric-identity.json`. One-time and persistent — *not* per
login; the plugin's transport logs in and stays connected on its own.
**Only `AGENT_ID` is read from the environment** — if unset, `--agent-id`
is required. `--api-key` is flag-only (never from the env). Other flags:
`--center`, `--identity-file`, `--openclaw-path` (sensible defaults;
`--center` also falls back to `openclaw.json`). Restart the gateway
afterwards.
## Config
- `channels.fabric.centerApiBase` — e.g. `http://localhost:7001/api`
- `channels.fabric.commandsSyncKey` — **required**; must equal the guild's
`FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the
guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`.
Sourced from config only — never from the environment.
- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent
turn whose blocks are `text → thinking/tool → text` into multiple
`deliver()` calls; this buffers them and posts ONE Fabric message at the
deterministic turn boundary (right after the inbound reply dispatch
resolves — no hooks, no timers, no idle guessing). `false` = raw
per-segment posting.
- `channels.fabric.accounts.<agentId>` = `{ fabricApiKey, enabled }`
(agent = account; the account id is the openclaw agentId)
(**agent = account**; the account id is the OpenClaw agentId)
- plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json`
(`{ entries: [{ agentId, fabricApiKey }] }`)
### Required: route binding (account → agent)
openclaw routes a channel turn to an agent via `cfg.bindings`. Without a
Fabric binding it falls back to the default agent. Add one per account:
OpenClaw routes a channel turn via `cfg.bindings`; without a Fabric binding
it falls back to the default agent. One per account:
```json
{ "agentId": "<agent>", "match": { "channel": "fabric", "accountId": "<account>" } }
```
e.g. `openclaw config set bindings '[…, {"agentId":"echo","match":{"channel":"fabric","accountId":"echo"}}]' --json`,
then `openclaw gateway restart`.
Then `openclaw gateway restart`.
## Tools
- `fabric-register` — bind this agent's Fabric API key
(Key binding is **not** a tool — see *Binding a key to an agent* above.)
- `create-chat-channel` (general) / `create-work-channel` (work) /
`create-report-channel` (report) / `create-discussion-channel` (discuss)
- `discussion-complete` — post a summary then close the channel
(Fabric `POST /channels/:id/close`; closed → history readable, posts → 409)
- `discussion-complete` — post a summary, then close the channel
(closed → history readable; new posts → `409`)
- `fabric-canvas` — manage a channel's single pinned canvas document; one
tool, four `action`s: `read` (current canvas or null) · `share`
(create/replace; caller becomes sharer) · `update` (edit in place;
sharer-only) · `close` (remove; sharer-only). `share` needs
`title`/`format`(`md`|`html`|`text`)/`source`.
- `fabric-channel` — channel membership; one tool, three `action`s:
`members` (list the channel's member userIds) · `join` (this agent
joins) · `leave` (this agent leaves).
## Transport (Phase 1 = B1)
## Slash-command catalog
One Fabric socket per agent identity, in the plugin runtime. Firehose (B2) is
a later drop-in behind the same `dispatch()` seam.
On `gateway_start` the plugin syncs OpenClaw's native-command catalog to
each guild (`command-sync.ts`: `listNativeCommandSpecsForConfig` +
`findCommandByNativeName`, dynamic arg `choices` snapshotted via
`resolveCommandArgChoices`) → `PUT /api/commands`. This is exactly the data
Discord registers as slash commands; Fabric's frontend uses it for `/`
autocomplete. Fabric deliberately does **not** advertise the
`nativeCommands` channel capability — it stays a text-command surface, so a
`/<cmd>` message is delivered normally and OpenClaw's command system
executes it (text-command + command session + auth), reusing the standard
inbound path. Only `/no-reply` and `/force-proceed` stay
server-intercepted by the guild.
## Build
## Install / build
```bash
npm install && npm run build
node install.mjs # build + copy to ~/.openclaw/plugins/fabric + configure
```
> The plugin compiles against the host's OpenClaw SDK
> (`openclaw/plugin-sdk/*`); build inside the OpenClaw plugin environment.
`install.mjs` mirrors the PaddedCell-style installer (also `--uninstall`).
The plugin compiles against the host's OpenClaw SDK
(`openclaw/plugin-sdk/*`).
> Transport is Phase 1 (one socket per agent). A firehose variant (B2) is a
> later drop-in behind the same `dispatch()` seam.

161
bin/fabric-register.mjs Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env node
/**
* fabric-register — bind an OpenClaw agent to a Fabric Center API key.
*
* One-time, self-contained (no plugin deps). Installed to
* ~/.openclaw/bin/fabric-register by the plugin installer.
*
* AGENT_ID is read from the environment if set; otherwise --agent-id is
* required. The API key is validated against Center (POST
* /auth/agent/login) and, on success, written to the plugin's identity
* file so the Fabric channel plugin can connect that agent.
*
* Usage:
* fabric-register --api-key fak_xxx # uses $AGENT_ID
* fabric-register --agent-id echo --api-key fak_xxx
*
* Only AGENT_ID is read from the environment; everything else is a flag.
*
* Flags:
* --agent-id <id> required unless the AGENT_ID env var is set
* --api-key <fak_…> required (flag only — never from the environment)
* --center <url> else openclaw.json channels.fabric.centerApiBase;
* else http://localhost:7001/api
* --identity-file <path> default ~/.openclaw/fabric-identity.json
* --openclaw-path <dir> default ~/.openclaw
* -h | --help
*/
import { readFileSync, writeFileSync, mkdirSync, existsSync } from 'node:fs';
import { dirname, join, resolve } from 'node:path';
import { homedir } from 'node:os';
function parseArgs(argv) {
const out = {};
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '-h' || a === '--help') out.help = true;
else if (a.startsWith('--')) {
const key = a.slice(2);
const v = argv[i + 1];
if (v === undefined || v.startsWith('--')) out[key] = true;
else {
out[key] = v;
i++;
}
}
}
return out;
}
const HELP = `fabric-register — bind an OpenClaw agent to a Fabric Center API key
fabric-register --api-key fak_xxx # agent id from $AGENT_ID
fabric-register --agent-id <id> --api-key fak_xxx
--agent-id <id> required unless the AGENT_ID env var is set
--api-key <fak_…> required (flag only — never read from the env)
--center <url> Center API base (else openclaw.json
channels.fabric.centerApiBase; else
http://localhost:7001/api)
--identity-file <path> default ~/.openclaw/fabric-identity.json
--openclaw-path <dir> default ~/.openclaw
-h, --help
Only AGENT_ID is taken from the environment; everything else is a flag.
`;
function fail(msg) {
console.error(`fabric-register: ${msg}`);
process.exit(2);
}
async function main() {
const a = parseArgs(process.argv.slice(2));
if (a.help) {
process.stdout.write(HELP);
return;
}
// agent id: env AGENT_ID wins; else --agent-id is required.
const agentId =
(process.env.AGENT_ID && process.env.AGENT_ID.trim()) ||
(typeof a['agent-id'] === 'string' && a['agent-id'].trim());
if (!agentId) {
fail('no agent id: set the AGENT_ID environment variable or pass --agent-id <id>');
}
// api key: flag ONLY — never from the environment.
const apiKey = typeof a['api-key'] === 'string' && a['api-key'].trim();
if (!apiKey) fail('missing --api-key <fak_…> (flag only)');
const openclawPath = resolve(
(typeof a['openclaw-path'] === 'string' && a['openclaw-path']) ||
join(homedir(), '.openclaw'),
);
// center api base: flag > openclaw.json > default
let center = (typeof a.center === 'string' && a.center) || '';
if (!center) {
try {
const cfg = JSON.parse(readFileSync(join(openclawPath, 'openclaw.json'), 'utf8'));
center = cfg?.channels?.fabric?.centerApiBase || '';
} catch {
/* fall through to default */
}
}
if (!center) center = 'http://localhost:7001/api';
center = center.replace(/\/+$/, '');
const identityFile = resolve(
(typeof a['identity-file'] === 'string' && a['identity-file']) ||
join(openclawPath, 'fabric-identity.json'),
);
// 1) validate the key against Center (also resolves the Fabric identity)
let session;
try {
const res = await fetch(`${center}/auth/agent/login`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ apiKey }),
});
if (!res.ok) {
const t = await res.text().catch(() => '');
fail(`Center rejected the key: POST ${center}/auth/agent/login -> ${res.status} ${t}`);
}
session = await res.json();
} catch (e) {
fail(`could not reach Center at ${center}: ${e?.message || e}`);
}
// 2) upsert into the identity file (merge by agentId)
let file = { entries: [] };
if (existsSync(identityFile)) {
try {
const parsed = JSON.parse(readFileSync(identityFile, 'utf8'));
if (parsed && Array.isArray(parsed.entries)) file = parsed;
} catch {
/* corrupt -> overwrite */
}
}
const entry = {
agentId,
fabricApiKey: apiKey,
fabricUserId: session?.user?.id,
displayName: session?.user?.name,
};
const i = file.entries.findIndex((e) => e && e.agentId === agentId);
if (i >= 0) file.entries[i] = { ...file.entries[i], ...entry };
else file.entries.push(entry);
mkdirSync(dirname(identityFile), { recursive: true });
writeFileSync(identityFile, JSON.stringify(file, null, 2));
console.log(
`fabric-register: bound agent "${agentId}" -> Fabric user ` +
`${session?.user?.email || session?.user?.id} (${identityFile}). ` +
`Restart the gateway to connect: openclaw gateway restart`,
);
}
main().catch((e) => fail(e?.message || String(e)));

View File

@@ -5,11 +5,13 @@
// the OpenClawPluginApi for runtime startup (transport + tools).
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef = null;
@@ -57,8 +59,14 @@ export default defineChannelPluginEntry({
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
void inbound.start();
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -7,6 +7,17 @@ const DEFAULT_CENTER = 'http://localhost:7001/api';
function section(cfg) {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg) {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg) {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg) {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -137,7 +137,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({
attachedResults: {
channel: 'fabric',
sendText: async (ctx) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {});
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

75
dist/fabric/src/coalesce.js vendored Normal file
View File

@@ -0,0 +1,75 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x) {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
const pendingByChannel = new Map();
async function flushChannel(channelId, reason) {
const p = pendingByChannel.get(channelId);
if (!p)
return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text)
return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
}
catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params) {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text)
return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
}
else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(rawChannelId) {
const cid = normChannelId(rawChannelId);
if (cid)
await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric() {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

109
dist/fabric/src/command-sync.js vendored Normal file
View File

@@ -0,0 +1,109 @@
// Build the Fabric slash-command catalog from OpenClaw's native-command
// specs (the same source Discord uses to register slash commands) and push
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
// message is delivered normally and OpenClaw's command system executes it —
// this catalog only drives the frontend `/` autocomplete, so we resolve any
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import { listNativeCommandSpecsForConfig, findCommandByNativeName, resolveCommandArgChoices, } from 'openclaw/plugin-sdk/native-command-registry';
import { resolveCommandsSyncKey } from './accounts.js';
function normChoice(c) {
if (typeof c === 'string')
return { value: c, label: c };
const o = c;
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
}
export function buildFabricCommandSpecs(cfg) {
const specs = listNativeCommandSpecsForConfig(cfg, {
provider: 'fabric',
});
return specs.map((s) => {
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
const def = findCommandByNativeName(s.name, 'fabric');
const args = (s.args ?? []).map((a) => {
const raw = a.choices;
let choices = null;
if (Array.isArray(raw)) {
choices = raw.map(normChoice);
}
else if (typeof raw === 'function' && def) {
try {
const r = resolveCommandArgChoices({
command: def,
arg: a,
cfg: cfg,
provider: 'fabric',
});
choices = r.map((x) => ({ value: x.value, label: x.label }));
}
catch {
choices = null;
}
}
return {
name: String(a.name ?? ''),
description: String(a.description ?? ''),
type: String(a.type ?? 'string'),
required: !!a.required,
captureRemaining: !!a.captureRemaining,
preferAutocomplete: !!a.preferAutocomplete,
choices,
};
});
return {
name: s.name,
nativeName: s.name,
description: s.description,
acceptsArgs: !!s.acceptsArgs,
args,
argsParsing: def?.argsParsing ?? 'positional',
};
});
}
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(client, cfg, accounts, log) {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg);
if (!syncKey) {
log.warn('fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)');
return;
}
let specs;
try {
specs = buildFabricCommandSpecs(cfg);
}
catch (err) {
log.warn(`fabric: build command specs failed: ${String(err)}`);
return;
}
if (!specs.length)
return;
const done = new Set();
for (const a of accounts) {
let session;
try {
session = await client.agentLogin(a.fabricApiKey);
}
catch {
continue;
}
for (const g of session.guilds) {
if (done.has(g.nodeId))
continue;
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
continue;
try {
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
}
catch (err) {
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
}
}
}
}

View File

@@ -21,6 +21,25 @@ export class FabricClient {
}
return (await res.json());
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
async req(method, url, auth, body, extraHeaders) {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
}
const text = await res.text();
return (text ? JSON.parse(text) : null);
}
// Exchange an agent API key for a Fabric user session (+ guild tokens).
agentLogin(apiKey) {
return this.post(`${this.centerApiBase}/auth/agent/login`, { apiKey });
@@ -50,4 +69,40 @@ export class FabricClient {
joinChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
}
leaveChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
}
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(guildEndpoint, guildToken, commands, syncKey) {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands }, syncKey ? { 'x-commands-sync-key': syncKey } : undefined);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(guildEndpoint, guildToken, channelId) {
return this.req('GET', `${guildEndpoint}/api/channels/${channelId}/members`, guildToken);
}
// ---- channel canvas (one pinned doc per channel) ----
canvasUrl(endpoint, channelId) {
return `${endpoint}/api/channels/${channelId}/canvas`;
}
// null when the channel has no canvas
getCanvas(endpoint, token, channelId) {
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
}
// share / replace (caller becomes the sharer)
shareCanvas(endpoint, token, channelId, body) {
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
}
// update in place (original sharer only — else the guild returns 403)
updateCanvas(endpoint, token, channelId, body) {
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
}
// remove ("close") the canvas (original sharer only)
removeCanvas(endpoint, token, channelId) {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
}

View File

@@ -1,5 +1,10 @@
import { promises as fs } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
cfg;
@@ -9,6 +14,43 @@ export class FabricInbound {
accounts;
sockets = [];
seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
// Re-login per agent on a short TTL to keep a fresh token.
tokenCache = new Map();
static TOKEN_TTL_MS = 8 * 60 * 1000;
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
async freshGuildToken(agentId, guildNodeId, fallback) {
const pick = (s) => s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
const now = Date.now();
const cached = this.tokenCache.get(agentId);
if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) {
return pick(cached.session) ?? pick(fallback);
}
const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey;
if (apiKey) {
try {
const s = await this.client.agentLogin(apiKey);
this.tokenCache.set(agentId, { session: s, at: now });
return pick(s) ?? pick(fallback);
}
catch (err) {
this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`);
}
}
return pick(fallback);
}
constructor(core, // PluginRuntime
cfg, // OpenClawConfig
client, identity, log, accounts = []) {
@@ -41,6 +83,9 @@ export class FabricInbound {
}
}
stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets)
s.disconnect();
this.sockets = [];
@@ -56,19 +101,61 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
const channels = res.ok ? (await res.json()) : [];
for (const c of channels)
socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
}
catch {
/* best effort */
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)
@@ -87,13 +174,49 @@ export class FabricInbound {
this.sockets.push(socket);
}
}
async dispatch(agentId, guild, channelId, m, session) {
// wakeup === false -> drop (Fabric already decided this agent is silent)
if (m.wakeup !== true) {
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
return;
// Download a message's attachments to a temp dir using the agent's guild
// token; returns local paths/types/urls for the inbound media context.
async fetchAttachments(agentId, endpoint, token, m) {
const out = { paths: [], types: [], urls: [] };
const list = m.attachments ?? [];
if (!list.length || !token)
return out;
const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_'));
try {
await fs.mkdir(dir, { recursive: true });
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
catch {
return out;
}
let i = 0;
for (const a of list) {
try {
const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`;
const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } });
if (!res.ok) {
this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`);
continue;
}
const buf = Buffer.from(await res.arrayBuffer());
const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`;
const p = join(dir, `${i}-${safe}`);
await fs.writeFile(p, buf);
out.paths.push(p);
out.types.push(a.mimeType ||
res.headers.get('content-type')?.split(';')[0] ||
'application/octet-stream');
out.urls.push(abs);
i++;
}
catch (err) {
this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`);
}
}
if (out.paths.length)
this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`);
return out;
}
async dispatch(agentId, guild, channelId, m, session) {
const core = this.core;
const cfg = this.cfg;
try {
@@ -106,7 +229,7 @@ export class FabricInbound {
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
const baseCtx = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
@@ -124,8 +247,49 @@ export class FabricInbound {
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
};
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err) => this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
// bytes) and noisy. Local MediaPaths is the reliable delivery.
...(media.paths.length
? {
MediaPaths: media.paths,
MediaTypes: media.types,
MediaPath: media.paths[0],
MediaType: media.types[0],
}
: {}),
});
const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token;
await dispatchInboundReplyWithBase({
cfg: this.cfg,
channel: 'fabric',
@@ -139,18 +303,46 @@ export class FabricInbound {
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt)
return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg),
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
log: (m) => this.log.info(m),
});
},
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
// Fabric has no length limit: deliver the whole reply as ONE message.
replyOptions: { disableBlockStreaming: true },
// - disableBlockStreaming: Fabric has no length limit, deliver the
// whole reply as ONE message.
// - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group
// chats to "message_tool_only", which SUPPRESSES auto-delivery of
// the agent's text reply (it expects the agent to call a message
// tool). Fabric already gates *when* an agent speaks via the
// per-recipient wakeup flag, so once a turn is dispatched the
// reply must always flow back through `deliver`. Forcing
// 'automatic' overrides the group default so the reply is
// delivered. (source-reply-delivery-mode: a truthy `requested`
// wins unless it's message_tool_only with no tool available.)
replyOptions: {
disableBlockStreaming: true,
sourceReplyDeliveryMode: 'automatic',
},
});
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
}
catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
}
finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -9,7 +9,9 @@ export function registerFabricTools(api, client, identity) {
const ctxGuild = async (agentId, guildNodeId) => {
const entry = identity.findByAgentId(agentId);
if (!entry)
throw new Error(`agent ${agentId} not registered (call fabric-register)`);
throw new Error(`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
@@ -17,32 +19,10 @@ export function registerFabricTools(api, client, identity) {
throw new Error(`agent not a member of guild ${guildNodeId}`);
return { session, guild, token };
};
// fabric-register: bind this agent to a Fabric API key.
api.registerTool((ctx) => ({
name: 'fabric-register',
description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).",
parameters: {
type: 'object',
additionalProperties: false,
required: ['fabricApiKey'],
properties: {
fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' },
},
},
execute: async (params) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const session = await client.agentLogin(params.fabricApiKey);
identity.upsert({
agentId,
fabricApiKey: params.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
return { ok: true, user: session.user };
},
}));
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind) => api.registerTool((ctx) => ({
name: `create-${kind}-channel`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
@@ -109,4 +89,117 @@ export function registerFabricTools(api, client, identity) {
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise).
api.registerTool((ctx) => ({
name: 'fabric-canvas',
description: "Manage a channel's pinned canvas document. action: " +
"read (current canvas or null) | share (create/replace; you become " +
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
'sharer only).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
title: { type: 'string', description: 'share: required; update: optional' },
format: {
type: 'string',
enum: ['md', 'html', 'text'],
description: 'share: required; update: optional',
},
source: {
type: 'string',
description: 'document body. share: required; update: optional',
},
},
},
execute: async (p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'read': {
const canvas = await client.getCanvas(ep, token, p.channelId);
return { ok: true, canvas };
}
case 'share': {
if (!p.title || !p.format || p.source === undefined) {
return { ok: false, error: 'share requires title, format, and source' };
}
const canvas = await client.shareCanvas(ep, token, p.channelId, {
title: p.title,
format: p.format,
source: p.source,
});
return { ok: true, canvas };
}
case 'update': {
const body = {};
if (p.title !== undefined)
body.title = p.title;
if (p.format !== undefined)
body.format = p.format;
if (p.source !== undefined)
body.source = p.source;
if (Object.keys(body).length === 0) {
return { ok: false, error: 'update needs at least one of title/format/source' };
}
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
return { ok: true, canvas };
}
case 'close': {
await client.removeCanvas(ep, token, p.channelId);
return { ok: true, removed: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// fabric-channel: channel membership (one tool, three actions).
api.registerTool((ctx) => ({
name: 'fabric-channel',
description: 'Channel membership. action: members (list channel member userIds) | ' +
'join (this agent joins the channel) | leave (this agent leaves).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['members', 'join', 'leave'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
},
},
execute: async (p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'members': {
const members = await client.channelMembers(ep, token, p.channelId);
return { ok: true, members };
}
case 'join': {
await client.joinChannel(ep, token, p.channelId);
return { ok: true, joined: true };
}
case 'leave': {
await client.leaveChannel(ep, token, p.channelId);
return { ok: true, left: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
}

View File

@@ -6,11 +6,13 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import path from 'node:path';
import os from 'node:os';
@@ -36,7 +38,7 @@ export default defineChannelPluginEntry({
config?: unknown;
pluginConfig?: { identityFilePath?: string };
logger: { info: (m: string) => void; warn: (m: string) => void };
on: (ev: string, fn: () => void) => void;
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
registerTool: (d: unknown) => void;
};
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
@@ -82,9 +84,15 @@ export default defineChannelPluginEntry({
);
void inbound.start();
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -10,7 +10,7 @@
*/
import { execSync } from 'child_process';
import { existsSync, mkdirSync, copyFileSync, readdirSync, rmSync } from 'fs';
import { existsSync, mkdirSync, copyFileSync, readdirSync, rmSync, chmodSync } from 'fs';
import { dirname, join, resolve } from 'path';
import { fileURLToPath } from 'url';
import { homedir } from 'os';
@@ -109,12 +109,30 @@ function build() {
ok('compiled -> dist/fabric');
}
function binTarget(base) {
return join(base, 'bin', 'fabric-register');
}
function installBinScript(base) {
const src = join(__dirname, 'bin', 'fabric-register.mjs');
const dst = binTarget(base);
mkdirSync(dirname(dst), { recursive: true });
copyFileSync(src, dst);
chmodSync(dst, 0o755);
ok(`fabric-register -> ${dst}`);
}
function clearInstall(base) {
const dest = join(base, 'plugins', PLUGIN_ID);
if (existsSync(dest)) {
rmSync(dest, { recursive: true, force: true });
ok(`removed ${dest}`);
}
const bin = binTarget(base);
if (existsSync(bin)) {
rmSync(bin, { force: true });
ok(`removed ${bin}`);
}
}
function cleanupConfig(base) {
const dest = join(base, 'plugins', PLUGIN_ID);
@@ -149,6 +167,7 @@ function install() {
ok(`plugin files -> ${dest}`);
exec('npm install --omit=dev', { cwd: dest, silent: !opt.verbose });
ok('runtime deps installed');
installBinScript(base);
return { base, dest };
}
@@ -200,8 +219,10 @@ function main() {
console.log('');
log('Install complete. Next:', 'blue');
log(' 1. Mint an agent key: (in Center) node dist/cli.js user apikey --email <agent-email>', 'cyan');
log(' 2. openclaw gateway restart', 'cyan');
log(' 3. As an agent, call the fabric-register tool with that key', 'cyan');
log(' 2. Bind it to an agent (one-time), either:', 'cyan');
log(' AGENT_ID=<agent> ~/.openclaw/bin/fabric-register --api-key <fak_…>', 'cyan');
log(' (or pass --agent-id <agent>; or set channels.fabric.accounts.<agent>)', 'cyan');
log(' 3. openclaw gateway restart', 'cyan');
console.log('');
} catch (e) {
log(`\nInstall failed: ${e.message}`, 'red');

View File

@@ -39,6 +39,15 @@
"type": "string",
"description": "Fabric Center API base, e.g. http://localhost:7001/api"
},
"commandsSyncKey": {
"type": "string",
"minLength": 1,
"description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js"
},
"coalesce": {
"type": "boolean",
"description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting."
},
"dmSecurity": { "type": "string" },
"dmPolicy": { "type": "string" },
"enabled": { "type": "boolean" },
@@ -59,10 +68,12 @@
}
}
}
}
},
"required": ["commandsSyncKey"]
},
"uiHints": {
"centerApiBase": { "label": "Center API base" }
"centerApiBase": { "label": "Center API base" },
"commandsSyncKey": { "label": "Commands sync key" }
}
}
}

View File

@@ -14,6 +14,15 @@ export type FabricAccountConfig = {
export type FabricChannelConfig = {
centerApiBase?: string;
// Shared secret matching the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY
// (Guild C-2). Required by the channel config schema; sourced from config
// only — never from the environment.
commandsSyncKey?: string;
// Coalesce an agent turn that OpenClaw split into multiple deliveries
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
// message. The flush boundary is the deterministic `agent_end` hook (not
// a timer). Default true; set false for raw per-segment posting.
coalesce?: boolean;
accounts?: Record<string, FabricAccountConfig>;
defaultAccount?: string;
} & FabricAccountConfig;
@@ -35,6 +44,19 @@ function section(cfg: Cfg): FabricChannelConfig {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg: Cfg): string {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg: Cfg): boolean {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg: Cfg): string[] {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
cfg?: unknown;
config?: unknown;
}) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

93
src/coalesce.ts Normal file
View File

@@ -0,0 +1,93 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x: string | null | undefined): string {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
type Pending = {
parts: string[];
post: (text: string) => Promise<void>;
log?: (m: string) => void;
safety: ReturnType<typeof setTimeout>;
};
const pendingByChannel = new Map<string, Pending>();
async function flushChannel(channelId: string, reason: string): Promise<void> {
const p = pendingByChannel.get(channelId);
if (!p) return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text) return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
} catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params: {
channelId: string;
text: string;
coalesce: boolean;
post: (text: string) => Promise<void>;
log?: (m: string) => void;
}): Promise<void> {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text) return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
} else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(
() => void flushChannel(cid, 'safety-timeout'),
SAFETY_FLUSH_MS,
),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(
rawChannelId: string | null | undefined,
): Promise<void> {
const cid = normChannelId(rawChannelId);
if (cid) await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric(): Promise<void> {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

149
src/command-sync.ts Normal file
View File

@@ -0,0 +1,149 @@
// Build the Fabric slash-command catalog from OpenClaw's native-command
// specs (the same source Discord uses to register slash commands) and push
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
// message is delivered normally and OpenClaw's command system executes it —
// this catalog only drives the frontend `/` autocomplete, so we resolve any
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import {
listNativeCommandSpecsForConfig,
findCommandByNativeName,
resolveCommandArgChoices,
} from 'openclaw/plugin-sdk/native-command-registry';
import type { FabricClient } from './fabric-client.js';
import { resolveCommandsSyncKey } from './accounts.js';
type Logger = { info: (m: string) => void; warn: (m: string) => void };
type FabricArg = {
name: string;
description: string;
type: string;
required: boolean;
captureRemaining: boolean;
preferAutocomplete: boolean;
choices: Array<{ value: string; label: string }> | null;
};
type FabricCommand = {
name: string;
nativeName: string;
description: string;
acceptsArgs: boolean;
args: FabricArg[];
argsParsing: string;
};
function normChoice(c: unknown): { value: string; label: string } {
if (typeof c === 'string') return { value: c, label: c };
const o = c as { value?: string; label?: string };
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
}
export function buildFabricCommandSpecs(cfg: unknown): FabricCommand[] {
const specs = listNativeCommandSpecsForConfig(cfg as never, {
provider: 'fabric',
}) as Array<{
name: string;
description: string;
acceptsArgs?: boolean;
args?: Array<Record<string, unknown>>;
}>;
return specs.map((s) => {
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
const def = findCommandByNativeName(s.name, 'fabric') as
| { argsParsing?: string; args?: Array<Record<string, unknown>> }
| undefined;
const args: FabricArg[] = (s.args ?? []).map((a) => {
const raw = a.choices;
let choices: Array<{ value: string; label: string }> | null = null;
if (Array.isArray(raw)) {
choices = raw.map(normChoice);
} else if (typeof raw === 'function' && def) {
try {
const r = resolveCommandArgChoices({
command: def as never,
arg: a as never,
cfg: cfg as never,
provider: 'fabric',
}) as Array<{ value: string; label: string }>;
choices = r.map((x) => ({ value: x.value, label: x.label }));
} catch {
choices = null;
}
}
return {
name: String(a.name ?? ''),
description: String(a.description ?? ''),
type: String(a.type ?? 'string'),
required: !!a.required,
captureRemaining: !!a.captureRemaining,
preferAutocomplete: !!a.preferAutocomplete,
choices,
};
});
return {
name: s.name,
nativeName: s.name,
description: s.description,
acceptsArgs: !!s.acceptsArgs,
args,
argsParsing: def?.argsParsing ?? 'positional',
};
});
}
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(
client: FabricClient,
cfg: unknown,
accounts: Array<{ agentId: string; fabricApiKey: string }>,
log: Logger,
): Promise<void> {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg as never);
if (!syncKey) {
log.warn(
'fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)',
);
return;
}
let specs: FabricCommand[];
try {
specs = buildFabricCommandSpecs(cfg);
} catch (err) {
log.warn(`fabric: build command specs failed: ${String(err)}`);
return;
}
if (!specs.length) return;
const done = new Set<string>();
for (const a of accounts) {
let session;
try {
session = await client.agentLogin(a.fabricApiKey);
} catch {
continue;
}
for (const g of session.guilds) {
if (done.has(g.nodeId)) continue;
const tok = session.guildAccessTokens.find(
(t) => t.guildNodeId === g.nodeId,
)?.token;
if (!tok) continue;
try {
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
} catch (err) {
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
}
}
}
}

View File

@@ -29,6 +29,32 @@ export class FabricClient {
return (await res.json()) as T;
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
private async req<T>(
method: string,
url: string,
auth?: string,
body?: unknown,
extraHeaders?: Record<string, string>,
): Promise<T> {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
}
const text = await res.text();
return (text ? JSON.parse(text) : null) as T;
}
// Exchange an agent API key for a Fabric user session (+ guild tokens).
agentLogin(apiKey: string): Promise<FabricSession> {
return this.post<FabricSession>(`${this.centerApiBase}/auth/agent/login`, { apiKey });
@@ -86,4 +112,95 @@ export class FabricClient {
joinChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
}
leaveChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
}
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(
guildEndpoint: string,
guildToken: string,
commands: unknown[],
syncKey: string,
): Promise<unknown> {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req(
'PUT',
`${guildEndpoint}/api/commands`,
guildToken,
{ commands },
syncKey ? { 'x-commands-sync-key': syncKey } : undefined,
);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(
guildEndpoint: string,
guildToken: string,
channelId: string,
): Promise<Array<{ userId: string; bypass?: boolean }>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels/${channelId}/members`,
guildToken,
);
}
// ---- channel canvas (one pinned doc per channel) ----
private canvasUrl(endpoint: string, channelId: string): string {
return `${endpoint}/api/channels/${channelId}/canvas`;
}
// null when the channel has no canvas
getCanvas(
endpoint: string,
token: string,
channelId: string,
): Promise<FabricCanvas | null> {
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
}
// share / replace (caller becomes the sharer)
shareCanvas(
endpoint: string,
token: string,
channelId: string,
body: CanvasInput,
): Promise<FabricCanvas> {
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
}
// update in place (original sharer only — else the guild returns 403)
updateCanvas(
endpoint: string,
token: string,
channelId: string,
body: Partial<CanvasInput>,
): Promise<FabricCanvas> {
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
}
// remove ("close") the canvas (original sharer only)
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
}
export type CanvasFormat = 'md' | 'html' | 'text';
export type CanvasInput = { title: string; format: CanvasFormat; source: string };
export type FabricCanvas = {
channelId: string;
sharerUserId: string;
title: string;
format: CanvasFormat;
source: string;
version: number;
createdAt: string;
updatedAt: string;
};

View File

@@ -1,7 +1,12 @@
import { promises as fs } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io, type Socket } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
// channels (nextcloud-talk) drive the kernel:
@@ -13,13 +18,23 @@ import type { IdentityRegistry } from './identity.js';
type Core = {
channel: {
routing: { resolveAgentRoute: (p: unknown) => { agentId: string; sessionKey: string; accountId?: string } };
session: { resolveStorePath: (store: unknown, o: { agentId: string }) => string };
session: {
resolveStorePath: (store: unknown, o: { agentId: string }) => string;
recordInboundSession: (p: {
storePath: string;
sessionKey: string;
ctx: unknown;
createIfMissing?: boolean;
onRecordError: (e: unknown) => void;
}) => Promise<unknown>;
};
reply: { finalizeInboundContext: (p: Record<string, unknown>) => unknown };
};
};
type Logger = { info: (m: string) => void; warn: (m: string) => void; error?: (m: string) => void };
type FabricAttachment = { url: string; name?: string; mimeType?: string };
type FabricMessage = {
messageId: string;
seq: number;
@@ -27,12 +42,58 @@ type FabricMessage = {
authorUserId?: string;
createdAt?: string;
channelId?: string;
attachments?: FabricAttachment[];
wakeup?: boolean;
// x-type of the channel (sent on message.created). 'dm' bypasses the
// wakeup gate: any message that isn't the agent's own is delivered.
xType?: string;
};
export class FabricInbound {
private sockets: Socket[] = [];
private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
private channelSyncTimers: NodeJS.Timeout[] = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
// Re-login per agent on a short TTL to keep a fresh token.
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
private async freshGuildToken(
agentId: string,
guildNodeId: string,
fallback: FabricSession,
): Promise<string | undefined> {
const pick = (s: FabricSession) =>
s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
const now = Date.now();
const cached = this.tokenCache.get(agentId);
if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) {
return pick(cached.session) ?? pick(fallback);
}
const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey;
if (apiKey) {
try {
const s = await this.client.agentLogin(apiKey);
this.tokenCache.set(agentId, { session: s, at: now });
return pick(s) ?? pick(fallback);
} catch (err) {
this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`);
}
}
return pick(fallback);
}
constructor(
private readonly core: unknown, // PluginRuntime
@@ -66,6 +127,8 @@ export class FabricInbound {
}
stop(): void {
for (const t of this.channelSyncTimers) clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect();
this.sockets = [];
}
@@ -80,20 +143,67 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set<string>();
const syncChannels = async (kind: 'initial' | 'resync') => {
let freshTok: string | undefined;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
} catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
{ headers: { authorization: `Bearer ${tok}` } },
{ headers: { authorization: `Bearer ${authTok}` } },
);
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
if (!res.ok) return;
const channels = (await res.json()) as Array<{ id: string }>;
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
);
} else if (added > 0 || removed > 0) {
this.log.info(
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
);
}
} catch {
/* best effort */
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
@@ -109,6 +219,53 @@ export class FabricInbound {
}
}
// Download a message's attachments to a temp dir using the agent's guild
// token; returns local paths/types/urls for the inbound media context.
private async fetchAttachments(
agentId: string,
endpoint: string,
token: string | undefined,
m: FabricMessage,
): Promise<{ paths: string[]; types: string[]; urls: string[] }> {
const out = { paths: [] as string[], types: [] as string[], urls: [] as string[] };
const list = m.attachments ?? [];
if (!list.length || !token) return out;
const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_'));
try {
await fs.mkdir(dir, { recursive: true });
} catch {
return out;
}
let i = 0;
for (const a of list) {
try {
const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`;
const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } });
if (!res.ok) {
this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`);
continue;
}
const buf = Buffer.from(await res.arrayBuffer());
const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`;
const p = join(dir, `${i}-${safe}`);
await fs.writeFile(p, buf);
out.paths.push(p);
out.types.push(
a.mimeType ||
res.headers.get('content-type')?.split(';')[0] ||
'application/octet-stream',
);
out.urls.push(abs);
i++;
} catch (err) {
this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`);
}
}
if (out.paths.length)
this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`);
return out;
}
private async dispatch(
agentId: string,
guild: { nodeId: string; endpoint: string },
@@ -116,13 +273,6 @@ export class FabricInbound {
m: FabricMessage,
session: FabricSession,
): Promise<void> {
// wakeup === false -> drop (Fabric already decided this agent is silent)
if (m.wakeup !== true) {
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } };
try {
@@ -135,7 +285,8 @@ export class FabricInbound {
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const ctxPayload = core.channel.reply.finalizeInboundContext({
const baseCtx: Record<string, unknown> = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
@@ -153,9 +304,56 @@ export class FabricInbound {
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
});
};
const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token;
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err: unknown) =>
this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(
`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`,
);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
// bytes) and noisy. Local MediaPaths is the reliable delivery.
...(media.paths.length
? {
MediaPaths: media.paths,
MediaTypes: media.types,
MediaPath: media.paths[0],
MediaType: media.types[0],
}
: {}),
});
await dispatchInboundReplyWithBase({
cfg: this.cfg as never,
@@ -169,19 +367,47 @@ export class FabricInbound {
const text = (payload?.text ?? '').trim();
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt) return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg as never),
post: (t) =>
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
log: (m) => this.log.info(m),
});
},
onRecordError: (err: unknown) =>
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err: unknown, info: { kind: string }) =>
this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
// Fabric has no length limit: deliver the whole reply as ONE message.
replyOptions: { disableBlockStreaming: true } as never,
// - disableBlockStreaming: Fabric has no length limit, deliver the
// whole reply as ONE message.
// - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group
// chats to "message_tool_only", which SUPPRESSES auto-delivery of
// the agent's text reply (it expects the agent to call a message
// tool). Fabric already gates *when* an agent speaks via the
// per-recipient wakeup flag, so once a turn is dispatched the
// reply must always flow back through `deliver`. Forcing
// 'automatic' overrides the group default so the reply is
// delivered. (source-reply-delivery-mode: a truthy `requested`
// wins unless it's message_tool_only with no tool available.)
replyOptions: {
disableBlockStreaming: true,
sourceReplyDeliveryMode: 'automatic',
} as never,
});
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
} catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
} finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -25,7 +25,12 @@ export function registerFabricTools(
// Resolve the calling agent's Fabric session + a guild's token/endpoint.
const ctxGuild = async (agentId: string, guildNodeId: string) => {
const entry = identity.findByAgentId(agentId);
if (!entry) throw new Error(`agent ${agentId} not registered (call fabric-register)`);
if (!entry)
throw new Error(
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`,
);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
@@ -33,31 +38,10 @@ export function registerFabricTools(
return { session, guild, token };
};
// fabric-register: bind this agent to a Fabric API key.
api.registerTool((ctx: Ctx) => ({
name: 'fabric-register',
description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).",
parameters: {
type: 'object',
additionalProperties: false,
required: ['fabricApiKey'],
properties: {
fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' },
},
},
execute: async (params: { fabricApiKey: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const session = await client.agentLogin(params.fabricApiKey);
identity.upsert({
agentId,
fabricApiKey: params.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
return { ok: true, user: session.user };
},
}));
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({
@@ -136,4 +120,127 @@ export function registerFabricTools(
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise).
api.registerTool((ctx: Ctx) => ({
name: 'fabric-canvas',
description:
"Manage a channel's pinned canvas document. action: " +
"read (current canvas or null) | share (create/replace; you become " +
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
'sharer only).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
title: { type: 'string', description: 'share: required; update: optional' },
format: {
type: 'string',
enum: ['md', 'html', 'text'],
description: 'share: required; update: optional',
},
source: {
type: 'string',
description: 'document body. share: required; update: optional',
},
},
},
execute: async (p: {
action: 'read' | 'share' | 'update' | 'close';
guildNodeId: string;
channelId: string;
title?: string;
format?: 'md' | 'html' | 'text';
source?: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'read': {
const canvas = await client.getCanvas(ep, token, p.channelId);
return { ok: true, canvas };
}
case 'share': {
if (!p.title || !p.format || p.source === undefined) {
return { ok: false, error: 'share requires title, format, and source' };
}
const canvas = await client.shareCanvas(ep, token, p.channelId, {
title: p.title,
format: p.format,
source: p.source,
});
return { ok: true, canvas };
}
case 'update': {
const body: Partial<{ title: string; format: 'md' | 'html' | 'text'; source: string }> = {};
if (p.title !== undefined) body.title = p.title;
if (p.format !== undefined) body.format = p.format;
if (p.source !== undefined) body.source = p.source;
if (Object.keys(body).length === 0) {
return { ok: false, error: 'update needs at least one of title/format/source' };
}
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
return { ok: true, canvas };
}
case 'close': {
await client.removeCanvas(ep, token, p.channelId);
return { ok: true, removed: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// fabric-channel: channel membership (one tool, three actions).
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel',
description:
'Channel membership. action: members (list channel member userIds) | ' +
'join (this agent joins the channel) | leave (this agent leaves).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['members', 'join', 'leave'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
},
},
execute: async (p: {
action: 'members' | 'join' | 'leave';
guildNodeId: string;
channelId: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'members': {
const members = await client.channelMembers(ep, token, p.channelId);
return { ok: true, members };
}
case 'join': {
await client.joinChannel(ep, token, p.channelId);
return { ok: true, joined: true };
}
case 'leave': {
await client.leaveChannel(ep, token, p.channelId);
return { ok: true, left: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
}