Files
Fabric.OpenclawPlugin/dist/fabric/src/inbound.js
hzhang a060ff98a2 feat(inbound): listen for backend-pushed channel.joined/left events
Companion to nav/Fabric.Backend.Guild#<TBD> which adds the server-side
emitToUser broadcast on channel membership changes. Before, the inbound
only learned about new channels via the 60s polling resync (worst-case
60s lag). Now the backend tells us directly so sub/unsub is realtime.

socket.on('channel.joined', evt) → join the socket.io room for evt.channelId
                                    and add to the local 'joined' set.
socket.on('channel.left',   evt) → leave + remove from 'joined'.

Both events are idempotent (`if (joined.has(id))` / `if (!joined.has(id))`)
so duplicate emits from server are safe. Polling resync still runs every
60s as a safety net for transient socket drops between emit and
reconnect, partial server failures, etc.

When backend lacks this support (older deployments), nothing breaks —
the event simply never fires and polling carries the load as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:08:33 +01:00

371 lines
18 KiB
JavaScript

import { promises as fs } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
cfg;
client;
identity;
log;
accounts;
sockets = [];
seen = new Set();
// Timers that periodically re-sync channel membership per (agent, guild).
// Without this, the agent's socket.io subscriptions are a snapshot taken
// at connect time — any channel the agent joins later (e.g. a fresh DM
// created by another user) is unreachable until the gateway restarts.
channelSyncTimers = [];
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
// poll. 60s keeps the lag bounded without hammering the backend.
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
// Guild access tokens are short-lived (~15 min). The socket survives via
// socket.io reconnect, but the token captured at connect time goes stale,
// so HTTP calls (attachment download, posting the reply) start 401ing.
// Re-login per agent on a short TTL to keep a fresh token.
tokenCache = new Map();
static TOKEN_TTL_MS = 8 * 60 * 1000;
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
async freshGuildToken(agentId, guildNodeId, fallback) {
const pick = (s) => s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
const now = Date.now();
const cached = this.tokenCache.get(agentId);
if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) {
return pick(cached.session) ?? pick(fallback);
}
const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey;
if (apiKey) {
try {
const s = await this.client.agentLogin(apiKey);
this.tokenCache.set(agentId, { session: s, at: now });
return pick(s) ?? pick(fallback);
}
catch (err) {
this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`);
}
}
return pick(fallback);
}
constructor(core, // PluginRuntime
cfg, // OpenClawConfig
client, identity, log, accounts = []) {
this.core = core;
this.cfg = cfg;
this.client = client;
this.identity = identity;
this.log = log;
this.accounts = accounts;
}
async start() {
const entries = this.accounts.length > 0
? this.accounts.map((a) => ({ agentId: a.agentId, fabricApiKey: a.fabricApiKey }))
: this.identity.list();
for (const entry of entries) {
try {
const session = await this.client.agentLogin(entry.fabricApiKey);
this.identity.upsert({
agentId: entry.agentId,
fabricApiKey: entry.fabricApiKey,
fabricUserId: session.user.id,
displayName: session.user.name,
});
await this.connectAgent(entry.agentId, session);
this.log.info(`fabric: agent ${entry.agentId} connected as ${session.user.email}`);
}
catch (err) {
this.log.warn(`fabric: agent ${entry.agentId} connect failed: ${String(err)}`);
}
}
}
stop() {
for (const t of this.channelSyncTimers)
clearInterval(t);
this.channelSyncTimers = [];
for (const s of this.sockets)
s.disconnect();
this.sockets = [];
}
async connectAgent(agentId, session) {
const selfUserId = session.user.id;
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok)
continue;
const socket = io(`${g.endpoint}/realtime`, {
transports: ['websocket'],
auth: { token: tok },
autoConnect: false,
});
// Tracked socket.io rooms for this (agent, guild). The initial fetch
// on `connect` seeds it; the periodic resync diffs against it so we
// only emit `join_channel` for genuinely new channels (and
// `leave_channel` for ones the agent is no longer in).
const joined = new Set();
const syncChannels = async (kind) => {
let freshTok;
try {
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
}
catch {
freshTok = tok;
}
const authTok = freshTok ?? tok;
try {
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
if (!res.ok)
return;
const channels = (await res.json());
const current = new Set(channels.map((c) => c.id));
let added = 0;
let removed = 0;
for (const id of current) {
if (!joined.has(id)) {
socket.emit('join_channel', { channelId: id });
joined.add(id);
added++;
}
}
for (const id of [...joined]) {
if (!current.has(id)) {
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
removed++;
}
}
if (kind === 'initial') {
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
}
else if (added > 0 || removed > 0) {
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
}
}
catch {
/* best effort — next tick will retry */
}
};
socket.on('connect', () => {
// On every (re)connect the server forgets prior subscriptions, so
// reset our local view and seed from a fresh fetch.
joined.clear();
void syncChannels('initial');
});
// Push-based membership events from the backend (companion to
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
// server tells us this user was added to / removed from a
// channel, we sub/unsub the socket.io room immediately — no
// 60s wait for the polling resync. Polling remains as a safety
// net for missed events.
socket.on('channel.joined', (evt) => {
const id = evt?.channelId;
if (!id || joined.has(id))
return;
socket.emit('join_channel', { channelId: id });
joined.add(id);
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
socket.on('channel.left', (evt) => {
const id = evt?.channelId;
if (!id || !joined.has(id))
return;
socket.emit('leave_channel', { channelId: id });
joined.delete(id);
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
});
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
this.channelSyncTimers.push(syncTimer);
socket.on('message.created', (m) => {
const channelId = m.channelId ?? '';
if (!channelId)
return;
if (m.authorUserId && m.authorUserId === selfUserId)
return;
const key = `${agentId}:${m.messageId}`;
if (this.seen.has(key))
return;
this.seen.add(key);
if (this.seen.size > 5000)
this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
});
socket.connect();
this.sockets.push(socket);
}
}
// Download a message's attachments to a temp dir using the agent's guild
// token; returns local paths/types/urls for the inbound media context.
async fetchAttachments(agentId, endpoint, token, m) {
const out = { paths: [], types: [], urls: [] };
const list = m.attachments ?? [];
if (!list.length || !token)
return out;
const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_'));
try {
await fs.mkdir(dir, { recursive: true });
}
catch {
return out;
}
let i = 0;
for (const a of list) {
try {
const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`;
const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } });
if (!res.ok) {
this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`);
continue;
}
const buf = Buffer.from(await res.arrayBuffer());
const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`;
const p = join(dir, `${i}-${safe}`);
await fs.writeFile(p, buf);
out.paths.push(p);
out.types.push(a.mimeType ||
res.headers.get('content-type')?.split(';')[0] ||
'application/octet-stream');
out.urls.push(abs);
i++;
}
catch (err) {
this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`);
}
}
if (out.paths.length)
this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`);
return out;
}
async dispatch(agentId, guild, channelId, m, session) {
const core = this.core;
const cfg = this.cfg;
try {
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const baseCtx = {
Body: m.content,
BodyForAgent: m.content,
RawBody: m.content,
CommandBody: m.content,
From: `fabric:channel:${channelId}`,
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',
Surface: 'fabric',
MessageSid: m.messageId,
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
OriginatingChannel: 'fabric',
OriginatingTo: `fabric:${channelId}`,
};
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
// this round. Do NOT run the model and do NOT send anything back — the
// discuss/work turn engine expects silence from non-woken agents (only
// the woken speaker emits a normal message or /no-reply). We still
// record the message into the agent's session so it has the full
// channel conversation as context whenever it IS later woken.
//
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,
sessionKey: route.sessionKey,
ctx: ctxPayload,
createIfMissing: true,
onRecordError: (err) => this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
});
this.log.info(`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`);
return;
}
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
// Fetch any uploaded files for the agent: download to a temp dir and
// hand openclaw local MediaPaths (+types) so the model receives them.
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
const ctxPayload = core.channel.reply.finalizeInboundContext({
...baseCtx,
// Provide ONLY local paths. The guild file URL is on a private host
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
// passing MediaUrls is both redundant (we already downloaded the
// bytes) and noisy. Local MediaPaths is the reliable delivery.
...(media.paths.length
? {
MediaPaths: media.paths,
MediaTypes: media.types,
MediaPath: media.paths[0],
MediaType: media.types[0],
}
: {}),
});
await dispatchInboundReplyWithBase({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
route,
storePath,
ctxPayload: ctxPayload,
core: this.core,
deliver: async (payload) => {
const text = (payload?.text ?? '').trim();
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
if (!text || !gt)
return;
// Buffer segments; the merged message is posted right after
// dispatch returns (the deterministic turn boundary, see the
// finally below). Disable per channel: channels.fabric.coalesce.
await enqueueDelivery({
channelId,
text,
coalesce: resolveCoalesce(this.cfg),
post: (t) => this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id),
log: (m) => this.log.info(m),
});
},
onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
// - disableBlockStreaming: Fabric has no length limit, deliver the
// whole reply as ONE message.
// - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group
// chats to "message_tool_only", which SUPPRESSES auto-delivery of
// the agent's text reply (it expects the agent to call a message
// tool). Fabric already gates *when* an agent speaks via the
// per-recipient wakeup flag, so once a turn is dispatched the
// reply must always flow back through `deliver`. Forcing
// 'automatic' overrides the group default so the reply is
// delivered. (source-reply-delivery-mode: a truthy `requested`
// wins unless it's message_tool_only with no tool available.)
replyOptions: {
disableBlockStreaming: true,
sourceReplyDeliveryMode: 'automatic',
},
});
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
}
catch (err) {
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
}
finally {
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
// resolves AFTER every deliver() call of this turn has run, so the
// buffer now holds all segments — flush them as ONE Fabric message.
// No hooks, no timers, no idle guessing.
await flushFabricForChannel(channelId);
}
}
}