12 Commits

Author SHA1 Message Date
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
92945b777d feat(fabric): dm channels deliver any non-self message (no wakeup gate)
inbound: FabricMessage gains xType; the wakeup gate is bypassed when
xType==='dm' (self messages are already filtered upstream), so a 1:1
dm always reaches the model regardless of wakeup metadata.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:20 +01:00
8774cfd7cc feat(fabric): coalesce a split agent turn into ONE message (deterministic)
OpenClaw delivers an agent turn whose blocks are text -> thinking/tool
-> text via multiple inbound deliver() calls (a non-text block is a
delivery boundary), so one turn became N Fabric messages.

Fix: buffer deliver() segments per channel (src/coalesce.ts) and flush
them as ONE postMessage at a deterministic boundary — the finally after
dispatchInboundReplyWithBase() resolves, which provably runs only after
every deliver() of the turn (verified: deliver,deliver -> dispatch
returned -> flush). No hooks, no timers, no idle guessing. The
agent_end hook was rejected: it fires BEFORE deliver(). gateway_stop
flushes any leftover; a long safety timeout is a leak-guard only.
channels.fabric.coalesce=false restores raw per-segment posting.

Verified on local openclaw + Fabric with a fake text/thinking/text
model: single trigger -> exactly one merged message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 22:15:46 +01:00
ab126825ef feat(security): commandsSyncKey is a required channel-config field (Guild C-2)
The slash-command sync secret now comes from
channels.fabric.commandsSyncKey (configSchema marks it required) and
is no longer read from FABRIC_COMMANDS_SYNC_KEY env. command-sync
resolves it from config and threads it into client.syncCommands;
when absent, sync is skipped with a clear warning. README updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:44:25 +01:00
bb63a57384 feat(security): send x-commands-sync-key when configured (Guild C-2)
syncCommands attaches the FABRIC_COMMANDS_SYNC_KEY header when the
operator sets it, so the guild can restrict slash-command catalog
writes to this plugin. No-op / backward compatible when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:14 +01:00
fc6edaabfd docs: slash-command catalog section
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:15:03 +01:00
c03562046d feat(plugin): sync OpenClaw slash-command catalog to Fabric
- command-sync.ts: buildFabricCommandSpecs(cfg) reads OpenClaw native
  command specs via openclaw/plugin-sdk/native-command-registry
  (listNativeCommandSpecsForConfig + findCommandByNativeName), resolves
  dynamic arg choices to a static snapshot (resolveCommandArgChoices) —
  same data Discord registers as slash commands.
- syncFabricCommands(): on gateway_start, after inbound starts, PUT the
  catalog to each connected guild (FabricClient.syncCommands ->
  PUT /api/commands; idempotent, one per guild).
- Fabric stays a TEXT-command surface (no nativeCommands capability):
  execution still flows as a /<cmd> message into OpenClaw's command
  system; this catalog only drives frontend autocomplete.

Verified: 41 specs built (args/choices incl. dynamic), synced to
test-guild1, GET /api/commands round-trips count=41.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:06:22 +01:00
fac6debfa5 feat(plugin): fabric-channel tool (members / join / leave)
One tool, three actions backed by FabricClient channelMembers (GET
/channels/:id/members -> [{userId,bypass}]), joinChannel, and new
leaveChannel (POST /channels/:id/leave).

Verified: client-level smoke against the running guild — members
initial=[tester], after join echo2 present, after leave echo2 gone.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 15:33:47 +01:00
aaabb0ddb0 feat(plugin): fabric-canvas tool; fabric-register env=AGENT_ID only
- bin/fabric-register.mjs: only AGENT_ID is read from the environment;
  --api-key is flag-only (no FABRIC_API_KEY); dropped FABRIC_CENTER_API_BASE
  / FABRIC_IDENTITY_FILE / OPENCLAW_PATH env fallbacks (flags + sensible
  defaults; --center still falls back to openclaw.json).
- New fabric-canvas tool (one tool, four actions): read / share / update /
  close the channel's single pinned canvas. Backed by FabricClient
  get/share/update/removeCanvas (GET/PUT/PATCH/DELETE; empty 2xx body ->
  null). update/close are sharer-only server-side.
- README updated.

Verified: client-level smoke against the running guild —
read(empty→null) → share(v1) → read → update(v2) → close(→null) all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 15:28:13 +01:00
26c12533fb refactor(plugin): fabric-register is a script, not a tool
Binding an agent's Fabric API key was an OpenClaw tool; make it a
self-contained Node script installed to ~/.openclaw/bin/fabric-register
instead.

- bin/fabric-register.mjs: no plugin deps; AGENT_ID env wins, else
  --agent-id required; --api-key validated via POST /auth/agent/login;
  on success upserts ~/.openclaw/fabric-identity.json (format matches
  IdentityRegistry). Flags/env for center, identity-file, openclaw-path.
- install.mjs: copy the script to ~/.openclaw/bin (chmod 0755) on
  install, remove on uninstall; Next-steps updated.
- tools.ts: drop the fabric-register tool; ctxGuild error now points to
  the script / static accounts config.
- README updated.

Verified: missing-id -> exit 2; --agent-id and AGENT_ID both bind and
write a valid identity file; bad key -> 401, no write.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 13:12:48 +01:00
9d0fa1d5c8 docs: rewrite README to match current architecture
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 12:53:25 +01:00
892db9f9be feat(plugin): record non-wakeup messages as session history (no model)
Previously a non-wakeup message returned immediately and was fully
discarded — the agent kept zero record of it, so when later woken in a
discuss/work channel it replied without the conversation context.

Now non-wakeup messages are ingested into the agent's OpenClaw session
via recordInboundSession (createIfMissing) WITHOUT dispatch: the real
model is not invoked and nothing is sent back to Fabric. This is
correct for the turn engine — only the woken speaker emits a normal
message or /no-reply; non-woken agents stay silent — while still
giving the agent full channel context whenever it IS woken.

Verified live: report-channel (all recipients wakeup=false) message
logs 'recorded (no wakeup, history only)' with 0 dispatch/deliver/
posted; wakeup path unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 10:42:36 +01:00
20 changed files with 1455 additions and 135 deletions

132
README.md
View File

@@ -1,65 +1,139 @@
# Fabric.OpenclawPlugin
An **OpenClaw channel plugin** that connects OpenClaw agents to a Fabric guild.
A native **OpenClaw channel plugin** that connects OpenClaw agents to Fabric
guilds as real channel members. Independent of OpenClaw's source — it only
uses the public plugin SDK.
## Model
- `kind: "channel"` plugin (like the bundled discord channel). OpenClaw **core
owns dispatch** (inbound → agent run) and the reply pipeline via the channel
turn kernel `runtime.channel.turn.run(...)`.
- Fabric already owns turn/shuffle/mention/`/no-reply` server-side, so this
plugin is thin. Fabric's per-recipient **`wakeup`** maps to channel-turn
**admission**:
- `wakeup === true``dispatch` (agent runs, may reply)
- otherwise → `{ kind: "drop", recordHistory: true }` (kept as context)
- **No sidecar, no fake no-reply model, no `before_model_resolve` gating.**
- `kind: "channel"` plugin (like the bundled discord channel). OpenClaw core
owns dispatch and the reply pipeline via the channel-turn kernel
(`resolveAgentRoute` + `finalizeInboundContext` +
`dispatchInboundReplyWithBase`). Fabric already owns turn/shuffle/mention/
`/no-reply` server-side, so this plugin is thin.
- Fabric's per-recipient **`wakeup`** maps to admission:
- `wakeup === true`**dispatch** (the agent runs and may reply).
- `wakeup !== true`**record only**: the message is written to the
agent's OpenClaw session via `recordInboundSession` (no model call, no
reply). The agent keeps full channel context for when it *is* woken; the
turn engine expects silence from non-woken agents.
- Replies are forced to **automatic** delivery
(`replyOptions.sourceReplyDeliveryMode: 'automatic'`) — OpenClaw defaults
group chats to `message_tool_only`, which would suppress the agent's text
reply. Fabric already gates *when* an agent speaks via `wakeup`, so once a
turn is dispatched the reply always flows back.
- One Fabric socket per agent identity. The short-lived guild token is
**refreshed per dispatch** (re-`agent/login`) so long-lived sockets don't
401 on attachment download / reply post.
## Files to agents
When an inbound message has `attachments[]`, the plugin downloads each file
(with the agent's guild token) to a temp dir and sets local
`MediaPaths`/`MediaTypes` on the inbound context so the agent receives the
files. `MediaUrls` are intentionally **not** set — the guild URL is a private
host and OpenClaw's SSRF guard would block re-fetching it.
## Auth
Each agent has a Fabric Center **API key** (mint via Center CLI:
`node dist/cli.js user apikey --email <agent-email>`). The key is exchanged for
a user session (`POST /auth/agent/login`) used to receive (socket) and post
replies. Bind a key to an agent via the `fabric-register` tool, or pre-populate
the identity file.
`node dist/cli.js user apikey --email <agent-email>`). The key is exchanged
for a user session (`POST /auth/agent/login`).
### Binding a key to an agent (one-time)
Two ways, both write the same identity registry the transport reads:
1. **Static config** — set `channels.fabric.accounts.<agentId> =
{ fabricApiKey, enabled }`. The agent never runs anything.
2. **`fabric-register` script** — installed to `~/.openclaw/bin/fabric-register`
by the installer (it is **not** an OpenClaw tool):
```bash
# agent id from $AGENT_ID (set in the agent runtime):
~/.openclaw/bin/fabric-register --api-key fak_…
# or pass it explicitly:
~/.openclaw/bin/fabric-register --agent-id <agent> --api-key fak_…
```
It validates the key against Center, then writes
`~/.openclaw/fabric-identity.json`. One-time and persistent — *not* per
login; the plugin's transport logs in and stays connected on its own.
**Only `AGENT_ID` is read from the environment** — if unset, `--agent-id`
is required. `--api-key` is flag-only (never from the env). Other flags:
`--center`, `--identity-file`, `--openclaw-path` (sensible defaults;
`--center` also falls back to `openclaw.json`). Restart the gateway
afterwards.
## Config
- `channels.fabric.centerApiBase` — e.g. `http://localhost:7001/api`
- `channels.fabric.commandsSyncKey` — **required**; must equal the guild's
`FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the
guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`.
Sourced from config only — never from the environment.
- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent
turn whose blocks are `text → thinking/tool → text` into multiple
`deliver()` calls; this buffers them and posts ONE Fabric message at the
deterministic turn boundary (right after the inbound reply dispatch
resolves — no hooks, no timers, no idle guessing). `false` = raw
per-segment posting.
- `channels.fabric.accounts.<agentId>` = `{ fabricApiKey, enabled }`
(agent = account; the account id is the openclaw agentId)
(**agent = account**; the account id is the OpenClaw agentId)
- plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json`
(`{ entries: [{ agentId, fabricApiKey }] }`)
### Required: route binding (account → agent)
openclaw routes a channel turn to an agent via `cfg.bindings`. Without a
Fabric binding it falls back to the default agent. Add one per account:
OpenClaw routes a channel turn via `cfg.bindings`; without a Fabric binding
it falls back to the default agent. One per account:
```json
{ "agentId": "<agent>", "match": { "channel": "fabric", "accountId": "<account>" } }
```
e.g. `openclaw config set bindings '[…, {"agentId":"echo","match":{"channel":"fabric","accountId":"echo"}}]' --json`,
then `openclaw gateway restart`.
Then `openclaw gateway restart`.
## Tools
- `fabric-register` — bind this agent's Fabric API key
(Key binding is **not** a tool — see *Binding a key to an agent* above.)
- `create-chat-channel` (general) / `create-work-channel` (work) /
`create-report-channel` (report) / `create-discussion-channel` (discuss)
- `discussion-complete` — post a summary then close the channel
(Fabric `POST /channels/:id/close`; closed → history readable, posts → 409)
- `discussion-complete` — post a summary, then close the channel
(closed → history readable; new posts → `409`)
- `fabric-canvas` — manage a channel's single pinned canvas document; one
tool, four `action`s: `read` (current canvas or null) · `share`
(create/replace; caller becomes sharer) · `update` (edit in place;
sharer-only) · `close` (remove; sharer-only). `share` needs
`title`/`format`(`md`|`html`|`text`)/`source`.
- `fabric-channel` — channel membership; one tool, three `action`s:
`members` (list the channel's member userIds) · `join` (this agent
joins) · `leave` (this agent leaves).
## Transport (Phase 1 = B1)
## Slash-command catalog
One Fabric socket per agent identity, in the plugin runtime. Firehose (B2) is
a later drop-in behind the same `dispatch()` seam.
On `gateway_start` the plugin syncs OpenClaw's native-command catalog to
each guild (`command-sync.ts`: `listNativeCommandSpecsForConfig` +
`findCommandByNativeName`, dynamic arg `choices` snapshotted via
`resolveCommandArgChoices`) → `PUT /api/commands`. This is exactly the data
Discord registers as slash commands; Fabric's frontend uses it for `/`
autocomplete. Fabric deliberately does **not** advertise the
`nativeCommands` channel capability — it stays a text-command surface, so a
`/<cmd>` message is delivered normally and OpenClaw's command system
executes it (text-command + command session + auth), reusing the standard
inbound path. Only `/no-reply` and `/force-proceed` stay
server-intercepted by the guild.
## Build
## Install / build
```bash
npm install && npm run build
node install.mjs # build + copy to ~/.openclaw/plugins/fabric + configure
```
> The plugin compiles against the host's OpenClaw SDK
> (`openclaw/plugin-sdk/*`); build inside the OpenClaw plugin environment.
`install.mjs` mirrors the PaddedCell-style installer (also `--uninstall`).
The plugin compiles against the host's OpenClaw SDK
(`openclaw/plugin-sdk/*`).
> Transport is Phase 1 (one socket per agent). A firehose variant (B2) is a
> later drop-in behind the same `dispatch()` seam.

161
bin/fabric-register.mjs Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/env node
/**
* fabric-register — bind an OpenClaw agent to a Fabric Center API key.
*
* One-time, self-contained (no plugin deps). Installed to
* ~/.openclaw/bin/fabric-register by the plugin installer.
*
* AGENT_ID is read from the environment if set; otherwise --agent-id is
* required. The API key is validated against Center (POST
* /auth/agent/login) and, on success, written to the plugin's identity
* file so the Fabric channel plugin can connect that agent.
*
* Usage:
* fabric-register --api-key fak_xxx # uses $AGENT_ID
* fabric-register --agent-id echo --api-key fak_xxx
*
* Only AGENT_ID is read from the environment; everything else is a flag.
*
* Flags:
* --agent-id <id> required unless the AGENT_ID env var is set
* --api-key <fak_…> required (flag only — never from the environment)
* --center <url> else openclaw.json channels.fabric.centerApiBase;
* else http://localhost:7001/api
* --identity-file <path> default ~/.openclaw/fabric-identity.json
* --openclaw-path <dir> default ~/.openclaw
* -h | --help
*/
import { readFileSync, writeFileSync, mkdirSync, existsSync } from 'node:fs';
import { dirname, join, resolve } from 'node:path';
import { homedir } from 'node:os';
function parseArgs(argv) {
const out = {};
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '-h' || a === '--help') out.help = true;
else if (a.startsWith('--')) {
const key = a.slice(2);
const v = argv[i + 1];
if (v === undefined || v.startsWith('--')) out[key] = true;
else {
out[key] = v;
i++;
}
}
}
return out;
}
const HELP = `fabric-register — bind an OpenClaw agent to a Fabric Center API key
fabric-register --api-key fak_xxx # agent id from $AGENT_ID
fabric-register --agent-id <id> --api-key fak_xxx
--agent-id <id> required unless the AGENT_ID env var is set
--api-key <fak_…> required (flag only — never read from the env)
--center <url> Center API base (else openclaw.json
channels.fabric.centerApiBase; else
http://localhost:7001/api)
--identity-file <path> default ~/.openclaw/fabric-identity.json
--openclaw-path <dir> default ~/.openclaw
-h, --help
Only AGENT_ID is taken from the environment; everything else is a flag.
`;
function fail(msg) {
console.error(`fabric-register: ${msg}`);
process.exit(2);
}
async function main() {
const a = parseArgs(process.argv.slice(2));
if (a.help) {
process.stdout.write(HELP);
return;
}
// agent id: env AGENT_ID wins; else --agent-id is required.
const agentId =
(process.env.AGENT_ID && process.env.AGENT_ID.trim()) ||
(typeof a['agent-id'] === 'string' && a['agent-id'].trim());
if (!agentId) {
fail('no agent id: set the AGENT_ID environment variable or pass --agent-id <id>');
}
// api key: flag ONLY — never from the environment.
const apiKey = typeof a['api-key'] === 'string' && a['api-key'].trim();
if (!apiKey) fail('missing --api-key <fak_…> (flag only)');
const openclawPath = resolve(
(typeof a['openclaw-path'] === 'string' && a['openclaw-path']) ||
join(homedir(), '.openclaw'),
);
// center api base: flag > openclaw.json > default
let center = (typeof a.center === 'string' && a.center) || '';
if (!center) {
try {
const cfg = JSON.parse(readFileSync(join(openclawPath, 'openclaw.json'), 'utf8'));
center = cfg?.channels?.fabric?.centerApiBase || '';
} catch {
/* fall through to default */
}
}
if (!center) center = 'http://localhost:7001/api';
center = center.replace(/\/+$/, '');
const identityFile = resolve(
(typeof a['identity-file'] === 'string' && a['identity-file']) ||
join(openclawPath, 'fabric-identity.json'),
);
// 1) validate the key against Center (also resolves the Fabric identity)
let session;
try {
const res = await fetch(`${center}/auth/agent/login`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ apiKey }),
});
if (!res.ok) {
const t = await res.text().catch(() => '');
fail(`Center rejected the key: POST ${center}/auth/agent/login -> ${res.status} ${t}`);
}
session = await res.json();
} catch (e) {
fail(`could not reach Center at ${center}: ${e?.message || e}`);
}
// 2) upsert into the identity file (merge by agentId)
let file = { entries: [] };
if (existsSync(identityFile)) {
try {
const parsed = JSON.parse(readFileSync(identityFile, 'utf8'));
if (parsed && Array.isArray(parsed.entries)) file = parsed;
} catch {
/* corrupt -> overwrite */
}
}
const entry = {
agentId,
fabricApiKey: apiKey,
fabricUserId: session?.user?.id,
displayName: session?.user?.name,
};
const i = file.entries.findIndex((e) => e && e.agentId === agentId);
if (i >= 0) file.entries[i] = { ...file.entries[i], ...entry };
else file.entries.push(entry);
mkdirSync(dirname(identityFile), { recursive: true });
writeFileSync(identityFile, JSON.stringify(file, null, 2));
console.log(
`fabric-register: bound agent "${agentId}" -> Fabric user ` +
`${session?.user?.email || session?.user?.id} (${identityFile}). ` +
`Restart the gateway to connect: openclaw gateway restart`,
);
}
main().catch((e) => fail(e?.message || String(e)));

View File

@@ -5,11 +5,13 @@
// the OpenClawPluginApi for runtime startup (transport + tools).
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef = null;
@@ -57,8 +59,14 @@ export default defineChannelPluginEntry({
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
void inbound.start();
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -7,6 +7,17 @@ const DEFAULT_CENTER = 'http://localhost:7001/api';
function section(cfg) {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg) {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg) {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg) {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -137,7 +137,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({
attachedResults: {
channel: 'fabric',
sendText: async (ctx) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {});
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

75
dist/fabric/src/coalesce.js vendored Normal file
View File

@@ -0,0 +1,75 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x) {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
const pendingByChannel = new Map();
async function flushChannel(channelId, reason) {
const p = pendingByChannel.get(channelId);
if (!p)
return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text)
return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
}
catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params) {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text)
return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
}
else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(rawChannelId) {
const cid = normChannelId(rawChannelId);
if (cid)
await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric() {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

109
dist/fabric/src/command-sync.js vendored Normal file
View File

@@ -0,0 +1,109 @@
// Build the Fabric slash-command catalog from OpenClaw's native-command
// specs (the same source Discord uses to register slash commands) and push
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
// message is delivered normally and OpenClaw's command system executes it —
// this catalog only drives the frontend `/` autocomplete, so we resolve any
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import { listNativeCommandSpecsForConfig, findCommandByNativeName, resolveCommandArgChoices, } from 'openclaw/plugin-sdk/native-command-registry';
import { resolveCommandsSyncKey } from './accounts.js';
function normChoice(c) {
if (typeof c === 'string')
return { value: c, label: c };
const o = c;
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
}
export function buildFabricCommandSpecs(cfg) {
const specs = listNativeCommandSpecsForConfig(cfg, {
provider: 'fabric',
});
return specs.map((s) => {
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
const def = findCommandByNativeName(s.name, 'fabric');
const args = (s.args ?? []).map((a) => {
const raw = a.choices;
let choices = null;
if (Array.isArray(raw)) {
choices = raw.map(normChoice);
}
else if (typeof raw === 'function' && def) {
try {
const r = resolveCommandArgChoices({
command: def,
arg: a,
cfg: cfg,
provider: 'fabric',
});
choices = r.map((x) => ({ value: x.value, label: x.label }));
}
catch {
choices = null;
}
}
return {
name: String(a.name ?? ''),
description: String(a.description ?? ''),
type: String(a.type ?? 'string'),
required: !!a.required,
captureRemaining: !!a.captureRemaining,
preferAutocomplete: !!a.preferAutocomplete,
choices,
};
});
return {
name: s.name,
nativeName: s.name,
description: s.description,
acceptsArgs: !!s.acceptsArgs,
args,
argsParsing: def?.argsParsing ?? 'positional',
};
});
}
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(client, cfg, accounts, log) {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg);
if (!syncKey) {
log.warn('fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)');
return;
}
let specs;
try {
specs = buildFabricCommandSpecs(cfg);
}
catch (err) {
log.warn(`fabric: build command specs failed: ${String(err)}`);
return;
}
if (!specs.length)
return;
const done = new Set();
for (const a of accounts) {
let session;
try {
session = await client.agentLogin(a.fabricApiKey);
}
catch {
continue;
}
for (const g of session.guilds) {
if (done.has(g.nodeId))
continue;
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
continue;
try {
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
}
catch (err) {
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
}
}
}
}

View File

@@ -21,6 +21,25 @@ export class FabricClient {
}
return (await res.json());
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
async req(method, url, auth, body, extraHeaders) {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
}
const text = await res.text();
return (text ? JSON.parse(text) : null);
}
// Exchange an agent API key for a Fabric user session (+ guild tokens).
agentLogin(apiKey) {
return this.post(`${this.centerApiBase}/auth/agent/login`, { apiKey });
@@ -50,4 +69,40 @@ export class FabricClient {
joinChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
}
leaveChannel(guildEndpoint, guildToken, channelId) {
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
}
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(guildEndpoint, guildToken, commands, syncKey) {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands }, syncKey ? { 'x-commands-sync-key': syncKey } : undefined);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(guildEndpoint, guildToken, channelId) {
return this.req('GET', `${guildEndpoint}/api/channels/${channelId}/members`, guildToken);
}
// ---- channel canvas (one pinned doc per channel) ----
canvasUrl(endpoint, channelId) {
return `${endpoint}/api/channels/${channelId}/canvas`;
}
// null when the channel has no canvas
getCanvas(endpoint, token, channelId) {
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
}
// share / replace (caller becomes the sharer)
shareCanvas(endpoint, token, channelId, body) {
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
}
// update in place (original sharer only — else the guild returns 403)
updateCanvas(endpoint, token, channelId, body) {
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
}
// remove ("close") the canvas (original sharer only)
removeCanvas(endpoint, token, channelId) {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
}

View File

@@ -3,6 +3,8 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
cfg;
@@ -12,6 +14,14 @@ export class FabricInbound {
accounts;
sockets = [];
seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -73,6 +83,9 @@ export class FabricInbound {
}
}
stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets)
s.disconnect();
this.sockets = [];
@@ -88,19 +101,61 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
const channels = res.ok ? (await res.json()) : [];
for (const c of channels)
socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
}
catch {
/* best effort */
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)
@@ -162,12 +217,6 @@ export class FabricInbound {
return out;
}
async dispatch(agentId, guild, channelId, m, session) {
// wakeup === false -> drop (Fabric already decided this agent is silent)
if (m.wakeup !== true) {
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const core = this.core;
const cfg = this.cfg;
try {
@@ -180,11 +229,7 @@ export class FabricInbound {
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
const baseCtx = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
@@ -202,6 +247,36 @@ export class FabricInbound {
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
};
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err) => this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
@@ -228,8 +303,16 @@ export class FabricInbound {
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt)
return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg),
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
log: (m) => this.log.info(m),
});
},
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
@@ -254,5 +337,12 @@ export class FabricInbound {
catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
}
finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -9,7 +9,9 @@ export function registerFabricTools(api, client, identity) {
const ctxGuild = async (agentId, guildNodeId) => {
const entry = identity.findByAgentId(agentId);
if (!entry)
throw new Error(`agent ${agentId} not registered (call fabric-register)`);
throw new Error(`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
@@ -17,32 +19,10 @@ export function registerFabricTools(api, client, identity) {
throw new Error(`agent not a member of guild ${guildNodeId}`);
return { session, guild, token };
};
// fabric-register: bind this agent to a Fabric API key.
api.registerTool((ctx) => ({
name: 'fabric-register',
description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).",
parameters: {
type: 'object',
additionalProperties: false,
required: ['fabricApiKey'],
properties: {
fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' },
},
},
execute: async (params) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const session = await client.agentLogin(params.fabricApiKey);
identity.upsert({
agentId,
fabricApiKey: params.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
return { ok: true, user: session.user };
},
}));
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind) => api.registerTool((ctx) => ({
name: `create-${kind}-channel`,
description: `Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}).`,
@@ -109,4 +89,117 @@ export function registerFabricTools(api, client, identity) {
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise).
api.registerTool((ctx) => ({
name: 'fabric-canvas',
description: "Manage a channel's pinned canvas document. action: " +
"read (current canvas or null) | share (create/replace; you become " +
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
'sharer only).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
title: { type: 'string', description: 'share: required; update: optional' },
format: {
type: 'string',
enum: ['md', 'html', 'text'],
description: 'share: required; update: optional',
},
source: {
type: 'string',
description: 'document body. share: required; update: optional',
},
},
},
execute: async (p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'read': {
const canvas = await client.getCanvas(ep, token, p.channelId);
return { ok: true, canvas };
}
case 'share': {
if (!p.title || !p.format || p.source === undefined) {
return { ok: false, error: 'share requires title, format, and source' };
}
const canvas = await client.shareCanvas(ep, token, p.channelId, {
title: p.title,
format: p.format,
source: p.source,
});
return { ok: true, canvas };
}
case 'update': {
const body = {};
if (p.title !== undefined)
body.title = p.title;
if (p.format !== undefined)
body.format = p.format;
if (p.source !== undefined)
body.source = p.source;
if (Object.keys(body).length === 0) {
return { ok: false, error: 'update needs at least one of title/format/source' };
}
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
return { ok: true, canvas };
}
case 'close': {
await client.removeCanvas(ep, token, p.channelId);
return { ok: true, removed: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// fabric-channel: channel membership (one tool, three actions).
api.registerTool((ctx) => ({
name: 'fabric-channel',
description: 'Channel membership. action: members (list channel member userIds) | ' +
'join (this agent joins the channel) | leave (this agent leaves).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['members', 'join', 'leave'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
},
},
execute: async (p) => {
const agentId = ctx.agentId;
if (!agentId)
return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'members': {
const members = await client.channelMembers(ep, token, p.channelId);
return { ok: true, members };
}
case 'join': {
await client.joinChannel(ep, token, p.channelId);
return { ok: true, joined: true };
}
case 'leave': {
await client.leaveChannel(ep, token, p.channelId);
return { ok: true, left: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
}

View File

@@ -6,11 +6,13 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import path from 'node:path';
import os from 'node:os';
@@ -36,7 +38,7 @@ export default defineChannelPluginEntry({
config?: unknown;
pluginConfig?: { identityFilePath?: string };
logger: { info: (m: string) => void; warn: (m: string) => void };
on: (ev: string, fn: () => void) => void;
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
registerTool: (d: unknown) => void;
};
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
@@ -82,9 +84,15 @@ export default defineChannelPluginEntry({
);
void inbound.start();
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
// Note: the per-turn coalesce flush happens deterministically in
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
// is the real "all deliveries done" boundary; the agent_end hook fires
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
inbound?.stop();
inbound = null;
});

View File

@@ -10,7 +10,7 @@
*/
import { execSync } from 'child_process';
import { existsSync, mkdirSync, copyFileSync, readdirSync, rmSync } from 'fs';
import { existsSync, mkdirSync, copyFileSync, readdirSync, rmSync, chmodSync } from 'fs';
import { dirname, join, resolve } from 'path';
import { fileURLToPath } from 'url';
import { homedir } from 'os';
@@ -109,12 +109,30 @@ function build() {
ok('compiled -> dist/fabric');
}
function binTarget(base) {
return join(base, 'bin', 'fabric-register');
}
function installBinScript(base) {
const src = join(__dirname, 'bin', 'fabric-register.mjs');
const dst = binTarget(base);
mkdirSync(dirname(dst), { recursive: true });
copyFileSync(src, dst);
chmodSync(dst, 0o755);
ok(`fabric-register -> ${dst}`);
}
function clearInstall(base) {
const dest = join(base, 'plugins', PLUGIN_ID);
if (existsSync(dest)) {
rmSync(dest, { recursive: true, force: true });
ok(`removed ${dest}`);
}
const bin = binTarget(base);
if (existsSync(bin)) {
rmSync(bin, { force: true });
ok(`removed ${bin}`);
}
}
function cleanupConfig(base) {
const dest = join(base, 'plugins', PLUGIN_ID);
@@ -149,6 +167,7 @@ function install() {
ok(`plugin files -> ${dest}`);
exec('npm install --omit=dev', { cwd: dest, silent: !opt.verbose });
ok('runtime deps installed');
installBinScript(base);
return { base, dest };
}
@@ -200,8 +219,10 @@ function main() {
console.log('');
log('Install complete. Next:', 'blue');
log(' 1. Mint an agent key: (in Center) node dist/cli.js user apikey --email <agent-email>', 'cyan');
log(' 2. openclaw gateway restart', 'cyan');
log(' 3. As an agent, call the fabric-register tool with that key', 'cyan');
log(' 2. Bind it to an agent (one-time), either:', 'cyan');
log(' AGENT_ID=<agent> ~/.openclaw/bin/fabric-register --api-key <fak_…>', 'cyan');
log(' (or pass --agent-id <agent>; or set channels.fabric.accounts.<agent>)', 'cyan');
log(' 3. openclaw gateway restart', 'cyan');
console.log('');
} catch (e) {
log(`\nInstall failed: ${e.message}`, 'red');

View File

@@ -39,6 +39,15 @@
"type": "string",
"description": "Fabric Center API base, e.g. http://localhost:7001/api"
},
"commandsSyncKey": {
"type": "string",
"minLength": 1,
"description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js"
},
"coalesce": {
"type": "boolean",
"description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting."
},
"dmSecurity": { "type": "string" },
"dmPolicy": { "type": "string" },
"enabled": { "type": "boolean" },
@@ -59,10 +68,12 @@
}
}
}
}
},
"required": ["commandsSyncKey"]
},
"uiHints": {
"centerApiBase": { "label": "Center API base" }
"centerApiBase": { "label": "Center API base" },
"commandsSyncKey": { "label": "Commands sync key" }
}
}
}

View File

@@ -14,6 +14,15 @@ export type FabricAccountConfig = {
export type FabricChannelConfig = {
centerApiBase?: string;
// Shared secret matching the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY
// (Guild C-2). Required by the channel config schema; sourced from config
// only — never from the environment.
commandsSyncKey?: string;
// Coalesce an agent turn that OpenClaw split into multiple deliveries
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
// message. The flush boundary is the deterministic `agent_end` hook (not
// a timer). Default true; set false for raw per-segment posting.
coalesce?: boolean;
accounts?: Record<string, FabricAccountConfig>;
defaultAccount?: string;
} & FabricAccountConfig;
@@ -35,6 +44,19 @@ function section(cfg: Cfg): FabricChannelConfig {
return cfg.channels?.fabric ?? {};
}
// The commands-sync shared secret (channel-level only). Empty string when
// unconfigured — callers decide how to handle (slash-command sync is then
// rejected by the guild).
export function resolveCommandsSyncKey(cfg: Cfg): string {
return (section(cfg).commandsSyncKey ?? '').trim();
}
// Whether to coalesce a split agent turn into one Fabric message
// (channel-level). Default true.
export function resolveCoalesce(cfg: Cfg): boolean {
return (cfg.channels?.fabric ?? {}).coalesce !== false;
}
export function listFabricAccountIds(cfg: Cfg): string[] {
const accts = section(cfg).accounts ?? {};
const ids = Object.keys(accts);

View File

@@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
cfg?: unknown;
config?: unknown;
}) => {
// openclaw passes config under cfg or config depending on path
// openclaw passes config under cfg or config depending on path.
// Note: inbound agent replies go through inbound.ts `deliver`
// (where turn coalescing happens). This path is for any direct
// outbound sends and posts immediately.
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
try {
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);

93
src/coalesce.ts Normal file
View File

@@ -0,0 +1,93 @@
// Deterministic turn coalescer.
//
// OpenClaw calls the Fabric `deliver` callback once per assistant text
// segment; a thinking/tool block between two text blocks is a delivery
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
// multiple deliver() calls. There is no turn id on the delivery, so we
// BUFFER segments by Fabric channelId and post the merged message when the
// turn truly ends. The flush is driven by inbound.ts right after
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
// agent_end hook, which fires before deliver()). `coalesce=false` posts
// each segment immediately.
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
export function normChannelId(x: string | null | undefined): string {
const s = String(x ?? '');
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
}
type Pending = {
parts: string[];
post: (text: string) => Promise<void>;
log?: (m: string) => void;
safety: ReturnType<typeof setTimeout>;
};
const pendingByChannel = new Map<string, Pending>();
async function flushChannel(channelId: string, reason: string): Promise<void> {
const p = pendingByChannel.get(channelId);
if (!p) return;
pendingByChannel.delete(channelId);
clearTimeout(p.safety);
const text = p.parts.join('\n\n').trim();
if (!text) return;
try {
await p.post(text);
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
} catch (e) {
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
}
}
// Buffer one delivered segment (or send immediately when coalesce=false).
// `post` performs the real Fabric postMessage with the caller's already
// resolved guild/token; on flush it is called once with the merged text.
export async function enqueueDelivery(params: {
channelId: string;
text: string;
coalesce: boolean;
post: (text: string) => Promise<void>;
log?: (m: string) => void;
}): Promise<void> {
const cid = normChannelId(params.channelId);
const text = (params.text ?? '').trim();
if (!text) return;
if (!params.coalesce) {
await params.post(text);
return;
}
const existing = pendingByChannel.get(cid);
if (existing) {
existing.parts.push(text);
existing.post = params.post; // freshest guild/token closure
existing.log = params.log;
} else {
pendingByChannel.set(cid, {
parts: [text],
post: params.post,
log: params.log,
safety: setTimeout(
() => void flushChannel(cid, 'safety-timeout'),
SAFETY_FLUSH_MS,
),
});
}
}
// Called by the agent_end hook with the hook ctx's channelId (bare or
// fabric:-prefixed). Deterministic per-turn boundary.
export async function flushFabricForChannel(
rawChannelId: string | null | undefined,
): Promise<void> {
const cid = normChannelId(rawChannelId);
if (cid) await flushChannel(cid, 'dispatch-end');
}
// gateway_stop: flush anything still buffered.
export async function flushAllFabric(): Promise<void> {
for (const cid of [...pendingByChannel.keys()]) {
await flushChannel(cid, 'gateway_stop');
}
}

149
src/command-sync.ts Normal file
View File

@@ -0,0 +1,149 @@
// Build the Fabric slash-command catalog from OpenClaw's native-command
// specs (the same source Discord uses to register slash commands) and push
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
// message is delivered normally and OpenClaw's command system executes it —
// this catalog only drives the frontend `/` autocomplete, so we resolve any
// dynamic arg `choices` to a static snapshot here (like Discord does at
// registration time).
import {
listNativeCommandSpecsForConfig,
findCommandByNativeName,
resolveCommandArgChoices,
} from 'openclaw/plugin-sdk/native-command-registry';
import type { FabricClient } from './fabric-client.js';
import { resolveCommandsSyncKey } from './accounts.js';
type Logger = { info: (m: string) => void; warn: (m: string) => void };
type FabricArg = {
name: string;
description: string;
type: string;
required: boolean;
captureRemaining: boolean;
preferAutocomplete: boolean;
choices: Array<{ value: string; label: string }> | null;
};
type FabricCommand = {
name: string;
nativeName: string;
description: string;
acceptsArgs: boolean;
args: FabricArg[];
argsParsing: string;
};
function normChoice(c: unknown): { value: string; label: string } {
if (typeof c === 'string') return { value: c, label: c };
const o = c as { value?: string; label?: string };
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
}
export function buildFabricCommandSpecs(cfg: unknown): FabricCommand[] {
const specs = listNativeCommandSpecsForConfig(cfg as never, {
provider: 'fabric',
}) as Array<{
name: string;
description: string;
acceptsArgs?: boolean;
args?: Array<Record<string, unknown>>;
}>;
return specs.map((s) => {
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
const def = findCommandByNativeName(s.name, 'fabric') as
| { argsParsing?: string; args?: Array<Record<string, unknown>> }
| undefined;
const args: FabricArg[] = (s.args ?? []).map((a) => {
const raw = a.choices;
let choices: Array<{ value: string; label: string }> | null = null;
if (Array.isArray(raw)) {
choices = raw.map(normChoice);
} else if (typeof raw === 'function' && def) {
try {
const r = resolveCommandArgChoices({
command: def as never,
arg: a as never,
cfg: cfg as never,
provider: 'fabric',
}) as Array<{ value: string; label: string }>;
choices = r.map((x) => ({ value: x.value, label: x.label }));
} catch {
choices = null;
}
}
return {
name: String(a.name ?? ''),
description: String(a.description ?? ''),
type: String(a.type ?? 'string'),
required: !!a.required,
captureRemaining: !!a.captureRemaining,
preferAutocomplete: !!a.preferAutocomplete,
choices,
};
});
return {
name: s.name,
nativeName: s.name,
description: s.description,
acceptsArgs: !!s.acceptsArgs,
args,
argsParsing: def?.argsParsing ?? 'positional',
};
});
}
// Push the catalog to every guild the known agents belong to (idempotent;
// the catalog is OpenClaw-global, so one PUT per guild is enough).
export async function syncFabricCommands(
client: FabricClient,
cfg: unknown,
accounts: Array<{ agentId: string; fabricApiKey: string }>,
log: Logger,
): Promise<void> {
// Guild C-2: the sync key comes from the channel config only (schema
// marks it required). Without it the guild rejects the catalog write.
const syncKey = resolveCommandsSyncKey(cfg as never);
if (!syncKey) {
log.warn(
'fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)',
);
return;
}
let specs: FabricCommand[];
try {
specs = buildFabricCommandSpecs(cfg);
} catch (err) {
log.warn(`fabric: build command specs failed: ${String(err)}`);
return;
}
if (!specs.length) return;
const done = new Set<string>();
for (const a of accounts) {
let session;
try {
session = await client.agentLogin(a.fabricApiKey);
} catch {
continue;
}
for (const g of session.guilds) {
if (done.has(g.nodeId)) continue;
const tok = session.guildAccessTokens.find(
(t) => t.guildNodeId === g.nodeId,
)?.token;
if (!tok) continue;
try {
await client.syncCommands(g.endpoint, tok, specs, syncKey);
done.add(g.nodeId);
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
} catch (err) {
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
}
}
}
}

View File

@@ -29,6 +29,32 @@ export class FabricClient {
return (await res.json()) as T;
}
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
// (Fabric returns an empty body when a channel has no canvas).
private async req<T>(
method: string,
url: string,
auth?: string,
body?: unknown,
extraHeaders?: Record<string, string>,
): Promise<T> {
const res = await fetch(url, {
method,
headers: {
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
...(auth ? { authorization: `Bearer ${auth}` } : {}),
...(extraHeaders ?? {}),
},
body: body !== undefined ? JSON.stringify(body) : undefined,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
}
const text = await res.text();
return (text ? JSON.parse(text) : null) as T;
}
// Exchange an agent API key for a Fabric user session (+ guild tokens).
agentLogin(apiKey: string): Promise<FabricSession> {
return this.post<FabricSession>(`${this.centerApiBase}/auth/agent/login`, { apiKey });
@@ -86,4 +112,95 @@ export class FabricClient {
joinChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
}
leaveChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
}
// Register the OpenClaw slash-command catalog with this guild (idempotent
// full replace). The frontend GETs it for `/` autocomplete; execution
// still flows as a normal /<cmd> message into OpenClaw's command system.
syncCommands(
guildEndpoint: string,
guildToken: string,
commands: unknown[],
syncKey: string,
): Promise<unknown> {
// Guild C-2: the shared key is sourced from the channel config
// (channels.fabric.commandsSyncKey) and must equal the guild's
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
return this.req(
'PUT',
`${guildEndpoint}/api/commands`,
guildToken,
{ commands },
syncKey ? { 'x-commands-sync-key': syncKey } : undefined,
);
}
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
channelMembers(
guildEndpoint: string,
guildToken: string,
channelId: string,
): Promise<Array<{ userId: string; bypass?: boolean }>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels/${channelId}/members`,
guildToken,
);
}
// ---- channel canvas (one pinned doc per channel) ----
private canvasUrl(endpoint: string, channelId: string): string {
return `${endpoint}/api/channels/${channelId}/canvas`;
}
// null when the channel has no canvas
getCanvas(
endpoint: string,
token: string,
channelId: string,
): Promise<FabricCanvas | null> {
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
}
// share / replace (caller becomes the sharer)
shareCanvas(
endpoint: string,
token: string,
channelId: string,
body: CanvasInput,
): Promise<FabricCanvas> {
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
}
// update in place (original sharer only — else the guild returns 403)
updateCanvas(
endpoint: string,
token: string,
channelId: string,
body: Partial<CanvasInput>,
): Promise<FabricCanvas> {
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
}
// remove ("close") the canvas (original sharer only)
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
}
export type CanvasFormat = 'md' | 'html' | 'text';
export type CanvasInput = { title: string; format: CanvasFormat; source: string };
export type FabricCanvas = {
channelId: string;
sharerUserId: string;
title: string;
format: CanvasFormat;
source: string;
version: number;
createdAt: string;
updatedAt: string;
};

View File

@@ -5,6 +5,8 @@ import { io, type Socket } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
// channels (nextcloud-talk) drive the kernel:
@@ -16,7 +18,16 @@ import type { IdentityRegistry } from './identity.js';
type Core = {
channel: {
routing: { resolveAgentRoute: (p: unknown) => { agentId: string; sessionKey: string; accountId?: string } };
session: { resolveStorePath: (store: unknown, o: { agentId: string }) => string };
session: {
resolveStorePath: (store: unknown, o: { agentId: string }) => string;
recordInboundSession: (p: {
storePath: string;
sessionKey: string;
ctx: unknown;
createIfMissing?: boolean;
onRecordError: (e: unknown) => void;
}) => Promise<unknown>;
};
reply: { finalizeInboundContext: (p: Record<string, unknown>) => unknown };
};
};
@@ -33,11 +44,22 @@ type FabricMessage = {
channelId?: string;
attachments?: FabricAttachment[];
wakeup?: boolean;
// x-type of the channel (sent on message.created). 'dm' bypasses the
// wakeup gate: any message that isn't the agent's own is delivered.
xType?: string;
};
export class FabricInbound {
private sockets: Socket[] = [];
private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
private channelSyncTimers: NodeJS.Timeout[] = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -105,6 +127,8 @@ export class FabricInbound {
}
stop(): void {
for (const t of this.channelSyncTimers) clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect();
this.sockets = [];
}
@@ -119,20 +143,67 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set<string>();
const syncChannels = async (kind: 'initial' | 'resync') => {
let freshTok: string | undefined;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
} catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
{ headers: { authorization: `Bearer ${tok}` } },
{ headers: { authorization: `Bearer ${authTok}` } },
);
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
if (!res.ok) return;
const channels = (await res.json()) as Array<{ id: string }>;
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
);
} else if (added > 0 || removed > 0) {
this.log.info(
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
);
}
} catch {
/* best effort */
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
@@ -202,13 +273,6 @@ export class FabricInbound {
m: FabricMessage,
session: FabricSession,
): Promise<void> {
// wakeup === false -> drop (Fabric already decided this agent is silent)
if (m.wakeup !== true) {
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } };
try {
@@ -222,13 +286,7 @@ export class FabricInbound {
agentId: route.agentId,
});
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
const baseCtx: Record<string, unknown> = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
@@ -246,6 +304,43 @@ export class FabricInbound {
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
};
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err: unknown) =>
this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(
`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`,
);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
@@ -272,8 +367,17 @@ export class FabricInbound {
const text = (payload?.text ?? '').trim();
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt) return;
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg as never),
post: (t) =>
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
log: (m) => this.log.info(m),
});
},
onRecordError: (err: unknown) =>
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
@@ -298,6 +402,12 @@ export class FabricInbound {
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
} catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
} finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}

View File

@@ -25,7 +25,12 @@ export function registerFabricTools(
// Resolve the calling agent's Fabric session + a guild's token/endpoint.
const ctxGuild = async (agentId: string, guildNodeId: string) => {
const entry = identity.findByAgentId(agentId);
if (!entry) throw new Error(`agent ${agentId} not registered (call fabric-register)`);
if (!entry)
throw new Error(
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`,
);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
const token = session.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
@@ -33,31 +38,10 @@ export function registerFabricTools(
return { session, guild, token };
};
// fabric-register: bind this agent to a Fabric API key.
api.registerTool((ctx: Ctx) => ({
name: 'fabric-register',
description: "Register this agent's Fabric API key (minted via Center CLI `user apikey`).",
parameters: {
type: 'object',
additionalProperties: false,
required: ['fabricApiKey'],
properties: {
fabricApiKey: { type: 'string', description: 'Fabric Center API key (fak_…)' },
},
},
execute: async (params: { fabricApiKey: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const session = await client.agentLogin(params.fabricApiKey);
identity.upsert({
agentId,
fabricApiKey: params.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
return { ok: true, user: session.user };
},
}));
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({
@@ -136,4 +120,127 @@ export function registerFabricTools(
return { ok: true, closed: true };
},
}));
// fabric-canvas: share / update / read / close the channel's single
// pinned canvas document (one tool, four actions). update/close are
// sharer-only server-side (the guild returns 403 otherwise).
api.registerTool((ctx: Ctx) => ({
name: 'fabric-canvas',
description:
"Manage a channel's pinned canvas document. action: " +
"read (current canvas or null) | share (create/replace; you become " +
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
'sharer only).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
title: { type: 'string', description: 'share: required; update: optional' },
format: {
type: 'string',
enum: ['md', 'html', 'text'],
description: 'share: required; update: optional',
},
source: {
type: 'string',
description: 'document body. share: required; update: optional',
},
},
},
execute: async (p: {
action: 'read' | 'share' | 'update' | 'close';
guildNodeId: string;
channelId: string;
title?: string;
format?: 'md' | 'html' | 'text';
source?: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'read': {
const canvas = await client.getCanvas(ep, token, p.channelId);
return { ok: true, canvas };
}
case 'share': {
if (!p.title || !p.format || p.source === undefined) {
return { ok: false, error: 'share requires title, format, and source' };
}
const canvas = await client.shareCanvas(ep, token, p.channelId, {
title: p.title,
format: p.format,
source: p.source,
});
return { ok: true, canvas };
}
case 'update': {
const body: Partial<{ title: string; format: 'md' | 'html' | 'text'; source: string }> = {};
if (p.title !== undefined) body.title = p.title;
if (p.format !== undefined) body.format = p.format;
if (p.source !== undefined) body.source = p.source;
if (Object.keys(body).length === 0) {
return { ok: false, error: 'update needs at least one of title/format/source' };
}
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
return { ok: true, canvas };
}
case 'close': {
await client.removeCanvas(ep, token, p.channelId);
return { ok: true, removed: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
// fabric-channel: channel membership (one tool, three actions).
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel',
description:
'Channel membership. action: members (list channel member userIds) | ' +
'join (this agent joins the channel) | leave (this agent leaves).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['action', 'guildNodeId', 'channelId'],
properties: {
action: { type: 'string', enum: ['members', 'join', 'leave'] },
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
},
},
execute: async (p: {
action: 'members' | 'join' | 'leave';
guildNodeId: string;
channelId: string;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const ep = guild.endpoint;
switch (p.action) {
case 'members': {
const members = await client.channelMembers(ep, token, p.channelId);
return { ok: true, members };
}
case 'join': {
await client.joinChannel(ep, token, p.channelId);
return { ok: true, joined: true };
}
case 'leave': {
await client.leaveChannel(ep, token, p.channelId);
return { ok: true, left: true };
}
default:
return { ok: false, error: `unknown action ${String(p.action)}` };
}
},
}));
}