feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the new triage flow: ## 1. Per-channel serial queue Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel chain so consecutive messages on the same channel are processed strictly in order — no concurrent model turns for the same channel. Other channels remain independent (parallelism preserved across channels). Implementation: `Map<channelId, Promise>` where each new task awaits the previous. The map entry self-cleans when the chain settles AND no newer task has overwritten it. ## 2. HF on_call gate (triage + wake=true only) Before dispatching a triage wake to the on-duty agent, hit HF `GET /calendar/agent/status?agent_id=...`. If the agent isn't currently on_call, the message is pushed to a per-agent gated queue instead of dispatched — no model turn fires. Status check is cached for 5s to amortise across rapid triage bursts. When a subsequent triage message arrives and the agent IS on_call by that point, the gated queue drains FIFO (re-enqueued through the same per-channel chain so order is kept) before the new message dispatches. Drained queue is in-memory only; on gateway restart the underlying Fabric messages get re-fetched via the connect-time history sweep. ## 3. Triage observer skip (wake=false) Triage messages that arrive with wakeup=false are admin observers — by spec they MUST NOT enter the agent's session history. Skipped entirely (no recordInboundSession call). The next time this agent legitimately wakes for triage, their context contains only past wakeups + their own outgoing messages — no observer-side chatter from other agents. For NON-triage channels the legacy "record-as-history" stays — those keep their full channel conversation available for later wakes. ## Env - HF_API_BASE_URL — defaults `https://monitor.hangman-lab.top` - HF_CLAW_IDENTIFIER — defaults to `os.hostname()` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
144
src/inbound.ts
144
src/inbound.ts
@@ -67,6 +67,109 @@ export class FabricInbound {
|
|||||||
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
||||||
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
||||||
|
|
||||||
|
// Per-channel serial work queue. Every inbound socket message for a
|
||||||
|
// channel awaits the previous task for that same channel, so model
|
||||||
|
// turns never interleave. Map key = channelId; value is the tail of
|
||||||
|
// the chain (an in-flight promise the next task awaits).
|
||||||
|
//
|
||||||
|
// Why per-channel and not per-agent: a single agent may sit in
|
||||||
|
// several triage / general channels; we want each channel to flow at
|
||||||
|
// its own speed but the SAME channel's traffic to be strictly serial.
|
||||||
|
// For dm and discuss the queue also serialises but those traditionally
|
||||||
|
// had at-most-one-in-flight anyway via the turn engine.
|
||||||
|
private channelChains = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
|
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
|
||||||
|
// agent/status round-trip off the hot path for back-to-back triage
|
||||||
|
// messages. Short TTL because status flips are rare-but-meaningful.
|
||||||
|
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
|
||||||
|
private static readonly AGENT_STATUS_TTL_MS = 5_000;
|
||||||
|
|
||||||
|
// Triage messages that arrived while the on-duty agent wasn't on_call
|
||||||
|
// — sit here until either (a) the agent becomes on_call and the next
|
||||||
|
// triage arrival drains them, or (b) the gateway restarts (lost; ok
|
||||||
|
// because the underlying Fabric messages are persisted and re-fetched
|
||||||
|
// on agent reconnect's history sweep).
|
||||||
|
private pendingTriageGated: Array<{
|
||||||
|
agentId: string;
|
||||||
|
g: { nodeId: string; endpoint: string };
|
||||||
|
channelId: string;
|
||||||
|
m: FabricMessage;
|
||||||
|
session: FabricSession;
|
||||||
|
}> = [];
|
||||||
|
|
||||||
|
// Schedule `task` to run after every previous task on the same
|
||||||
|
// channel has completed. Returns the promise so callers can await
|
||||||
|
// their own result if they need to; the chain itself is fire-and-
|
||||||
|
// forget from the socket.on handler.
|
||||||
|
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
|
||||||
|
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
|
||||||
|
const next = prev.then(task).catch((err) => {
|
||||||
|
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
|
||||||
|
});
|
||||||
|
this.channelChains.set(channelId, next);
|
||||||
|
// Best-effort cleanup so the Map doesn't grow without bound for
|
||||||
|
// long-running gateways: drop the entry when the chain settles, but
|
||||||
|
// only if it's still the latest reference (newer enqueue may have
|
||||||
|
// overwritten it in the meantime).
|
||||||
|
void next.finally(() => {
|
||||||
|
if (this.channelChains.get(channelId) === next) {
|
||||||
|
this.channelChains.delete(channelId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hit HF backend to check whether `agentId` is currently on_call.
|
||||||
|
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
|
||||||
|
// on_call" — triage stays gated rather than risking a confused wake.
|
||||||
|
private async checkAgentOnCall(agentId: string): Promise<boolean> {
|
||||||
|
const cached = this.agentStatusCache.get(agentId);
|
||||||
|
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
|
||||||
|
return cached.onCall;
|
||||||
|
}
|
||||||
|
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
|
||||||
|
const claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim() || (await import('os')).hostname();
|
||||||
|
let onCall = false;
|
||||||
|
try {
|
||||||
|
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
|
||||||
|
const res = await fetch(url, {
|
||||||
|
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
|
||||||
|
});
|
||||||
|
if (res.ok) {
|
||||||
|
const data = (await res.json()) as { status?: string };
|
||||||
|
onCall = (data.status ?? '').toLowerCase() === 'on_call';
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
|
||||||
|
}
|
||||||
|
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
|
||||||
|
return onCall;
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIFO drain of all triage-gated messages for `agentId` (called when
|
||||||
|
// we just learned they're on_call). Each drained message is dispatched
|
||||||
|
// through its own channel chain so per-channel serial order is kept.
|
||||||
|
private async drainGatedFor(agentId: string): Promise<void> {
|
||||||
|
const keep: typeof this.pendingTriageGated = [];
|
||||||
|
const drain: typeof this.pendingTriageGated = [];
|
||||||
|
for (const item of this.pendingTriageGated) {
|
||||||
|
if (item.agentId === agentId) drain.push(item);
|
||||||
|
else keep.push(item);
|
||||||
|
}
|
||||||
|
if (drain.length === 0) return;
|
||||||
|
this.pendingTriageGated = keep;
|
||||||
|
for (const item of drain) {
|
||||||
|
this.log.info(
|
||||||
|
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
|
||||||
|
);
|
||||||
|
// Re-enqueue via the per-channel chain so ordering is preserved.
|
||||||
|
this.enqueueChannelTask(item.channelId, async () => {
|
||||||
|
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return a fresh guild access token for the agent, re-authenticating with
|
// Return a fresh guild access token for the agent, re-authenticating with
|
||||||
// the agent's Fabric API key when the cached session is stale. Falls back
|
// the agent's Fabric API key when the cached session is stale. Falls back
|
||||||
// to the connect-time session token if re-login fails.
|
// to the connect-time session token if re-login fails.
|
||||||
@@ -232,7 +335,33 @@ export class FabricInbound {
|
|||||||
if (this.seen.has(key)) return;
|
if (this.seen.has(key)) return;
|
||||||
this.seen.add(key);
|
this.seen.add(key);
|
||||||
if (this.seen.size > 5000) this.seen.clear();
|
if (this.seen.size > 5000) this.seen.clear();
|
||||||
void this.dispatch(agentId, g, channelId, m, session);
|
// Per-channel serial queue. Prevents concurrent model turns for
|
||||||
|
// the same channel — important for triage where a second wake
|
||||||
|
// arriving mid-reply would interleave with the in-flight one.
|
||||||
|
this.enqueueChannelTask(channelId, async () => {
|
||||||
|
// Triage on_call gate: if the on-duty agent isn't currently
|
||||||
|
// on_call per HF, don't dispatch yet — just sit on the
|
||||||
|
// per-channel queue. Subsequent triage messages will recheck;
|
||||||
|
// when the agent becomes on_call, the next arrival drains.
|
||||||
|
//
|
||||||
|
// Also handles: triage + wake=true must verify status before
|
||||||
|
// committing to a model turn. Non-triage and triage observer
|
||||||
|
// (wake=false) skip the gate.
|
||||||
|
if (m.xType === 'triage' && m.wakeup === true) {
|
||||||
|
const onCall = await this.checkAgentOnCall(agentId);
|
||||||
|
if (!onCall) {
|
||||||
|
this.log.info(
|
||||||
|
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
|
||||||
|
);
|
||||||
|
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Drain any previously-gated messages (FIFO) before this one,
|
||||||
|
// now that we know the agent is on_call.
|
||||||
|
await this.drainGatedFor(agentId);
|
||||||
|
}
|
||||||
|
await this.dispatch(agentId, g, channelId, m, session);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
socket.connect();
|
socket.connect();
|
||||||
this.sockets.push(socket);
|
this.sockets.push(socket);
|
||||||
@@ -337,6 +466,19 @@ export class FabricInbound {
|
|||||||
// any message that isn't the agent's own (already filtered above) is
|
// any message that isn't the agent's own (already filtered above) is
|
||||||
// always delivered to the model.
|
// always delivered to the model.
|
||||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||||
|
// Triage exception: non-wake messages (admin observer) MUST NOT
|
||||||
|
// enter the agent's session at all. The next time the agent
|
||||||
|
// wakes for a triage message, their context should contain only
|
||||||
|
// their own past wakeups + their own outgoing messages — never
|
||||||
|
// the observer-only chatter from other agents. For non-triage
|
||||||
|
// channels keep the legacy "record-as-history" so a later wake
|
||||||
|
// sees the full channel conversation.
|
||||||
|
if (m.xType === 'triage') {
|
||||||
|
this.log.info(
|
||||||
|
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||||
await core.channel.session.recordInboundSession({
|
await core.channel.session.recordInboundSession({
|
||||||
storePath,
|
storePath,
|
||||||
|
|||||||
Reference in New Issue
Block a user