Files
Fabric.OpenclawPlugin/dist/fabric/src/presence-sync.js
hzhang 9419d270e5 fix(presence-sync): tick mutex so setInterval overlap can't spawn parallel ticks
The presence-sync tick iterates accounts serially with await on each
agent-login + PUT round-trip — a single tick can easily run 20+s when
there are several accounts. setInterval(intervalMs) does NOT wait for
the previous tick to finish, so on a busy gateway the next tick fires
on top of a still-running one and two parallel iterations each PUT
the same agentId within ~10 ms. That tipped the guild backend's
first-time-insert race (separate fix in nav/Fabric.Backend.Guild) into
500s on prod (caught in t2 gateway 2026-05-25 23:23:35Z; 6 of 6 agents
showed paired log lines 4-10 ms apart for the same agent → idle).

Fix: a simple `inflight` boolean. tick() returns immediately if
already running; the next interval beat catches up. lastStatus !==
bridge.get gating already means status changes catch the next tick
anyway, so skipping a beat costs nothing the next beat won't fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:08 +01:00

140 lines
5.7 KiB
JavaScript

// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
export class PresenceSync {
logger;
client;
timer = null;
lastStatus = new Map(); // by agentId
accounts = new Map();
tokenCache = new Map(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
inflight = false;
constructor(logger, client) {
this.logger = logger;
this.client = client;
}
setAccounts(accounts) {
this.accounts.clear();
for (const a of accounts)
this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000) {
if (this.timer)
return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
async ensureGuildToken(acct) {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now)
return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
}
catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
async tick() {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight)
return;
this.inflight = true;
try {
await this.tickInner();
}
finally {
this.inflight = false;
}
}
async tickInner() {
const bridge = globalThis['__hfAgentStatus'];
if (!bridge || typeof bridge.get !== 'function')
return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status;
try {
status = await bridge.get(agentId);
}
catch {
continue;
}
if (!status)
continue;
if (this.lastStatus.get(agentId) === status)
continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken)
continue;
try {
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
}
else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401)
this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
}
catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}