fix: dynamically sync inbound channel subscriptions #1

Merged
hzhang merged 1 commits from fix/inbound-dynamic-channel-sync into main 2026-05-21 06:56:49 +00:00
2 changed files with 125 additions and 15 deletions
Showing only changes of commit 7d90ae4f2b - Show all commits

View File

@@ -14,6 +14,14 @@ export class FabricInbound {
accounts;
sockets = [];
seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -75,6 +83,9 @@ export class FabricInbound {
}
}
stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets)
s.disconnect();
this.sockets = [];
@@ -90,19 +101,61 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
const channels = res.ok ? (await res.json()) : [];
for (const c of channels)
socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
}
catch {
/* best effort */
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)

View File

@@ -52,6 +52,14 @@ type FabricMessage = {
export class FabricInbound {
private sockets: Socket[] = [];
private seen = new Set<string>();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
private channelSyncTimers: NodeJS.Timeout[] = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
@@ -119,6 +127,8 @@ export class FabricInbound {
}
stop(): void {
for (const t of this.channelSyncTimers) clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets) s.disconnect();
this.sockets = [];
}
@@ -133,20 +143,67 @@ export class FabricInbound {
auth: { token: tok },
autoConnect: false,
});
const joinAll = async () => {
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set<string>();
const syncChannels = async (kind: 'initial' | 'resync') => {
let freshTok: string | undefined;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
} catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
{ headers: { authorization: `Bearer ${tok}` } },
{ headers: { authorization: `Bearer ${authTok}` } },
);
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
if (!res.ok) return;
const channels = (await res.json()) as Array<{ id: string }>;
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
);
} else if (added > 0 || removed > 0) {
this.log.info(
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
);
}
} catch {
/* best effort */
/* best effort — next tick will retry */
}
};
socket.on('connect', () => void joinAll());
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;