feat(plugin): fabric-guild-list + fabric-channel-set-purpose tools + purpose on existing tools

Adds two agent-facing tools that close the discoverability loop:

  - fabric-guild-list — enumerates guilds the agent belongs to with
    name + purpose + status (no api calls beyond the existing agentLogin
    response). Optional nameFilter/purposeFilter for narrowing.
  - fabric-channel-set-purpose — PATCH /api/channels/:id { purpose }
    so agents can backfill or update an existing channel's purpose.

Extends existing tools:
  - fabric-channel-list now returns purpose on each row.
  - create-{chat,work,report,discussion}-channel accept optional purpose.

FabricClient + FabricSession type changes carry the new field through.
Manifest contracts.tools updated (jiti loader needs both manifest entry
and onStartup activation to register).

Lets workflows that previously needed hardcoded channel ids instead say
'find a guild whose purpose mentions debate, then a channel of x_type
announce whose purpose covers public debate broadcasts.'
This commit is contained in:
h z
2026-05-23 19:22:10 +01:00
parent 6fe06f55dd
commit 5ff464a055
8 changed files with 746 additions and 6 deletions

View File

@@ -28,6 +28,125 @@ export class FabricInbound {
// Re-login per agent on a short TTL to keep a fresh token.
tokenCache = new Map();
static TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
channelChains = new Map();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
agentStatusCache = new Map();
static AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
pendingTriageGated = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
enqueueChannelTask(channelId, task) {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
async checkAgentOnCall(agentId) {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg;
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
}
catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json());
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
}
catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
async drainGatedFor(agentId) {
const keep = [];
const drain = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId)
drain.push(item);
else
keep.push(item);
}
if (drain.length === 0)
return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
@@ -90,8 +209,48 @@ export class FabricInbound {
s.disconnect();
this.sockets = [];
}
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts() {
const out = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId)
continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
if (!presenceGuildUrl)
continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target).
firstGuildEndpointByAgent = new Map();
async connectAgent(agentId, session) {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild)
this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
@@ -190,7 +349,31 @@ export class FabricInbound {
this.seen.add(key);
if (this.seen.size > 5000)
this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
// Per-channel serial queue. Prevents concurrent model turns for
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
});
socket.connect();
this.sockets.push(socket);
@@ -281,6 +464,17 @@ export class FabricInbound {
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,