1 Commits

Author SHA1 Message Date
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
2 changed files with 125 additions and 15 deletions

View File

@@ -14,6 +14,14 @@ export class FabricInbound {
accounts; accounts;
sockets = []; sockets = [];
seen = new Set(); seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via // Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale, // socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing. // so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -75,6 +83,9 @@ export class FabricInbound {
} }
} }
stop() { stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) for (const s of this.sockets)
s.disconnect(); s.disconnect();
this.sockets = []; this.sockets = [];
@@ -90,19 +101,61 @@ export class FabricInbound {
auth: { token: tok }, auth: { token: tok },
autoConnect: false, autoConnect: false,
}); });
const joinAll = async () => { // Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try { try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } }); freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
const channels = res.ok ? (await res.json()) : [];
for (const c of channels)
socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
} }
catch { catch {
/* best effort */ freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
} }
}; };
socket.on('connect', () => void joinAll()); socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => { socket.on('message.created', (m) => {
const channelId = m.channelId ?? ''; const channelId = m.channelId ?? '';
if (!channelId) if (!channelId)

View File

@@ -52,6 +52,14 @@ type FabricMessage = {
export class FabricInbound { export class FabricInbound {
private sockets: Socket[] = []; private sockets: Socket[] = [];
private seen = new Set<string>(); private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
private channelSyncTimers: NodeJS.Timeout[] = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via // Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale, // socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing. // so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -119,6 +127,8 @@ export class FabricInbound {
} }
stop(): void { stop(): void {
for (const t of this.channelSyncTimers) clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect(); for (const s of this.sockets) s.disconnect();
this.sockets = []; this.sockets = [];
} }
@@ -133,20 +143,67 @@ export class FabricInbound {
auth: { token: tok }, auth: { token: tok },
autoConnect: false, autoConnect: false,
}); });
const joinAll = async () => { // Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set<string>();
const syncChannels = async (kind: 'initial' | 'resync') => {
let freshTok: string | undefined;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
} catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try { try {
const res = await fetch( const res = await fetch(
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, `${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
{ headers: { authorization: `Bearer ${tok}` } }, { headers: { authorization: `Bearer ${authTok}` } },
); );
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : []; if (!res.ok) return;
for (const c of channels) socket.emit('join_channel', { channelId: c.id }); const channels = (await res.json()) as Array<{ id: string }>;
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
);
} else if (added > 0 || removed > 0) {
this.log.info(
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
);
}
} catch { } catch {
/* best effort */ /* best effort — next tick will retry */
} }
}; };
socket.on('connect', () => void joinAll()); socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m: FabricMessage) => { socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? ''; const channelId = m.channelId ?? '';
if (!channelId) return; if (!channelId) return;