4 Commits

Author SHA1 Message Date
h z
fc2ab628b2 Merge pull request 'feat(fabric): dynamic-subscription via fabric-register openclaw tool' (#12) from feat/dynamic-account-subscription into main 2026-06-01 07:57:10 +00:00
893b93198d feat(fabric): dynamic-subscription via fabric-register openclaw tool
The plugin manifest declared `fabric-register` as a tool name but
tools.ts never registered it — recruitment fell through to the
standalone `/root/.openclaw/bin/fabric-register` binary, which writes
~/.openclaw/fabric-identity.json correctly but exits without notifying
the running plugin. That left fabric inbound's subscription as a
connect-time snapshot: every new agent required a gateway restart
between `new-agent` and the interview's sub-discussion or the message
had no socket to dispatch on.

Wire the full path:
  - `FabricInbound.addAccount(entry)` — login, upsert identity, open socket(s),
    track per-agent so removeAccount can teardown cleanly. Idempotent: a
    second call replaces the previous socket (used post-onboard when the
    agent rotates off the shared `interviewee` placeholder onto its own
    apikey).
  - `FabricInbound.removeAccount(agentId)` — disconnect sockets, clear
    timers + per-agent caches.
  - `__fabric.addAccount` / `removeAccount` — cross-plugin bridge so the
    `fabric-register` tool can reach the live FabricInbound instance from
    its tool handler context.
  - `fabric-register` openclaw tool — validates apiKey, calls
    `__fabric.addAccount`, returns `{ok, fabricUserId, displayName}`.
    Accepts `agentId` arg so recruitment can bind on behalf of a
    freshly-created agent before that agent has a session of its own.

Removes the "restart the gateway" advice from the ctxGuild
"agent not registered" error message — operators should now call the
tool path instead.

After this lands + the ClawSkills register-agent script flip (separate
commit), `recruitment.new-agent` -> interviewer sub-discussion runs
without a gateway restart in between.
2026-06-01 08:56:53 +01:00
h z
260d50196b Merge pull request 'fix(routing): resolveAgentRoute uses binding.accountId, not agent_id' (#11) from fix/routing-use-binding-accountid into main 2026-05-31 19:33:13 +00:00
ea713064e1 fix(routing): resolveAgentRoute uses binding.accountId, not agent_id
`socket.on('message.created', ...)` dispatched with
`accountId: agentId` (the openclaw agent id, e.g. 'analyst2') instead
of the binding's `match.accountId` (the fabric account slot label, e.g.
'interviewee'). For most agents the binding is `{agentId: X, accountId: X}`
so the two coincide and the call works by accident. For shared-placeholder
slots (the recruitment `interviewee` apikey reused across pre-onboard
agents) the slot label is `interviewee` not the agent_id, so the lookup
returns bindings=0 and openclaw core silently falls back to the `main`
agent — which then handles the sub-discussion turn under main's
workspace identity. Symptom: every sub-discussion interview reply
masquerades as the human user's IDENTITY.md text.

Walk cfg.bindings for the entry that ties this agentId to a fabric
account; use its accountId. Fall back to agentId when the agent has
no explicit fabric binding declared (preserves prior behavior for
agents wired before the binding format was uniform).

Verified on prod-t2 recruitment retest 2026-05-31:
  Before: routing log `accountId=analyst2 ... bindings=0`, main session
          ran instead of analyst2.
  After:  `accountId=interviewee ... bindings=1`, analyst2 session ran
          (0 main sessions in sub channel).
2026-05-31 20:32:41 +01:00
3 changed files with 205 additions and 10 deletions

View File

@@ -96,13 +96,29 @@ export default defineChannelPluginEntry({
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis as Record<string, unknown>;
_G['__fabric'] = { getChannelType };
_G['__fabric'] = {
getChannelType,
// Dynamic-subscription bridges: tools (notably `fabric-register`)
// call these to add/remove an account's inbound socket without
// a gateway restart. Both delegate to the live FabricInbound
// instance via the module-level `inbound` closure variable; the
// closures stay valid across gateway_start / gateway_stop
// because we re-assign the variable, not the property.
addAccount: async (entry: { agentId: string; fabricApiKey: string }) => {
if (!inbound) throw new Error('fabric inbound not ready yet (gateway not started?)');
await inbound.addAccount(entry);
},
removeAccount: (agentId: string) => {
if (!inbound) return;
inbound.removeAccount(agentId);
},
};
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try { flushChannelMeta(); } catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType + addAccount + removeAccount)');
}
api.on('gateway_start', () => {

View File

@@ -51,8 +51,38 @@ type FabricMessage = {
xType?: string;
};
// Walk cfg.bindings for the entry that ties `agentId` to a fabric account.
// Returns the binding's match.accountId (the slot label routing keys on);
// returns undefined when the agent has no explicit fabric binding so the
// caller can fall back to agentId without changing pre-existing semantics
// for agents whose binding accountId == agent_id anyway.
function findFabricBindingAccountId(cfg: unknown, agentId: string): string | undefined {
const bindings = (cfg as { bindings?: Array<{
agentId?: string;
match?: { channel?: string; accountId?: string };
}> })?.bindings;
if (!Array.isArray(bindings)) return undefined;
for (const b of bindings) {
if (
b?.agentId === agentId &&
b?.match?.channel === 'fabric' &&
typeof b?.match?.accountId === 'string' &&
b.match.accountId.length > 0
) {
return b.match.accountId;
}
}
return undefined;
}
export class FabricInbound {
private sockets: Socket[] = [];
// Per-agent socket + timer tracking. Enables `removeAccount(agentId)`
// to tear down ONE agent without restarting the whole inbound. New
// sockets get appended on `connectAgent`; both maps are emptied by
// `stop()`.
private socketsByAgent = new Map<string, Socket[]>();
private timersByAgent = new Map<string, NodeJS.Timeout[]>();
private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
@@ -263,6 +293,71 @@ export class FabricInbound {
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect();
this.sockets = [];
this.socketsByAgent.clear();
this.timersByAgent.clear();
}
/**
* Bring up ONE new account at runtime (no gateway restart).
*
* Mirrors what `start()` does per entry: login to Center, upsert the
* identity registry, open the socket(s). Idempotent: re-calling with
* the same agentId tears down the previous socket(s) first so the
* fresh apikey replaces the stale one (recruitment onboard rotates
* the agent from the shared `interviewee` placeholder to a real
* per-agent apikey — the old `interviewee` socket must drop before
* the new one comes up or the agent ends up subscribed to both users
* at once).
*
* Used by the `fabric-register` openclaw tool to make recruitment
* end-to-end without a gateway restart between `new-agent` and the
* interview's sub-discussion dispatch.
*/
async addAccount(entry: { agentId: string; fabricApiKey: string }): Promise<void> {
if (this.socketsByAgent.has(entry.agentId)) {
this.removeAccount(entry.agentId);
}
const session = await this.client.agentLogin(entry.fabricApiKey);
this.identity.upsert({
agentId: entry.agentId,
fabricApiKey: entry.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
await this.connectAgent(entry.agentId, session);
this.log.info(`fabric: agent ${entry.agentId} dynamically added as ${session.user.email}`);
}
/**
* Tear down ONE account's sockets + timers without touching others.
* Caller is responsible for any identity-registry cleanup; this only
* drops the live socket subscription so the agent stops receiving
* Fabric pushes.
*/
removeAccount(agentId: string): void {
const sockets = this.socketsByAgent.get(agentId);
if (sockets) {
for (const s of sockets) {
try { s.disconnect(); } catch { /* socket already dead */ }
// Also remove from the flat list so `stop()` doesn't double-close.
const idx = this.sockets.indexOf(s);
if (idx !== -1) this.sockets.splice(idx, 1);
}
this.socketsByAgent.delete(agentId);
}
const timers = this.timersByAgent.get(agentId);
if (timers) {
for (const t of timers) {
clearInterval(t);
const idx = this.channelSyncTimers.indexOf(t);
if (idx !== -1) this.channelSyncTimers.splice(idx, 1);
}
this.timersByAgent.delete(agentId);
}
this.firstGuildByAgent.delete(agentId);
this.tokenCache.delete(agentId);
this.agentStatusCache.delete(agentId);
this.log.info(`fabric: agent ${agentId} dynamically removed`);
}
/**
@@ -428,6 +523,9 @@ export class FabricInbound {
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
const agentTimers = this.timersByAgent.get(agentId) ?? [];
agentTimers.push(syncTimer);
this.timersByAgent.set(agentId, agentTimers);
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
@@ -473,6 +571,11 @@ export class FabricInbound {
});
socket.connect();
this.sockets.push(socket);
// Track per-agent so addAccount/removeAccount can teardown
// independently without disturbing other agents.
const agentSockets = this.socketsByAgent.get(agentId) ?? [];
agentSockets.push(socket);
this.socketsByAgent.set(agentId, agentSockets);
}
}
@@ -541,10 +644,22 @@ export class FabricInbound {
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
// resolveAgentRoute needs the *binding* accountId (the channel-side
// slot name) — not the openclaw agentId. For most agents the binding
// is `{agentId: X, match: {channel: fabric, accountId: X}}` so the
// two coincide; but for shared-placeholder cases (e.g. the recruitment
// `interviewee` slot bound to multiple agents over its lifetime) the
// binding accountId is the slot label ("interviewee", "Neon", …) not
// the agent_id. Passing agentId there returned bindings=0 and silently
// fell back to `main`, hijacking sub-discussion turns. Look up the
// agent's fabric binding accountId here; fall back to agentId when no
// explicit binding exists (preserves prior behavior for agents with
// no fabric binding declared).
const bindingAccountId = findFabricBindingAccountId(this.cfg, agentId) ?? agentId;
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
accountId: bindingAccountId,
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {

View File

@@ -47,9 +47,9 @@ export function registerFabricTools(
const entry = identity.findByAgentId(agentId);
if (!entry)
throw new Error(
`agent ${agentId} not registered — run: AGENT_ID=${agentId} ` +
`~/.openclaw/bin/fabric-register --api-key <fak_…> (or set ` +
`channels.fabric.accounts.${agentId}); then restart the gateway`,
`agent ${agentId} not registered — call the openclaw \`fabric-register\` ` +
`tool (apiKey: <fak_…>, agentId: ${agentId}); the dynamic-subscription ` +
`path brings the socket up immediately, no gateway restart needed`,
);
const session = await client.agentLogin(entry.fabricApiKey);
const guild = session.guilds.find((g) => g.nodeId === guildNodeId);
@@ -58,10 +58,74 @@ export function registerFabricTools(
return { session, guild, token };
};
// NOTE: binding an agent's Fabric API key is intentionally NOT a tool.
// It's a one-time step done out-of-band via the installed script
// ~/.openclaw/bin/fabric-register --api-key <fak_…> (AGENT_ID or --agent-id)
// or via static config (channels.fabric.accounts.<agentId>).
// Bind an agent's Fabric API key — validates the key against Center,
// upserts ~/.openclaw/fabric-identity.json, AND brings up the inbound
// socket immediately via the live FabricInbound instance (no gateway
// restart). The standalone binary `~/.openclaw/bin/fabric-register`
// still exists for one-time bootstrap before the gateway runs, but
// recruitment's `register-agent` script should prefer this tool path
// so the new agent's socket is live before `interviewer` fires.
api.registerTool((ctx: Ctx) => ({
name: 'fabric-register',
description:
'Bind an agent to a Fabric Center API key. Validates the key, writes ' +
'the entry to ~/.openclaw/fabric-identity.json, and starts a live ' +
'inbound socket immediately so the agent receives Fabric pushes ' +
'without a gateway restart. Caller defaults to the current agent; ' +
'pass `agentId` to bind on behalf of another agent (recruitment use).',
parameters: {
type: 'object',
additionalProperties: false,
required: ['apiKey'],
properties: {
apiKey: { type: 'string', description: 'Fabric Center API key (`fak_…`)' },
agentId: {
type: 'string',
description:
'Agent to register. Defaults to the calling agent (ctx.agentId). ' +
'Recruitment onboarding may override this when wiring a freshly ' +
'created agent before that agent has a session of its own.',
},
},
},
execute: async (_id: string, p: { apiKey: string; agentId?: string }) => {
const agentId = p.agentId ?? ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context (pass agentId)' };
if (!p.apiKey || typeof p.apiKey !== 'string') {
return { ok: false, error: 'apiKey required' };
}
// Delegate to FabricInbound.addAccount via the cross-plugin bridge.
// The bridge is installed in index.ts when inbound spins up; if it's
// not present yet, the gateway is still starting and the caller should
// retry (rare path — only hit during the gateway_start window).
const fabricApi = (globalThis as Record<string, unknown>)['__fabric'] as
| { addAccount?: (entry: { agentId: string; fabricApiKey: string }) => Promise<void> }
| undefined;
if (!fabricApi?.addAccount) {
return {
ok: false,
error:
'fabric inbound not ready (gateway still starting?). Fall back to ' +
'~/.openclaw/bin/fabric-register or retry after a few seconds.',
};
}
try {
await fabricApi.addAccount({ agentId, fabricApiKey: p.apiKey });
} catch (err) {
return {
ok: false,
error: `fabric-register failed: ${String(err)}`,
};
}
const entry = identity.findByAgentId(agentId);
return {
ok: true,
agentId,
fabricUserId: entry?.fabricUserId,
displayName: entry?.displayName,
};
},
}));
const makeCreate = (kind: 'chat' | 'work' | 'report' | 'discussion') =>
api.registerTool((ctx: Ctx) => ({