/** * presence-sync — read each connected agent's HF status (via the * cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by * HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild * `PUT /api/agents/:userId/presence` so the backend can apply * busy-discard on `announce`-type channel deliveries. * * Push model: we only PUT when an agent's status actually changes * (since the last push). The HF-side accessor has its own TTL cache * to absorb the every-30s polling. * * Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per * app.module.js) which expects `Authorization: Bearer ` * — NOT the agent's fabricApiKey directly. So before each PUT we do * a fresh agent-login (or reuse a cached token if still within its * 15-min JWT TTL) and pull the guildAccessToken matching the target * guild. Status changes are rare enough that login overhead is fine. * * If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop * is a no-op — Fabric backend defaults presence to 'unknown' which is * treated as not-busy. Announce-channel delivery still works; busy * filtering simply doesn't kick in. */ import type { FabricClient } from './fabric-client.js'; type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline'; type Bridge = { get(agentId: string): Promise }; type Logger = { info: (m: string) => void; warn: (m: string) => void }; export interface PresenceSyncAccount { agentId: string; fabricUserId: string; // the agent's Fabric Center user id (UUID) guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/ guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick fabricApiKey: string; // existing per-account key (used for agent-login) } // Guild access JWTs expire every 900s. Refresh ~2 min early to stay // safely inside the window even if a tick runs late. const TOKEN_TTL_MS = (15 - 2) * 60 * 1000; interface CachedToken { token: string; expiresAt: number; // epoch ms } export class PresenceSync { private timer: ReturnType | null = null; private readonly lastStatus = new Map(); // by agentId private readonly accounts = new Map(); private readonly tokenCache = new Map(); // by agentId // Mutex flag: a tick iterates accounts serially with `await` on each // agent-login + PUT round-trip, so a single tick can easily run 20+s // when there are many accounts. setInterval(intervalMs) does NOT wait // for the previous tick to finish — without this guard the next tick // fires on top of a still-running one and two parallel iterations // PUT the same agentId within milliseconds. That tipped the backend's // first-time-insert race (separate fix in Fabric.Backend.Guild) into // 500s on prod. Guarded ticks just skip a beat instead. private inflight = false; constructor(private readonly logger: Logger, private readonly client: FabricClient) {} setAccounts(accounts: PresenceSyncAccount[]): void { this.accounts.clear(); for (const a of accounts) this.accounts.set(a.agentId, a); } start(intervalMs = 30_000): void { if (this.timer) return; this.timer = setInterval(() => { this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`)); }, intervalMs); // run once immediately so initial state lands fast void this.tick(); } stop(): void { if (this.timer) { clearInterval(this.timer); this.timer = null; } } /** * Fetch a fresh guildAccessToken for `acct`, caching it under the * agentId until just before its JWT expiry. Returns null on login * failure or if the session has no matching guild — caller logs + * skips the PUT. */ private async ensureGuildToken(acct: PresenceSyncAccount): Promise { const now = Date.now(); const cached = this.tokenCache.get(acct.agentId); if (cached && cached.expiresAt > now) return cached.token; let session; try { session = await this.client.agentLogin(acct.fabricApiKey); } catch (err) { this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`); return null; } const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId); if (!entry?.token) { this.logger.warn( `fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`, ); return null; } this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS }); return entry.token; } private async tick(): Promise { // Mutex: see the `inflight` field declaration for the why. Drop // overlapping ticks rather than letting them run concurrently — // status is gated by `lastStatus !== bridge.get`, so skipping a // beat costs nothing the next beat won't catch. if (this.inflight) return; this.inflight = true; try { await this.tickInner(); } finally { this.inflight = false; } } private async tickInner(): Promise { const bridge = (globalThis as Record)['__hfAgentStatus'] as Bridge | undefined; if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip for (const [agentId, acct] of this.accounts) { let status: HfStatus | undefined; try { status = await bridge.get(agentId); } catch { continue; } if (!status) continue; if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT const guildToken = await this.ensureGuildToken(acct); if (!guildToken) continue; try { // Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global // APP_GUARD) requires `Authorization: Bearer ` // — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent // x-api-key and got 401 "missing bearer token" forever. The /api // prefix is required because the guild backend sets a global // 'api' prefix in main.ts setGlobalPrefix('api'). const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`; const res = await fetch(url, { method: 'PUT', headers: { 'content-type': 'application/json', authorization: `Bearer ${guildToken}`, }, body: JSON.stringify({ status, source: 'hf-plugin' }), }); if (res.ok) { this.lastStatus.set(agentId, status); this.logger.info(`fabric: presence-sync ${agentId} → ${status}`); } else { // 401 here usually means the cached token went stale unexpectedly // (server-side rotation or clock skew) — drop the cache so the // next tick re-logs-in. if (res.status === 401) this.tokenCache.delete(agentId); this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`); } } catch (err) { this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`); } } } }