feat(gateway): register fabric.register gateway method for live no-restart registration
Recruitment's register-agent step is a plain shell step (no LLM turn), so it cannot invoke the `fabric-register` TOOL (tool only fires inside an agent turn) and there is no `openclaw tools call` CLI. It previously fell back to the standalone bootstrap binary, which writes fabric-identity.json but cannot notify the running plugin -> the new agent's inbound socket only came up after a gateway restart. This adds an in-process gateway method `fabric.register` (scope: operator.write) whose handler runs inbound.addAccount: validates the key, persists identity, and brings the inbound socket up immediately. The script now calls `openclaw gateway call fabric.register --params ...` and only falls back to the bootstrap binary if the method is unavailable. Also resyncs committed dist/ to source (sub-discussion-hook/store + tools/ inbound were source-committed but their dist artifacts were stale). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
115
dist/fabric/src/inbound.js
vendored
115
dist/fabric/src/inbound.js
vendored
@@ -7,6 +7,25 @@ import { resolveCoalesce } from './accounts.js';
|
||||
import { fabricPeerRoutingForXType } from './channel.js';
|
||||
import { recordChannelType } from './channel-meta.js';
|
||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||
// Walk cfg.bindings for the entry that ties `agentId` to a fabric account.
|
||||
// Returns the binding's match.accountId (the slot label routing keys on);
|
||||
// returns undefined when the agent has no explicit fabric binding so the
|
||||
// caller can fall back to agentId without changing pre-existing semantics
|
||||
// for agents whose binding accountId == agent_id anyway.
|
||||
function findFabricBindingAccountId(cfg, agentId) {
|
||||
const bindings = cfg?.bindings;
|
||||
if (!Array.isArray(bindings))
|
||||
return undefined;
|
||||
for (const b of bindings) {
|
||||
if (b?.agentId === agentId &&
|
||||
b?.match?.channel === 'fabric' &&
|
||||
typeof b?.match?.accountId === 'string' &&
|
||||
b.match.accountId.length > 0) {
|
||||
return b.match.accountId;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
export class FabricInbound {
|
||||
core;
|
||||
cfg;
|
||||
@@ -15,6 +34,12 @@ export class FabricInbound {
|
||||
log;
|
||||
accounts;
|
||||
sockets = [];
|
||||
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
|
||||
// to tear down ONE agent without restarting the whole inbound. New
|
||||
// sockets get appended on `connectAgent`; both maps are emptied by
|
||||
// `stop()`.
|
||||
socketsByAgent = new Map();
|
||||
timersByAgent = new Map();
|
||||
seen = new Set();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
@@ -210,6 +235,74 @@ export class FabricInbound {
|
||||
for (const s of this.sockets)
|
||||
s.disconnect();
|
||||
this.sockets = [];
|
||||
this.socketsByAgent.clear();
|
||||
this.timersByAgent.clear();
|
||||
}
|
||||
/**
|
||||
* Bring up ONE new account at runtime (no gateway restart).
|
||||
*
|
||||
* Mirrors what `start()` does per entry: login to Center, upsert the
|
||||
* identity registry, open the socket(s). Idempotent: re-calling with
|
||||
* the same agentId tears down the previous socket(s) first so the
|
||||
* fresh apikey replaces the stale one (recruitment onboard rotates
|
||||
* the agent from the shared `interviewee` placeholder to a real
|
||||
* per-agent apikey — the old `interviewee` socket must drop before
|
||||
* the new one comes up or the agent ends up subscribed to both users
|
||||
* at once).
|
||||
*
|
||||
* Used by the `fabric-register` openclaw tool to make recruitment
|
||||
* end-to-end without a gateway restart between `new-agent` and the
|
||||
* interview's sub-discussion dispatch.
|
||||
*/
|
||||
async addAccount(entry) {
|
||||
if (this.socketsByAgent.has(entry.agentId)) {
|
||||
this.removeAccount(entry.agentId);
|
||||
}
|
||||
const session = await this.client.agentLogin(entry.fabricApiKey);
|
||||
this.identity.upsert({
|
||||
agentId: entry.agentId,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
fabricUserId: session.user.id,
|
||||
displayName: session.user.name,
|
||||
});
|
||||
await this.connectAgent(entry.agentId, session);
|
||||
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
|
||||
}
|
||||
/**
|
||||
* Tear down ONE account's sockets + timers without touching others.
|
||||
* Caller is responsible for any identity-registry cleanup; this only
|
||||
* drops the live socket subscription so the agent stops receiving
|
||||
* Fabric pushes.
|
||||
*/
|
||||
removeAccount(agentId) {
|
||||
const sockets = this.socketsByAgent.get(agentId);
|
||||
if (sockets) {
|
||||
for (const s of sockets) {
|
||||
try {
|
||||
s.disconnect();
|
||||
}
|
||||
catch { /* socket already dead */ }
|
||||
// Also remove from the flat list so `stop()` doesn't double-close.
|
||||
const idx = this.sockets.indexOf(s);
|
||||
if (idx !== -1)
|
||||
this.sockets.splice(idx, 1);
|
||||
}
|
||||
this.socketsByAgent.delete(agentId);
|
||||
}
|
||||
const timers = this.timersByAgent.get(agentId);
|
||||
if (timers) {
|
||||
for (const t of timers) {
|
||||
clearInterval(t);
|
||||
const idx = this.channelSyncTimers.indexOf(t);
|
||||
if (idx !== -1)
|
||||
this.channelSyncTimers.splice(idx, 1);
|
||||
}
|
||||
this.timersByAgent.delete(agentId);
|
||||
}
|
||||
this.firstGuildByAgent.delete(agentId);
|
||||
this.tokenCache.delete(agentId);
|
||||
this.agentStatusCache.delete(agentId);
|
||||
this.log.info(`fabric: agent ${agentId} dynamically removed`);
|
||||
}
|
||||
/**
|
||||
* Per-account metadata harvested during `start()` — used by
|
||||
@@ -360,6 +453,9 @@ export class FabricInbound {
|
||||
});
|
||||
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
const agentTimers = this.timersByAgent.get(agentId) ?? [];
|
||||
agentTimers.push(syncTimer);
|
||||
this.timersByAgent.set(agentId, agentTimers);
|
||||
socket.on('message.created', (m) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId)
|
||||
@@ -407,6 +503,11 @@ export class FabricInbound {
|
||||
});
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
// Track per-agent so addAccount/removeAccount can teardown
|
||||
// independently without disturbing other agents.
|
||||
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
|
||||
agentSockets.push(socket);
|
||||
this.socketsByAgent.set(agentId, agentSockets);
|
||||
}
|
||||
}
|
||||
// Download a message's attachments to a temp dir using the agent's guild
|
||||
@@ -463,10 +564,22 @@ export class FabricInbound {
|
||||
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||
// misclassifies the turn.
|
||||
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||
// resolveAgentRoute needs the *binding* accountId (the channel-side
|
||||
// slot name) — not the openclaw agentId. For most agents the binding
|
||||
// is `{agentId: X, match: {channel: fabric, accountId: X}}` so the
|
||||
// two coincide; but for shared-placeholder cases (e.g. the recruitment
|
||||
// `interviewee` slot bound to multiple agents over its lifetime) the
|
||||
// binding accountId is the slot label ("interviewee", "Neon", …) not
|
||||
// the agent_id. Passing agentId there returned bindings=0 and silently
|
||||
// fell back to `main`, hijacking sub-discussion turns. Look up the
|
||||
// agent's fabric binding accountId here; fall back to agentId when no
|
||||
// explicit binding exists (preserves prior behavior for agents with
|
||||
// no fabric binding declared).
|
||||
const bindingAccountId = findFabricBindingAccountId(this.cfg, agentId) ?? agentId;
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg: this.cfg,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
accountId: bindingAccountId,
|
||||
peer: { kind: peerKind, id: channelId },
|
||||
});
|
||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||
|
||||
Reference in New Issue
Block a user