1 Commits

Author SHA1 Message Date
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
2 changed files with 1 additions and 212 deletions

View File

@@ -154,28 +154,6 @@ export class FabricInbound {
joined.clear(); joined.clear();
void syncChannels('initial'); void syncChannels('initial');
}); });
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt) => {
const id = evt?.channelId;
if (!id || joined.has(id))
return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt) => {
const id = evt?.channelId;
if (!id || !joined.has(id))
return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS); const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer); this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => { socket.on('message.created', (m) => {

View File

@@ -67,136 +67,6 @@ export class FabricInbound {
private tokenCache = new Map<string, { session: FabricSession; at: number }>(); private tokenCache = new Map<string, { session: FabricSession; at: number }>();
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000; private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
private channelChains = new Map<string, Promise<void>>();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
private static readonly AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
private pendingTriageGated: Array<{
agentId: string;
g: { nodeId: string; endpoint: string };
channelId: string;
m: FabricMessage;
session: FabricSession;
}> = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
private async checkAgentOnCall(agentId: string): Promise<boolean> {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg as {
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
};
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
} catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json()) as { status?: string };
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
} catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
private async drainGatedFor(agentId: string): Promise<void> {
const keep: typeof this.pendingTriageGated = [];
const drain: typeof this.pendingTriageGated = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId) drain.push(item);
else keep.push(item);
}
if (drain.length === 0) return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with // Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back // the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails. // to the connect-time session token if re-login fails.
@@ -329,26 +199,6 @@ export class FabricInbound {
joined.clear(); joined.clear();
void syncChannels('initial'); void syncChannels('initial');
}); });
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || joined.has(id)) return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || !joined.has(id)) return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval( const syncTimer = setInterval(
() => void syncChannels('resync'), () => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS, FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
@@ -362,33 +212,7 @@ export class FabricInbound {
if (this.seen.has(key)) return; if (this.seen.has(key)) return;
this.seen.add(key); this.seen.add(key);
if (this.seen.size > 5000) this.seen.clear(); if (this.seen.size > 5000) this.seen.clear();
// Per-channel serial queue. Prevents concurrent model turns for void this.dispatch(agentId, g, channelId, m, session);
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
}); });
socket.connect(); socket.connect();
this.sockets.push(socket); this.sockets.push(socket);
@@ -493,19 +317,6 @@ export class FabricInbound {
// any message that isn't the agent's own (already filtered above) is // any message that isn't the agent's own (already filtered above) is
// always delivered to the model. // always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) { if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx); const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({ await core.channel.session.recordInboundSession({
storePath, storePath,