From c68de029e454cc7d26cc9a5e8f625427764fa330 Mon Sep 17 00:00:00 2001 From: hanghang zhang Date: Fri, 22 May 2026 22:17:39 +0100 Subject: [PATCH 1/2] feat(triage): per-channel serial queue + HF on_call gate + observer skip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three behavioral changes to inbound message handling to support the new triage flow: ## 1. Per-channel serial queue Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel chain so consecutive messages on the same channel are processed strictly in order — no concurrent model turns for the same channel. Other channels remain independent (parallelism preserved across channels). Implementation: `Map` where each new task awaits the previous. The map entry self-cleans when the chain settles AND no newer task has overwritten it. ## 2. HF on_call gate (triage + wake=true only) Before dispatching a triage wake to the on-duty agent, hit HF `GET /calendar/agent/status?agent_id=...`. If the agent isn't currently on_call, the message is pushed to a per-agent gated queue instead of dispatched — no model turn fires. Status check is cached for 5s to amortise across rapid triage bursts. When a subsequent triage message arrives and the agent IS on_call by that point, the gated queue drains FIFO (re-enqueued through the same per-channel chain so order is kept) before the new message dispatches. Drained queue is in-memory only; on gateway restart the underlying Fabric messages get re-fetched via the connect-time history sweep. ## 3. Triage observer skip (wake=false) Triage messages that arrive with wakeup=false are admin observers — by spec they MUST NOT enter the agent's session history. Skipped entirely (no recordInboundSession call). The next time this agent legitimately wakes for triage, their context contains only past wakeups + their own outgoing messages — no observer-side chatter from other agents. For NON-triage channels the legacy "record-as-history" stays — those keep their full channel conversation available for later wakes. ## Env - HF_API_BASE_URL — defaults `https://monitor.hangman-lab.top` - HF_CLAW_IDENTIFIER — defaults to `os.hostname()` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/inbound.ts | 144 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 1 deletion(-) diff --git a/src/inbound.ts b/src/inbound.ts index 415c58a..d089448 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -67,6 +67,109 @@ export class FabricInbound { private tokenCache = new Map(); private static readonly TOKEN_TTL_MS = 8 * 60 * 1000; + // Per-channel serial work queue. Every inbound socket message for a + // channel awaits the previous task for that same channel, so model + // turns never interleave. Map key = channelId; value is the tail of + // the chain (an in-flight promise the next task awaits). + // + // Why per-channel and not per-agent: a single agent may sit in + // several triage / general channels; we want each channel to flow at + // its own speed but the SAME channel's traffic to be strictly serial. + // For dm and discuss the queue also serialises but those traditionally + // had at-most-one-in-flight anyway via the turn engine. + private channelChains = new Map>(); + + // Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/ + // agent/status round-trip off the hot path for back-to-back triage + // messages. Short TTL because status flips are rare-but-meaningful. + private agentStatusCache = new Map(); + private static readonly AGENT_STATUS_TTL_MS = 5_000; + + // Triage messages that arrived while the on-duty agent wasn't on_call + // — sit here until either (a) the agent becomes on_call and the next + // triage arrival drains them, or (b) the gateway restarts (lost; ok + // because the underlying Fabric messages are persisted and re-fetched + // on agent reconnect's history sweep). + private pendingTriageGated: Array<{ + agentId: string; + g: { nodeId: string; endpoint: string }; + channelId: string; + m: FabricMessage; + session: FabricSession; + }> = []; + + // Schedule `task` to run after every previous task on the same + // channel has completed. Returns the promise so callers can await + // their own result if they need to; the chain itself is fire-and- + // forget from the socket.on handler. + private enqueueChannelTask(channelId: string, task: () => Promise): Promise { + const prev = this.channelChains.get(channelId) ?? Promise.resolve(); + const next = prev.then(task).catch((err) => { + this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`); + }); + this.channelChains.set(channelId, next); + // Best-effort cleanup so the Map doesn't grow without bound for + // long-running gateways: drop the entry when the chain settles, but + // only if it's still the latest reference (newer enqueue may have + // overwritten it in the meantime). + void next.finally(() => { + if (this.channelChains.get(channelId) === next) { + this.channelChains.delete(channelId); + } + }); + return next; + } + + // Hit HF backend to check whether `agentId` is currently on_call. + // Cached for 5s. Failures (network, 404, etc.) are treated as "not + // on_call" — triage stays gated rather than risking a confused wake. + private async checkAgentOnCall(agentId: string): Promise { + const cached = this.agentStatusCache.get(agentId); + if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) { + return cached.onCall; + } + const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top'; + const claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim() || (await import('os')).hostname(); + let onCall = false; + try { + const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`; + const res = await fetch(url, { + headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw }, + }); + if (res.ok) { + const data = (await res.json()) as { status?: string }; + onCall = (data.status ?? '').toLowerCase() === 'on_call'; + } + } catch (err) { + this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`); + } + this.agentStatusCache.set(agentId, { onCall, at: Date.now() }); + return onCall; + } + + // FIFO drain of all triage-gated messages for `agentId` (called when + // we just learned they're on_call). Each drained message is dispatched + // through its own channel chain so per-channel serial order is kept. + private async drainGatedFor(agentId: string): Promise { + const keep: typeof this.pendingTriageGated = []; + const drain: typeof this.pendingTriageGated = []; + for (const item of this.pendingTriageGated) { + if (item.agentId === agentId) drain.push(item); + else keep.push(item); + } + if (drain.length === 0) return; + this.pendingTriageGated = keep; + for (const item of drain) { + this.log.info( + `fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`, + ); + // Re-enqueue via the per-channel chain so ordering is preserved. + this.enqueueChannelTask(item.channelId, async () => { + await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session); + }); + } + } + // Return a fresh guild access token for the agent, re-authenticating with // the agent's Fabric API key when the cached session is stale. Falls back // to the connect-time session token if re-login fails. @@ -232,7 +335,33 @@ export class FabricInbound { if (this.seen.has(key)) return; this.seen.add(key); if (this.seen.size > 5000) this.seen.clear(); - void this.dispatch(agentId, g, channelId, m, session); + // Per-channel serial queue. Prevents concurrent model turns for + // the same channel — important for triage where a second wake + // arriving mid-reply would interleave with the in-flight one. + this.enqueueChannelTask(channelId, async () => { + // Triage on_call gate: if the on-duty agent isn't currently + // on_call per HF, don't dispatch yet — just sit on the + // per-channel queue. Subsequent triage messages will recheck; + // when the agent becomes on_call, the next arrival drains. + // + // Also handles: triage + wake=true must verify status before + // committing to a model turn. Non-triage and triage observer + // (wake=false) skip the gate. + if (m.xType === 'triage' && m.wakeup === true) { + const onCall = await this.checkAgentOnCall(agentId); + if (!onCall) { + this.log.info( + `fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`, + ); + this.pendingTriageGated.push({ agentId, g, channelId, m, session }); + return; + } + // Drain any previously-gated messages (FIFO) before this one, + // now that we know the agent is on_call. + await this.drainGatedFor(agentId); + } + await this.dispatch(agentId, g, channelId, m, session); + }); }); socket.connect(); this.sockets.push(socket); @@ -337,6 +466,19 @@ export class FabricInbound { // any message that isn't the agent's own (already filtered above) is // always delivered to the model. if (m.xType !== 'dm' && m.wakeup !== true) { + // Triage exception: non-wake messages (admin observer) MUST NOT + // enter the agent's session at all. The next time the agent + // wakes for a triage message, their context should contain only + // their own past wakeups + their own outgoing messages — never + // the observer-only chatter from other agents. For non-triage + // channels keep the legacy "record-as-history" so a later wake + // sees the full channel conversation. + if (m.xType === 'triage') { + this.log.info( + `fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`, + ); + return; + } const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx); await core.channel.session.recordInboundSession({ storePath, -- 2.49.1 From b7e9d278cd6089a37309098ef46a07bcaf921c26 Mon Sep 17 00:00:00 2001 From: hanghang zhang Date: Fri, 22 May 2026 22:46:50 +0100 Subject: [PATCH 2/2] fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit os.hostname() fallback is wrong in sim where container hostname (server.t2) doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate fallback that reads openclaw config plugins.harbor-forge.identifier — the same value the HF plugin uses for its outbound HF calls — keeping plugin and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER env override. Priority: 1. HF_CLAW_IDENTIFIER env (operator override) 2. openclaw config plugins.harbor-forge.identifier (NEW) 3. os.hostname() last-resort 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/inbound.ts | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/inbound.ts b/src/inbound.ts index d089448..06c1973 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -129,7 +129,34 @@ export class FabricInbound { return cached.onCall; } const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top'; - const claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim() || (await import('os')).hostname(); + // CLAW_IDENTIFIER resolution priority: + // 1. HF_CLAW_IDENTIFIER env (operator override) + // 2. openclaw config `plugins.harbor-forge.identifier` (what the HF + // plugin itself uses — keeps the two in sync without an extra + // env per service unit) + // 3. os.hostname() last-resort fallback (often wrong: e.g. sim + // container hostname is `server.t2` but HF agent row has + // `claw_identifier=sim-t2`; matching is mandatory for the HF + // backend's _require_agent() check) + let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim(); + if (!claw) { + try { + // openclaw config shape (verified in sim): + // { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } } + const cfg = this.cfg as { + plugins?: { entries?: Record }; + }; + const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier; + if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) { + claw = fromCfg.trim(); + } + } catch { + /* fall through to hostname */ + } + } + if (!claw) { + claw = (await import('os')).hostname(); + } let onCall = false; try { const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`; -- 2.49.1