Compare commits
4 Commits
fix/routin
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 40c9cb5740 | |||
| fc2ab628b2 | |||
| 893b93198d | |||
| 260d50196b |
20
index.ts
20
index.ts
@@ -96,13 +96,29 @@ export default defineChannelPluginEntry({
|
||||
// fall back to "assume DM" — fail closed on unknown.
|
||||
{
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
_G['__fabric'] = { getChannelType };
|
||||
_G['__fabric'] = {
|
||||
getChannelType,
|
||||
// Dynamic-subscription bridges: tools (notably `fabric-register`)
|
||||
// call these to add/remove an account's inbound socket without
|
||||
// a gateway restart. Both delegate to the live FabricInbound
|
||||
// instance via the module-level `inbound` closure variable; the
|
||||
// closures stay valid across gateway_start / gateway_stop
|
||||
// because we re-assign the variable, not the property.
|
||||
addAccount: async (entry: { agentId: string; fabricApiKey: string }) => {
|
||||
if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)');
|
||||
await inbound.addAccount(entry);
|
||||
},
|
||||
removeAccount: (agentId: string) => {
|
||||
if (!inbound) return;
|
||||
inbound.removeAccount(agentId);
|
||||
},
|
||||
};
|
||||
// Flush channel-meta cache when the gateway shuts down so
|
||||
// recently-recorded xType entries don't get lost.
|
||||
api.on('gateway_stop', () => {
|
||||
try { flushChannelMeta(); } catch { /* ignore */ }
|
||||
});
|
||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)');
|
||||
}
|
||||
|
||||
api.on('gateway_start', () => {
|
||||
|
||||
@@ -77,6 +77,12 @@ function findFabricBindingAccountId(cfg: unknown, agentId: string): string | und
|
||||
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
|
||||
// to tear down ONE agent without restarting the whole inbound. New
|
||||
// sockets get appended on `connectAgent`; both maps are emptied by
|
||||
// `stop()`.
|
||||
private socketsByAgent = new Map<string, Socket[]>();
|
||||
private timersByAgent = new Map<string, NodeJS.Timeout[]>();
|
||||
private seen = new Set<string>();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
@@ -287,6 +293,71 @@ export class FabricInbound {
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
this.socketsByAgent.clear();
|
||||
this.timersByAgent.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Bring up ONE new account at runtime (no gateway restart).
|
||||
*
|
||||
* Mirrors what `start()` does per entry: login to Center, upsert the
|
||||
* identity registry, open the socket(s). Idempotent: re-calling with
|
||||
* the same agentId tears down the previous socket(s) first so the
|
||||
* fresh apikey replaces the stale one (recruitment onboard rotates
|
||||
* the agent from the shared `interviewee` placeholder to a real
|
||||
* per-agent apikey — the old `interviewee` socket must drop before
|
||||
* the new one comes up or the agent ends up subscribed to both users
|
||||
* at once).
|
||||
*
|
||||
* Used by the `fabric-register` openclaw tool to make recruitment
|
||||
* end-to-end without a gateway restart between `new-agent` and the
|
||||
* interview's sub-discussion dispatch.
|
||||
*/
|
||||
async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise<void> {
|
||||
if (this.socketsByAgent.has(entry.agentId)) {
|
||||
this.removeAccount(entry.agentId);
|
||||
}
|
||||
const session = await this.client.agentLogin(entry.fabricApiKey);
|
||||
this.identity.upsert({
|
||||
agentId: entry.agentId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
fabricUserId: session.user.id,
|
||||
displayName: session.user.name,
|
||||
});
|
||||
await this.connectAgent(entry.agentId, session);
|
||||
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tear down ONE account's sockets + timers without touching others.
|
||||
* Caller is responsible for any identity-registry cleanup; this only
|
||||
* drops the live socket subscription so the agent stops receiving
|
||||
* Fabric pushes.
|
||||
*/
|
||||
removeAccount(agentId: string): void {
|
||||
const sockets = this.socketsByAgent.get(agentId);
|
||||
if (sockets) {
|
||||
for (const s of sockets) {
|
||||
try { s.disconnect(); } catch { /* socket already dead */ }
|
||||
// Also remove from the flat list so `stop()` doesn't double-close.
|
||||
const idx = this.sockets.indexOf(s);
|
||||
if (idx !== -1) this.sockets.splice(idx, 1);
|
||||
}
|
||||
this.socketsByAgent.delete(agentId);
|
||||
}
|
||||
const timers = this.timersByAgent.get(agentId);
|
||||
if (timers) {
|
||||
for (const t of timers) {
|
||||
clearInterval(t);
|
||||
const idx = this.channelSyncTimers.indexOf(t);
|
||||
if (idx !== -1) this.channelSyncTimers.splice(idx, 1);
|
||||
}
|
||||
this.timersByAgent.delete(agentId);
|
||||
}
|
||||
this.firstGuildByAgent.delete(agentId);
|
||||
this.tokenCache.delete(agentId);
|
||||
this.agentStatusCache.delete(agentId);
|
||||
this.log.info(`fabric: agent ${agentId} dynamically removed`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -452,6 +523,9 @@ export class FabricInbound {
|
||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||
);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
const agentTimers = this.timersByAgent.get(agentId) ?? [];
|
||||
agentTimers.push(syncTimer);
|
||||
this.timersByAgent.set(agentId, agentTimers);
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
@@ -497,6 +571,11 @@ export class FabricInbound {
|
||||
});
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
// Track per-agent so addAccount/removeAccount can teardown
|
||||
// independently without disturbing other agents.
|
||||
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
|
||||
agentSockets.push(socket);
|
||||
this.socketsByAgent.set(agentId, agentSockets);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
141
src/tools.ts
141
src/tools.ts
@@ -35,6 +35,30 @@ const GREETING_DELAY_MS = (() => {
|
||||
return Number.isFinite(v) && v >= 0 ? v : 500;
|
||||
})();
|
||||
|
||||
// PaddedCell tools-cache integration (cross-runtime alignment with
|
||||
// Plexum decision #37; openclaw lacks a before_outgoing_tools hook so we
|
||||
// opt in per-plugin). All Fabric tools are gate-able by default — agents
|
||||
// must `dynamic-cache-tools` them before the model sees them.
|
||||
// `gatedRegister` wraps openclaw's api.registerTool: each factory invocation
|
||||
// (1) registers the tool's name+description into PaddedCell's catalog
|
||||
// so dynamic-list-tools / dynamic-search-tools surface it
|
||||
// (2) returns null if the per-session cache doesn't include the name
|
||||
// Buffer-drain pattern handles plugin load order — if PaddedCell hasn't
|
||||
// loaded yet, the stub default-allows + queues catalog entries; PaddedCell
|
||||
// drains the queue when it installs the real API.
|
||||
function ensurePaddedStub(): void {
|
||||
const g = globalThis as unknown as { __padded?: Record<string, unknown> & { _pendingCatalog?: Array<{ name: string; description: string }>; allowTool?: unknown; registerCatalogEntry?: unknown } };
|
||||
if (g.__padded) return;
|
||||
const buf: Array<{ name: string; description: string }> = [];
|
||||
g.__padded = {
|
||||
_pendingCatalog: buf,
|
||||
registerCatalogEntry(name: string, description: string): void {
|
||||
buf.push({ name, description });
|
||||
},
|
||||
allowTool: () => true, // fail-open until PaddedCell installs the real API
|
||||
};
|
||||
}
|
||||
|
||||
export function registerFabricTools(
|
||||
api: ToolApi,
|
||||
client: FabricClient,
|
||||
@@ -42,14 +66,29 @@ export function registerFabricTools(
|
||||
store: SubDiscussionStore,
|
||||
cfg: ToolsCfg,
|
||||
): void {
|
||||
ensurePaddedStub();
|
||||
const seenForCatalog = new Set<string>();
|
||||
const gatedRegister = (factory: (ctx: Ctx) => unknown | null): void => {
|
||||
api.registerTool((ctx: Ctx) => {
|
||||
const tool = factory(ctx) as { name?: string; description?: string } | null;
|
||||
if (!tool || !tool.name) return tool;
|
||||
const padded = (globalThis as unknown as { __padded?: { allowTool?: (n: string, c: Ctx) => boolean; registerCatalogEntry?: (n: string, d: string) => void } }).__padded;
|
||||
if (padded?.registerCatalogEntry && !seenForCatalog.has(tool.name)) {
|
||||
padded.registerCatalogEntry(tool.name, tool.description ?? '');
|
||||
seenForCatalog.add(tool.name);
|
||||
}
|
||||
if (padded?.allowTool && !padded.allowTool(tool.name, ctx)) return null;
|
||||
return tool;
|
||||
});
|
||||
};
|
||||
// Resolve the calling agent's Fabric session + a guild's token/endpoint.
|
||||
const ctxGuild = async (agentId: string, guildNodeId: string) => {
|
||||
const entry = identity.findByAgentId(agentId);
|
||||
if (!entry)
|
||||
throw new Error(
|
||||
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
|
||||
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
|
||||
`channels.fabric.accounts.${agentId}); then restart the gateway`,
|
||||
`agent ${agentId} not registered — call the openclaw \`fabric-register\` ` +
|
||||
`tool (apiKey: <fak_…>, agentId: ${agentId}); the dynamic-subscription ` +
|
||||
`path brings the socket up immediately, no gateway restart needed`,
|
||||
);
|
||||
const session = await client.agentLogin(entry.fabricApiKey);
|
||||
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
|
||||
@@ -58,13 +97,77 @@ export function registerFabricTools(
|
||||
return { session, guild, token };
|
||||
};
|
||||
|
||||
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
|
||||
// It's a one-time step done out-of-band via the installed script
|
||||
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
|
||||
// or via static config (channels.fabric.accounts.<agentId>).
|
||||
// Bind an agent's Fabric API key — validates the key against Center,
|
||||
// upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound
|
||||
// socket immediately via the live FabricInbound instance (no gateway
|
||||
// restart). The standalone binary `~/.openclaw/bin/fabric-register`
|
||||
// still exists for one-time bootstrap before the gateway runs, but
|
||||
// recruitment's `register-agent` script should prefer this tool path
|
||||
// so the new agent's socket is live before `interviewer` fires.
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-register',
|
||||
description:
|
||||
'Bind an agent to a Fabric Center API key. Validates the key, writes ' +
|
||||
'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' +
|
||||
'inbound socket immediately so the agent receives Fabric pushes ' +
|
||||
'without a gateway restart. Caller defaults to the current agent; ' +
|
||||
'pass `agentId` to bind on behalf of another agent (recruitment use).',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
additionalProperties: false,
|
||||
required: ['apiKey'],
|
||||
properties: {
|
||||
apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' },
|
||||
agentId: {
|
||||
type: 'string',
|
||||
description:
|
||||
'Agent to register. Defaults to the calling agent (ctx.agentId). ' +
|
||||
'Recruitment onboarding may override this when wiring a freshly ' +
|
||||
'created agent before that agent has a session of its own.',
|
||||
},
|
||||
},
|
||||
},
|
||||
execute: async (_id: string, p: { apiKey: string; agentId?: string }) => {
|
||||
const agentId = p.agentId ?? ctx.agentId;
|
||||
if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' };
|
||||
if (!p.apiKey || typeof p.apiKey !== 'string') {
|
||||
return { ok: false, error: 'apiKey required' };
|
||||
}
|
||||
// Delegate to FabricInbound.addAccount via the cross-plugin bridge.
|
||||
// The bridge is installed in index.ts when inbound spins up; if it's
|
||||
// not present yet, the gateway is still starting and the caller should
|
||||
// retry (rare path — only hit during the gateway_start window).
|
||||
const fabricApi = (globalThis as Record<string, unknown>)['__fabric'] as
|
||||
| { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise<void> }
|
||||
| undefined;
|
||||
if (!fabricApi?.addAccount) {
|
||||
return {
|
||||
ok: false,
|
||||
error:
|
||||
'fabric inbound not ready (gateway still starting?). Fall back to ' +
|
||||
'~/.openclaw/bin/fabric-register or retry after a few seconds.',
|
||||
};
|
||||
}
|
||||
try {
|
||||
await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey });
|
||||
} catch (err) {
|
||||
return {
|
||||
ok: false,
|
||||
error: `fabric-register failed: ${String(err)}`,
|
||||
};
|
||||
}
|
||||
const entry = identity.findByAgentId(agentId);
|
||||
return {
|
||||
ok: true,
|
||||
agentId,
|
||||
fabricUserId: entry?.fabricUserId,
|
||||
displayName: entry?.displayName,
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: `create-${kind}-channel`,
|
||||
description:
|
||||
`Create a Fabric ${kind} channel (x_type=${X_BY_KIND[kind]}). ` +
|
||||
@@ -120,7 +223,7 @@ export function registerFabricTools(
|
||||
|
||||
// discussion-complete: post a summary then close the channel (Guild
|
||||
// /channels/:id/close — history stays readable, new posts -> 409).
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'discussion-complete',
|
||||
description: 'Conclude a discussion: post a summary then close the channel.',
|
||||
parameters: {
|
||||
@@ -178,7 +281,7 @@ export function registerFabricTools(
|
||||
// session prompt whenever a turn in this channel fires — so the
|
||||
// two roles see different instructions, no shared guide file.
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'create-sub-discussion',
|
||||
description:
|
||||
'Open a host-driven sub-discussion channel (x_type=discuss) hanging off your current channel, ' +
|
||||
@@ -299,7 +402,7 @@ export function registerFabricTools(
|
||||
// author, not the host's personal account — and can wake the host on
|
||||
// the parent channel to continue whatever workflow opened the sub.
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'close-sub-discussion',
|
||||
description:
|
||||
'Close a sub-discussion channel you opened (host-only) and write a callback to the parent ' +
|
||||
@@ -396,7 +499,7 @@ export function registerFabricTools(
|
||||
// fabric-canvas: share / update / read / close the channel's single
|
||||
// pinned canvas document (one tool, four actions). update/close are
|
||||
// sharer-only server-side (the guild returns 403 otherwise).
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-canvas',
|
||||
description:
|
||||
"Manage a channel's pinned canvas document. action: " +
|
||||
@@ -473,7 +576,7 @@ export function registerFabricTools(
|
||||
}));
|
||||
|
||||
// fabric-channel: channel membership (one tool, three actions).
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-channel',
|
||||
description:
|
||||
'Channel membership. action: members (list channel member userIds) | ' +
|
||||
@@ -525,7 +628,7 @@ export function registerFabricTools(
|
||||
// workload to #agents-room, or triage agent following up on an
|
||||
// already-routed task by commenting in #updates.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-send-message',
|
||||
description:
|
||||
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
|
||||
@@ -570,7 +673,7 @@ export function registerFabricTools(
|
||||
// config → tool returns ok:false with a clear error, no fall-through
|
||||
// to regular agent posting.
|
||||
// ───────────────────────────────────────────────────────────────────
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-send-sys-msg',
|
||||
description:
|
||||
'Send a SYSTEM-AUTHORED message into a Fabric channel (author = guild sentinel, not you). ' +
|
||||
@@ -644,7 +747,7 @@ export function registerFabricTools(
|
||||
// agent is a member of. Returns id / name / xType per channel so the
|
||||
// agent can pick a channelId for fabric-send-message etc.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-channel-list',
|
||||
description:
|
||||
'List channels visible to the calling agent in a guild. Optional ' +
|
||||
@@ -705,7 +808,7 @@ export function registerFabricTools(
|
||||
// workflow says "find the right guild for X" — pick by purpose,
|
||||
// then fabric-channel-list to find the right channel inside it.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-guild-list',
|
||||
description:
|
||||
'List guilds the calling agent is a member of. Returns ' +
|
||||
@@ -766,7 +869,7 @@ export function registerFabricTools(
|
||||
// channel must be public). Use this to backfill purpose on existing
|
||||
// channels, or to refine it after a channel's role evolves.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-channel-set-purpose',
|
||||
description:
|
||||
"Set or update a channel's free-form purpose description. " +
|
||||
@@ -809,7 +912,7 @@ export function registerFabricTools(
|
||||
// gated; verify a previous message went through; lookup recent
|
||||
// duplicates before opening a new task in triage.
|
||||
// -----------------------------------------------------------------
|
||||
api.registerTool((ctx: Ctx) => ({
|
||||
gatedRegister((ctx: Ctx) => ({
|
||||
name: 'fabric-message-history',
|
||||
description:
|
||||
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
|
||||
|
||||
Reference in New Issue
Block a user