Merge pull request 'fix: dynamically sync inbound channel subscriptions' (#1) from fix/inbound-dynamic-channel-sync into main
This commit is contained in:
69
dist/fabric/src/inbound.js
vendored
69
dist/fabric/src/inbound.js
vendored
@@ -14,6 +14,14 @@ export class FabricInbound {
|
||||
accounts;
|
||||
sockets = [];
|
||||
seen = new Set();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
channelSyncTimers = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -75,6 +83,9 @@ export class FabricInbound {
|
||||
}
|
||||
}
|
||||
stop() {
|
||||
for (const t of this.channelSyncTimers)
|
||||
clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets)
|
||||
s.disconnect();
|
||||
this.sockets = [];
|
||||
@@ -90,19 +101,61 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set();
|
||||
const syncChannels = async (kind) => {
|
||||
let freshTok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
|
||||
const channels = res.ok ? (await res.json()) : [];
|
||||
for (const c of channels)
|
||||
socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
}
|
||||
catch {
|
||||
/* best effort */
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
|
||||
if (!res.ok)
|
||||
return;
|
||||
const channels = (await res.json());
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
|
||||
}
|
||||
else if (added > 0 || removed > 0) {
|
||||
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
|
||||
}
|
||||
}
|
||||
catch {
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId)
|
||||
|
||||
@@ -52,6 +52,14 @@ type FabricMessage = {
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
private seen = new Set<string>();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
private channelSyncTimers: NodeJS.Timeout[] = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -119,6 +127,8 @@ export class FabricInbound {
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const t of this.channelSyncTimers) clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
}
|
||||
@@ -133,20 +143,67 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set<string>();
|
||||
const syncChannels = async (kind: 'initial' | 'resync') => {
|
||||
let freshTok: string | undefined;
|
||||
try {
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
} catch {
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(
|
||||
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
|
||||
{ headers: { authorization: `Bearer ${tok}` } },
|
||||
{ headers: { authorization: `Bearer ${authTok}` } },
|
||||
);
|
||||
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
|
||||
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
if (!res.ok) return;
|
||||
const channels = (await res.json()) as Array<{ id: string }>;
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
|
||||
);
|
||||
} else if (added > 0 || removed > 0) {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
/* best effort */
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
const syncTimer = setInterval(
|
||||
() => void syncChannels('resync'),
|
||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||
);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
|
||||
Reference in New Issue
Block a user