Compare commits
7 Commits
fix/presen
...
b659dadb9e
| Author | SHA1 | Date | |
|---|---|---|---|
| b659dadb9e | |||
| 20e55849eb | |||
| d47d3467df | |||
| 7dc70522d1 | |||
| 2acb084ee4 | |||
| 9419d270e5 | |||
| 79b29db26c |
13
dist/fabric/src/channel.js
vendored
13
dist/fabric/src/channel.js
vendored
@@ -117,6 +117,19 @@ export const fabricChannelPlugin = createChatChannelPlugin({
|
|||||||
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId),
|
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg, accountId),
|
||||||
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg),
|
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg),
|
||||||
isConfigured: (account) => Boolean(account.fabricApiKey),
|
isConfigured: (account) => Boolean(account.fabricApiKey),
|
||||||
|
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
|
||||||
|
// by the channel-health-monitor — defaults `configured: true` when the
|
||||||
|
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
|
||||||
|
// in server-channels). Without this, fabric's synthetic 'default'
|
||||||
|
// account (returned by listFabricAccountIds when channels.fabric.accounts
|
||||||
|
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
|
||||||
|
// running:false} → isManagedAccount=true → not-running → restart loop
|
||||||
|
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
|
||||||
|
// Mirror isConfigured here so the snapshot truthfully reports false for
|
||||||
|
// any account without a fabricApiKey.
|
||||||
|
describeAccount: (account) => ({
|
||||||
|
configured: Boolean(account.fabricApiKey),
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
// Minimal setup adapter: Fabric is configured directly under
|
// Minimal setup adapter: Fabric is configured directly under
|
||||||
// channels.fabric.* (no interactive wizard). applyAccountConfig is the
|
// channels.fabric.* (no interactive wizard). applyAccountConfig is the
|
||||||
|
|||||||
19
dist/fabric/src/inbound.js
vendored
19
dist/fabric/src/inbound.js
vendored
@@ -261,9 +261,26 @@ export class FabricInbound {
|
|||||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||||
if (!tok)
|
if (!tok)
|
||||||
continue;
|
continue;
|
||||||
|
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
|
||||||
|
// on every (re)connect. The single-shot `auth: { token: tok }` shape
|
||||||
|
// captured the token in closure: after socket.io's silent auto-reconnect
|
||||||
|
// the backend got the same JWT that expired ~15 min into the session
|
||||||
|
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
|
||||||
|
// the application layer. The client's `connect` event still fired (TCP
|
||||||
|
// succeeded), so the plugin happily ran the channel-resync, emitted
|
||||||
|
// `join_channel` into the void, and logged "joined N channel(s)" while
|
||||||
|
// the backend was actually broadcasting message.created to a room with
|
||||||
|
// zero subscribers. End user symptom: DMs to agents silently dropped.
|
||||||
const socket = io(`${g.endpoint}/realtime`, {
|
const socket = io(`${g.endpoint}/realtime`, {
|
||||||
transports: ['websocket'],
|
transports: ['websocket'],
|
||||||
auth: { token: tok },
|
auth: (cb) => {
|
||||||
|
// Best-effort fresh token; on transient failure fall back to the
|
||||||
|
// last known good one. tokenCache also keeps HTTP calls (attachment
|
||||||
|
// download / reply post) from 401'ing in the same window.
|
||||||
|
this.freshGuildToken(agentId, g.nodeId, session)
|
||||||
|
.then((fresh) => cb({ token: fresh ?? tok }))
|
||||||
|
.catch(() => cb({ token: tok }));
|
||||||
|
},
|
||||||
autoConnect: false,
|
autoConnect: false,
|
||||||
});
|
});
|
||||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||||
|
|||||||
24
dist/fabric/src/presence-sync.js
vendored
24
dist/fabric/src/presence-sync.js
vendored
@@ -8,6 +8,15 @@ export class PresenceSync {
|
|||||||
lastStatus = new Map(); // by agentId
|
lastStatus = new Map(); // by agentId
|
||||||
accounts = new Map();
|
accounts = new Map();
|
||||||
tokenCache = new Map(); // by agentId
|
tokenCache = new Map(); // by agentId
|
||||||
|
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||||
|
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||||
|
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||||
|
// for the previous tick to finish — without this guard the next tick
|
||||||
|
// fires on top of a still-running one and two parallel iterations
|
||||||
|
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||||
|
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||||
|
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||||
|
inflight = false;
|
||||||
constructor(logger, client) {
|
constructor(logger, client) {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
@@ -60,6 +69,21 @@ export class PresenceSync {
|
|||||||
return entry.token;
|
return entry.token;
|
||||||
}
|
}
|
||||||
async tick() {
|
async tick() {
|
||||||
|
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||||
|
// overlapping ticks rather than letting them run concurrently —
|
||||||
|
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||||
|
// beat costs nothing the next beat won't catch.
|
||||||
|
if (this.inflight)
|
||||||
|
return;
|
||||||
|
this.inflight = true;
|
||||||
|
try {
|
||||||
|
await this.tickInner();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
this.inflight = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async tickInner() {
|
||||||
const bridge = globalThis['__hfAgentStatus'];
|
const bridge = globalThis['__hfAgentStatus'];
|
||||||
if (!bridge || typeof bridge.get !== 'function')
|
if (!bridge || typeof bridge.get !== 'function')
|
||||||
return; // HF plugin not loaded — skip
|
return; // HF plugin not loaded — skip
|
||||||
|
|||||||
@@ -153,6 +153,19 @@ export const fabricChannelPlugin = createChatChannelPlugin<ResolvedFabricAccount
|
|||||||
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId),
|
resolveAccount: (cfg, accountId) => resolveFabricAccount(cfg as never, accountId),
|
||||||
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never),
|
defaultAccountId: (cfg) => resolveDefaultFabricAccountId(cfg as never),
|
||||||
isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey),
|
isConfigured: (account: ResolvedFabricAccount) => Boolean(account.fabricApiKey),
|
||||||
|
// openclaw's channelManager.getRuntimeSnapshot() — called every minute
|
||||||
|
// by the channel-health-monitor — defaults `configured: true` when the
|
||||||
|
// plugin doesn't expose describeAccount (see applyDescribedAccountFields
|
||||||
|
// in server-channels). Without this, fabric's synthetic 'default'
|
||||||
|
// account (returned by listFabricAccountIds when channels.fabric.accounts
|
||||||
|
// is empty — the prod shape) gets snapshot {enabled:true, configured:true,
|
||||||
|
// running:false} → isManagedAccount=true → not-running → restart loop
|
||||||
|
// every ~10 min, logging `[fabric:default] health-monitor: restarting`.
|
||||||
|
// Mirror isConfigured here so the snapshot truthfully reports false for
|
||||||
|
// any account without a fabricApiKey.
|
||||||
|
describeAccount: (account: ResolvedFabricAccount) => ({
|
||||||
|
configured: Boolean(account.fabricApiKey),
|
||||||
|
}),
|
||||||
},
|
},
|
||||||
// Minimal setup adapter: Fabric is configured directly under
|
// Minimal setup adapter: Fabric is configured directly under
|
||||||
// channels.fabric.* (no interactive wizard). applyAccountConfig is the
|
// channels.fabric.* (no interactive wizard). applyAccountConfig is the
|
||||||
|
|||||||
@@ -325,9 +325,26 @@ export class FabricInbound {
|
|||||||
for (const g of session.guilds) {
|
for (const g of session.guilds) {
|
||||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||||
if (!tok) continue;
|
if (!tok) continue;
|
||||||
|
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
|
||||||
|
// on every (re)connect. The single-shot `auth: { token: tok }` shape
|
||||||
|
// captured the token in closure: after socket.io's silent auto-reconnect
|
||||||
|
// the backend got the same JWT that expired ~15 min into the session
|
||||||
|
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
|
||||||
|
// the application layer. The client's `connect` event still fired (TCP
|
||||||
|
// succeeded), so the plugin happily ran the channel-resync, emitted
|
||||||
|
// `join_channel` into the void, and logged "joined N channel(s)" while
|
||||||
|
// the backend was actually broadcasting message.created to a room with
|
||||||
|
// zero subscribers. End user symptom: DMs to agents silently dropped.
|
||||||
const socket = io(`${g.endpoint}/realtime`, {
|
const socket = io(`${g.endpoint}/realtime`, {
|
||||||
transports: ['websocket'],
|
transports: ['websocket'],
|
||||||
auth: { token: tok },
|
auth: (cb) => {
|
||||||
|
// Best-effort fresh token; on transient failure fall back to the
|
||||||
|
// last known good one. tokenCache also keeps HTTP calls (attachment
|
||||||
|
// download / reply post) from 401'ing in the same window.
|
||||||
|
this.freshGuildToken(agentId, g.nodeId, session)
|
||||||
|
.then((fresh) => cb({ token: fresh ?? tok }))
|
||||||
|
.catch(() => cb({ token: tok }));
|
||||||
|
},
|
||||||
autoConnect: false,
|
autoConnect: false,
|
||||||
});
|
});
|
||||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||||
|
|||||||
@@ -50,6 +50,16 @@ export class PresenceSync {
|
|||||||
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
||||||
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
|
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
|
||||||
|
|
||||||
|
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||||
|
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||||
|
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||||
|
// for the previous tick to finish — without this guard the next tick
|
||||||
|
// fires on top of a still-running one and two parallel iterations
|
||||||
|
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||||
|
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||||
|
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||||
|
private inflight = false;
|
||||||
|
|
||||||
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
|
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
|
||||||
|
|
||||||
setAccounts(accounts: PresenceSyncAccount[]): void {
|
setAccounts(accounts: PresenceSyncAccount[]): void {
|
||||||
@@ -103,6 +113,20 @@ export class PresenceSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async tick(): Promise<void> {
|
private async tick(): Promise<void> {
|
||||||
|
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||||
|
// overlapping ticks rather than letting them run concurrently —
|
||||||
|
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||||
|
// beat costs nothing the next beat won't catch.
|
||||||
|
if (this.inflight) return;
|
||||||
|
this.inflight = true;
|
||||||
|
try {
|
||||||
|
await this.tickInner();
|
||||||
|
} finally {
|
||||||
|
this.inflight = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async tickInner(): Promise<void> {
|
||||||
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
||||||
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user