4 Commits

Author SHA1 Message Date
7dc70522d1 fix(inbound): refresh socket.io auth on (re)connect via callback
Backend issues short-lived guildAccessToken (TTL=900s). The previous
`auth: { token: tok }` shape captured the JWT once in connectAgent's
closure: after socket.io's auto-reconnect the backend kept getting the
same expired JWT and silently rejected the handshake at the application
layer (RealtimeGateway logs 'socket rejected: <id>'). The client's
'connect' event still fired (TCP succeeded) so the plugin happily ran
the channel-resync, emitted join_channel into the void, and logged
'joined N channel(s)' while the backend was actually broadcasting
message.created to a room with zero subscribers. End-user symptom:
DMs/group messages to agents silently dropped 15 min after gateway
start, with no error anywhere on the agent side.

Switch to the callback form, which socket.io re-evaluates on every
(re)connect — same call site we already use for the HTTP path via
freshGuildToken/tokenCache.

Verified in sim (commit 2acb084 + this patch):
1. Connect new DM channel + post msg -> dispatch + reply ✓
2. `docker restart fabric-backend-guild` to force socket disconnect
3. Plugin reconnects automatically and logs
   'fabric: agent recruiter joined 12 channel(s) on sim-guild-1' ✓
   (without the fix this reconnect was silently rejected; sim used to
    log 'WARN socket rejected: <id>' on the guild backend)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 13:50:24 +01:00
h z
2acb084ee4 fix(presence-sync): tick mutex (#8) 2026-05-26 02:06:21 +00:00
9419d270e5 fix(presence-sync): tick mutex so setInterval overlap can't spawn parallel ticks
The presence-sync tick iterates accounts serially with await on each
agent-login + PUT round-trip — a single tick can easily run 20+s when
there are several accounts. setInterval(intervalMs) does NOT wait for
the previous tick to finish, so on a busy gateway the next tick fires
on top of a still-running one and two parallel iterations each PUT
the same agentId within ~10 ms. That tipped the guild backend's
first-time-insert race (separate fix in nav/Fabric.Backend.Guild) into
500s on prod (caught in t2 gateway 2026-05-25 23:23:35Z; 6 of 6 agents
showed paired log lines 4-10 ms apart for the same agent → idle).

Fix: a simple `inflight` boolean. tick() returns immediately if
already running; the next interval beat catches up. lastStatus !==
bridge.get gating already means status changes catch the next tick
anyway, so skipping a beat costs nothing the next beat won't fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:08 +01:00
h z
79b29db26c fix(presence-sync): /api prefix + Bearer guildAccessToken (#7) 2026-05-25 23:17:45 +00:00
4 changed files with 84 additions and 2 deletions

View File

@@ -261,9 +261,26 @@ export class FabricInbound {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) if (!tok)
continue; continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, { const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'], transports: ['websocket'],
auth: { token: tok }, auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false, autoConnect: false,
}); });
// Tracked socket.io rooms for this (agent, guild). The initial fetch // Tracked socket.io rooms for this (agent, guild). The initial fetch

View File

@@ -8,6 +8,15 @@ export class PresenceSync {
lastStatus = new Map(); // by agentId lastStatus = new Map(); // by agentId
accounts = new Map(); accounts = new Map();
tokenCache = new Map(); // by agentId tokenCache = new Map(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
inflight = false;
constructor(logger, client) { constructor(logger, client) {
this.logger = logger; this.logger = logger;
this.client = client; this.client = client;
@@ -60,6 +69,21 @@ export class PresenceSync {
return entry.token; return entry.token;
} }
async tick() { async tick() {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight)
return;
this.inflight = true;
try {
await this.tickInner();
}
finally {
this.inflight = false;
}
}
async tickInner() {
const bridge = globalThis['__hfAgentStatus']; const bridge = globalThis['__hfAgentStatus'];
if (!bridge || typeof bridge.get !== 'function') if (!bridge || typeof bridge.get !== 'function')
return; // HF plugin not loaded — skip return; // HF plugin not loaded — skip

View File

@@ -325,9 +325,26 @@ export class FabricInbound {
for (const g of session.guilds) { for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue; if (!tok) continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, { const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'], transports: ['websocket'],
auth: { token: tok }, auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false, autoConnect: false,
}); });
// Tracked socket.io rooms for this (agent, guild). The initial fetch // Tracked socket.io rooms for this (agent, guild). The initial fetch

View File

@@ -50,6 +50,16 @@ export class PresenceSync {
private readonly accounts = new Map<string, PresenceSyncAccount>(); private readonly accounts = new Map<string, PresenceSyncAccount>();
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
private inflight = false;
constructor(private readonly logger: Logger, private readonly client: FabricClient) {} constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
setAccounts(accounts: PresenceSyncAccount[]): void { setAccounts(accounts: PresenceSyncAccount[]): void {
@@ -103,6 +113,20 @@ export class PresenceSync {
} }
private async tick(): Promise<void> { private async tick(): Promise<void> {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight) return;
this.inflight = true;
try {
await this.tickInner();
} finally {
this.inflight = false;
}
}
private async tickInner(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined; const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip