import { promises as fs } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { io } from 'socket.io-client'; import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; import { resolveCoalesce } from './accounts.js'; import { fabricPeerRoutingForXType } from './channel.js'; import { recordChannelType } from './channel-meta.js'; import { enqueueDelivery, flushFabricForChannel } from './coalesce.js'; export class FabricInbound { core; cfg; client; identity; log; accounts; sockets = []; seen = new Set(); // Timers that periodically re-sync channel membership per (agent, guild). // Without this, the agent's socket.io subscriptions are a snapshot taken // at connect time — any channel the agent joins later (e.g. a fresh DM // created by another user) is unreachable until the gateway restarts. channelSyncTimers = []; // Resync cadence. Backend doesn't push a `channel.joined` event, so we // poll. 60s keeps the lag bounded without hammering the backend. static CHANNEL_SYNC_INTERVAL_MS = 60_000; // Guild access tokens are short-lived (~15 min). The socket survives via // socket.io reconnect, but the token captured at connect time goes stale, // so HTTP calls (attachment download, posting the reply) start 401ing. // Re-login per agent on a short TTL to keep a fresh token. tokenCache = new Map(); static TOKEN_TTL_MS = 8 * 60 * 1000; // Per-channel serial work queue. Every inbound socket message for a // channel awaits the previous task for that same channel, so model // turns never interleave. Map key = channelId; value is the tail of // the chain (an in-flight promise the next task awaits). // // Why per-channel and not per-agent: a single agent may sit in // several triage / general channels; we want each channel to flow at // its own speed but the SAME channel's traffic to be strictly serial. // For dm and discuss the queue also serialises but those traditionally // had at-most-one-in-flight anyway via the turn engine. channelChains = new Map(); // Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/ // agent/status round-trip off the hot path for back-to-back triage // messages. Short TTL because status flips are rare-but-meaningful. agentStatusCache = new Map(); static AGENT_STATUS_TTL_MS = 5_000; // Triage messages that arrived while the on-duty agent wasn't on_call // — sit here until either (a) the agent becomes on_call and the next // triage arrival drains them, or (b) the gateway restarts (lost; ok // because the underlying Fabric messages are persisted and re-fetched // on agent reconnect's history sweep). pendingTriageGated = []; // Schedule `task` to run after every previous task on the same // channel has completed. Returns the promise so callers can await // their own result if they need to; the chain itself is fire-and- // forget from the socket.on handler. enqueueChannelTask(channelId, task) { const prev = this.channelChains.get(channelId) ?? Promise.resolve(); const next = prev.then(task).catch((err) => { this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`); }); this.channelChains.set(channelId, next); // Best-effort cleanup so the Map doesn't grow without bound for // long-running gateways: drop the entry when the chain settles, but // only if it's still the latest reference (newer enqueue may have // overwritten it in the meantime). void next.finally(() => { if (this.channelChains.get(channelId) === next) { this.channelChains.delete(channelId); } }); return next; } // Hit HF backend to check whether `agentId` is currently on_call. // Cached for 5s. Failures (network, 404, etc.) are treated as "not // on_call" — triage stays gated rather than risking a confused wake. async checkAgentOnCall(agentId) { const cached = this.agentStatusCache.get(agentId); if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) { return cached.onCall; } const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top'; // CLAW_IDENTIFIER resolution priority: // 1. HF_CLAW_IDENTIFIER env (operator override) // 2. openclaw config `plugins.harbor-forge.identifier` (what the HF // plugin itself uses — keeps the two in sync without an extra // env per service unit) // 3. os.hostname() last-resort fallback (often wrong: e.g. sim // container hostname is `server.t2` but HF agent row has // `claw_identifier=sim-t2`; matching is mandatory for the HF // backend's _require_agent() check) let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim(); if (!claw) { try { // openclaw config shape (verified in sim): // { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } } const cfg = this.cfg; const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier; if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) { claw = fromCfg.trim(); } } catch { /* fall through to hostname */ } } if (!claw) { claw = (await import('os')).hostname(); } let onCall = false; try { const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`; const res = await fetch(url, { headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw }, }); if (res.ok) { const data = (await res.json()); onCall = (data.status ?? '').toLowerCase() === 'on_call'; } } catch (err) { this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`); } this.agentStatusCache.set(agentId, { onCall, at: Date.now() }); return onCall; } // FIFO drain of all triage-gated messages for `agentId` (called when // we just learned they're on_call). Each drained message is dispatched // through its own channel chain so per-channel serial order is kept. async drainGatedFor(agentId) { const keep = []; const drain = []; for (const item of this.pendingTriageGated) { if (item.agentId === agentId) drain.push(item); else keep.push(item); } if (drain.length === 0) return; this.pendingTriageGated = keep; for (const item of drain) { this.log.info(`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`); // Re-enqueue via the per-channel chain so ordering is preserved. this.enqueueChannelTask(item.channelId, async () => { await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session); }); } } // Return a fresh guild access token for the agent, re-authenticating with // the agent's Fabric API key when the cached session is stale. Falls back // to the connect-time session token if re-login fails. async freshGuildToken(agentId, guildNodeId, fallback) { const pick = (s) => s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token; const now = Date.now(); const cached = this.tokenCache.get(agentId); if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) { return pick(cached.session) ?? pick(fallback); } const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey; if (apiKey) { try { const s = await this.client.agentLogin(apiKey); this.tokenCache.set(agentId, { session: s, at: now }); return pick(s) ?? pick(fallback); } catch (err) { this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`); } } return pick(fallback); } constructor(core, // PluginRuntime cfg, // OpenClawConfig client, identity, log, accounts = []) { this.core = core; this.cfg = cfg; this.client = client; this.identity = identity; this.log = log; this.accounts = accounts; } async start() { const entries = this.accounts.length > 0 ? this.accounts.map((a) => ({ agentId: a.agentId, fabricApiKey: a.fabricApiKey })) : this.identity.list(); for (const entry of entries) { try { const session = await this.client.agentLogin(entry.fabricApiKey); this.identity.upsert({ agentId: entry.agentId, fabricApiKey: entry.fabricApiKey, fabricUserId: session.user.id, displayName: session.user.name, }); await this.connectAgent(entry.agentId, session); this.log.info(`fabric: agent ${entry.agentId} connected as ${session.user.email}`); } catch (err) { this.log.warn(`fabric: agent ${entry.agentId} connect failed: ${String(err)}`); } } } stop() { for (const t of this.channelSyncTimers) clearInterval(t); this.channelSyncTimers = []; for (const s of this.sockets) s.disconnect(); this.sockets = []; } /** * Per-account metadata harvested during `start()` — used by * PresenceSync to know where to push each agent's HF status. * * `fabricUserId` is filled from `session.user.id` after agent-login. * `guildBaseUrl` is the FIRST guild the agent is connected to (multi- * guild presence push is a future concern; for sim/prod-v1 each agent * is in one guild). * * Returns ONLY agents that successfully connected — failed-login * agents have no fabricUserId yet and are excluded. */ getPresenceAccounts() { const out = []; for (const entry of this.identity.list()) { if (!entry.fabricUserId) continue; const presenceGuild = this.firstGuildByAgent.get(entry.agentId); if (!presenceGuild) continue; out.push({ agentId: entry.agentId, fabricUserId: entry.fabricUserId, guildBaseUrl: presenceGuild.endpoint, guildNodeId: presenceGuild.nodeId, fabricApiKey: entry.fabricApiKey, }); } return out; } // Filled by connectAgent for each (agent, guild). Tracks ONLY the first // guild per agent (used as the presence-push target). Stores both // endpoint and nodeId — presence-sync needs both: endpoint to build // the URL, nodeId to pick the matching guildAccessToken from a fresh // agent-login response. firstGuildByAgent = new Map(); async connectAgent(agentId, session) { const selfUserId = session.user.id; // First-guild capture for presence-sync push target. session.guilds is // already in priority order from Center; we take the first one with a // valid endpoint and stop. Multi-guild presence is a future concern. if (!this.firstGuildByAgent.has(agentId)) { const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0); if (firstGuild) this.firstGuildByAgent.set(agentId, { endpoint: firstGuild.endpoint, nodeId: firstGuild.nodeId }); } for (const g of session.guilds) { const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; if (!tok) continue; // Use the *callback* form of `auth` so socket.io re-evaluates the JWT // on every (re)connect. The single-shot `auth: { token: tok }` shape // captured the token in closure: after socket.io's silent auto-reconnect // the backend got the same JWT that expired ~15 min into the session // (guildAccessToken TTL = 900s) and silently rejected the handshake at // the application layer. The client's `connect` event still fired (TCP // succeeded), so the plugin happily ran the channel-resync, emitted // `join_channel` into the void, and logged "joined N channel(s)" while // the backend was actually broadcasting message.created to a room with // zero subscribers. End user symptom: DMs to agents silently dropped. const socket = io(`${g.endpoint}/realtime`, { transports: ['websocket'], auth: (cb) => { // Best-effort fresh token; on transient failure fall back to the // last known good one. tokenCache also keeps HTTP calls (attachment // download / reply post) from 401'ing in the same window. this.freshGuildToken(agentId, g.nodeId, session) .then((fresh) => cb({ token: fresh ?? tok })) .catch(() => cb({ token: tok })); }, autoConnect: false, }); // Tracked socket.io rooms for this (agent, guild). The initial fetch // on `connect` seeds it; the periodic resync diffs against it so we // only emit `join_channel` for genuinely new channels (and // `leave_channel` for ones the agent is no longer in). const joined = new Set(); const syncChannels = async (kind) => { let freshTok; try { freshTok = await this.freshGuildToken(agentId, g.nodeId, session); } catch { freshTok = tok; } const authTok = freshTok ?? tok; try { const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } }); if (!res.ok) return; const channels = (await res.json()); const current = new Set(channels.map((c) => c.id)); let added = 0; let removed = 0; for (const id of current) { if (!joined.has(id)) { socket.emit('join_channel', { channelId: id }); joined.add(id); added++; } } for (const id of [...joined]) { if (!current.has(id)) { socket.emit('leave_channel', { channelId: id }); joined.delete(id); removed++; } } if (kind === 'initial') { this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`); } else if (added > 0 || removed > 0) { this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`); } } catch { /* best effort — next tick will retry */ } }; socket.on('connect', () => { // On every (re)connect the server forgets prior subscriptions, so // reset our local view and seed from a fresh fetch. joined.clear(); void syncChannels('initial'); }); // Push-based membership events from the backend (companion to // Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the // server tells us this user was added to / removed from a // channel, we sub/unsub the socket.io room immediately — no // 60s wait for the polling resync. Polling remains as a safety // net for missed events. socket.on('channel.joined', (evt) => { const id = evt?.channelId; if (!id || joined.has(id)) return; socket.emit('join_channel', { channelId: id }); joined.add(id); this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`); }); socket.on('channel.left', (evt) => { const id = evt?.channelId; if (!id || !joined.has(id)) return; socket.emit('leave_channel', { channelId: id }); joined.delete(id); this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`); }); const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS); this.channelSyncTimers.push(syncTimer); socket.on('message.created', (m) => { const channelId = m.channelId ?? ''; if (!channelId) return; // Record xType into the channel-meta cache before self-author // / dedup gates — channel type doesn't depend on who sent the // message, and recording it on observer-only triage messages // is still useful (the next consumer asking // __fabric.getChannelType wants the answer regardless of // whether THIS message was delivered to an agent). recordChannelType(channelId, m.xType); if (m.authorUserId && m.authorUserId === selfUserId) return; const key = `${agentId}:${m.messageId}`; if (this.seen.has(key)) return; this.seen.add(key); if (this.seen.size > 5000) this.seen.clear(); // Per-channel serial queue. Prevents concurrent model turns for // the same channel — important for triage where a second wake // arriving mid-reply would interleave with the in-flight one. this.enqueueChannelTask(channelId, async () => { // Triage on_call gate: if the on-duty agent isn't currently // on_call per HF, don't dispatch yet — just sit on the // per-channel queue. Subsequent triage messages will recheck; // when the agent becomes on_call, the next arrival drains. // // Also handles: triage + wake=true must verify status before // committing to a model turn. Non-triage and triage observer // (wake=false) skip the gate. if (m.xType === 'triage' && m.wakeup === true) { const onCall = await this.checkAgentOnCall(agentId); if (!onCall) { this.log.info(`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`); this.pendingTriageGated.push({ agentId, g, channelId, m, session }); return; } // Drain any previously-gated messages (FIFO) before this one, // now that we know the agent is on_call. await this.drainGatedFor(agentId); } await this.dispatch(agentId, g, channelId, m, session); }); }); socket.connect(); this.sockets.push(socket); } } // Download a message's attachments to a temp dir using the agent's guild // token; returns local paths/types/urls for the inbound media context. async fetchAttachments(agentId, endpoint, token, m) { const out = { paths: [], types: [], urls: [] }; const list = m.attachments ?? []; if (!list.length || !token) return out; const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_')); try { await fs.mkdir(dir, { recursive: true }); } catch { return out; } let i = 0; for (const a of list) { try { const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`; const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } }); if (!res.ok) { this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`); continue; } const buf = Buffer.from(await res.arrayBuffer()); const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`; const p = join(dir, `${i}-${safe}`); await fs.writeFile(p, buf); out.paths.push(p); out.types.push(a.mimeType || res.headers.get('content-type')?.split(';')[0] || 'application/octet-stream'); out.urls.push(abs); i++; } catch (err) { this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`); } } if (out.paths.length) this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`); return out; } async dispatch(agentId, guild, channelId, m, session) { const core = this.core; const cfg = this.cfg; try { // Route by xType. DM channels need peer.kind='direct' so openclaw // treats them as 1:1 (sessionKey 'agent::fabric:direct:' // and ctx.ChatType='direct') rather than as a multi-party group. // Without this, the agent's user-prompt metadata says // 'is_group_chat: true' on a DM and downstream prompt logic // (commands-handlers `isDirectMessage` checks ChatType==='direct') // misclassifies the turn. const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType); const route = core.channel.routing.resolveAgentRoute({ cfg: this.cfg, channel: 'fabric', accountId: agentId, peer: { kind: peerKind, id: channelId }, }); const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { agentId: route.agentId, }); const baseCtx = { Body: m.content, BodyForAgent: m.content, RawBody: m.content, CommandBody: m.content, From: `fabric:channel:${channelId}`, To: `fabric:${channelId}`, SessionKey: route.sessionKey, AccountId: route.accountId ?? agentId, ChatType: chatType, ConversationLabel: `fabric:${guild.nodeId}`, SenderId: m.authorUserId ?? 'fabric', Provider: 'fabric', Surface: 'fabric', MessageSid: m.messageId, Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(), OriginatingChannel: 'fabric', OriginatingTo: `fabric:${channelId}`, }; // Non-wakeup: Fabric has already decided this agent is NOT the speaker // this round. Do NOT run the model and do NOT send anything back — the // discuss/work turn engine expects silence from non-woken agents (only // the woken speaker emits a normal message or /no-reply). We still // record the message into the agent's session so it has the full // channel conversation as context whenever it IS later woken. // // Exception: dm channels are 1:1 — there is no turn/wakeup gating; // any message that isn't the agent's own (already filtered above) is // always delivered to the model. if (m.xType !== 'dm' && m.wakeup !== true) { // Triage exception: non-wake messages (admin observer) MUST NOT // enter the agent's session at all. The next time the agent // wakes for a triage message, their context should contain only // their own past wakeups + their own outgoing messages — never // the observer-only chatter from other agents. For non-triage // channels keep the legacy "record-as-history" so a later wake // sees the full channel conversation. if (m.xType === 'triage') { this.log.info(`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`); return; } const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx); await core.channel.session.recordInboundSession({ storePath, sessionKey: route.sessionKey, ctx: ctxPayload, createIfMissing: true, onRecordError: (err) => this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`), }); this.log.info(`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`); return; } this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`); const gt = await this.freshGuildToken(agentId, guild.nodeId, session); // Fetch any uploaded files for the agent: download to a temp dir and // hand openclaw local MediaPaths (+types) so the model receives them. const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m); const ctxPayload = core.channel.reply.finalizeInboundContext({ ...baseCtx, // Provide ONLY local paths. The guild file URL is on a private host // (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so // passing MediaUrls is both redundant (we already downloaded the // bytes) and noisy. Local MediaPaths is the reliable delivery. ...(media.paths.length ? { MediaPaths: media.paths, MediaTypes: media.types, MediaPath: media.paths[0], MediaType: media.types[0], } : {}), }); await dispatchInboundReplyWithBase({ cfg: this.cfg, channel: 'fabric', accountId: agentId, route, storePath, ctxPayload: ctxPayload, core: this.core, deliver: async (payload) => { const text = (payload?.text ?? '').trim(); this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`); if (!text || !gt) return; // Buffer segments; the merged message is posted right after // dispatch returns (the deterministic turn boundary, see the // finally below). Disable per channel: channels.fabric.coalesce. await enqueueDelivery({ channelId, text, coalesce: resolveCoalesce(this.cfg), post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id), log: (m) => this.log.info(m), }); }, onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`), onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`), // - disableBlockStreaming: Fabric has no length limit, deliver the // whole reply as ONE message. // - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group // chats to "message_tool_only", which SUPPRESSES auto-delivery of // the agent's text reply (it expects the agent to call a message // tool). Fabric already gates *when* an agent speaks via the // per-recipient wakeup flag, so once a turn is dispatched the // reply must always flow back through `deliver`. Forcing // 'automatic' overrides the group default so the reply is // delivered. (source-reply-delivery-mode: a truthy `requested` // wins unless it's message_tool_only with no tool available.) replyOptions: { disableBlockStreaming: true, sourceReplyDeliveryMode: 'automatic', }, }); this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`); } catch (err) { this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`); } finally { // Deterministic per-turn boundary: dispatchInboundReplyWithBase only // resolves AFTER every deliver() call of this turn has run, so the // buffer now holds all segments — flush them as ONE Fabric message. // No hooks, no timers, no idle guessing. await flushFabricForChannel(channelId); } } }