8 Commits

Author SHA1 Message Date
7dc70522d1 fix(inbound): refresh socket.io auth on (re)connect via callback
Backend issues short-lived guildAccessToken (TTL=900s). The previous
`auth: { token: tok }` shape captured the JWT once in connectAgent's
closure: after socket.io's auto-reconnect the backend kept getting the
same expired JWT and silently rejected the handshake at the application
layer (RealtimeGateway logs 'socket rejected: <id>'). The client's
'connect' event still fired (TCP succeeded) so the plugin happily ran
the channel-resync, emitted join_channel into the void, and logged
'joined N channel(s)' while the backend was actually broadcasting
message.created to a room with zero subscribers. End-user symptom:
DMs/group messages to agents silently dropped 15 min after gateway
start, with no error anywhere on the agent side.

Switch to the callback form, which socket.io re-evaluates on every
(re)connect — same call site we already use for the HTTP path via
freshGuildToken/tokenCache.

Verified in sim (commit 2acb084 + this patch):
1. Connect new DM channel + post msg -> dispatch + reply ✓
2. `docker restart fabric-backend-guild` to force socket disconnect
3. Plugin reconnects automatically and logs
   'fabric: agent recruiter joined 12 channel(s) on sim-guild-1' ✓
   (without the fix this reconnect was silently rejected; sim used to
    log 'WARN socket rejected: <id>' on the guild backend)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 13:50:24 +01:00
h z
2acb084ee4 fix(presence-sync): tick mutex (#8) 2026-05-26 02:06:21 +00:00
9419d270e5 fix(presence-sync): tick mutex so setInterval overlap can't spawn parallel ticks
The presence-sync tick iterates accounts serially with await on each
agent-login + PUT round-trip — a single tick can easily run 20+s when
there are several accounts. setInterval(intervalMs) does NOT wait for
the previous tick to finish, so on a busy gateway the next tick fires
on top of a still-running one and two parallel iterations each PUT
the same agentId within ~10 ms. That tipped the guild backend's
first-time-insert race (separate fix in nav/Fabric.Backend.Guild) into
500s on prod (caught in t2 gateway 2026-05-25 23:23:35Z; 6 of 6 agents
showed paired log lines 4-10 ms apart for the same agent → idle).

Fix: a simple `inflight` boolean. tick() returns immediately if
already running; the next interval beat catches up. lastStatus !==
bridge.get gating already means status changes catch the next tick
anyway, so skipping a beat costs nothing the next beat won't fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:08 +01:00
h z
79b29db26c fix(presence-sync): /api prefix + Bearer guildAccessToken (#7) 2026-05-25 23:17:45 +00:00
a87de27cff fix(presence-sync): use /api prefix + Bearer guildAccessToken (not x-api-key)
Two layered bugs in the presence-sync loop, both causing every PUT to
fail forever in prod:

1. **Missing /api prefix.** URL was `${guildBaseUrl}/agents/<id>/presence`
   but the guild backend sets a global prefix 'api' in main.ts
   `setGlobalPrefix('api')`. Every other REST call in this plugin
   (channel.ts channels list, fabric-client.ts postMessage, canvas)
   already prepends /api/ — only presence-sync missed it. Returned 404
   "Cannot PUT /agents/...".

2. **Wrong auth scheme.** Plugin sent `x-api-key: <fabricApiKey>`, but
   the endpoint sits behind the global APP_GUARD = ApiKeyGuard, which
   actually expects `Authorization: Bearer <guildAccessToken>` (despite
   its name — confusing naming on the backend side). With /api added,
   error became 401 "missing bearer token". Confirmed by `docker exec
   fabric-backend-guild grep APP_GUARD /app/dist/app.module.js` and
   manual curl: Bearer guild token → 200 OK.

**Fix**

- presence-sync.ts: do agent-login on demand to obtain a fresh
  guildAccessToken, cache it per-agent for 13 min (under the 15-min
  JWT TTL), use it as Bearer for the PUT. 401 response invalidates
  the cache so the next tick re-logs-in. Pushes are gated on status
  changes (rare), so the login overhead is negligible.

- inbound.ts: firstGuildEndpointByAgent → firstGuildByAgent storing
  both endpoint and nodeId (presence-sync needs nodeId to pick the
  right token out of guildAccessTokens[]).

- index.ts: pass FabricClient to PresenceSync constructor.

**Verified in sim**

After restart, gateway log shows `fabric: presence-sync recruiter →
idle` (200 OK), zero failed PUTs, where previously it would log a 404
every ~5s per agent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 23:54:38 +01:00
h z
dabaa6e1f2 fix(inbound): route fabric DM channels as peer.kind=direct / ChatType=direct (#6) 2026-05-25 14:03:20 +00:00
b8e0e424fa fix(inbound): route fabric DM channels as peer.kind='direct' / ChatType='direct'
Inbound was hardcoding `peer: { kind: 'group' }` and `ChatType: 'group'`
for every fabric channel regardless of xType. As a result:

- sessionKey for a DM was `agent:<id>:fabric:group:<chan>` instead of
  `agent:<id>:fabric:direct:<chan>`
- ctx.ChatType='group' caused user-prompt metadata to render
  `is_group_chat: true` on a DM
- openclaw's `isDirectMessage()` check (ChatType==='direct') returned
  false, so DM-specific prompt and turn behavior never engaged

Caught by recruiter test in session 40c51de2: the model's thinking trace
acknowledged "fabric DM channel" (from the ClawPrompts chat-injector
hook) but the surrounding user-prompt metadata contradicted it with
`is_group_chat: true`, and the model reasoned its way out of running
`workflow_start`.

Fix factors a small helper `fabricPeerRoutingForXType` (and a cache-
backed `fabricPeerRoutingForChannel` for outbound) in channel.ts that
maps:
  - 'dm'  → { peerKind: 'direct', chatType: 'direct' }
  - rest  → { peerKind: 'group',  chatType: 'group' }   (no change)

Inbound uses m.xType directly (live, authoritative). Outbound has no
xType in its call signature, so it consults the channel-meta cache
populated by inbound (same `getChannelType` already exposed via
__fabric). Cache miss falls back to 'group' — the pre-fix default, no
regression. The proactive-DM-without-prior-inbound edge case still
routes that one outbound as 'group'; the next round agrees on 'direct'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 14:26:42 +01:00
h z
81a10f2a1f Merge #5 feat(channel-meta): __fabric.getChannelType 2026-05-25 10:38:22 +00:00
8 changed files with 311 additions and 52 deletions

View File

@@ -94,7 +94,7 @@ export default defineChannelPluginEntry({
void inbound.start().then(() => {
if (!inbound)
return;
presence = new PresenceSync(api.logger);
presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);

View File

@@ -11,6 +11,15 @@
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
import { FabricClient } from './fabric-client.js';
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
import { getChannelType } from './channel-meta.js';
export function fabricPeerRoutingForXType(xType) {
if (xType === 'dm')
return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId) {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
// ---- target grammar: fabric:<channelId> ----
export function stripFabricTargetPrefix(raw) {
let s = (raw ?? '').trim();
@@ -38,13 +47,18 @@ export function resolveFabricOutboundSessionRoute(params) {
const id = stripFabricTargetPrefix(params.target);
if (!id)
return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: 'fabric',
accountId: params.accountId,
peer: { kind: 'group', id },
chatType: 'group',
peer: { kind: peerKind, id },
chatType,
from: `fabric:channel:${id}`,
to: `fabric:${id}`,
});

View File

@@ -4,6 +4,7 @@ import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
@@ -227,38 +228,59 @@ export class FabricInbound {
for (const entry of this.identity.list()) {
if (!entry.fabricUserId)
continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
if (!presenceGuildUrl)
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuild)
continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl,
guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target).
firstGuildEndpointByAgent = new Map();
// guild per agent (used as the presence-push target). Stores both
// endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
firstGuildByAgent = new Map();
async connectAgent(agentId, session) {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) {
if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild)
this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'],
auth: { token: tok },
auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false,
});
// Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -433,11 +455,19 @@ export class FabricInbound {
const core = this.core;
const cfg = this.cfg;
try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
@@ -451,7 +481,7 @@ export class FabricInbound {
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ChatType: chatType,
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',

View File

@@ -1,26 +1,25 @@
/**
* presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
* on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
export class PresenceSync {
logger;
client;
timer = null;
lastStatus = new Map(); // by agentId
accounts = new Map();
constructor(logger) {
tokenCache = new Map(); // by agentId
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
inflight = false;
constructor(logger, client) {
this.logger = logger;
this.client = client;
}
setAccounts(accounts) {
this.accounts.clear();
@@ -42,7 +41,49 @@ export class PresenceSync {
this.timer = null;
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
async ensureGuildToken(acct) {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now)
return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
}
catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
async tick() {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight)
return;
this.inflight = true;
try {
await this.tickInner();
}
finally {
this.inflight = false;
}
}
async tickInner() {
const bridge = globalThis['__hfAgentStatus'];
if (!bridge || typeof bridge.get !== 'function')
return; // HF plugin not loaded — skip
@@ -58,13 +99,22 @@ export class PresenceSync {
continue;
if (this.lastStatus.get(agentId) === status)
continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken)
continue;
try {
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
'x-api-key': acct.fabricApiKey,
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
@@ -73,6 +123,11 @@ export class PresenceSync {
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
}
else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401)
this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
}

View File

@@ -116,7 +116,7 @@ export default defineChannelPluginEntry({
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound) return;
presence = new PresenceSync(api.logger);
presence = new PresenceSync(api.logger, client);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);

View File

@@ -21,6 +21,39 @@ import {
resolveDefaultFabricAccountId,
type ResolvedFabricAccount,
} from './accounts.js';
import { getChannelType } from './channel-meta.js';
/**
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
*
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
* 'direct' | 'group' | 'channel'. We collapse:
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
* - rest → 'group' (multi-party; turn-engine gates speech)
*
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
* otherwise the agent's outbound message lands in a different session
* than the inbound that triggered it and conversation state splits.
*
* Outbound has no live xType (the agent target is just a channelId), so
* it consults the channel-meta cache populated by inbound. Cache miss
* (channel never observed) falls back to 'group' — same as the pre-fix
* behavior, no regression on cold cache. The proactive-DM-first-message
* edge case (agent DMs a channel before any inbound) still lands as
* 'group' on that one outbound; the next inbound + outbound pair will
* agree on 'direct'.
*/
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
@@ -45,13 +78,18 @@ export function looksLikeFabricTargetId(raw: string): boolean {
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
const id = stripFabricTargetPrefix(params.target);
if (!id) return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: 'fabric',
accountId: params.accountId,
peer: { kind: 'group', id },
chatType: 'group',
peer: { kind: peerKind, id },
chatType,
from: `fabric:channel:${id}`,
to: `fabric:${id}`,
});

View File

@@ -6,6 +6,7 @@ import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
@@ -280,17 +281,25 @@ export class FabricInbound {
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> {
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = [];
const out: Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
guildNodeId: string;
fabricApiKey: string;
}> = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId) continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
if (!presenceGuildUrl) continue;
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
if (!presenceGuild) continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl,
guildBaseUrl: presenceGuild.endpoint,
guildNodeId: presenceGuild.nodeId,
fabricApiKey: entry.fabricApiKey,
});
}
@@ -298,24 +307,44 @@ export class FabricInbound {
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target).
private firstGuildEndpointByAgent = new Map<string, string>();
// guild per agent (used as the presence-push target). Stores both
// endpoint and nodeId — presence-sync needs both: endpoint to build
// the URL, nodeId to pick the matching guildAccessToken from a fresh
// agent-login response.
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) {
if (!this.firstGuildByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue;
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
// on every (re)connect. The single-shot `auth: { token: tok }` shape
// captured the token in closure: after socket.io's silent auto-reconnect
// the backend got the same JWT that expired ~15 min into the session
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
// the application layer. The client's `connect` event still fired (TCP
// succeeded), so the plugin happily ran the channel-resync, emitted
// `join_channel` into the void, and logged "joined N channel(s)" while
// the backend was actually broadcasting message.created to a room with
// zero subscribers. End user symptom: DMs to agents silently dropped.
const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'],
auth: { token: tok },
auth: (cb) => {
// Best-effort fresh token; on transient failure fall back to the
// last known good one. tokenCache also keeps HTTP calls (attachment
// download / reply post) from 401'ing in the same window.
this.freshGuildToken(agentId, g.nodeId, session)
.then((fresh) => cb({ token: fresh ?? tok }))
.catch(() => cb({ token: tok }));
},
autoConnect: false,
});
// Tracked socket.io rooms for this (agent, guild). The initial fetch
@@ -504,11 +533,19 @@ export class FabricInbound {
const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } };
try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
@@ -523,7 +560,7 @@ export class FabricInbound {
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ChatType: chatType,
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',

View File

@@ -2,18 +2,26 @@
* presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
* on `announce`-type channel deliveries.
* `PUT /api/agents/:userId/presence` so the backend can apply
* busy-discard on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
* app.module.js) which expects `Authorization: Bearer <guild-token>`
* — NOT the agent's fabricApiKey directly. So before each PUT we do
* a fresh agent-login (or reuse a cached token if still within its
* 15-min JWT TTL) and pull the guildAccessToken matching the target
* guild. Status changes are rare enough that login overhead is fine.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
import type { FabricClient } from './fabric-client.js';
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
@@ -23,15 +31,36 @@ export interface PresenceSyncAccount {
agentId: string;
fabricUserId: string; // the agent's Fabric Center user id (UUID)
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
fabricApiKey: string; // existing per-account key
guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
fabricApiKey: string; // existing per-account key (used for agent-login)
}
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
// safely inside the window even if a tick runs late.
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
interface CachedToken {
token: string;
expiresAt: number; // epoch ms
}
export class PresenceSync {
private timer: ReturnType<typeof setInterval> | null = null;
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
private readonly accounts = new Map<string, PresenceSyncAccount>();
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
constructor(private readonly logger: Logger) {}
// Mutex flag: a tick iterates accounts serially with `await` on each
// agent-login + PUT round-trip, so a single tick can easily run 20+s
// when there are many accounts. setInterval(intervalMs) does NOT wait
// for the previous tick to finish — without this guard the next tick
// fires on top of a still-running one and two parallel iterations
// PUT the same agentId within milliseconds. That tipped the backend's
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
// 500s on prod. Guarded ticks just skip a beat instead.
private inflight = false;
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
setAccounts(accounts: PresenceSyncAccount[]): void {
this.accounts.clear();
@@ -54,7 +83,50 @@ export class PresenceSync {
}
}
/**
* Fetch a fresh guildAccessToken for `acct`, caching it under the
* agentId until just before its JWT expiry. Returns null on login
* failure or if the session has no matching guild — caller logs +
* skips the PUT.
*/
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
const now = Date.now();
const cached = this.tokenCache.get(acct.agentId);
if (cached && cached.expiresAt > now) return cached.token;
let session;
try {
session = await this.client.agentLogin(acct.fabricApiKey);
} catch (err) {
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
return null;
}
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
if (!entry?.token) {
this.logger.warn(
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
);
return null;
}
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
return entry.token;
}
private async tick(): Promise<void> {
// Mutex: see the `inflight` field declaration for the why. Drop
// overlapping ticks rather than letting them run concurrently —
// status is gated by `lastStatus !== bridge.get`, so skipping a
// beat costs nothing the next beat won't catch.
if (this.inflight) return;
this.inflight = true;
try {
await this.tickInner();
} finally {
this.inflight = false;
}
}
private async tickInner(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
@@ -68,13 +140,22 @@ export class PresenceSync {
if (!status) continue;
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
const guildToken = await this.ensureGuildToken(acct);
if (!guildToken) continue;
try {
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
// x-api-key and got 401 "missing bearer token" forever. The /api
// prefix is required because the guild backend sets a global
// 'api' prefix in main.ts setGlobalPrefix('api').
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
'x-api-key': acct.fabricApiKey,
authorization: `Bearer ${guildToken}`,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
@@ -82,6 +163,10 @@ export class PresenceSync {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} else {
// 401 here usually means the cached token went stale unexpectedly
// (server-side rotation or clock skew) — drop the cache so the
// next tick re-logs-in.
if (res.status === 401) this.tokenCache.delete(agentId);
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
} catch (err) {