Compare commits
9 Commits
c330571bcb
...
fix/socket
| Author | SHA1 | Date | |
|---|---|---|---|
| 7dc70522d1 | |||
| 2acb084ee4 | |||
| 9419d270e5 | |||
| 79b29db26c | |||
| a87de27cff | |||
| dabaa6e1f2 | |||
| b8e0e424fa | |||
| 81a10f2a1f | |||
| c5429129d9 |
26
dist/fabric/index.js
vendored
26
dist/fabric/index.js
vendored
@@ -6,6 +6,7 @@
|
|||||||
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
||||||
import { fabricChannelPlugin } from './src/channel.js';
|
import { fabricChannelPlugin } from './src/channel.js';
|
||||||
import { flushAllFabric } from './src/coalesce.js';
|
import { flushAllFabric } from './src/coalesce.js';
|
||||||
|
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
|
||||||
import { FabricInbound } from './src/inbound.js';
|
import { FabricInbound } from './src/inbound.js';
|
||||||
import { listEnabledFabricAccounts } from './src/accounts.js';
|
import { listEnabledFabricAccounts } from './src/accounts.js';
|
||||||
import { registerFabricTools } from './src/tools.js';
|
import { registerFabricTools } from './src/tools.js';
|
||||||
@@ -43,6 +44,29 @@ export default defineChannelPluginEntry({
|
|||||||
const client = new FabricClient(centerApiBase);
|
const client = new FabricClient(centerApiBase);
|
||||||
const identity = new IdentityRegistry(idFile);
|
const identity = new IdentityRegistry(idFile);
|
||||||
registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity);
|
registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity);
|
||||||
|
// Cross-plugin API: globalThis.__fabric
|
||||||
|
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
|
||||||
|
// injection to DM-typed channels only. The channel-meta cache is
|
||||||
|
// populated lazily from inbound (message.created carries xType) and
|
||||||
|
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
|
||||||
|
// very first DM after a fresh gateway start hits cache from the
|
||||||
|
// previous run rather than firing the injector on the wrong type.
|
||||||
|
//
|
||||||
|
// null return = channel never seen (cache cold). Callers MUST NOT
|
||||||
|
// fall back to "assume DM" — fail closed on unknown.
|
||||||
|
{
|
||||||
|
const _G = globalThis;
|
||||||
|
_G['__fabric'] = { getChannelType };
|
||||||
|
// Flush channel-meta cache when the gateway shuts down so
|
||||||
|
// recently-recorded xType entries don't get lost.
|
||||||
|
api.on('gateway_stop', () => {
|
||||||
|
try {
|
||||||
|
flushChannelMeta();
|
||||||
|
}
|
||||||
|
catch { /* ignore */ }
|
||||||
|
});
|
||||||
|
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
||||||
|
}
|
||||||
api.on('gateway_start', () => {
|
api.on('gateway_start', () => {
|
||||||
const _G = globalThis;
|
const _G = globalThis;
|
||||||
if (_G._fabricInboundStarted)
|
if (_G._fabricInboundStarted)
|
||||||
@@ -70,7 +94,7 @@ export default defineChannelPluginEntry({
|
|||||||
void inbound.start().then(() => {
|
void inbound.start().then(() => {
|
||||||
if (!inbound)
|
if (!inbound)
|
||||||
return;
|
return;
|
||||||
presence = new PresenceSync(api.logger);
|
presence = new PresenceSync(api.logger, client);
|
||||||
presence.setAccounts(inbound.getPresenceAccounts());
|
presence.setAccounts(inbound.getPresenceAccounts());
|
||||||
presence.start();
|
presence.start();
|
||||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||||
|
|||||||
105
dist/fabric/src/channel-meta.js
vendored
Normal file
105
dist/fabric/src/channel-meta.js
vendored
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
/**
|
||||||
|
* Channel-meta cache. Records (channelId → xType) for every fabric
|
||||||
|
* channel the gateway has seen at least one inbound message in.
|
||||||
|
*
|
||||||
|
* Populated lazily from inbound (`recordChannelType` is called for
|
||||||
|
* every `message.created` event with non-empty `xType`). Persisted to
|
||||||
|
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
|
||||||
|
* gateway restarts (so the very first DM after restart still gets the
|
||||||
|
* right xType without waiting for a fresh inbound).
|
||||||
|
*
|
||||||
|
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
|
||||||
|
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
|
||||||
|
* to xType==='dm' only.
|
||||||
|
*
|
||||||
|
* Failure mode: lookup misses (channel never seen / inbound dropped
|
||||||
|
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
|
||||||
|
* fall back to "assume DM" or you re-introduce the false-positive on
|
||||||
|
* group channels.
|
||||||
|
*/
|
||||||
|
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
|
||||||
|
import { dirname, join } from 'node:path';
|
||||||
|
import { homedir } from 'node:os';
|
||||||
|
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
|
||||||
|
let memory = new Map();
|
||||||
|
let loaded = false;
|
||||||
|
let dirty = false;
|
||||||
|
let flushTimer = null;
|
||||||
|
function load() {
|
||||||
|
if (loaded)
|
||||||
|
return;
|
||||||
|
loaded = true;
|
||||||
|
try {
|
||||||
|
if (!existsSync(CACHE_FILE))
|
||||||
|
return;
|
||||||
|
const raw = readFileSync(CACHE_FILE, 'utf8');
|
||||||
|
const parsed = JSON.parse(raw);
|
||||||
|
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
|
||||||
|
if (typeof k === 'string' && typeof v === 'string')
|
||||||
|
memory.set(k, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch {
|
||||||
|
// ignore — start with empty cache on corruption
|
||||||
|
}
|
||||||
|
}
|
||||||
|
function scheduleFlush() {
|
||||||
|
if (flushTimer)
|
||||||
|
return;
|
||||||
|
// Debounce writes — many inbound messages may arrive in a burst.
|
||||||
|
// 250ms coalesces them; on gateway_stop the channel plugin can force
|
||||||
|
// a synchronous flush via flushChannelMeta().
|
||||||
|
flushTimer = setTimeout(() => {
|
||||||
|
flushTimer = null;
|
||||||
|
if (!dirty)
|
||||||
|
return;
|
||||||
|
dirty = false;
|
||||||
|
flushSync();
|
||||||
|
}, 250);
|
||||||
|
}
|
||||||
|
function flushSync() {
|
||||||
|
try {
|
||||||
|
const dir = dirname(CACHE_FILE);
|
||||||
|
if (!existsSync(dir))
|
||||||
|
mkdirSync(dir, { recursive: true });
|
||||||
|
const out = { channels: Object.fromEntries(memory) };
|
||||||
|
const tmp = CACHE_FILE + '.tmp';
|
||||||
|
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
|
||||||
|
renameSync(tmp, CACHE_FILE);
|
||||||
|
}
|
||||||
|
catch {
|
||||||
|
// swallow — cache is an optimization; loss-on-write is recoverable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/** Called by inbound on every message.created. xType empty → no-op. */
|
||||||
|
export function recordChannelType(channelId, xType) {
|
||||||
|
if (!channelId || !xType)
|
||||||
|
return;
|
||||||
|
load();
|
||||||
|
const existing = memory.get(channelId);
|
||||||
|
if (existing === xType)
|
||||||
|
return;
|
||||||
|
memory.set(channelId, xType);
|
||||||
|
dirty = true;
|
||||||
|
scheduleFlush();
|
||||||
|
}
|
||||||
|
/** Cross-plugin lookup. null when channel never seen / unknown. */
|
||||||
|
export function getChannelType(channelId) {
|
||||||
|
if (!channelId)
|
||||||
|
return null;
|
||||||
|
load();
|
||||||
|
return memory.get(channelId) ?? null;
|
||||||
|
}
|
||||||
|
/** Force-flush — called on plugin shutdown to make sure recently
|
||||||
|
* recorded entries hit disk before the gateway dies. */
|
||||||
|
export function flushChannelMeta() {
|
||||||
|
if (flushTimer) {
|
||||||
|
clearTimeout(flushTimer);
|
||||||
|
flushTimer = null;
|
||||||
|
}
|
||||||
|
if (dirty) {
|
||||||
|
dirty = false;
|
||||||
|
flushSync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
export const CHANNEL_META_PATH = CACHE_FILE;
|
||||||
18
dist/fabric/src/channel.js
vendored
18
dist/fabric/src/channel.js
vendored
@@ -11,6 +11,15 @@
|
|||||||
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
|
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
|
||||||
import { FabricClient } from './fabric-client.js';
|
import { FabricClient } from './fabric-client.js';
|
||||||
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
|
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
|
||||||
|
import { getChannelType } from './channel-meta.js';
|
||||||
|
export function fabricPeerRoutingForXType(xType) {
|
||||||
|
if (xType === 'dm')
|
||||||
|
return { peerKind: 'direct', chatType: 'direct' };
|
||||||
|
return { peerKind: 'group', chatType: 'group' };
|
||||||
|
}
|
||||||
|
export function fabricPeerRoutingForChannel(channelId) {
|
||||||
|
return fabricPeerRoutingForXType(getChannelType(channelId));
|
||||||
|
}
|
||||||
// ---- target grammar: fabric:<channelId> ----
|
// ---- target grammar: fabric:<channelId> ----
|
||||||
export function stripFabricTargetPrefix(raw) {
|
export function stripFabricTargetPrefix(raw) {
|
||||||
let s = (raw ?? '').trim();
|
let s = (raw ?? '').trim();
|
||||||
@@ -38,13 +47,18 @@ export function resolveFabricOutboundSessionRoute(params) {
|
|||||||
const id = stripFabricTargetPrefix(params.target);
|
const id = stripFabricTargetPrefix(params.target);
|
||||||
if (!id)
|
if (!id)
|
||||||
return null;
|
return null;
|
||||||
|
// Consult the channel-meta cache populated by inbound — DM channels
|
||||||
|
// need peer.kind='direct' so the outbound session key matches the
|
||||||
|
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
|
||||||
|
// no regression on cold cache).
|
||||||
|
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
|
||||||
return buildChannelOutboundSessionRoute({
|
return buildChannelOutboundSessionRoute({
|
||||||
cfg: params.cfg,
|
cfg: params.cfg,
|
||||||
agentId: params.agentId,
|
agentId: params.agentId,
|
||||||
channel: 'fabric',
|
channel: 'fabric',
|
||||||
accountId: params.accountId,
|
accountId: params.accountId,
|
||||||
peer: { kind: 'group', id },
|
peer: { kind: peerKind, id },
|
||||||
chatType: 'group',
|
chatType,
|
||||||
from: `fabric:channel:${id}`,
|
from: `fabric:channel:${id}`,
|
||||||
to: `fabric:${id}`,
|
to: `fabric:${id}`,
|
||||||
});
|
});
|
||||||
|
|||||||
58
dist/fabric/src/inbound.js
vendored
58
dist/fabric/src/inbound.js
vendored
@@ -4,6 +4,8 @@ import { join } from 'node:path';
|
|||||||
import { io } from 'socket.io-client';
|
import { io } from 'socket.io-client';
|
||||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||||
import { resolveCoalesce } from './accounts.js';
|
import { resolveCoalesce } from './accounts.js';
|
||||||
|
import { fabricPeerRoutingForXType } from './channel.js';
|
||||||
|
import { recordChannelType } from './channel-meta.js';
|
||||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||||
export class FabricInbound {
|
export class FabricInbound {
|
||||||
core;
|
core;
|
||||||
@@ -226,38 +228,59 @@ export class FabricInbound {
|
|||||||
for (const entry of this.identity.list()) {
|
for (const entry of this.identity.list()) {
|
||||||
if (!entry.fabricUserId)
|
if (!entry.fabricUserId)
|
||||||
continue;
|
continue;
|
||||||
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
|
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
|
||||||
if (!presenceGuildUrl)
|
if (!presenceGuild)
|
||||||
continue;
|
continue;
|
||||||
out.push({
|
out.push({
|
||||||
agentId: entry.agentId,
|
agentId: entry.agentId,
|
||||||
fabricUserId: entry.fabricUserId,
|
fabricUserId: entry.fabricUserId,
|
||||||
guildBaseUrl: presenceGuildUrl,
|
guildBaseUrl: presenceGuild.endpoint,
|
||||||
|
guildNodeId: presenceGuild.nodeId,
|
||||||
fabricApiKey: entry.fabricApiKey,
|
fabricApiKey: entry.fabricApiKey,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||||
// guild per agent (used as the presence-push target).
|
// guild per agent (used as the presence-push target). Stores both
|
||||||
firstGuildEndpointByAgent = new Map();
|
// endpoint and nodeId — presence-sync needs both: endpoint to build
|
||||||
|
// the URL, nodeId to pick the matching guildAccessToken from a fresh
|
||||||
|
// agent-login response.
|
||||||
|
firstGuildByAgent = new Map();
|
||||||
async connectAgent(agentId, session) {
|
async connectAgent(agentId, session) {
|
||||||
const selfUserId = session.user.id;
|
const selfUserId = session.user.id;
|
||||||
// First-guild capture for presence-sync push target. session.guilds is
|
// First-guild capture for presence-sync push target. session.guilds is
|
||||||
// already in priority order from Center; we take the first one with a
|
// already in priority order from Center; we take the first one with a
|
||||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||||
if (!this.firstGuildEndpointByAgent.has(agentId)) {
|
if (!this.firstGuildByAgent.has(agentId)) {
|
||||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||||
if (firstGuild)
|
if (firstGuild)
|
||||||
this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
|
this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
|
||||||
}
|
}
|
||||||
for (const g of session.guilds) {
|
for (const g of session.guilds) {
|
||||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||||
if (!tok)
|
if (!tok)
|
||||||
continue;
|
continue;
|
||||||
|
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
|
||||||
|
// on every (re)connect. The single-shot `auth: { token: tok }` shape
|
||||||
|
// captured the token in closure: after socket.io's silent auto-reconnect
|
||||||
|
// the backend got the same JWT that expired ~15 min into the session
|
||||||
|
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
|
||||||
|
// the application layer. The client's `connect` event still fired (TCP
|
||||||
|
// succeeded), so the plugin happily ran the channel-resync, emitted
|
||||||
|
// `join_channel` into the void, and logged "joined N channel(s)" while
|
||||||
|
// the backend was actually broadcasting message.created to a room with
|
||||||
|
// zero subscribers. End user symptom: DMs to agents silently dropped.
|
||||||
const socket = io(`${g.endpoint}/realtime`, {
|
const socket = io(`${g.endpoint}/realtime`, {
|
||||||
transports: ['websocket'],
|
transports: ['websocket'],
|
||||||
auth: { token: tok },
|
auth: (cb) => {
|
||||||
|
// Best-effort fresh token; on transient failure fall back to the
|
||||||
|
// last known good one. tokenCache also keeps HTTP calls (attachment
|
||||||
|
// download / reply post) from 401'ing in the same window.
|
||||||
|
this.freshGuildToken(agentId, g.nodeId, session)
|
||||||
|
.then((fresh) => cb({ token: fresh ?? tok }))
|
||||||
|
.catch(() => cb({ token: tok }));
|
||||||
|
},
|
||||||
autoConnect: false,
|
autoConnect: false,
|
||||||
});
|
});
|
||||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||||
@@ -341,6 +364,13 @@ export class FabricInbound {
|
|||||||
const channelId = m.channelId ?? '';
|
const channelId = m.channelId ?? '';
|
||||||
if (!channelId)
|
if (!channelId)
|
||||||
return;
|
return;
|
||||||
|
// Record xType into the channel-meta cache before self-author
|
||||||
|
// / dedup gates — channel type doesn't depend on who sent the
|
||||||
|
// message, and recording it on observer-only triage messages
|
||||||
|
// is still useful (the next consumer asking
|
||||||
|
// __fabric.getChannelType wants the answer regardless of
|
||||||
|
// whether THIS message was delivered to an agent).
|
||||||
|
recordChannelType(channelId, m.xType);
|
||||||
if (m.authorUserId && m.authorUserId === selfUserId)
|
if (m.authorUserId && m.authorUserId === selfUserId)
|
||||||
return;
|
return;
|
||||||
const key = `${agentId}:${m.messageId}`;
|
const key = `${agentId}:${m.messageId}`;
|
||||||
@@ -425,11 +455,19 @@ export class FabricInbound {
|
|||||||
const core = this.core;
|
const core = this.core;
|
||||||
const cfg = this.cfg;
|
const cfg = this.cfg;
|
||||||
try {
|
try {
|
||||||
|
// Route by xType. DM channels need peer.kind='direct' so openclaw
|
||||||
|
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
|
||||||
|
// and ctx.ChatType='direct') rather than as a multi-party group.
|
||||||
|
// Without this, the agent's user-prompt metadata says
|
||||||
|
// 'is_group_chat: true' on a DM and downstream prompt logic
|
||||||
|
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||||
|
// misclassifies the turn.
|
||||||
|
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||||
const route = core.channel.routing.resolveAgentRoute({
|
const route = core.channel.routing.resolveAgentRoute({
|
||||||
cfg: this.cfg,
|
cfg: this.cfg,
|
||||||
channel: 'fabric',
|
channel: 'fabric',
|
||||||
accountId: agentId,
|
accountId: agentId,
|
||||||
peer: { kind: 'group', id: channelId },
|
peer: { kind: peerKind, id: channelId },
|
||||||
});
|
});
|
||||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||||
agentId: route.agentId,
|
agentId: route.agentId,
|
||||||
@@ -443,7 +481,7 @@ export class FabricInbound {
|
|||||||
To: `fabric:${channelId}`,
|
To: `fabric:${channelId}`,
|
||||||
SessionKey: route.sessionKey,
|
SessionKey: route.sessionKey,
|
||||||
AccountId: route.accountId ?? agentId,
|
AccountId: route.accountId ?? agentId,
|
||||||
ChatType: 'group',
|
ChatType: chatType,
|
||||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||||
SenderId: m.authorUserId ?? 'fabric',
|
SenderId: m.authorUserId ?? 'fabric',
|
||||||
Provider: 'fabric',
|
Provider: 'fabric',
|
||||||
|
|||||||
93
dist/fabric/src/presence-sync.js
vendored
93
dist/fabric/src/presence-sync.js
vendored
@@ -1,26 +1,25 @@
|
|||||||
/**
|
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
|
||||||
* presence-sync — read each connected agent's HF status (via the
|
// safely inside the window even if a tick runs late.
|
||||||
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
|
||||||
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
|
||||||
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
|
|
||||||
* on `announce`-type channel deliveries.
|
|
||||||
*
|
|
||||||
* Push model: we only PUT when an agent's status actually changes
|
|
||||||
* (since the last push). The HF-side accessor has its own TTL cache
|
|
||||||
* to absorb the every-30s polling.
|
|
||||||
*
|
|
||||||
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
|
||||||
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
|
||||||
* treated as not-busy. Announce-channel delivery still works; busy
|
|
||||||
* filtering simply doesn't kick in.
|
|
||||||
*/
|
|
||||||
export class PresenceSync {
|
export class PresenceSync {
|
||||||
logger;
|
logger;
|
||||||
|
client;
|
||||||
timer = null;
|
timer = null;
|
||||||
lastStatus = new Map(); // by agentId
|
lastStatus = new Map(); // by agentId
|
||||||
accounts = new Map();
|
accounts = new Map();
|
||||||
constructor(logger) {
|
tokenCache = new Map(); // by agentId
|
||||||
|
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||||
|
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||||
|
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||||
|
// for the previous tick to finish — without this guard the next tick
|
||||||
|
// fires on top of a still-running one and two parallel iterations
|
||||||
|
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||||
|
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||||
|
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||||
|
inflight = false;
|
||||||
|
constructor(logger, client) {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
|
this.client = client;
|
||||||
}
|
}
|
||||||
setAccounts(accounts) {
|
setAccounts(accounts) {
|
||||||
this.accounts.clear();
|
this.accounts.clear();
|
||||||
@@ -42,7 +41,49 @@ export class PresenceSync {
|
|||||||
this.timer = null;
|
this.timer = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Fetch a fresh guildAccessToken for `acct`, caching it under the
|
||||||
|
* agentId until just before its JWT expiry. Returns null on login
|
||||||
|
* failure or if the session has no matching guild — caller logs +
|
||||||
|
* skips the PUT.
|
||||||
|
*/
|
||||||
|
async ensureGuildToken(acct) {
|
||||||
|
const now = Date.now();
|
||||||
|
const cached = this.tokenCache.get(acct.agentId);
|
||||||
|
if (cached && cached.expiresAt > now)
|
||||||
|
return cached.token;
|
||||||
|
let session;
|
||||||
|
try {
|
||||||
|
session = await this.client.agentLogin(acct.fabricApiKey);
|
||||||
|
}
|
||||||
|
catch (err) {
|
||||||
|
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
|
||||||
|
if (!entry?.token) {
|
||||||
|
this.logger.warn(`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
|
||||||
|
return entry.token;
|
||||||
|
}
|
||||||
async tick() {
|
async tick() {
|
||||||
|
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||||
|
// overlapping ticks rather than letting them run concurrently —
|
||||||
|
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||||
|
// beat costs nothing the next beat won't catch.
|
||||||
|
if (this.inflight)
|
||||||
|
return;
|
||||||
|
this.inflight = true;
|
||||||
|
try {
|
||||||
|
await this.tickInner();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
this.inflight = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async tickInner() {
|
||||||
const bridge = globalThis['__hfAgentStatus'];
|
const bridge = globalThis['__hfAgentStatus'];
|
||||||
if (!bridge || typeof bridge.get !== 'function')
|
if (!bridge || typeof bridge.get !== 'function')
|
||||||
return; // HF plugin not loaded — skip
|
return; // HF plugin not loaded — skip
|
||||||
@@ -58,13 +99,22 @@ export class PresenceSync {
|
|||||||
continue;
|
continue;
|
||||||
if (this.lastStatus.get(agentId) === status)
|
if (this.lastStatus.get(agentId) === status)
|
||||||
continue; // no change → no PUT
|
continue; // no change → no PUT
|
||||||
|
const guildToken = await this.ensureGuildToken(acct);
|
||||||
|
if (!guildToken)
|
||||||
|
continue;
|
||||||
try {
|
try {
|
||||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
|
||||||
|
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
|
||||||
|
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
|
||||||
|
// x-api-key and got 401 "missing bearer token" forever. The /api
|
||||||
|
// prefix is required because the guild backend sets a global
|
||||||
|
// 'api' prefix in main.ts setGlobalPrefix('api').
|
||||||
|
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||||
const res = await fetch(url, {
|
const res = await fetch(url, {
|
||||||
method: 'PUT',
|
method: 'PUT',
|
||||||
headers: {
|
headers: {
|
||||||
'content-type': 'application/json',
|
'content-type': 'application/json',
|
||||||
'x-api-key': acct.fabricApiKey,
|
authorization: `Bearer ${guildToken}`,
|
||||||
},
|
},
|
||||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||||
});
|
});
|
||||||
@@ -73,6 +123,11 @@ export class PresenceSync {
|
|||||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
// 401 here usually means the cached token went stale unexpectedly
|
||||||
|
// (server-side rotation or clock skew) — drop the cache so the
|
||||||
|
// next tick re-logs-in.
|
||||||
|
if (res.status === 401)
|
||||||
|
this.tokenCache.delete(agentId);
|
||||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
10
dist/fabric/src/tools.js
vendored
10
dist/fabric/src/tools.js
vendored
@@ -311,10 +311,12 @@ export function registerFabricTools(api, client, identity) {
|
|||||||
api.registerTool((ctx) => ({
|
api.registerTool((ctx) => ({
|
||||||
name: 'fabric-guild-list',
|
name: 'fabric-guild-list',
|
||||||
description: 'List guilds the calling agent is a member of. Returns ' +
|
description: 'List guilds the calling agent is a member of. Returns ' +
|
||||||
'{nodeId, name, purpose, status} per row. `purpose` is a free-form ' +
|
'{nodeId, name, purpose, status} per row. ' +
|
||||||
"description of what each guild is for. Use this BEFORE " +
|
"`purpose` is a free-form description of what each guild is for — " +
|
||||||
'fabric-channel-list when a workflow asks you to pick the ' +
|
'pick the guild whose purpose matches your intent. Use this tool ' +
|
||||||
'right guild by intent (no guild ids hardcoded into workflows).',
|
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
|
||||||
|
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
|
||||||
|
'debate broadcasts" → then list its announce-type channels).',
|
||||||
parameters: {
|
parameters: {
|
||||||
type: 'object',
|
type: 'object',
|
||||||
additionalProperties: false,
|
additionalProperties: false,
|
||||||
|
|||||||
24
index.ts
24
index.ts
@@ -7,6 +7,7 @@ import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
|
|||||||
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
|
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
|
||||||
import { fabricChannelPlugin } from './src/channel.js';
|
import { fabricChannelPlugin } from './src/channel.js';
|
||||||
import { flushAllFabric } from './src/coalesce.js';
|
import { flushAllFabric } from './src/coalesce.js';
|
||||||
|
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
|
||||||
import { FabricInbound } from './src/inbound.js';
|
import { FabricInbound } from './src/inbound.js';
|
||||||
import { listEnabledFabricAccounts } from './src/accounts.js';
|
import { listEnabledFabricAccounts } from './src/accounts.js';
|
||||||
import { registerFabricTools } from './src/tools.js';
|
import { registerFabricTools } from './src/tools.js';
|
||||||
@@ -62,6 +63,27 @@ export default defineChannelPluginEntry({
|
|||||||
identity,
|
identity,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Cross-plugin API: globalThis.__fabric
|
||||||
|
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
|
||||||
|
// injection to DM-typed channels only. The channel-meta cache is
|
||||||
|
// populated lazily from inbound (message.created carries xType) and
|
||||||
|
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
|
||||||
|
// very first DM after a fresh gateway start hits cache from the
|
||||||
|
// previous run rather than firing the injector on the wrong type.
|
||||||
|
//
|
||||||
|
// null return = channel never seen (cache cold). Callers MUST NOT
|
||||||
|
// fall back to "assume DM" — fail closed on unknown.
|
||||||
|
{
|
||||||
|
const _G = globalThis as Record<string, unknown>;
|
||||||
|
_G['__fabric'] = { getChannelType };
|
||||||
|
// Flush channel-meta cache when the gateway shuts down so
|
||||||
|
// recently-recorded xType entries don't get lost.
|
||||||
|
api.on('gateway_stop', () => {
|
||||||
|
try { flushChannelMeta(); } catch { /* ignore */ }
|
||||||
|
});
|
||||||
|
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
|
||||||
|
}
|
||||||
|
|
||||||
api.on('gateway_start', () => {
|
api.on('gateway_start', () => {
|
||||||
const _G = globalThis as Record<string, unknown>;
|
const _G = globalThis as Record<string, unknown>;
|
||||||
if (_G._fabricInboundStarted) return;
|
if (_G._fabricInboundStarted) return;
|
||||||
@@ -94,7 +116,7 @@ export default defineChannelPluginEntry({
|
|||||||
// their fabricUserId + first guild endpoint populated).
|
// their fabricUserId + first guild endpoint populated).
|
||||||
void inbound.start().then(() => {
|
void inbound.start().then(() => {
|
||||||
if (!inbound) return;
|
if (!inbound) return;
|
||||||
presence = new PresenceSync(api.logger);
|
presence = new PresenceSync(api.logger, client);
|
||||||
presence.setAccounts(inbound.getPresenceAccounts());
|
presence.setAccounts(inbound.getPresenceAccounts());
|
||||||
presence.start();
|
presence.start();
|
||||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||||
|
|||||||
108
src/channel-meta.ts
Normal file
108
src/channel-meta.ts
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
/**
|
||||||
|
* Channel-meta cache. Records (channelId → xType) for every fabric
|
||||||
|
* channel the gateway has seen at least one inbound message in.
|
||||||
|
*
|
||||||
|
* Populated lazily from inbound (`recordChannelType` is called for
|
||||||
|
* every `message.created` event with non-empty `xType`). Persisted to
|
||||||
|
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
|
||||||
|
* gateway restarts (so the very first DM after restart still gets the
|
||||||
|
* right xType without waiting for a fresh inbound).
|
||||||
|
*
|
||||||
|
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
|
||||||
|
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
|
||||||
|
* to xType==='dm' only.
|
||||||
|
*
|
||||||
|
* Failure mode: lookup misses (channel never seen / inbound dropped
|
||||||
|
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
|
||||||
|
* fall back to "assume DM" or you re-introduce the false-positive on
|
||||||
|
* group channels.
|
||||||
|
*/
|
||||||
|
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
|
||||||
|
import { dirname, join } from 'node:path';
|
||||||
|
import { homedir } from 'node:os';
|
||||||
|
|
||||||
|
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
|
||||||
|
|
||||||
|
interface ChannelMetaFile {
|
||||||
|
// channelId → xType ('dm' | 'triage' | 'group' | etc.)
|
||||||
|
channels: Record<string, string>;
|
||||||
|
}
|
||||||
|
|
||||||
|
let memory = new Map<string, string>();
|
||||||
|
let loaded = false;
|
||||||
|
let dirty = false;
|
||||||
|
let flushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
function load(): void {
|
||||||
|
if (loaded) return;
|
||||||
|
loaded = true;
|
||||||
|
try {
|
||||||
|
if (!existsSync(CACHE_FILE)) return;
|
||||||
|
const raw = readFileSync(CACHE_FILE, 'utf8');
|
||||||
|
const parsed = JSON.parse(raw) as ChannelMetaFile;
|
||||||
|
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
|
||||||
|
if (typeof k === 'string' && typeof v === 'string') memory.set(k, v);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// ignore — start with empty cache on corruption
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function scheduleFlush(): void {
|
||||||
|
if (flushTimer) return;
|
||||||
|
// Debounce writes — many inbound messages may arrive in a burst.
|
||||||
|
// 250ms coalesces them; on gateway_stop the channel plugin can force
|
||||||
|
// a synchronous flush via flushChannelMeta().
|
||||||
|
flushTimer = setTimeout(() => {
|
||||||
|
flushTimer = null;
|
||||||
|
if (!dirty) return;
|
||||||
|
dirty = false;
|
||||||
|
flushSync();
|
||||||
|
}, 250);
|
||||||
|
}
|
||||||
|
|
||||||
|
function flushSync(): void {
|
||||||
|
try {
|
||||||
|
const dir = dirname(CACHE_FILE);
|
||||||
|
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
|
||||||
|
const out: ChannelMetaFile = { channels: Object.fromEntries(memory) };
|
||||||
|
const tmp = CACHE_FILE + '.tmp';
|
||||||
|
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
|
||||||
|
renameSync(tmp, CACHE_FILE);
|
||||||
|
} catch {
|
||||||
|
// swallow — cache is an optimization; loss-on-write is recoverable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Called by inbound on every message.created. xType empty → no-op. */
|
||||||
|
export function recordChannelType(channelId: string, xType: string | undefined): void {
|
||||||
|
if (!channelId || !xType) return;
|
||||||
|
load();
|
||||||
|
const existing = memory.get(channelId);
|
||||||
|
if (existing === xType) return;
|
||||||
|
memory.set(channelId, xType);
|
||||||
|
dirty = true;
|
||||||
|
scheduleFlush();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Cross-plugin lookup. null when channel never seen / unknown. */
|
||||||
|
export function getChannelType(channelId: string): string | null {
|
||||||
|
if (!channelId) return null;
|
||||||
|
load();
|
||||||
|
return memory.get(channelId) ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Force-flush — called on plugin shutdown to make sure recently
|
||||||
|
* recorded entries hit disk before the gateway dies. */
|
||||||
|
export function flushChannelMeta(): void {
|
||||||
|
if (flushTimer) {
|
||||||
|
clearTimeout(flushTimer);
|
||||||
|
flushTimer = null;
|
||||||
|
}
|
||||||
|
if (dirty) {
|
||||||
|
dirty = false;
|
||||||
|
flushSync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const CHANNEL_META_PATH = CACHE_FILE;
|
||||||
@@ -21,6 +21,39 @@ import {
|
|||||||
resolveDefaultFabricAccountId,
|
resolveDefaultFabricAccountId,
|
||||||
type ResolvedFabricAccount,
|
type ResolvedFabricAccount,
|
||||||
} from './accounts.js';
|
} from './accounts.js';
|
||||||
|
import { getChannelType } from './channel-meta.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
|
||||||
|
*
|
||||||
|
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
|
||||||
|
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
|
||||||
|
* 'direct' | 'group' | 'channel'. We collapse:
|
||||||
|
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
|
||||||
|
* - rest → 'group' (multi-party; turn-engine gates speech)
|
||||||
|
*
|
||||||
|
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
|
||||||
|
* otherwise the agent's outbound message lands in a different session
|
||||||
|
* than the inbound that triggered it and conversation state splits.
|
||||||
|
*
|
||||||
|
* Outbound has no live xType (the agent target is just a channelId), so
|
||||||
|
* it consults the channel-meta cache populated by inbound. Cache miss
|
||||||
|
* (channel never observed) falls back to 'group' — same as the pre-fix
|
||||||
|
* behavior, no regression on cold cache. The proactive-DM-first-message
|
||||||
|
* edge case (agent DMs a channel before any inbound) still lands as
|
||||||
|
* 'group' on that one outbound; the next inbound + outbound pair will
|
||||||
|
* agree on 'direct'.
|
||||||
|
*/
|
||||||
|
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
|
||||||
|
|
||||||
|
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
|
||||||
|
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
|
||||||
|
return { peerKind: 'group', chatType: 'group' };
|
||||||
|
}
|
||||||
|
|
||||||
|
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
|
||||||
|
return fabricPeerRoutingForXType(getChannelType(channelId));
|
||||||
|
}
|
||||||
|
|
||||||
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
|
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
|
||||||
|
|
||||||
@@ -45,13 +78,18 @@ export function looksLikeFabricTargetId(raw: string): boolean {
|
|||||||
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
|
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
|
||||||
const id = stripFabricTargetPrefix(params.target);
|
const id = stripFabricTargetPrefix(params.target);
|
||||||
if (!id) return null;
|
if (!id) return null;
|
||||||
|
// Consult the channel-meta cache populated by inbound — DM channels
|
||||||
|
// need peer.kind='direct' so the outbound session key matches the
|
||||||
|
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
|
||||||
|
// no regression on cold cache).
|
||||||
|
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
|
||||||
return buildChannelOutboundSessionRoute({
|
return buildChannelOutboundSessionRoute({
|
||||||
cfg: params.cfg,
|
cfg: params.cfg,
|
||||||
agentId: params.agentId,
|
agentId: params.agentId,
|
||||||
channel: 'fabric',
|
channel: 'fabric',
|
||||||
accountId: params.accountId,
|
accountId: params.accountId,
|
||||||
peer: { kind: 'group', id },
|
peer: { kind: peerKind, id },
|
||||||
chatType: 'group',
|
chatType,
|
||||||
from: `fabric:channel:${id}`,
|
from: `fabric:channel:${id}`,
|
||||||
to: `fabric:${id}`,
|
to: `fabric:${id}`,
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-
|
|||||||
import type { FabricClient, FabricSession } from './fabric-client.js';
|
import type { FabricClient, FabricSession } from './fabric-client.js';
|
||||||
import type { IdentityRegistry } from './identity.js';
|
import type { IdentityRegistry } from './identity.js';
|
||||||
import { resolveCoalesce } from './accounts.js';
|
import { resolveCoalesce } from './accounts.js';
|
||||||
|
import { fabricPeerRoutingForXType } from './channel.js';
|
||||||
|
import { recordChannelType } from './channel-meta.js';
|
||||||
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
||||||
|
|
||||||
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
|
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
|
||||||
@@ -279,17 +281,25 @@ export class FabricInbound {
|
|||||||
agentId: string;
|
agentId: string;
|
||||||
fabricUserId: string;
|
fabricUserId: string;
|
||||||
guildBaseUrl: string;
|
guildBaseUrl: string;
|
||||||
|
guildNodeId: string;
|
||||||
fabricApiKey: string;
|
fabricApiKey: string;
|
||||||
}> {
|
}> {
|
||||||
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = [];
|
const out: Array<{
|
||||||
|
agentId: string;
|
||||||
|
fabricUserId: string;
|
||||||
|
guildBaseUrl: string;
|
||||||
|
guildNodeId: string;
|
||||||
|
fabricApiKey: string;
|
||||||
|
}> = [];
|
||||||
for (const entry of this.identity.list()) {
|
for (const entry of this.identity.list()) {
|
||||||
if (!entry.fabricUserId) continue;
|
if (!entry.fabricUserId) continue;
|
||||||
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
|
const presenceGuild = this.firstGuildByAgent.get(entry.agentId);
|
||||||
if (!presenceGuildUrl) continue;
|
if (!presenceGuild) continue;
|
||||||
out.push({
|
out.push({
|
||||||
agentId: entry.agentId,
|
agentId: entry.agentId,
|
||||||
fabricUserId: entry.fabricUserId,
|
fabricUserId: entry.fabricUserId,
|
||||||
guildBaseUrl: presenceGuildUrl,
|
guildBaseUrl: presenceGuild.endpoint,
|
||||||
|
guildNodeId: presenceGuild.nodeId,
|
||||||
fabricApiKey: entry.fabricApiKey,
|
fabricApiKey: entry.fabricApiKey,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -297,24 +307,44 @@ export class FabricInbound {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||||
// guild per agent (used as the presence-push target).
|
// guild per agent (used as the presence-push target). Stores both
|
||||||
private firstGuildEndpointByAgent = new Map<string, string>();
|
// endpoint and nodeId — presence-sync needs both: endpoint to build
|
||||||
|
// the URL, nodeId to pick the matching guildAccessToken from a fresh
|
||||||
|
// agent-login response.
|
||||||
|
private firstGuildByAgent = new Map<string, { endpoint: string; nodeId: string }>();
|
||||||
|
|
||||||
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
||||||
const selfUserId = session.user.id;
|
const selfUserId = session.user.id;
|
||||||
// First-guild capture for presence-sync push target. session.guilds is
|
// First-guild capture for presence-sync push target. session.guilds is
|
||||||
// already in priority order from Center; we take the first one with a
|
// already in priority order from Center; we take the first one with a
|
||||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||||
if (!this.firstGuildEndpointByAgent.has(agentId)) {
|
if (!this.firstGuildByAgent.has(agentId)) {
|
||||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||||
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
|
if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId });
|
||||||
}
|
}
|
||||||
for (const g of session.guilds) {
|
for (const g of session.guilds) {
|
||||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||||
if (!tok) continue;
|
if (!tok) continue;
|
||||||
|
// Use the *callback* form of `auth` so socket.io re-evaluates the JWT
|
||||||
|
// on every (re)connect. The single-shot `auth: { token: tok }` shape
|
||||||
|
// captured the token in closure: after socket.io's silent auto-reconnect
|
||||||
|
// the backend got the same JWT that expired ~15 min into the session
|
||||||
|
// (guildAccessToken TTL = 900s) and silently rejected the handshake at
|
||||||
|
// the application layer. The client's `connect` event still fired (TCP
|
||||||
|
// succeeded), so the plugin happily ran the channel-resync, emitted
|
||||||
|
// `join_channel` into the void, and logged "joined N channel(s)" while
|
||||||
|
// the backend was actually broadcasting message.created to a room with
|
||||||
|
// zero subscribers. End user symptom: DMs to agents silently dropped.
|
||||||
const socket = io(`${g.endpoint}/realtime`, {
|
const socket = io(`${g.endpoint}/realtime`, {
|
||||||
transports: ['websocket'],
|
transports: ['websocket'],
|
||||||
auth: { token: tok },
|
auth: (cb) => {
|
||||||
|
// Best-effort fresh token; on transient failure fall back to the
|
||||||
|
// last known good one. tokenCache also keeps HTTP calls (attachment
|
||||||
|
// download / reply post) from 401'ing in the same window.
|
||||||
|
this.freshGuildToken(agentId, g.nodeId, session)
|
||||||
|
.then((fresh) => cb({ token: fresh ?? tok }))
|
||||||
|
.catch(() => cb({ token: tok }));
|
||||||
|
},
|
||||||
autoConnect: false,
|
autoConnect: false,
|
||||||
});
|
});
|
||||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||||
@@ -401,6 +431,13 @@ export class FabricInbound {
|
|||||||
socket.on('message.created', (m: FabricMessage) => {
|
socket.on('message.created', (m: FabricMessage) => {
|
||||||
const channelId = m.channelId ?? '';
|
const channelId = m.channelId ?? '';
|
||||||
if (!channelId) return;
|
if (!channelId) return;
|
||||||
|
// Record xType into the channel-meta cache before self-author
|
||||||
|
// / dedup gates — channel type doesn't depend on who sent the
|
||||||
|
// message, and recording it on observer-only triage messages
|
||||||
|
// is still useful (the next consumer asking
|
||||||
|
// __fabric.getChannelType wants the answer regardless of
|
||||||
|
// whether THIS message was delivered to an agent).
|
||||||
|
recordChannelType(channelId, m.xType);
|
||||||
if (m.authorUserId && m.authorUserId === selfUserId) return;
|
if (m.authorUserId && m.authorUserId === selfUserId) return;
|
||||||
const key = `${agentId}:${m.messageId}`;
|
const key = `${agentId}:${m.messageId}`;
|
||||||
if (this.seen.has(key)) return;
|
if (this.seen.has(key)) return;
|
||||||
@@ -496,11 +533,19 @@ export class FabricInbound {
|
|||||||
const core = this.core as Core & Record<string, unknown>;
|
const core = this.core as Core & Record<string, unknown>;
|
||||||
const cfg = this.cfg as { session?: { store?: unknown } };
|
const cfg = this.cfg as { session?: { store?: unknown } };
|
||||||
try {
|
try {
|
||||||
|
// Route by xType. DM channels need peer.kind='direct' so openclaw
|
||||||
|
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
|
||||||
|
// and ctx.ChatType='direct') rather than as a multi-party group.
|
||||||
|
// Without this, the agent's user-prompt metadata says
|
||||||
|
// 'is_group_chat: true' on a DM and downstream prompt logic
|
||||||
|
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
|
||||||
|
// misclassifies the turn.
|
||||||
|
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
|
||||||
const route = core.channel.routing.resolveAgentRoute({
|
const route = core.channel.routing.resolveAgentRoute({
|
||||||
cfg: this.cfg,
|
cfg: this.cfg,
|
||||||
channel: 'fabric',
|
channel: 'fabric',
|
||||||
accountId: agentId,
|
accountId: agentId,
|
||||||
peer: { kind: 'group', id: channelId },
|
peer: { kind: peerKind, id: channelId },
|
||||||
});
|
});
|
||||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||||
agentId: route.agentId,
|
agentId: route.agentId,
|
||||||
@@ -515,7 +560,7 @@ export class FabricInbound {
|
|||||||
To: `fabric:${channelId}`,
|
To: `fabric:${channelId}`,
|
||||||
SessionKey: route.sessionKey,
|
SessionKey: route.sessionKey,
|
||||||
AccountId: route.accountId ?? agentId,
|
AccountId: route.accountId ?? agentId,
|
||||||
ChatType: 'group',
|
ChatType: chatType,
|
||||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||||
SenderId: m.authorUserId ?? 'fabric',
|
SenderId: m.authorUserId ?? 'fabric',
|
||||||
Provider: 'fabric',
|
Provider: 'fabric',
|
||||||
|
|||||||
@@ -2,18 +2,26 @@
|
|||||||
* presence-sync — read each connected agent's HF status (via the
|
* presence-sync — read each connected agent's HF status (via the
|
||||||
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
||||||
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
||||||
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
|
* `PUT /api/agents/:userId/presence` so the backend can apply
|
||||||
* on `announce`-type channel deliveries.
|
* busy-discard on `announce`-type channel deliveries.
|
||||||
*
|
*
|
||||||
* Push model: we only PUT when an agent's status actually changes
|
* Push model: we only PUT when an agent's status actually changes
|
||||||
* (since the last push). The HF-side accessor has its own TTL cache
|
* (since the last push). The HF-side accessor has its own TTL cache
|
||||||
* to absorb the every-30s polling.
|
* to absorb the every-30s polling.
|
||||||
*
|
*
|
||||||
|
* Auth: the endpoint sits behind ApiKeyGuard (global APP_GUARD per
|
||||||
|
* app.module.js) which expects `Authorization: Bearer <guild-token>`
|
||||||
|
* — NOT the agent's fabricApiKey directly. So before each PUT we do
|
||||||
|
* a fresh agent-login (or reuse a cached token if still within its
|
||||||
|
* 15-min JWT TTL) and pull the guildAccessToken matching the target
|
||||||
|
* guild. Status changes are rare enough that login overhead is fine.
|
||||||
|
*
|
||||||
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
||||||
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
||||||
* treated as not-busy. Announce-channel delivery still works; busy
|
* treated as not-busy. Announce-channel delivery still works; busy
|
||||||
* filtering simply doesn't kick in.
|
* filtering simply doesn't kick in.
|
||||||
*/
|
*/
|
||||||
|
import type { FabricClient } from './fabric-client.js';
|
||||||
|
|
||||||
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
|
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
|
||||||
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
|
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
|
||||||
@@ -23,15 +31,36 @@ export interface PresenceSyncAccount {
|
|||||||
agentId: string;
|
agentId: string;
|
||||||
fabricUserId: string; // the agent's Fabric Center user id (UUID)
|
fabricUserId: string; // the agent's Fabric Center user id (UUID)
|
||||||
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
|
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
|
||||||
fabricApiKey: string; // existing per-account key
|
guildNodeId: string; // which guildAccessTokens[].guildNodeId to pick
|
||||||
|
fabricApiKey: string; // existing per-account key (used for agent-login)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Guild access JWTs expire every 900s. Refresh ~2 min early to stay
|
||||||
|
// safely inside the window even if a tick runs late.
|
||||||
|
const TOKEN_TTL_MS = (15 - 2) * 60 * 1000;
|
||||||
|
|
||||||
|
interface CachedToken {
|
||||||
|
token: string;
|
||||||
|
expiresAt: number; // epoch ms
|
||||||
}
|
}
|
||||||
|
|
||||||
export class PresenceSync {
|
export class PresenceSync {
|
||||||
private timer: ReturnType<typeof setInterval> | null = null;
|
private timer: ReturnType<typeof setInterval> | null = null;
|
||||||
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
|
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
|
||||||
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
||||||
|
private readonly tokenCache = new Map<string, CachedToken>(); // by agentId
|
||||||
|
|
||||||
constructor(private readonly logger: Logger) {}
|
// Mutex flag: a tick iterates accounts serially with `await` on each
|
||||||
|
// agent-login + PUT round-trip, so a single tick can easily run 20+s
|
||||||
|
// when there are many accounts. setInterval(intervalMs) does NOT wait
|
||||||
|
// for the previous tick to finish — without this guard the next tick
|
||||||
|
// fires on top of a still-running one and two parallel iterations
|
||||||
|
// PUT the same agentId within milliseconds. That tipped the backend's
|
||||||
|
// first-time-insert race (separate fix in Fabric.Backend.Guild) into
|
||||||
|
// 500s on prod. Guarded ticks just skip a beat instead.
|
||||||
|
private inflight = false;
|
||||||
|
|
||||||
|
constructor(private readonly logger: Logger, private readonly client: FabricClient) {}
|
||||||
|
|
||||||
setAccounts(accounts: PresenceSyncAccount[]): void {
|
setAccounts(accounts: PresenceSyncAccount[]): void {
|
||||||
this.accounts.clear();
|
this.accounts.clear();
|
||||||
@@ -54,7 +83,50 @@ export class PresenceSync {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch a fresh guildAccessToken for `acct`, caching it under the
|
||||||
|
* agentId until just before its JWT expiry. Returns null on login
|
||||||
|
* failure or if the session has no matching guild — caller logs +
|
||||||
|
* skips the PUT.
|
||||||
|
*/
|
||||||
|
private async ensureGuildToken(acct: PresenceSyncAccount): Promise<string | null> {
|
||||||
|
const now = Date.now();
|
||||||
|
const cached = this.tokenCache.get(acct.agentId);
|
||||||
|
if (cached && cached.expiresAt > now) return cached.token;
|
||||||
|
|
||||||
|
let session;
|
||||||
|
try {
|
||||||
|
session = await this.client.agentLogin(acct.fabricApiKey);
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.warn(`fabric: presence-sync agent-login failed for ${acct.agentId}: ${String(err)}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const entry = session.guildAccessTokens.find((g) => g.guildNodeId === acct.guildNodeId);
|
||||||
|
if (!entry?.token) {
|
||||||
|
this.logger.warn(
|
||||||
|
`fabric: presence-sync no guild token for ${acct.agentId} guild=${acct.guildNodeId}`,
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.tokenCache.set(acct.agentId, { token: entry.token, expiresAt: now + TOKEN_TTL_MS });
|
||||||
|
return entry.token;
|
||||||
|
}
|
||||||
|
|
||||||
private async tick(): Promise<void> {
|
private async tick(): Promise<void> {
|
||||||
|
// Mutex: see the `inflight` field declaration for the why. Drop
|
||||||
|
// overlapping ticks rather than letting them run concurrently —
|
||||||
|
// status is gated by `lastStatus !== bridge.get`, so skipping a
|
||||||
|
// beat costs nothing the next beat won't catch.
|
||||||
|
if (this.inflight) return;
|
||||||
|
this.inflight = true;
|
||||||
|
try {
|
||||||
|
await this.tickInner();
|
||||||
|
} finally {
|
||||||
|
this.inflight = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async tickInner(): Promise<void> {
|
||||||
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
||||||
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
||||||
|
|
||||||
@@ -68,13 +140,22 @@ export class PresenceSync {
|
|||||||
if (!status) continue;
|
if (!status) continue;
|
||||||
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
|
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
|
||||||
|
|
||||||
|
const guildToken = await this.ensureGuildToken(acct);
|
||||||
|
if (!guildToken) continue;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
// Endpoint: PUT /api/agents/:userId/presence. ApiKeyGuard (global
|
||||||
|
// APP_GUARD) requires `Authorization: Bearer <guildAccessToken>`
|
||||||
|
// — NOT the agent's raw fabricApiKey. Pre-v1: this loop sent
|
||||||
|
// x-api-key and got 401 "missing bearer token" forever. The /api
|
||||||
|
// prefix is required because the guild backend sets a global
|
||||||
|
// 'api' prefix in main.ts setGlobalPrefix('api').
|
||||||
|
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/api/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||||
const res = await fetch(url, {
|
const res = await fetch(url, {
|
||||||
method: 'PUT',
|
method: 'PUT',
|
||||||
headers: {
|
headers: {
|
||||||
'content-type': 'application/json',
|
'content-type': 'application/json',
|
||||||
'x-api-key': acct.fabricApiKey,
|
authorization: `Bearer ${guildToken}`,
|
||||||
},
|
},
|
||||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||||
});
|
});
|
||||||
@@ -82,6 +163,10 @@ export class PresenceSync {
|
|||||||
this.lastStatus.set(agentId, status);
|
this.lastStatus.set(agentId, status);
|
||||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||||
} else {
|
} else {
|
||||||
|
// 401 here usually means the cached token went stale unexpectedly
|
||||||
|
// (server-side rotation or clock skew) — drop the cache so the
|
||||||
|
// next tick re-logs-in.
|
||||||
|
if (res.status === 401) this.tokenCache.delete(agentId);
|
||||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
Reference in New Issue
Block a user