feat(fabric): dynamic-subscription via fabric-register openclaw tool
The plugin manifest declared `fabric-register` as a tool name but
tools.ts never registered it — recruitment fell through to the
standalone `/root/.openclaw/bin/fabric-register` binary, which writes
~/.openclaw/fabric-identity.json correctly but exits without notifying
the running plugin. That left fabric inbound's subscription as a
connect-time snapshot: every new agent required a gateway restart
between `new-agent` and the interview's sub-discussion or the message
had no socket to dispatch on.
Wire the full path:
- `FabricInbound.addAccount(entry)` — login, upsert identity, open socket(s),
track per-agent so removeAccount can teardown cleanly. Idempotent: a
second call replaces the previous socket (used post-onboard when the
agent rotates off the shared `interviewee` placeholder onto its own
apikey).
- `FabricInbound.removeAccount(agentId)` — disconnect sockets, clear
timers + per-agent caches.
- `__fabric.addAccount` / `removeAccount` — cross-plugin bridge so the
`fabric-register` tool can reach the live FabricInbound instance from
its tool handler context.
- `fabric-register` openclaw tool — validates apiKey, calls
`__fabric.addAccount`, returns `{ok, fabricUserId, displayName}`.
Accepts `agentId` arg so recruitment can bind on behalf of a
freshly-created agent before that agent has a session of its own.
Removes the "restart the gateway" advice from the ctxGuild
"agent not registered" error message — operators should now call the
tool path instead.
After this lands + the ClawSkills register-agent script flip (separate
commit), `recruitment.new-agent` -> interviewer sub-discussion runs
without a gateway restart in between.
This commit is contained in:
20
index.ts
20
index.ts
@@ -96,13 +96,29 @@ export default defineChannelPluginEntry({
|
|||||||
// fall back to "assume DM" — fail closed on unknown.
|
// fall back to "assume DM" — fail closed on unknown.
|
||||||
{
|
{
|
||||||
const _G = globalThis as Record<string, unknown>;
|
const _G = globalThis as Record<string, unknown>;
|
||||||
_G['__fabric'] = { getChannelType };
|
_G['__fabric'] = {
|
||||||
|
getChannelType,
|
||||||
|
// Dynamic-subscription bridges: tools (notably `fabric-register`)
|
||||||
|
// call these to add/remove an account's inbound socket without
|
||||||
|
// a gateway restart. Both delegate to the live FabricInbound
|
||||||
|
// instance via the module-level `inbound` closure variable; the
|
||||||
|
// closures stay valid across gateway_start / gateway_stop
|
||||||
|
// because we re-assign the variable, not the property.
|
||||||
|
addAccount: async (entry: { agentId: string; fabricApiKey: string }) => {
|
||||||
|
if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)');
|
||||||
|
await inbound.addAccount(entry);
|
||||||
|
},
|
||||||
|
removeAccount: (agentId: string) => {
|
||||||
|
if (!inbound) return;
|
||||||
|
inbound.removeAccount(agentId);
|
||||||
|
},
|
||||||
|
};
|
||||||
// Flush channel-meta cache when the gateway shuts down so
|
// Flush channel-meta cache when the gateway shuts down so
|
||||||
// recently-recorded xType entries don't get lost.
|
// recently-recorded xType entries don't get lost.
|
||||||
api.on('gateway_stop', () => {
|
api.on('gateway_stop', () => {
|
||||||
try { flushChannelMeta(); } catch { /* ignore */ }
|
try { flushChannelMeta(); } catch { /* ignore */ }
|
||||||
});
|
});
|
||||||
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)');
|
||||||
}
|
}
|
||||||
|
|
||||||
api.on('gateway_start', () => {
|
api.on('gateway_start', () => {
|
||||||
|
|||||||
@@ -77,6 +77,12 @@ function findFabricBindingAccountId(cfg: unknown, agentId: string): string | und
|
|||||||
|
|
||||||
export class FabricInbound {
|
export class FabricInbound {
|
||||||
private sockets: Socket[] = [];
|
private sockets: Socket[] = [];
|
||||||
|
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
|
||||||
|
// to tear down ONE agent without restarting the whole inbound. New
|
||||||
|
// sockets get appended on `connectAgent`; both maps are emptied by
|
||||||
|
// `stop()`.
|
||||||
|
private socketsByAgent = new Map<string, Socket[]>();
|
||||||
|
private timersByAgent = new Map<string, NodeJS.Timeout[]>();
|
||||||
private seen = new Set<string>();
|
private seen = new Set<string>();
|
||||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||||
@@ -287,6 +293,71 @@ export class FabricInbound {
|
|||||||
this.channelSyncTimers = [];
|
this.channelSyncTimers = [];
|
||||||
for (const s of this.sockets) s.disconnect();
|
for (const s of this.sockets) s.disconnect();
|
||||||
this.sockets = [];
|
this.sockets = [];
|
||||||
|
this.socketsByAgent.clear();
|
||||||
|
this.timersByAgent.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bring up ONE new account at runtime (no gateway restart).
|
||||||
|
*
|
||||||
|
* Mirrors what `start()` does per entry: login to Center, upsert the
|
||||||
|
* identity registry, open the socket(s). Idempotent: re-calling with
|
||||||
|
* the same agentId tears down the previous socket(s) first so the
|
||||||
|
* fresh apikey replaces the stale one (recruitment onboard rotates
|
||||||
|
* the agent from the shared `interviewee` placeholder to a real
|
||||||
|
* per-agent apikey — the old `interviewee` socket must drop before
|
||||||
|
* the new one comes up or the agent ends up subscribed to both users
|
||||||
|
* at once).
|
||||||
|
*
|
||||||
|
* Used by the `fabric-register` openclaw tool to make recruitment
|
||||||
|
* end-to-end without a gateway restart between `new-agent` and the
|
||||||
|
* interview's sub-discussion dispatch.
|
||||||
|
*/
|
||||||
|
async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise<void> {
|
||||||
|
if (this.socketsByAgent.has(entry.agentId)) {
|
||||||
|
this.removeAccount(entry.agentId);
|
||||||
|
}
|
||||||
|
const session = await this.client.agentLogin(entry.fabricApiKey);
|
||||||
|
this.identity.upsert({
|
||||||
|
agentId: entry.agentId,
|
||||||
|
fabricApiKey: entry.fabricApiKey,
|
||||||
|
fabricUserId: session.user.id,
|
||||||
|
displayName: session.user.name,
|
||||||
|
});
|
||||||
|
await this.connectAgent(entry.agentId, session);
|
||||||
|
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tear down ONE account's sockets + timers without touching others.
|
||||||
|
* Caller is responsible for any identity-registry cleanup; this only
|
||||||
|
* drops the live socket subscription so the agent stops receiving
|
||||||
|
* Fabric pushes.
|
||||||
|
*/
|
||||||
|
removeAccount(agentId: string): void {
|
||||||
|
const sockets = this.socketsByAgent.get(agentId);
|
||||||
|
if (sockets) {
|
||||||
|
for (const s of sockets) {
|
||||||
|
try { s.disconnect(); } catch { /* socket already dead */ }
|
||||||
|
// Also remove from the flat list so `stop()` doesn't double-close.
|
||||||
|
const idx = this.sockets.indexOf(s);
|
||||||
|
if (idx !== -1) this.sockets.splice(idx, 1);
|
||||||
|
}
|
||||||
|
this.socketsByAgent.delete(agentId);
|
||||||
|
}
|
||||||
|
const timers = this.timersByAgent.get(agentId);
|
||||||
|
if (timers) {
|
||||||
|
for (const t of timers) {
|
||||||
|
clearInterval(t);
|
||||||
|
const idx = this.channelSyncTimers.indexOf(t);
|
||||||
|
if (idx !== -1) this.channelSyncTimers.splice(idx, 1);
|
||||||
|
}
|
||||||
|
this.timersByAgent.delete(agentId);
|
||||||
|
}
|
||||||
|
this.firstGuildByAgent.delete(agentId);
|
||||||
|
this.tokenCache.delete(agentId);
|
||||||
|
this.agentStatusCache.delete(agentId);
|
||||||
|
this.log.info(`fabric: agent ${agentId} dynamically removed`);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -452,6 +523,9 @@ export class FabricInbound {
|
|||||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||||
);
|
);
|
||||||
this.channelSyncTimers.push(syncTimer);
|
this.channelSyncTimers.push(syncTimer);
|
||||||
|
const agentTimers = this.timersByAgent.get(agentId) ?? [];
|
||||||
|
agentTimers.push(syncTimer);
|
||||||
|
this.timersByAgent.set(agentId, agentTimers);
|
||||||
socket.on('message.created', (m: FabricMessage) => {
|
socket.on('message.created', (m: FabricMessage) => {
|
||||||
const channelId = m.channelId ?? '';
|
const channelId = m.channelId ?? '';
|
||||||
if (!channelId) return;
|
if (!channelId) return;
|
||||||
@@ -497,6 +571,11 @@ export class FabricInbound {
|
|||||||
});
|
});
|
||||||
socket.connect();
|
socket.connect();
|
||||||
this.sockets.push(socket);
|
this.sockets.push(socket);
|
||||||
|
// Track per-agent so addAccount/removeAccount can teardown
|
||||||
|
// independently without disturbing other agents.
|
||||||
|
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
|
||||||
|
agentSockets.push(socket);
|
||||||
|
this.socketsByAgent.set(agentId, agentSockets);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
78
src/tools.ts
78
src/tools.ts
@@ -47,9 +47,9 @@ export function registerFabricTools(
|
|||||||
const entry = identity.findByAgentId(agentId);
|
const entry = identity.findByAgentId(agentId);
|
||||||
if (!entry)
|
if (!entry)
|
||||||
throw new Error(
|
throw new Error(
|
||||||
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
|
`agent ${agentId} not registered — call the openclaw \`fabric-register\` ` +
|
||||||
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
|
`tool (apiKey: <fak_…>, agentId: ${agentId}); the dynamic-subscription ` +
|
||||||
`channels.fabric.accounts.${agentId}); then restart the gateway`,
|
`path brings the socket up immediately, no gateway restart needed`,
|
||||||
);
|
);
|
||||||
const session = await client.agentLogin(entry.fabricApiKey);
|
const session = await client.agentLogin(entry.fabricApiKey);
|
||||||
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
|
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
|
||||||
@@ -58,10 +58,74 @@ export function registerFabricTools(
|
|||||||
return { session, guild, token };
|
return { session, guild, token };
|
||||||
};
|
};
|
||||||
|
|
||||||
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
|
// Bind an agent's Fabric API key — validates the key against Center,
|
||||||
// It's a one-time step done out-of-band via the installed script
|
// upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound
|
||||||
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
|
// socket immediately via the live FabricInbound instance (no gateway
|
||||||
// or via static config (channels.fabric.accounts.<agentId>).
|
// restart). The standalone binary `~/.openclaw/bin/fabric-register`
|
||||||
|
// still exists for one-time bootstrap before the gateway runs, but
|
||||||
|
// recruitment's `register-agent` script should prefer this tool path
|
||||||
|
// so the new agent's socket is live before `interviewer` fires.
|
||||||
|
api.registerTool((ctx: Ctx) => ({
|
||||||
|
name: 'fabric-register',
|
||||||
|
description:
|
||||||
|
'Bind an agent to a Fabric Center API key. Validates the key, writes ' +
|
||||||
|
'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' +
|
||||||
|
'inbound socket immediately so the agent receives Fabric pushes ' +
|
||||||
|
'without a gateway restart. Caller defaults to the current agent; ' +
|
||||||
|
'pass `agentId` to bind on behalf of another agent (recruitment use).',
|
||||||
|
parameters: {
|
||||||
|
type: 'object',
|
||||||
|
additionalProperties: false,
|
||||||
|
required: ['apiKey'],
|
||||||
|
properties: {
|
||||||
|
apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' },
|
||||||
|
agentId: {
|
||||||
|
type: 'string',
|
||||||
|
description:
|
||||||
|
'Agent to register. Defaults to the calling agent (ctx.agentId). ' +
|
||||||
|
'Recruitment onboarding may override this when wiring a freshly ' +
|
||||||
|
'created agent before that agent has a session of its own.',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
execute: async (_id: string, p: { apiKey: string; agentId?: string }) => {
|
||||||
|
const agentId = p.agentId ?? ctx.agentId;
|
||||||
|
if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' };
|
||||||
|
if (!p.apiKey || typeof p.apiKey !== 'string') {
|
||||||
|
return { ok: false, error: 'apiKey required' };
|
||||||
|
}
|
||||||
|
// Delegate to FabricInbound.addAccount via the cross-plugin bridge.
|
||||||
|
// The bridge is installed in index.ts when inbound spins up; if it's
|
||||||
|
// not present yet, the gateway is still starting and the caller should
|
||||||
|
// retry (rare path — only hit during the gateway_start window).
|
||||||
|
const fabricApi = (globalThis as Record<string, unknown>)['__fabric'] as
|
||||||
|
| { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise<void> }
|
||||||
|
| undefined;
|
||||||
|
if (!fabricApi?.addAccount) {
|
||||||
|
return {
|
||||||
|
ok: false,
|
||||||
|
error:
|
||||||
|
'fabric inbound not ready (gateway still starting?). Fall back to ' +
|
||||||
|
'~/.openclaw/bin/fabric-register or retry after a few seconds.',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey });
|
||||||
|
} catch (err) {
|
||||||
|
return {
|
||||||
|
ok: false,
|
||||||
|
error: `fabric-register failed: ${String(err)}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
const entry = identity.findByAgentId(agentId);
|
||||||
|
return {
|
||||||
|
ok: true,
|
||||||
|
agentId,
|
||||||
|
fabricUserId: entry?.fabricUserId,
|
||||||
|
displayName: entry?.displayName,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
|
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
|
||||||
api.registerTool((ctx: Ctx) => ({
|
api.registerTool((ctx: Ctx) => ({
|
||||||
|
|||||||
Reference in New Issue
Block a user