Compare commits
9 Commits
26c12533fb
...
fix/inboun
| Author | SHA1 | Date | |
|---|---|---|---|
| d1d5ad10ca | |||
| 92945b777d | |||
| 8774cfd7cc | |||
| ab126825ef | |||
| bb63a57384 | |||
| fc6edaabfd | |||
| c03562046d | |||
| fac6debfa5 | |||
| aaabb0ddb0 |
41
README.md
41
README.md
@@ -59,14 +59,25 @@ Two ways, both write the same identity registry the transport reads:
|
||||
It validates the key against Center, then writes
|
||||
`~/.openclaw/fabric-identity.json`. One-time and persistent — *not* per
|
||||
login; the plugin's transport logs in and stays connected on its own.
|
||||
`AGENT_ID` env wins; otherwise `--agent-id` is required. Other flags:
|
||||
`--center`, `--identity-file`, `--openclaw-path` (env equivalents:
|
||||
`FABRIC_API_KEY`, `FABRIC_CENTER_API_BASE`, `FABRIC_IDENTITY_FILE`,
|
||||
`OPENCLAW_PATH`). Restart the gateway afterwards.
|
||||
**Only `AGENT_ID` is read from the environment** — if unset, `--agent-id`
|
||||
is required. `--api-key` is flag-only (never from the env). Other flags:
|
||||
`--center`, `--identity-file`, `--openclaw-path` (sensible defaults;
|
||||
`--center` also falls back to `openclaw.json`). Restart the gateway
|
||||
afterwards.
|
||||
|
||||
## Config
|
||||
|
||||
- `channels.fabric.centerApiBase` — e.g. `http://localhost:7001/api`
|
||||
- `channels.fabric.commandsSyncKey` — **required**; must equal the guild's
|
||||
`FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY` (Guild C-2). Read it from the
|
||||
guild with `docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js`.
|
||||
Sourced from config only — never from the environment.
|
||||
- `channels.fabric.coalesce` — default `true`. OpenClaw splits one agent
|
||||
turn whose blocks are `text → thinking/tool → text` into multiple
|
||||
`deliver()` calls; this buffers them and posts ONE Fabric message at the
|
||||
deterministic turn boundary (right after the inbound reply dispatch
|
||||
resolves — no hooks, no timers, no idle guessing). `false` = raw
|
||||
per-segment posting.
|
||||
- `channels.fabric.accounts.<agentId>` = `{ fabricApiKey, enabled }`
|
||||
(**agent = account**; the account id is the OpenClaw agentId)
|
||||
- plugin `identityFilePath` — default `~/.openclaw/fabric-identity.json`
|
||||
@@ -90,6 +101,28 @@ Then `openclaw gateway restart`.
|
||||
`create-report-channel` (report) / `create-discussion-channel` (discuss)
|
||||
- `discussion-complete` — post a summary, then close the channel
|
||||
(closed → history readable; new posts → `409`)
|
||||
- `fabric-canvas` — manage a channel's single pinned canvas document; one
|
||||
tool, four `action`s: `read` (current canvas or null) · `share`
|
||||
(create/replace; caller becomes sharer) · `update` (edit in place;
|
||||
sharer-only) · `close` (remove; sharer-only). `share` needs
|
||||
`title`/`format`(`md`|`html`|`text`)/`source`.
|
||||
- `fabric-channel` — channel membership; one tool, three `action`s:
|
||||
`members` (list the channel's member userIds) · `join` (this agent
|
||||
joins) · `leave` (this agent leaves).
|
||||
|
||||
## Slash-command catalog
|
||||
|
||||
On `gateway_start` the plugin syncs OpenClaw's native-command catalog to
|
||||
each guild (`command-sync.ts`: `listNativeCommandSpecsForConfig` +
|
||||
`findCommandByNativeName`, dynamic arg `choices` snapshotted via
|
||||
`resolveCommandArgChoices`) → `PUT /api/commands`. This is exactly the data
|
||||
Discord registers as slash commands; Fabric's frontend uses it for `/`
|
||||
autocomplete. Fabric deliberately does **not** advertise the
|
||||
`nativeCommands` channel capability — it stays a text-command surface, so a
|
||||
`/<cmd>` message is delivered normally and OpenClaw's command system
|
||||
executes it (text-command + command session + auth), reusing the standard
|
||||
inbound path. Only `/no-reply` and `/force-proceed` stay
|
||||
server-intercepted by the guild.
|
||||
|
||||
## Install / build
|
||||
|
||||
|
||||
@@ -14,14 +14,15 @@
|
||||
* fabric-register --api-key fak_xxx # uses $AGENT_ID
|
||||
* fabric-register --agent-id echo --api-key fak_xxx
|
||||
*
|
||||
* Flags / env:
|
||||
* --agent-id <id> (or env AGENT_ID; one of them required)
|
||||
* --api-key <fak_…> (or env FABRIC_API_KEY; required)
|
||||
* --center <url> (or env FABRIC_CENTER_API_BASE; else openclaw.json;
|
||||
* else http://localhost:7001/api)
|
||||
* --identity-file <path> (or env FABRIC_IDENTITY_FILE; else
|
||||
* ~/.openclaw/fabric-identity.json)
|
||||
* --openclaw-path <dir> (or env OPENCLAW_PATH; else ~/.openclaw)
|
||||
* Only AGENT_ID is read from the environment; everything else is a flag.
|
||||
*
|
||||
* Flags:
|
||||
* --agent-id <id> required unless the AGENT_ID env var is set
|
||||
* --api-key <fak_…> required (flag only — never from the environment)
|
||||
* --center <url> else openclaw.json channels.fabric.centerApiBase;
|
||||
* else http://localhost:7001/api
|
||||
* --identity-file <path> default ~/.openclaw/fabric-identity.json
|
||||
* --openclaw-path <dir> default ~/.openclaw
|
||||
* -h | --help
|
||||
*/
|
||||
import { readFileSync, writeFileSync, mkdirSync, existsSync } from 'node:fs';
|
||||
@@ -51,14 +52,16 @@ const HELP = `fabric-register — bind an OpenClaw agent to a Fabric Center API
|
||||
fabric-register --api-key fak_xxx # agent id from $AGENT_ID
|
||||
fabric-register --agent-id <id> --api-key fak_xxx
|
||||
|
||||
--agent-id <id> required unless $AGENT_ID is set
|
||||
--api-key <fak_…> required (or env FABRIC_API_KEY)
|
||||
--center <url> Center API base (or env FABRIC_CENTER_API_BASE;
|
||||
else openclaw.json channels.fabric.centerApiBase;
|
||||
else http://localhost:7001/api)
|
||||
--agent-id <id> required unless the AGENT_ID env var is set
|
||||
--api-key <fak_…> required (flag only — never read from the env)
|
||||
--center <url> Center API base (else openclaw.json
|
||||
channels.fabric.centerApiBase; else
|
||||
http://localhost:7001/api)
|
||||
--identity-file <path> default ~/.openclaw/fabric-identity.json
|
||||
--openclaw-path <dir> default ~/.openclaw
|
||||
-h, --help
|
||||
|
||||
Only AGENT_ID is taken from the environment; everything else is a flag.
|
||||
`;
|
||||
|
||||
function fail(msg) {
|
||||
@@ -81,22 +84,17 @@ async function main() {
|
||||
fail('no agent id: set the AGENT_ID environment variable or pass --agent-id <id>');
|
||||
}
|
||||
|
||||
const apiKey =
|
||||
(typeof a['api-key'] === 'string' && a['api-key'].trim()) ||
|
||||
(process.env.FABRIC_API_KEY && process.env.FABRIC_API_KEY.trim());
|
||||
if (!apiKey) fail('missing --api-key <fak_…> (or env FABRIC_API_KEY)');
|
||||
// api key: flag ONLY — never from the environment.
|
||||
const apiKey = typeof a['api-key'] === 'string' && a['api-key'].trim();
|
||||
if (!apiKey) fail('missing --api-key <fak_…> (flag only)');
|
||||
|
||||
const openclawPath = resolve(
|
||||
(typeof a['openclaw-path'] === 'string' && a['openclaw-path']) ||
|
||||
process.env.OPENCLAW_PATH ||
|
||||
join(homedir(), '.openclaw'),
|
||||
);
|
||||
|
||||
// center api base: flag > env > openclaw.json > default
|
||||
let center =
|
||||
(typeof a.center === 'string' && a.center) ||
|
||||
process.env.FABRIC_CENTER_API_BASE ||
|
||||
'';
|
||||
// center api base: flag > openclaw.json > default
|
||||
let center = (typeof a.center === 'string' && a.center) || '';
|
||||
if (!center) {
|
||||
try {
|
||||
const cfg = JSON.parse(readFileSync(join(openclawPath, 'openclaw.json'), 'utf8'));
|
||||
@@ -110,7 +108,6 @@ async function main() {
|
||||
|
||||
const identityFile = resolve(
|
||||
(typeof a['identity-file'] === 'string' && a['identity-file']) ||
|
||||
process.env.FABRIC_IDENTITY_FILE ||
|
||||
join(openclawPath, 'fabric-identity.json'),
|
||||
);
|
||||
|
||||
|
||||
8
dist/fabric/index.js
vendored
8
dist/fabric/index.js
vendored
@@ -5,11 +5,13 @@
|
||||
// the OpenClawPluginApi for runtime startup (transport + tools).
|
||||
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
||||
import { fabricChannelPlugin } from './src/channel.js';
|
||||
import { flushAllFabric } from './src/coalesce.js';
|
||||
import { FabricInbound } from './src/inbound.js';
|
||||
import { listEnabledFabricAccounts } from './src/accounts.js';
|
||||
import { registerFabricTools } from './src/tools.js';
|
||||
import { FabricClient } from './src/fabric-client.js';
|
||||
import { IdentityRegistry } from './src/identity.js';
|
||||
import { syncFabricCommands } from './src/command-sync.js';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
let runtimeRef = null;
|
||||
@@ -57,8 +59,14 @@ export default defineChannelPluginEntry({
|
||||
inbound = new FabricInbound(runtimeRef, api.config, client, identity, api.logger, accounts);
|
||||
void inbound.start();
|
||||
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
|
||||
void syncFabricCommands(client, cfg, accounts, api.logger);
|
||||
});
|
||||
// Note: the per-turn coalesce flush happens deterministically in
|
||||
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
|
||||
// is the real "all deliveries done" boundary; the agent_end hook fires
|
||||
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
|
||||
api.on('gateway_stop', () => {
|
||||
void flushAllFabric();
|
||||
inbound?.stop();
|
||||
inbound = null;
|
||||
});
|
||||
|
||||
11
dist/fabric/src/accounts.js
vendored
11
dist/fabric/src/accounts.js
vendored
@@ -7,6 +7,17 @@ const DEFAULT_CENTER = 'http://localhost:7001/api';
|
||||
function section(cfg) {
|
||||
return cfg.channels?.fabric ?? {};
|
||||
}
|
||||
// The commands-sync shared secret (channel-level only). Empty string when
|
||||
// unconfigured — callers decide how to handle (slash-command sync is then
|
||||
// rejected by the guild).
|
||||
export function resolveCommandsSyncKey(cfg) {
|
||||
return (section(cfg).commandsSyncKey ?? '').trim();
|
||||
}
|
||||
// Whether to coalesce a split agent turn into one Fabric message
|
||||
// (channel-level). Default true.
|
||||
export function resolveCoalesce(cfg) {
|
||||
return (cfg.channels?.fabric ?? {}).coalesce !== false;
|
||||
}
|
||||
export function listFabricAccountIds(cfg) {
|
||||
const accts = section(cfg).accounts ?? {};
|
||||
const ids = Object.keys(accts);
|
||||
|
||||
5
dist/fabric/src/channel.js
vendored
5
dist/fabric/src/channel.js
vendored
@@ -137,7 +137,10 @@ export const fabricChannelPlugin = createChatChannelPlugin({
|
||||
attachedResults: {
|
||||
channel: 'fabric',
|
||||
sendText: async (ctx) => {
|
||||
// openclaw passes config under cfg or config depending on path
|
||||
// openclaw passes config under cfg or config depending on path.
|
||||
// Note: inbound agent replies go through inbound.ts `deliver`
|
||||
// (where turn coalescing happens). This path is for any direct
|
||||
// outbound sends and posts immediately.
|
||||
const cfg = (ctx.cfg ?? ctx.config ?? {});
|
||||
try {
|
||||
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);
|
||||
|
||||
75
dist/fabric/src/coalesce.js
vendored
Normal file
75
dist/fabric/src/coalesce.js
vendored
Normal file
@@ -0,0 +1,75 @@
|
||||
// Deterministic turn coalescer.
|
||||
//
|
||||
// OpenClaw calls the Fabric `deliver` callback once per assistant text
|
||||
// segment; a thinking/tool block between two text blocks is a delivery
|
||||
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
|
||||
// multiple deliver() calls. There is no turn id on the delivery, so we
|
||||
// BUFFER segments by Fabric channelId and post the merged message when the
|
||||
// turn truly ends. The flush is driven by inbound.ts right after
|
||||
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
|
||||
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
|
||||
// agent_end hook, which fires before deliver()). `coalesce=false` posts
|
||||
// each segment immediately.
|
||||
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
|
||||
export function normChannelId(x) {
|
||||
const s = String(x ?? '');
|
||||
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
|
||||
}
|
||||
const pendingByChannel = new Map();
|
||||
async function flushChannel(channelId, reason) {
|
||||
const p = pendingByChannel.get(channelId);
|
||||
if (!p)
|
||||
return;
|
||||
pendingByChannel.delete(channelId);
|
||||
clearTimeout(p.safety);
|
||||
const text = p.parts.join('\n\n').trim();
|
||||
if (!text)
|
||||
return;
|
||||
try {
|
||||
await p.post(text);
|
||||
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
|
||||
}
|
||||
catch (e) {
|
||||
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
|
||||
}
|
||||
}
|
||||
// Buffer one delivered segment (or send immediately when coalesce=false).
|
||||
// `post` performs the real Fabric postMessage with the caller's already
|
||||
// resolved guild/token; on flush it is called once with the merged text.
|
||||
export async function enqueueDelivery(params) {
|
||||
const cid = normChannelId(params.channelId);
|
||||
const text = (params.text ?? '').trim();
|
||||
if (!text)
|
||||
return;
|
||||
if (!params.coalesce) {
|
||||
await params.post(text);
|
||||
return;
|
||||
}
|
||||
const existing = pendingByChannel.get(cid);
|
||||
if (existing) {
|
||||
existing.parts.push(text);
|
||||
existing.post = params.post; // freshest guild/token closure
|
||||
existing.log = params.log;
|
||||
}
|
||||
else {
|
||||
pendingByChannel.set(cid, {
|
||||
parts: [text],
|
||||
post: params.post,
|
||||
log: params.log,
|
||||
safety: setTimeout(() => void flushChannel(cid, 'safety-timeout'), SAFETY_FLUSH_MS),
|
||||
});
|
||||
}
|
||||
}
|
||||
// Called by the agent_end hook with the hook ctx's channelId (bare or
|
||||
// fabric:-prefixed). Deterministic per-turn boundary.
|
||||
export async function flushFabricForChannel(rawChannelId) {
|
||||
const cid = normChannelId(rawChannelId);
|
||||
if (cid)
|
||||
await flushChannel(cid, 'dispatch-end');
|
||||
}
|
||||
// gateway_stop: flush anything still buffered.
|
||||
export async function flushAllFabric() {
|
||||
for (const cid of [...pendingByChannel.keys()]) {
|
||||
await flushChannel(cid, 'gateway_stop');
|
||||
}
|
||||
}
|
||||
109
dist/fabric/src/command-sync.js
vendored
Normal file
109
dist/fabric/src/command-sync.js
vendored
Normal file
@@ -0,0 +1,109 @@
|
||||
// Build the Fabric slash-command catalog from OpenClaw's native-command
|
||||
// specs (the same source Discord uses to register slash commands) and push
|
||||
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
|
||||
// message is delivered normally and OpenClaw's command system executes it —
|
||||
// this catalog only drives the frontend `/` autocomplete, so we resolve any
|
||||
// dynamic arg `choices` to a static snapshot here (like Discord does at
|
||||
// registration time).
|
||||
import { listNativeCommandSpecsForConfig, findCommandByNativeName, resolveCommandArgChoices, } from 'openclaw/plugin-sdk/native-command-registry';
|
||||
import { resolveCommandsSyncKey } from './accounts.js';
|
||||
function normChoice(c) {
|
||||
if (typeof c === 'string')
|
||||
return { value: c, label: c };
|
||||
const o = c;
|
||||
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
|
||||
}
|
||||
export function buildFabricCommandSpecs(cfg) {
|
||||
const specs = listNativeCommandSpecsForConfig(cfg, {
|
||||
provider: 'fabric',
|
||||
});
|
||||
return specs.map((s) => {
|
||||
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
|
||||
const def = findCommandByNativeName(s.name, 'fabric');
|
||||
const args = (s.args ?? []).map((a) => {
|
||||
const raw = a.choices;
|
||||
let choices = null;
|
||||
if (Array.isArray(raw)) {
|
||||
choices = raw.map(normChoice);
|
||||
}
|
||||
else if (typeof raw === 'function' && def) {
|
||||
try {
|
||||
const r = resolveCommandArgChoices({
|
||||
command: def,
|
||||
arg: a,
|
||||
cfg: cfg,
|
||||
provider: 'fabric',
|
||||
});
|
||||
choices = r.map((x) => ({ value: x.value, label: x.label }));
|
||||
}
|
||||
catch {
|
||||
choices = null;
|
||||
}
|
||||
}
|
||||
return {
|
||||
name: String(a.name ?? ''),
|
||||
description: String(a.description ?? ''),
|
||||
type: String(a.type ?? 'string'),
|
||||
required: !!a.required,
|
||||
captureRemaining: !!a.captureRemaining,
|
||||
preferAutocomplete: !!a.preferAutocomplete,
|
||||
choices,
|
||||
};
|
||||
});
|
||||
return {
|
||||
name: s.name,
|
||||
nativeName: s.name,
|
||||
description: s.description,
|
||||
acceptsArgs: !!s.acceptsArgs,
|
||||
args,
|
||||
argsParsing: def?.argsParsing ?? 'positional',
|
||||
};
|
||||
});
|
||||
}
|
||||
// Push the catalog to every guild the known agents belong to (idempotent;
|
||||
// the catalog is OpenClaw-global, so one PUT per guild is enough).
|
||||
export async function syncFabricCommands(client, cfg, accounts, log) {
|
||||
// Guild C-2: the sync key comes from the channel config only (schema
|
||||
// marks it required). Without it the guild rejects the catalog write.
|
||||
const syncKey = resolveCommandsSyncKey(cfg);
|
||||
if (!syncKey) {
|
||||
log.warn('fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
|
||||
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)');
|
||||
return;
|
||||
}
|
||||
let specs;
|
||||
try {
|
||||
specs = buildFabricCommandSpecs(cfg);
|
||||
}
|
||||
catch (err) {
|
||||
log.warn(`fabric: build command specs failed: ${String(err)}`);
|
||||
return;
|
||||
}
|
||||
if (!specs.length)
|
||||
return;
|
||||
const done = new Set();
|
||||
for (const a of accounts) {
|
||||
let session;
|
||||
try {
|
||||
session = await client.agentLogin(a.fabricApiKey);
|
||||
}
|
||||
catch {
|
||||
continue;
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
if (done.has(g.nodeId))
|
||||
continue;
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok)
|
||||
continue;
|
||||
try {
|
||||
await client.syncCommands(g.endpoint, tok, specs, syncKey);
|
||||
done.add(g.nodeId);
|
||||
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
|
||||
}
|
||||
catch (err) {
|
||||
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
55
dist/fabric/src/fabric-client.js
vendored
55
dist/fabric/src/fabric-client.js
vendored
@@ -21,6 +21,25 @@ export class FabricClient {
|
||||
}
|
||||
return (await res.json());
|
||||
}
|
||||
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
|
||||
// (Fabric returns an empty body when a channel has no canvas).
|
||||
async req(method, url, auth, body, extraHeaders) {
|
||||
const res = await fetch(url, {
|
||||
method,
|
||||
headers: {
|
||||
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
|
||||
...(auth ? { authorization: `Bearer ${auth}` } : {}),
|
||||
...(extraHeaders ?? {}),
|
||||
},
|
||||
body: body !== undefined ? JSON.stringify(body) : undefined,
|
||||
});
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '');
|
||||
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
|
||||
}
|
||||
const text = await res.text();
|
||||
return (text ? JSON.parse(text) : null);
|
||||
}
|
||||
// Exchange an agent API key for a Fabric user session (+ guild tokens).
|
||||
agentLogin(apiKey) {
|
||||
return this.post(`${this.centerApiBase}/auth/agent/login`, { apiKey });
|
||||
@@ -50,4 +69,40 @@ export class FabricClient {
|
||||
joinChannel(guildEndpoint, guildToken, channelId) {
|
||||
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
|
||||
}
|
||||
leaveChannel(guildEndpoint, guildToken, channelId) {
|
||||
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
|
||||
}
|
||||
// Register the OpenClaw slash-command catalog with this guild (idempotent
|
||||
// full replace). The frontend GETs it for `/` autocomplete; execution
|
||||
// still flows as a normal /<cmd> message into OpenClaw's command system.
|
||||
syncCommands(guildEndpoint, guildToken, commands, syncKey) {
|
||||
// Guild C-2: the shared key is sourced from the channel config
|
||||
// (channels.fabric.commandsSyncKey) and must equal the guild's
|
||||
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
|
||||
return this.req('PUT', `${guildEndpoint}/api/commands`, guildToken, { commands }, syncKey ? { 'x-commands-sync-key': syncKey } : undefined);
|
||||
}
|
||||
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
|
||||
channelMembers(guildEndpoint, guildToken, channelId) {
|
||||
return this.req('GET', `${guildEndpoint}/api/channels/${channelId}/members`, guildToken);
|
||||
}
|
||||
// ---- channel canvas (one pinned doc per channel) ----
|
||||
canvasUrl(endpoint, channelId) {
|
||||
return `${endpoint}/api/channels/${channelId}/canvas`;
|
||||
}
|
||||
// null when the channel has no canvas
|
||||
getCanvas(endpoint, token, channelId) {
|
||||
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
|
||||
}
|
||||
// share / replace (caller becomes the sharer)
|
||||
shareCanvas(endpoint, token, channelId, body) {
|
||||
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
|
||||
}
|
||||
// update in place (original sharer only — else the guild returns 403)
|
||||
updateCanvas(endpoint, token, channelId, body) {
|
||||
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
|
||||
}
|
||||
// remove ("close") the canvas (original sharer only)
|
||||
removeCanvas(endpoint, token, channelId) {
|
||||
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
|
||||
}
|
||||
}
|
||||
|
||||
96
dist/fabric/src/inbound.js
vendored
96
dist/fabric/src/inbound.js
vendored
@@ -3,6 +3,8 @@ import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { io } from 'socket.io-client';
|
||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||
import { resolveCoalesce } from './accounts.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
export class FabricInbound {
|
||||
core;
|
||||
cfg;
|
||||
@@ -12,6 +14,14 @@ export class FabricInbound {
|
||||
accounts;
|
||||
sockets = [];
|
||||
seen = new Set();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
channelSyncTimers = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -73,6 +83,9 @@ export class FabricInbound {
|
||||
}
|
||||
}
|
||||
stop() {
|
||||
for (const t of this.channelSyncTimers)
|
||||
clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets)
|
||||
s.disconnect();
|
||||
this.sockets = [];
|
||||
@@ -88,19 +101,61 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set();
|
||||
const syncChannels = async (kind) => {
|
||||
let freshTok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
|
||||
const channels = res.ok ? (await res.json()) : [];
|
||||
for (const c of channels)
|
||||
socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
}
|
||||
catch {
|
||||
/* best effort */
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
|
||||
if (!res.ok)
|
||||
return;
|
||||
const channels = (await res.json());
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
|
||||
}
|
||||
else if (added > 0 || removed > 0) {
|
||||
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
|
||||
}
|
||||
}
|
||||
catch {
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId)
|
||||
@@ -199,7 +254,11 @@ export class FabricInbound {
|
||||
// the woken speaker emits a normal message or /no-reply). We still
|
||||
// record the message into the agent's session so it has the full
|
||||
// channel conversation as context whenever it IS later woken.
|
||||
if (m.wakeup !== true) {
|
||||
//
|
||||
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
|
||||
// any message that isn't the agent's own (already filtered above) is
|
||||
// always delivered to the model.
|
||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||
await core.channel.session.recordInboundSession({
|
||||
storePath,
|
||||
@@ -244,8 +303,16 @@ export class FabricInbound {
|
||||
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
|
||||
if (!text || !gt)
|
||||
return;
|
||||
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
|
||||
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
|
||||
// Buffer segments; the merged message is posted right after
|
||||
// dispatch returns (the deterministic turn boundary, see the
|
||||
// finally below). Disable per channel: channels.fabric.coalesce.
|
||||
await enqueueDelivery({
|
||||
channelId,
|
||||
text,
|
||||
coalesce: resolveCoalesce(this.cfg),
|
||||
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
|
||||
log: (m) => this.log.info(m),
|
||||
});
|
||||
},
|
||||
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
|
||||
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
|
||||
@@ -270,5 +337,12 @@ export class FabricInbound {
|
||||
catch (err) {
|
||||
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
||||
}
|
||||
finally {
|
||||
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
|
||||
// resolves AFTER every deliver() call of this turn has run, so the
|
||||
// buffer now holds all segments — flush them as ONE Fabric message.
|
||||
// No hooks, no timers, no idle guessing.
|
||||
await flushFabricForChannel(channelId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
113
dist/fabric/src/tools.js
vendored
113
dist/fabric/src/tools.js
vendored
@@ -89,4 +89,117 @@ export function registerFabricTools(api, client, identity) {
|
||||
return { ok: true, closed: true };
|
||||
},
|
||||
}));
|
||||
// fabric-canvas: share / update / read / close the channel's single
|
||||
// pinned canvas document (one tool, four actions). update/close are
|
||||
// sharer-only server-side (the guild returns 403 otherwise).
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-canvas',
|
||||
description: "Manage a channel's pinned canvas document. action: " +
|
||||
"read (current canvas or null) | share (create/replace; you become " +
|
||||
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
|
||||
'sharer only).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['action', 'guildNodeId', 'channelId'],
|
||||
properties: {
|
||||
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
title: { type: 'string', description: 'share: required; update: optional' },
|
||||
format: {
|
||||
type: 'string',
|
||||
enum: ['md', 'html', 'text'],
|
||||
description: 'share: required; update: optional',
|
||||
},
|
||||
source: {
|
||||
type: 'string',
|
||||
description: 'document body. share: required; update: optional',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const ep = guild.endpoint;
|
||||
switch (p.action) {
|
||||
case 'read': {
|
||||
const canvas = await client.getCanvas(ep, token, p.channelId);
|
||||
return { ok: true, canvas };
|
||||
}
|
||||
case 'share': {
|
||||
if (!p.title || !p.format || p.source === undefined) {
|
||||
return { ok: false, error: 'share requires title, format, and source' };
|
||||
}
|
||||
const canvas = await client.shareCanvas(ep, token, p.channelId, {
|
||||
title: p.title,
|
||||
format: p.format,
|
||||
source: p.source,
|
||||
});
|
||||
return { ok: true, canvas };
|
||||
}
|
||||
case 'update': {
|
||||
const body = {};
|
||||
if (p.title !== undefined)
|
||||
body.title = p.title;
|
||||
if (p.format !== undefined)
|
||||
body.format = p.format;
|
||||
if (p.source !== undefined)
|
||||
body.source = p.source;
|
||||
if (Object.keys(body).length === 0) {
|
||||
return { ok: false, error: 'update needs at least one of title/format/source' };
|
||||
}
|
||||
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
|
||||
return { ok: true, canvas };
|
||||
}
|
||||
case 'close': {
|
||||
await client.removeCanvas(ep, token, p.channelId);
|
||||
return { ok: true, removed: true };
|
||||
}
|
||||
default:
|
||||
return { ok: false, error: `unknown action ${String(p.action)}` };
|
||||
}
|
||||
},
|
||||
}));
|
||||
// fabric-channel: channel membership (one tool, three actions).
|
||||
api.registerTool((ctx) => ({
|
||||
name: 'fabric-channel',
|
||||
description: 'Channel membership. action: members (list channel member userIds) | ' +
|
||||
'join (this agent joins the channel) | leave (this agent leaves).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['action', 'guildNodeId', 'channelId'],
|
||||
properties: {
|
||||
action: { type: 'string', enum: ['members', 'join', 'leave'] },
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
},
|
||||
},
|
||||
execute: async (p) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId)
|
||||
return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const ep = guild.endpoint;
|
||||
switch (p.action) {
|
||||
case 'members': {
|
||||
const members = await client.channelMembers(ep, token, p.channelId);
|
||||
return { ok: true, members };
|
||||
}
|
||||
case 'join': {
|
||||
await client.joinChannel(ep, token, p.channelId);
|
||||
return { ok: true, joined: true };
|
||||
}
|
||||
case 'leave': {
|
||||
await client.leaveChannel(ep, token, p.channelId);
|
||||
return { ok: true, left: true };
|
||||
}
|
||||
default:
|
||||
return { ok: false, error: `unknown action ${String(p.action)}` };
|
||||
}
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
10
index.ts
10
index.ts
@@ -6,11 +6,13 @@
|
||||
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
||||
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
|
||||
import { fabricChannelPlugin } from './src/channel.js';
|
||||
import { flushAllFabric } from './src/coalesce.js';
|
||||
import { FabricInbound } from './src/inbound.js';
|
||||
import { listEnabledFabricAccounts } from './src/accounts.js';
|
||||
import { registerFabricTools } from './src/tools.js';
|
||||
import { FabricClient } from './src/fabric-client.js';
|
||||
import { IdentityRegistry } from './src/identity.js';
|
||||
import { syncFabricCommands } from './src/command-sync.js';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
|
||||
@@ -36,7 +38,7 @@ export default defineChannelPluginEntry({
|
||||
config?: unknown;
|
||||
pluginConfig?: { identityFilePath?: string };
|
||||
logger: { info: (m: string) => void; warn: (m: string) => void };
|
||||
on: (ev: string, fn: () => void) => void;
|
||||
on: (ev: string, fn: (...args: unknown[]) => unknown) => void;
|
||||
registerTool: (d: unknown) => void;
|
||||
};
|
||||
const cfg = (api.config ?? {}) as { channels?: { fabric?: { centerApiBase?: string } } };
|
||||
@@ -82,9 +84,15 @@ export default defineChannelPluginEntry({
|
||||
);
|
||||
void inbound.start();
|
||||
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
|
||||
void syncFabricCommands(client, cfg, accounts, api.logger);
|
||||
});
|
||||
|
||||
// Note: the per-turn coalesce flush happens deterministically in
|
||||
// inbound.ts right after dispatchInboundReplyWithBase resolves (that
|
||||
// is the real "all deliveries done" boundary; the agent_end hook fires
|
||||
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
|
||||
api.on('gateway_stop', () => {
|
||||
void flushAllFabric();
|
||||
inbound?.stop();
|
||||
inbound = null;
|
||||
});
|
||||
|
||||
@@ -39,6 +39,15 @@
|
||||
"type": "string",
|
||||
"description": "Fabric Center API base, e.g. http://localhost:7001/api"
|
||||
},
|
||||
"commandsSyncKey": {
|
||||
"type": "string",
|
||||
"minLength": 1,
|
||||
"description": "Shared secret that must equal the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY. Required to register the slash-command catalog (Guild C-2). Read it from the guild via: docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js"
|
||||
},
|
||||
"coalesce": {
|
||||
"type": "boolean",
|
||||
"description": "Merge a split agent turn (text → thinking/tool → text) into ONE Fabric message. Flushed deterministically on the agent_end hook. Default true; false = raw per-segment posting."
|
||||
},
|
||||
"dmSecurity": { "type": "string" },
|
||||
"dmPolicy": { "type": "string" },
|
||||
"enabled": { "type": "boolean" },
|
||||
@@ -59,10 +68,12 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["commandsSyncKey"]
|
||||
},
|
||||
"uiHints": {
|
||||
"centerApiBase": { "label": "Center API base" }
|
||||
"centerApiBase": { "label": "Center API base" },
|
||||
"commandsSyncKey": { "label": "Commands sync key" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,15 @@ export type FabricAccountConfig = {
|
||||
|
||||
export type FabricChannelConfig = {
|
||||
centerApiBase?: string;
|
||||
// Shared secret matching the guild's FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY
|
||||
// (Guild C-2). Required by the channel config schema; sourced from config
|
||||
// only — never from the environment.
|
||||
commandsSyncKey?: string;
|
||||
// Coalesce an agent turn that OpenClaw split into multiple deliveries
|
||||
// (text → thinking/tool → text => N sendText calls) into ONE Fabric
|
||||
// message. The flush boundary is the deterministic `agent_end` hook (not
|
||||
// a timer). Default true; set false for raw per-segment posting.
|
||||
coalesce?: boolean;
|
||||
accounts?: Record<string, FabricAccountConfig>;
|
||||
defaultAccount?: string;
|
||||
} & FabricAccountConfig;
|
||||
@@ -35,6 +44,19 @@ function section(cfg: Cfg): FabricChannelConfig {
|
||||
return cfg.channels?.fabric ?? {};
|
||||
}
|
||||
|
||||
// The commands-sync shared secret (channel-level only). Empty string when
|
||||
// unconfigured — callers decide how to handle (slash-command sync is then
|
||||
// rejected by the guild).
|
||||
export function resolveCommandsSyncKey(cfg: Cfg): string {
|
||||
return (section(cfg).commandsSyncKey ?? '').trim();
|
||||
}
|
||||
|
||||
// Whether to coalesce a split agent turn into one Fabric message
|
||||
// (channel-level). Default true.
|
||||
export function resolveCoalesce(cfg: Cfg): boolean {
|
||||
return (cfg.channels?.fabric ?? {}).coalesce !== false;
|
||||
}
|
||||
|
||||
export function listFabricAccountIds(cfg: Cfg): string[] {
|
||||
const accts = section(cfg).accounts ?? {};
|
||||
const ids = Object.keys(accts);
|
||||
|
||||
@@ -159,7 +159,10 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
|
||||
cfg?: unknown;
|
||||
config?: unknown;
|
||||
}) => {
|
||||
// openclaw passes config under cfg or config depending on path
|
||||
// openclaw passes config under cfg or config depending on path.
|
||||
// Note: inbound agent replies go through inbound.ts `deliver`
|
||||
// (where turn coalescing happens). This path is for any direct
|
||||
// outbound sends and posts immediately.
|
||||
const cfg = (ctx.cfg ?? ctx.config ?? {}) as AnyCfg;
|
||||
try {
|
||||
const r = await sendToFabric(cfg, ctx.accountId ?? null, ctx.to, ctx.text);
|
||||
|
||||
93
src/coalesce.ts
Normal file
93
src/coalesce.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
// Deterministic turn coalescer.
|
||||
//
|
||||
// OpenClaw calls the Fabric `deliver` callback once per assistant text
|
||||
// segment; a thinking/tool block between two text blocks is a delivery
|
||||
// boundary, so one agent turn of `text → thinking/tool → text` arrives as
|
||||
// multiple deliver() calls. There is no turn id on the delivery, so we
|
||||
// BUFFER segments by Fabric channelId and post the merged message when the
|
||||
// turn truly ends. The flush is driven by inbound.ts right after
|
||||
// `dispatchInboundReplyWithBase` resolves — that only happens AFTER every
|
||||
// deliver() of the turn, a deterministic boundary (NOT a timer, NOT the
|
||||
// agent_end hook, which fires before deliver()). `coalesce=false` posts
|
||||
// each segment immediately.
|
||||
|
||||
const SAFETY_FLUSH_MS = 120_000; // leak-guard only; not the flush mechanism
|
||||
|
||||
export function normChannelId(x: string | null | undefined): string {
|
||||
const s = String(x ?? '');
|
||||
return s.startsWith('fabric:') ? s.slice('fabric:'.length) : s;
|
||||
}
|
||||
|
||||
type Pending = {
|
||||
parts: string[];
|
||||
post: (text: string) => Promise<void>;
|
||||
log?: (m: string) => void;
|
||||
safety: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
const pendingByChannel = new Map<string, Pending>();
|
||||
|
||||
async function flushChannel(channelId: string, reason: string): Promise<void> {
|
||||
const p = pendingByChannel.get(channelId);
|
||||
if (!p) return;
|
||||
pendingByChannel.delete(channelId);
|
||||
clearTimeout(p.safety);
|
||||
const text = p.parts.join('\n\n').trim();
|
||||
if (!text) return;
|
||||
try {
|
||||
await p.post(text);
|
||||
p.log?.(`fabric: flushed ${p.parts.length} segment(s) channel=${channelId} (${reason})`);
|
||||
} catch (e) {
|
||||
p.log?.(`fabric: flush FAILED channel=${channelId} (${reason}): ${String(e)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Buffer one delivered segment (or send immediately when coalesce=false).
|
||||
// `post` performs the real Fabric postMessage with the caller's already
|
||||
// resolved guild/token; on flush it is called once with the merged text.
|
||||
export async function enqueueDelivery(params: {
|
||||
channelId: string;
|
||||
text: string;
|
||||
coalesce: boolean;
|
||||
post: (text: string) => Promise<void>;
|
||||
log?: (m: string) => void;
|
||||
}): Promise<void> {
|
||||
const cid = normChannelId(params.channelId);
|
||||
const text = (params.text ?? '').trim();
|
||||
if (!text) return;
|
||||
if (!params.coalesce) {
|
||||
await params.post(text);
|
||||
return;
|
||||
}
|
||||
const existing = pendingByChannel.get(cid);
|
||||
if (existing) {
|
||||
existing.parts.push(text);
|
||||
existing.post = params.post; // freshest guild/token closure
|
||||
existing.log = params.log;
|
||||
} else {
|
||||
pendingByChannel.set(cid, {
|
||||
parts: [text],
|
||||
post: params.post,
|
||||
log: params.log,
|
||||
safety: setTimeout(
|
||||
() => void flushChannel(cid, 'safety-timeout'),
|
||||
SAFETY_FLUSH_MS,
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Called by the agent_end hook with the hook ctx's channelId (bare or
|
||||
// fabric:-prefixed). Deterministic per-turn boundary.
|
||||
export async function flushFabricForChannel(
|
||||
rawChannelId: string | null | undefined,
|
||||
): Promise<void> {
|
||||
const cid = normChannelId(rawChannelId);
|
||||
if (cid) await flushChannel(cid, 'dispatch-end');
|
||||
}
|
||||
|
||||
// gateway_stop: flush anything still buffered.
|
||||
export async function flushAllFabric(): Promise<void> {
|
||||
for (const cid of [...pendingByChannel.keys()]) {
|
||||
await flushChannel(cid, 'gateway_stop');
|
||||
}
|
||||
}
|
||||
149
src/command-sync.ts
Normal file
149
src/command-sync.ts
Normal file
@@ -0,0 +1,149 @@
|
||||
// Build the Fabric slash-command catalog from OpenClaw's native-command
|
||||
// specs (the same source Discord uses to register slash commands) and push
|
||||
// it to each connected guild. Fabric is a TEXT-command surface: a /<cmd>
|
||||
// message is delivered normally and OpenClaw's command system executes it —
|
||||
// this catalog only drives the frontend `/` autocomplete, so we resolve any
|
||||
// dynamic arg `choices` to a static snapshot here (like Discord does at
|
||||
// registration time).
|
||||
import {
|
||||
listNativeCommandSpecsForConfig,
|
||||
findCommandByNativeName,
|
||||
resolveCommandArgChoices,
|
||||
} from 'openclaw/plugin-sdk/native-command-registry';
|
||||
import type { FabricClient } from './fabric-client.js';
|
||||
import { resolveCommandsSyncKey } from './accounts.js';
|
||||
|
||||
type Logger = { info: (m: string) => void; warn: (m: string) => void };
|
||||
|
||||
type FabricArg = {
|
||||
name: string;
|
||||
description: string;
|
||||
type: string;
|
||||
required: boolean;
|
||||
captureRemaining: boolean;
|
||||
preferAutocomplete: boolean;
|
||||
choices: Array<{ value: string; label: string }> | null;
|
||||
};
|
||||
type FabricCommand = {
|
||||
name: string;
|
||||
nativeName: string;
|
||||
description: string;
|
||||
acceptsArgs: boolean;
|
||||
args: FabricArg[];
|
||||
argsParsing: string;
|
||||
};
|
||||
|
||||
function normChoice(c: unknown): { value: string; label: string } {
|
||||
if (typeof c === 'string') return { value: c, label: c };
|
||||
const o = c as { value?: string; label?: string };
|
||||
return { value: String(o.value ?? ''), label: String(o.label ?? o.value ?? '') };
|
||||
}
|
||||
|
||||
export function buildFabricCommandSpecs(cfg: unknown): FabricCommand[] {
|
||||
const specs = listNativeCommandSpecsForConfig(cfg as never, {
|
||||
provider: 'fabric',
|
||||
}) as Array<{
|
||||
name: string;
|
||||
description: string;
|
||||
acceptsArgs?: boolean;
|
||||
args?: Array<Record<string, unknown>>;
|
||||
}>;
|
||||
|
||||
return specs.map((s) => {
|
||||
// ChatCommandDefinition (for argsParsing + dynamic choices provider)
|
||||
const def = findCommandByNativeName(s.name, 'fabric') as
|
||||
| { argsParsing?: string; args?: Array<Record<string, unknown>> }
|
||||
| undefined;
|
||||
|
||||
const args: FabricArg[] = (s.args ?? []).map((a) => {
|
||||
const raw = a.choices;
|
||||
let choices: Array<{ value: string; label: string }> | null = null;
|
||||
if (Array.isArray(raw)) {
|
||||
choices = raw.map(normChoice);
|
||||
} else if (typeof raw === 'function' && def) {
|
||||
try {
|
||||
const r = resolveCommandArgChoices({
|
||||
command: def as never,
|
||||
arg: a as never,
|
||||
cfg: cfg as never,
|
||||
provider: 'fabric',
|
||||
}) as Array<{ value: string; label: string }>;
|
||||
choices = r.map((x) => ({ value: x.value, label: x.label }));
|
||||
} catch {
|
||||
choices = null;
|
||||
}
|
||||
}
|
||||
return {
|
||||
name: String(a.name ?? ''),
|
||||
description: String(a.description ?? ''),
|
||||
type: String(a.type ?? 'string'),
|
||||
required: !!a.required,
|
||||
captureRemaining: !!a.captureRemaining,
|
||||
preferAutocomplete: !!a.preferAutocomplete,
|
||||
choices,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
name: s.name,
|
||||
nativeName: s.name,
|
||||
description: s.description,
|
||||
acceptsArgs: !!s.acceptsArgs,
|
||||
args,
|
||||
argsParsing: def?.argsParsing ?? 'positional',
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
// Push the catalog to every guild the known agents belong to (idempotent;
|
||||
// the catalog is OpenClaw-global, so one PUT per guild is enough).
|
||||
export async function syncFabricCommands(
|
||||
client: FabricClient,
|
||||
cfg: unknown,
|
||||
accounts: Array<{ agentId: string; fabricApiKey: string }>,
|
||||
log: Logger,
|
||||
): Promise<void> {
|
||||
// Guild C-2: the sync key comes from the channel config only (schema
|
||||
// marks it required). Without it the guild rejects the catalog write.
|
||||
const syncKey = resolveCommandsSyncKey(cfg as never);
|
||||
if (!syncKey) {
|
||||
log.warn(
|
||||
'fabric: channels.fabric.commandsSyncKey is not set — skipping ' +
|
||||
'slash-command sync (set it to the guild FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY)',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let specs: FabricCommand[];
|
||||
try {
|
||||
specs = buildFabricCommandSpecs(cfg);
|
||||
} catch (err) {
|
||||
log.warn(`fabric: build command specs failed: ${String(err)}`);
|
||||
return;
|
||||
}
|
||||
if (!specs.length) return;
|
||||
|
||||
const done = new Set<string>();
|
||||
for (const a of accounts) {
|
||||
let session;
|
||||
try {
|
||||
session = await client.agentLogin(a.fabricApiKey);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
if (done.has(g.nodeId)) continue;
|
||||
const tok = session.guildAccessTokens.find(
|
||||
(t) => t.guildNodeId === g.nodeId,
|
||||
)?.token;
|
||||
if (!tok) continue;
|
||||
try {
|
||||
await client.syncCommands(g.endpoint, tok, specs, syncKey);
|
||||
done.add(g.nodeId);
|
||||
log.info(`fabric: synced ${specs.length} slash command(s) -> ${g.nodeId}`);
|
||||
} catch (err) {
|
||||
log.warn(`fabric: command sync failed ${g.nodeId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,32 @@ export class FabricClient {
|
||||
return (await res.json()) as T;
|
||||
}
|
||||
|
||||
// Generic JSON request (GET/PUT/PATCH/DELETE). Empty 2xx body -> null
|
||||
// (Fabric returns an empty body when a channel has no canvas).
|
||||
private async req<T>(
|
||||
method: string,
|
||||
url: string,
|
||||
auth?: string,
|
||||
body?: unknown,
|
||||
extraHeaders?: Record<string, string>,
|
||||
): Promise<T> {
|
||||
const res = await fetch(url, {
|
||||
method,
|
||||
headers: {
|
||||
...(body !== undefined ? { 'content-type': 'application/json' } : {}),
|
||||
...(auth ? { authorization: `Bearer ${auth}` } : {}),
|
||||
...(extraHeaders ?? {}),
|
||||
},
|
||||
body: body !== undefined ? JSON.stringify(body) : undefined,
|
||||
});
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '');
|
||||
throw new Error(`${method} ${url} -> ${res.status} ${text}`);
|
||||
}
|
||||
const text = await res.text();
|
||||
return (text ? JSON.parse(text) : null) as T;
|
||||
}
|
||||
|
||||
// Exchange an agent API key for a Fabric user session (+ guild tokens).
|
||||
agentLogin(apiKey: string): Promise<FabricSession> {
|
||||
return this.post<FabricSession>(`${this.centerApiBase}/auth/agent/login`, { apiKey });
|
||||
@@ -86,4 +112,95 @@ export class FabricClient {
|
||||
joinChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
|
||||
return this.post(`${guildEndpoint}/api/channels/${channelId}/join`, {}, guildToken);
|
||||
}
|
||||
|
||||
leaveChannel(guildEndpoint: string, guildToken: string, channelId: string): Promise<unknown> {
|
||||
return this.post(`${guildEndpoint}/api/channels/${channelId}/leave`, {}, guildToken);
|
||||
}
|
||||
|
||||
// Register the OpenClaw slash-command catalog with this guild (idempotent
|
||||
// full replace). The frontend GETs it for `/` autocomplete; execution
|
||||
// still flows as a normal /<cmd> message into OpenClaw's command system.
|
||||
syncCommands(
|
||||
guildEndpoint: string,
|
||||
guildToken: string,
|
||||
commands: unknown[],
|
||||
syncKey: string,
|
||||
): Promise<unknown> {
|
||||
// Guild C-2: the shared key is sourced from the channel config
|
||||
// (channels.fabric.commandsSyncKey) and must equal the guild's
|
||||
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY for the catalog write.
|
||||
return this.req(
|
||||
'PUT',
|
||||
`${guildEndpoint}/api/commands`,
|
||||
guildToken,
|
||||
{ commands },
|
||||
syncKey ? { 'x-commands-sync-key': syncKey } : undefined,
|
||||
);
|
||||
}
|
||||
|
||||
// [{ userId, bypass }] — bypass is true only for discuss/work bypass-list
|
||||
channelMembers(
|
||||
guildEndpoint: string,
|
||||
guildToken: string,
|
||||
channelId: string,
|
||||
): Promise<Array<{ userId: string; bypass?: boolean }>> {
|
||||
return this.req(
|
||||
'GET',
|
||||
`${guildEndpoint}/api/channels/${channelId}/members`,
|
||||
guildToken,
|
||||
);
|
||||
}
|
||||
|
||||
// ---- channel canvas (one pinned doc per channel) ----
|
||||
|
||||
private canvasUrl(endpoint: string, channelId: string): string {
|
||||
return `${endpoint}/api/channels/${channelId}/canvas`;
|
||||
}
|
||||
|
||||
// null when the channel has no canvas
|
||||
getCanvas(
|
||||
endpoint: string,
|
||||
token: string,
|
||||
channelId: string,
|
||||
): Promise<FabricCanvas | null> {
|
||||
return this.req('GET', this.canvasUrl(endpoint, channelId), token);
|
||||
}
|
||||
|
||||
// share / replace (caller becomes the sharer)
|
||||
shareCanvas(
|
||||
endpoint: string,
|
||||
token: string,
|
||||
channelId: string,
|
||||
body: CanvasInput,
|
||||
): Promise<FabricCanvas> {
|
||||
return this.req('PUT', this.canvasUrl(endpoint, channelId), token, body);
|
||||
}
|
||||
|
||||
// update in place (original sharer only — else the guild returns 403)
|
||||
updateCanvas(
|
||||
endpoint: string,
|
||||
token: string,
|
||||
channelId: string,
|
||||
body: Partial<CanvasInput>,
|
||||
): Promise<FabricCanvas> {
|
||||
return this.req('PATCH', this.canvasUrl(endpoint, channelId), token, body);
|
||||
}
|
||||
|
||||
// remove ("close") the canvas (original sharer only)
|
||||
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
|
||||
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
|
||||
}
|
||||
}
|
||||
|
||||
export type CanvasFormat = 'md' | 'html' | 'text';
|
||||
export type CanvasInput = { title: string; format: CanvasFormat; source: string };
|
||||
export type FabricCanvas = {
|
||||
channelId: string;
|
||||
sharerUserId: string;
|
||||
title: string;
|
||||
format: CanvasFormat;
|
||||
source: string;
|
||||
version: number;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
101
src/inbound.ts
101
src/inbound.ts
@@ -5,6 +5,8 @@ import { io, type Socket } from 'socket.io-client';
|
||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||
import type { FabricClient, FabricSession } from './fabric-client.js';
|
||||
import type { IdentityRegistry } from './identity.js';
|
||||
import { resolveCoalesce } from './accounts.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
|
||||
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
|
||||
// channels (nextcloud-talk) drive the kernel:
|
||||
@@ -42,11 +44,22 @@ type FabricMessage = {
|
||||
channelId?: string;
|
||||
attachments?: FabricAttachment[];
|
||||
wakeup?: boolean;
|
||||
// x-type of the channel (sent on message.created). 'dm' bypasses the
|
||||
// wakeup gate: any message that isn't the agent's own is delivered.
|
||||
xType?: string;
|
||||
};
|
||||
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
private seen = new Set<string>();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
private channelSyncTimers: NodeJS.Timeout[] = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -114,6 +127,8 @@ export class FabricInbound {
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const t of this.channelSyncTimers) clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
}
|
||||
@@ -128,20 +143,67 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set<string>();
|
||||
const syncChannels = async (kind: 'initial' | 'resync') => {
|
||||
let freshTok: string | undefined;
|
||||
try {
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
} catch {
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(
|
||||
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
|
||||
{ headers: { authorization: `Bearer ${tok}` } },
|
||||
{ headers: { authorization: `Bearer ${authTok}` } },
|
||||
);
|
||||
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
|
||||
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
if (!res.ok) return;
|
||||
const channels = (await res.json()) as Array<{ id: string }>;
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
|
||||
);
|
||||
} else if (added > 0 || removed > 0) {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
/* best effort */
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
const syncTimer = setInterval(
|
||||
() => void syncChannels('resync'),
|
||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||
);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
@@ -250,7 +312,11 @@ export class FabricInbound {
|
||||
// the woken speaker emits a normal message or /no-reply). We still
|
||||
// record the message into the agent's session so it has the full
|
||||
// channel conversation as context whenever it IS later woken.
|
||||
if (m.wakeup !== true) {
|
||||
//
|
||||
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
|
||||
// any message that isn't the agent's own (already filtered above) is
|
||||
// always delivered to the model.
|
||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||
await core.channel.session.recordInboundSession({
|
||||
storePath,
|
||||
@@ -301,8 +367,17 @@ export class FabricInbound {
|
||||
const text = (payload?.text ?? '').trim();
|
||||
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
|
||||
if (!text || !gt) return;
|
||||
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
|
||||
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
|
||||
// Buffer segments; the merged message is posted right after
|
||||
// dispatch returns (the deterministic turn boundary, see the
|
||||
// finally below). Disable per channel: channels.fabric.coalesce.
|
||||
await enqueueDelivery({
|
||||
channelId,
|
||||
text,
|
||||
coalesce: resolveCoalesce(this.cfg as never),
|
||||
post: (t) =>
|
||||
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
|
||||
log: (m) => this.log.info(m),
|
||||
});
|
||||
},
|
||||
onRecordError: (err: unknown) =>
|
||||
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
|
||||
@@ -327,6 +402,12 @@ export class FabricInbound {
|
||||
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
|
||||
} catch (err) {
|
||||
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
||||
} finally {
|
||||
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
|
||||
// resolves AFTER every deliver() call of this turn has run, so the
|
||||
// buffer now holds all segments — flush them as ONE Fabric message.
|
||||
// No hooks, no timers, no idle guessing.
|
||||
await flushFabricForChannel(channelId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
123
src/tools.ts
123
src/tools.ts
@@ -120,4 +120,127 @@ export function registerFabricTools(
|
||||
return { ok: true, closed: true };
|
||||
},
|
||||
}));
|
||||
|
||||
// fabric-canvas: share / update / read / close the channel's single
|
||||
// pinned canvas document (one tool, four actions). update/close are
|
||||
// sharer-only server-side (the guild returns 403 otherwise).
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-canvas',
|
||||
description:
|
||||
"Manage a channel's pinned canvas document. action: " +
|
||||
"read (current canvas or null) | share (create/replace; you become " +
|
||||
'the sharer) | update (edit in place; sharer only) | close (remove; ' +
|
||||
'sharer only).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['action', 'guildNodeId', 'channelId'],
|
||||
properties: {
|
||||
action: { type: 'string', enum: ['read', 'share', 'update', 'close'] },
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
title: { type: 'string', description: 'share: required; update: optional' },
|
||||
format: {
|
||||
type: 'string',
|
||||
enum: ['md', 'html', 'text'],
|
||||
description: 'share: required; update: optional',
|
||||
},
|
||||
source: {
|
||||
type: 'string',
|
||||
description: 'document body. share: required; update: optional',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (p: {
|
||||
action: 'read' | 'share' | 'update' | 'close';
|
||||
guildNodeId: string;
|
||||
channelId: string;
|
||||
title?: string;
|
||||
format?: 'md' | 'html' | 'text';
|
||||
source?: string;
|
||||
}) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const ep = guild.endpoint;
|
||||
switch (p.action) {
|
||||
case 'read': {
|
||||
const canvas = await client.getCanvas(ep, token, p.channelId);
|
||||
return { ok: true, canvas };
|
||||
}
|
||||
case 'share': {
|
||||
if (!p.title || !p.format || p.source === undefined) {
|
||||
return { ok: false, error: 'share requires title, format, and source' };
|
||||
}
|
||||
const canvas = await client.shareCanvas(ep, token, p.channelId, {
|
||||
title: p.title,
|
||||
format: p.format,
|
||||
source: p.source,
|
||||
});
|
||||
return { ok: true, canvas };
|
||||
}
|
||||
case 'update': {
|
||||
const body: Partial<{ title: string; format: 'md' | 'html' | 'text'; source: string }> = {};
|
||||
if (p.title !== undefined) body.title = p.title;
|
||||
if (p.format !== undefined) body.format = p.format;
|
||||
if (p.source !== undefined) body.source = p.source;
|
||||
if (Object.keys(body).length === 0) {
|
||||
return { ok: false, error: 'update needs at least one of title/format/source' };
|
||||
}
|
||||
const canvas = await client.updateCanvas(ep, token, p.channelId, body);
|
||||
return { ok: true, canvas };
|
||||
}
|
||||
case 'close': {
|
||||
await client.removeCanvas(ep, token, p.channelId);
|
||||
return { ok: true, removed: true };
|
||||
}
|
||||
default:
|
||||
return { ok: false, error: `unknown action ${String(p.action)}` };
|
||||
}
|
||||
},
|
||||
}));
|
||||
|
||||
// fabric-channel: channel membership (one tool, three actions).
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
name: 'fabric-channel',
|
||||
description:
|
||||
'Channel membership. action: members (list channel member userIds) | ' +
|
||||
'join (this agent joins the channel) | leave (this agent leaves).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['action', 'guildNodeId', 'channelId'],
|
||||
properties: {
|
||||
action: { type: 'string', enum: ['members', 'join', 'leave'] },
|
||||
guildNodeId: { type: 'string' },
|
||||
channelId: { type: 'string' },
|
||||
},
|
||||
},
|
||||
execute: async (p: {
|
||||
action: 'members' | 'join' | 'leave';
|
||||
guildNodeId: string;
|
||||
channelId: string;
|
||||
}) => {
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||
const ep = guild.endpoint;
|
||||
switch (p.action) {
|
||||
case 'members': {
|
||||
const members = await client.channelMembers(ep, token, p.channelId);
|
||||
return { ok: true, members };
|
||||
}
|
||||
case 'join': {
|
||||
await client.joinChannel(ep, token, p.channelId);
|
||||
return { ok: true, joined: true };
|
||||
}
|
||||
case 'leave': {
|
||||
await client.leaveChannel(ep, token, p.channelId);
|
||||
return { ok: true, left: true };
|
||||
}
|
||||
default:
|
||||
return { ok: false, error: `unknown action ${String(p.action)}` };
|
||||
}
|
||||
},
|
||||
}));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user