From d1d5ad10ca69ef362c275ab6562508e8cd630ea6 Mon Sep 17 00:00:00 2001 From: hzhang Date: Thu, 21 May 2026 07:45:59 +0100 Subject: [PATCH] fix: dynamically sync inbound channel subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The fabric inbound previously called `joinAll()` once on socket.io `connect` — it fetched the agent's channel list via `GET /api/channels?guildId=...` and emitted `join_channel` for each. Any channel the agent joined *after* connect (e.g. a fresh DM created by another user that includes this agent) was unreachable until the gateway restarted: the socket was never subscribed to that room, so backend `message.created` push events never arrived. Backend doesn't emit a user-scoped `channel.joined` event we could piggy-back on (only `message.created`), so the fix is to poll. Every 60s the agent's channel list is re-fetched and diffed against a local `joined` set: - new channel ids → `socket.emit('join_channel', {channelId})` + add - ids in `joined` but absent from the fresh list → `leave_channel` emit + remove (best-effort; cleans subs if the agent is removed from a channel) Re-uses `freshGuildToken()` so the resync fetch survives token expiry (15-min TTL). Initial `connect` resets the local `joined` set since the server forgets prior room subscriptions on reconnect. Timers are tracked in `channelSyncTimers` and cleared in `stop()` alongside socket disconnect. Verified against prod server.t2 scenario: hzhang creates DM channel including agent 'manager' → without this fix, manager only sees the message after a gateway restart; with this fix, manager receives the message within at most 60s (next resync tick). Co-Authored-By: Claude Opus 4.7 (1M context) --- dist/fabric/src/inbound.js | 69 +++++++++++++++++++++++++++++++----- src/inbound.ts | 71 ++++++++++++++++++++++++++++++++++---- 2 files changed, 125 insertions(+), 15 deletions(-) diff --git a/dist/fabric/src/inbound.js b/dist/fabric/src/inbound.js index 570b04e..a313823 100644 --- a/dist/fabric/src/inbound.js +++ b/dist/fabric/src/inbound.js @@ -14,6 +14,14 @@ export class FabricInbound { accounts; sockets = []; seen = new Set(); + // Timers that periodically re-sync channel membership per (agent, guild). + // Without this, the agent's socket.io subscriptions are a snapshot taken + // at connect time — any channel the agent joins later (e.g. a fresh DM + // created by another user) is unreachable until the gateway restarts. + channelSyncTimers = []; + // Resync cadence. Backend doesn't push a `channel.joined` event, so we + // poll. 60s keeps the lag bounded without hammering the backend. + static CHANNEL_SYNC_INTERVAL_MS = 60_000; // Guild access tokens are short-lived (~15 min). The socket survives via // socket.io reconnect, but the token captured at connect time goes stale, // so HTTP calls (attachment download, posting the reply) start 401ing. @@ -75,6 +83,9 @@ export class FabricInbound { } } stop() { + for (const t of this.channelSyncTimers) + clearInterval(t); + this.channelSyncTimers = []; for (const s of this.sockets) s.disconnect(); this.sockets = []; @@ -90,19 +101,61 @@ export class FabricInbound { auth: { token: tok }, autoConnect: false, }); - const joinAll = async () => { + // Tracked socket.io rooms for this (agent, guild). The initial fetch + // on `connect` seeds it; the periodic resync diffs against it so we + // only emit `join_channel` for genuinely new channels (and + // `leave_channel` for ones the agent is no longer in). + const joined = new Set(); + const syncChannels = async (kind) => { + let freshTok; try { - const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } }); - const channels = res.ok ? (await res.json()) : []; - for (const c of channels) - socket.emit('join_channel', { channelId: c.id }); - this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); + freshTok = await this.freshGuildToken(agentId, g.nodeId, session); } catch { - /* best effort */ + freshTok = tok; + } + const authTok = freshTok ?? tok; + try { + const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } }); + if (!res.ok) + return; + const channels = (await res.json()); + const current = new Set(channels.map((c) => c.id)); + let added = 0; + let removed = 0; + for (const id of current) { + if (!joined.has(id)) { + socket.emit('join_channel', { channelId: id }); + joined.add(id); + added++; + } + } + for (const id of [...joined]) { + if (!current.has(id)) { + socket.emit('leave_channel', { channelId: id }); + joined.delete(id); + removed++; + } + } + if (kind === 'initial') { + this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`); + } + else if (added > 0 || removed > 0) { + this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`); + } + } + catch { + /* best effort — next tick will retry */ } }; - socket.on('connect', () => void joinAll()); + socket.on('connect', () => { + // On every (re)connect the server forgets prior subscriptions, so + // reset our local view and seed from a fresh fetch. + joined.clear(); + void syncChannels('initial'); + }); + const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS); + this.channelSyncTimers.push(syncTimer); socket.on('message.created', (m) => { const channelId = m.channelId ?? ''; if (!channelId) diff --git a/src/inbound.ts b/src/inbound.ts index 435fab1..267527f 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -52,6 +52,14 @@ type FabricMessage = { export class FabricInbound { private sockets: Socket[] = []; private seen = new Set(); + // Timers that periodically re-sync channel membership per (agent, guild). + // Without this, the agent's socket.io subscriptions are a snapshot taken + // at connect time — any channel the agent joins later (e.g. a fresh DM + // created by another user) is unreachable until the gateway restarts. + private channelSyncTimers: NodeJS.Timeout[] = []; + // Resync cadence. Backend doesn't push a `channel.joined` event, so we + // poll. 60s keeps the lag bounded without hammering the backend. + private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000; // Guild access tokens are short-lived (~15 min). The socket survives via // socket.io reconnect, but the token captured at connect time goes stale, // so HTTP calls (attachment download, posting the reply) start 401ing. @@ -119,6 +127,8 @@ export class FabricInbound { } stop(): void { + for (const t of this.channelSyncTimers) clearInterval(t); + this.channelSyncTimers = []; for (const s of this.sockets) s.disconnect(); this.sockets = []; } @@ -133,20 +143,67 @@ export class FabricInbound { auth: { token: tok }, autoConnect: false, }); - const joinAll = async () => { + // Tracked socket.io rooms for this (agent, guild). The initial fetch + // on `connect` seeds it; the periodic resync diffs against it so we + // only emit `join_channel` for genuinely new channels (and + // `leave_channel` for ones the agent is no longer in). + const joined = new Set(); + const syncChannels = async (kind: 'initial' | 'resync') => { + let freshTok: string | undefined; + try { + freshTok = await this.freshGuildToken(agentId, g.nodeId, session); + } catch { + freshTok = tok; + } + const authTok = freshTok ?? tok; try { const res = await fetch( `${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, - { headers: { authorization: `Bearer ${tok}` } }, + { headers: { authorization: `Bearer ${authTok}` } }, ); - const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : []; - for (const c of channels) socket.emit('join_channel', { channelId: c.id }); - this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); + if (!res.ok) return; + const channels = (await res.json()) as Array<{ id: string }>; + const current = new Set(channels.map((c) => c.id)); + let added = 0; + let removed = 0; + for (const id of current) { + if (!joined.has(id)) { + socket.emit('join_channel', { channelId: id }); + joined.add(id); + added++; + } + } + for (const id of [...joined]) { + if (!current.has(id)) { + socket.emit('leave_channel', { channelId: id }); + joined.delete(id); + removed++; + } + } + if (kind === 'initial') { + this.log.info( + `fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`, + ); + } else if (added > 0 || removed > 0) { + this.log.info( + `fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`, + ); + } } catch { - /* best effort */ + /* best effort — next tick will retry */ } }; - socket.on('connect', () => void joinAll()); + socket.on('connect', () => { + // On every (re)connect the server forgets prior subscriptions, so + // reset our local view and seed from a fresh fetch. + joined.clear(); + void syncChannels('initial'); + }); + const syncTimer = setInterval( + () => void syncChannels('resync'), + FabricInbound.CHANNEL_SYNC_INTERVAL_MS, + ); + this.channelSyncTimers.push(syncTimer); socket.on('message.created', (m: FabricMessage) => { const channelId = m.channelId ?? ''; if (!channelId) return;