1 Commits

Author SHA1 Message Date
d1d5ad10ca fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
2 changed files with 0 additions and 42 deletions

View File

@@ -154,28 +154,6 @@ export class FabricInbound {
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt) => {
const id = evt?.channelId;
if (!id || joined.has(id))
return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt) => {
const id = evt?.channelId;
if (!id || !joined.has(id))
return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {

View File

@@ -199,26 +199,6 @@ export class FabricInbound {
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || joined.has(id)) return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt: { channelId?: string }) => {
const id = evt?.channelId;
if (!id || !joined.has(id)) return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(
() => void syncChannels('resync'),
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,