diff --git a/dist/fabric/src/inbound.js b/dist/fabric/src/inbound.js index 570b04e..a313823 100644 --- a/dist/fabric/src/inbound.js +++ b/dist/fabric/src/inbound.js @@ -14,6 +14,14 @@ export class FabricInbound { accounts; sockets = []; seen = new Set(); + // Timers that periodically re-sync channel membership per (agent, guild). + // Without this, the agent's socket.io subscriptions are a snapshot taken + // at connect time — any channel the agent joins later (e.g. a fresh DM + // created by another user) is unreachable until the gateway restarts. + channelSyncTimers = []; + // Resync cadence. Backend doesn't push a `channel.joined` event, so we + // poll. 60s keeps the lag bounded without hammering the backend. + static CHANNEL_SYNC_INTERVAL_MS = 60_000; // Guild access tokens are short-lived (~15 min). The socket survives via // socket.io reconnect, but the token captured at connect time goes stale, // so HTTP calls (attachment download, posting the reply) start 401ing. @@ -75,6 +83,9 @@ export class FabricInbound { } } stop() { + for (const t of this.channelSyncTimers) + clearInterval(t); + this.channelSyncTimers = []; for (const s of this.sockets) s.disconnect(); this.sockets = []; @@ -90,19 +101,61 @@ export class FabricInbound { auth: { token: tok }, autoConnect: false, }); - const joinAll = async () => { + // Tracked socket.io rooms for this (agent, guild). The initial fetch + // on `connect` seeds it; the periodic resync diffs against it so we + // only emit `join_channel` for genuinely new channels (and + // `leave_channel` for ones the agent is no longer in). + const joined = new Set(); + const syncChannels = async (kind) => { + let freshTok; try { - const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } }); - const channels = res.ok ? (await res.json()) : []; - for (const c of channels) - socket.emit('join_channel', { channelId: c.id }); - this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); + freshTok = await this.freshGuildToken(agentId, g.nodeId, session); } catch { - /* best effort */ + freshTok = tok; + } + const authTok = freshTok ?? tok; + try { + const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } }); + if (!res.ok) + return; + const channels = (await res.json()); + const current = new Set(channels.map((c) => c.id)); + let added = 0; + let removed = 0; + for (const id of current) { + if (!joined.has(id)) { + socket.emit('join_channel', { channelId: id }); + joined.add(id); + added++; + } + } + for (const id of [...joined]) { + if (!current.has(id)) { + socket.emit('leave_channel', { channelId: id }); + joined.delete(id); + removed++; + } + } + if (kind === 'initial') { + this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`); + } + else if (added > 0 || removed > 0) { + this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`); + } + } + catch { + /* best effort — next tick will retry */ } }; - socket.on('connect', () => void joinAll()); + socket.on('connect', () => { + // On every (re)connect the server forgets prior subscriptions, so + // reset our local view and seed from a fresh fetch. + joined.clear(); + void syncChannels('initial'); + }); + const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS); + this.channelSyncTimers.push(syncTimer); socket.on('message.created', (m) => { const channelId = m.channelId ?? ''; if (!channelId) diff --git a/src/inbound.ts b/src/inbound.ts index 435fab1..267527f 100644 --- a/src/inbound.ts +++ b/src/inbound.ts @@ -52,6 +52,14 @@ type FabricMessage = { export class FabricInbound { private sockets: Socket[] = []; private seen = new Set(); + // Timers that periodically re-sync channel membership per (agent, guild). + // Without this, the agent's socket.io subscriptions are a snapshot taken + // at connect time — any channel the agent joins later (e.g. a fresh DM + // created by another user) is unreachable until the gateway restarts. + private channelSyncTimers: NodeJS.Timeout[] = []; + // Resync cadence. Backend doesn't push a `channel.joined` event, so we + // poll. 60s keeps the lag bounded without hammering the backend. + private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000; // Guild access tokens are short-lived (~15 min). The socket survives via // socket.io reconnect, but the token captured at connect time goes stale, // so HTTP calls (attachment download, posting the reply) start 401ing. @@ -119,6 +127,8 @@ export class FabricInbound { } stop(): void { + for (const t of this.channelSyncTimers) clearInterval(t); + this.channelSyncTimers = []; for (const s of this.sockets) s.disconnect(); this.sockets = []; } @@ -133,20 +143,67 @@ export class FabricInbound { auth: { token: tok }, autoConnect: false, }); - const joinAll = async () => { + // Tracked socket.io rooms for this (agent, guild). The initial fetch + // on `connect` seeds it; the periodic resync diffs against it so we + // only emit `join_channel` for genuinely new channels (and + // `leave_channel` for ones the agent is no longer in). + const joined = new Set(); + const syncChannels = async (kind: 'initial' | 'resync') => { + let freshTok: string | undefined; + try { + freshTok = await this.freshGuildToken(agentId, g.nodeId, session); + } catch { + freshTok = tok; + } + const authTok = freshTok ?? tok; try { const res = await fetch( `${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, - { headers: { authorization: `Bearer ${tok}` } }, + { headers: { authorization: `Bearer ${authTok}` } }, ); - const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : []; - for (const c of channels) socket.emit('join_channel', { channelId: c.id }); - this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); + if (!res.ok) return; + const channels = (await res.json()) as Array<{ id: string }>; + const current = new Set(channels.map((c) => c.id)); + let added = 0; + let removed = 0; + for (const id of current) { + if (!joined.has(id)) { + socket.emit('join_channel', { channelId: id }); + joined.add(id); + added++; + } + } + for (const id of [...joined]) { + if (!current.has(id)) { + socket.emit('leave_channel', { channelId: id }); + joined.delete(id); + removed++; + } + } + if (kind === 'initial') { + this.log.info( + `fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`, + ); + } else if (added > 0 || removed > 0) { + this.log.info( + `fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`, + ); + } } catch { - /* best effort */ + /* best effort — next tick will retry */ } }; - socket.on('connect', () => void joinAll()); + socket.on('connect', () => { + // On every (re)connect the server forgets prior subscriptions, so + // reset our local view and seed from a fresh fetch. + joined.clear(); + void syncChannels('initial'); + }); + const syncTimer = setInterval( + () => void syncChannels('resync'), + FabricInbound.CHANNEL_SYNC_INTERVAL_MS, + ); + this.channelSyncTimers.push(syncTimer); socket.on('message.created', (m: FabricMessage) => { const channelId = m.channelId ?? ''; if (!channelId) return;