The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.
Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
emit + remove (best-effort; cleans subs if the agent is removed from
a channel)
Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.
Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.
Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
414 lines
17 KiB
TypeScript
414 lines
17 KiB
TypeScript
import { promises as fs } from 'node:fs';
|
|
import { tmpdir } from 'node:os';
|
|
import { join } from 'node:path';
|
|
import { io, type Socket } from 'socket.io-client';
|
|
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
|
import type { FabricClient, FabricSession } from './fabric-client.js';
|
|
import type { IdentityRegistry } from './identity.js';
|
|
import { resolveCoalesce } from './accounts.js';
|
|
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
|
|
|
|
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
|
|
// channels (nextcloud-talk) drive the kernel:
|
|
// core = PluginRuntime (from setRuntime)
|
|
// route = core.channel.routing.resolveAgentRoute(...)
|
|
// ctx = core.channel.reply.finalizeInboundContext(...) // has SessionKey
|
|
// dispatch= dispatchInboundReplyWithBase({ cfg, route, ctxPayload, core, deliver })
|
|
// `core.channel.*` is accessed loosely so unrelated SDK drift won't break us.
|
|
type Core = {
|
|
channel: {
|
|
routing: { resolveAgentRoute: (p: unknown) => { agentId: string; sessionKey: string; accountId?: string } };
|
|
session: {
|
|
resolveStorePath: (store: unknown, o: { agentId: string }) => string;
|
|
recordInboundSession: (p: {
|
|
storePath: string;
|
|
sessionKey: string;
|
|
ctx: unknown;
|
|
createIfMissing?: boolean;
|
|
onRecordError: (e: unknown) => void;
|
|
}) => Promise<unknown>;
|
|
};
|
|
reply: { finalizeInboundContext: (p: Record<string, unknown>) => unknown };
|
|
};
|
|
};
|
|
|
|
type Logger = { info: (m: string) => void; warn: (m: string) => void; error?: (m: string) => void };
|
|
|
|
type FabricAttachment = { url: string; name?: string; mimeType?: string };
|
|
type FabricMessage = {
|
|
messageId: string;
|
|
seq: number;
|
|
content: string;
|
|
authorUserId?: string;
|
|
createdAt?: string;
|
|
channelId?: string;
|
|
attachments?: FabricAttachment[];
|
|
wakeup?: boolean;
|
|
// x-type of the channel (sent on message.created). 'dm' bypasses the
|
|
// wakeup gate: any message that isn't the agent's own is delivered.
|
|
xType?: string;
|
|
};
|
|
|
|
export class FabricInbound {
|
|
private sockets: Socket[] = [];
|
|
private seen = new Set<string>();
|
|
// Timers that periodically re-sync channel membership per (agent, guild).
|
|
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
|
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
|
// created by another user) is unreachable until the gateway restarts.
|
|
private channelSyncTimers: NodeJS.Timeout[] = [];
|
|
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
|
// poll. 60s keeps the lag bounded without hammering the backend.
|
|
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
|
// Guild access tokens are short-lived (~15 min). The socket survives via
|
|
// socket.io reconnect, but the token captured at connect time goes stale,
|
|
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
|
// Re-login per agent on a short TTL to keep a fresh token.
|
|
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
|
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
|
|
|
// Return a fresh guild access token for the agent, re-authenticating with
|
|
// the agent's Fabric API key when the cached session is stale. Falls back
|
|
// to the connect-time session token if re-login fails.
|
|
private async freshGuildToken(
|
|
agentId: string,
|
|
guildNodeId: string,
|
|
fallback: FabricSession,
|
|
): Promise<string | undefined> {
|
|
const pick = (s: FabricSession) =>
|
|
s.guildAccessTokens.find((t) => t.guildNodeId === guildNodeId)?.token;
|
|
const now = Date.now();
|
|
const cached = this.tokenCache.get(agentId);
|
|
if (cached && now - cached.at < FabricInbound.TOKEN_TTL_MS) {
|
|
return pick(cached.session) ?? pick(fallback);
|
|
}
|
|
const apiKey = this.identity.findByAgentId(agentId)?.fabricApiKey;
|
|
if (apiKey) {
|
|
try {
|
|
const s = await this.client.agentLogin(apiKey);
|
|
this.tokenCache.set(agentId, { session: s, at: now });
|
|
return pick(s) ?? pick(fallback);
|
|
} catch (err) {
|
|
this.log.warn(`fabric: token refresh failed agent=${agentId}: ${String(err)}`);
|
|
}
|
|
}
|
|
return pick(fallback);
|
|
}
|
|
|
|
constructor(
|
|
private readonly core: unknown, // PluginRuntime
|
|
private readonly cfg: unknown, // OpenClawConfig
|
|
private readonly client: FabricClient,
|
|
private readonly identity: IdentityRegistry,
|
|
private readonly log: Logger,
|
|
private readonly accounts: Array<{ agentId: string; fabricApiKey: string }> = [],
|
|
) {}
|
|
|
|
async start(): Promise<void> {
|
|
const entries =
|
|
this.accounts.length > 0
|
|
? this.accounts.map((a) => ({ agentId: a.agentId, fabricApiKey: a.fabricApiKey }))
|
|
: this.identity.list();
|
|
for (const entry of entries) {
|
|
try {
|
|
const session = await this.client.agentLogin(entry.fabricApiKey);
|
|
this.identity.upsert({
|
|
agentId: entry.agentId,
|
|
fabricApiKey: entry.fabricApiKey,
|
|
fabricUserId: session.user.id,
|
|
displayName: session.user.name,
|
|
});
|
|
await this.connectAgent(entry.agentId, session);
|
|
this.log.info(`fabric: agent ${entry.agentId} connected as ${session.user.email}`);
|
|
} catch (err) {
|
|
this.log.warn(`fabric: agent ${entry.agentId} connect failed: ${String(err)}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
stop(): void {
|
|
for (const t of this.channelSyncTimers) clearInterval(t);
|
|
this.channelSyncTimers = [];
|
|
for (const s of this.sockets) s.disconnect();
|
|
this.sockets = [];
|
|
}
|
|
|
|
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
|
const selfUserId = session.user.id;
|
|
for (const g of session.guilds) {
|
|
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
|
if (!tok) continue;
|
|
const socket = io(`${g.endpoint}/realtime`, {
|
|
transports: ['websocket'],
|
|
auth: { token: tok },
|
|
autoConnect: false,
|
|
});
|
|
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
|
// on `connect` seeds it; the periodic resync diffs against it so we
|
|
// only emit `join_channel` for genuinely new channels (and
|
|
// `leave_channel` for ones the agent is no longer in).
|
|
const joined = new Set<string>();
|
|
const syncChannels = async (kind: 'initial' | 'resync') => {
|
|
let freshTok: string | undefined;
|
|
try {
|
|
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
|
} catch {
|
|
freshTok = tok;
|
|
}
|
|
const authTok = freshTok ?? tok;
|
|
try {
|
|
const res = await fetch(
|
|
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
|
|
{ headers: { authorization: `Bearer ${authTok}` } },
|
|
);
|
|
if (!res.ok) return;
|
|
const channels = (await res.json()) as Array<{ id: string }>;
|
|
const current = new Set(channels.map((c) => c.id));
|
|
let added = 0;
|
|
let removed = 0;
|
|
for (const id of current) {
|
|
if (!joined.has(id)) {
|
|
socket.emit('join_channel', { channelId: id });
|
|
joined.add(id);
|
|
added++;
|
|
}
|
|
}
|
|
for (const id of [...joined]) {
|
|
if (!current.has(id)) {
|
|
socket.emit('leave_channel', { channelId: id });
|
|
joined.delete(id);
|
|
removed++;
|
|
}
|
|
}
|
|
if (kind === 'initial') {
|
|
this.log.info(
|
|
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
|
|
);
|
|
} else if (added > 0 || removed > 0) {
|
|
this.log.info(
|
|
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
|
|
);
|
|
}
|
|
} catch {
|
|
/* best effort — next tick will retry */
|
|
}
|
|
};
|
|
socket.on('connect', () => {
|
|
// On every (re)connect the server forgets prior subscriptions, so
|
|
// reset our local view and seed from a fresh fetch.
|
|
joined.clear();
|
|
void syncChannels('initial');
|
|
});
|
|
const syncTimer = setInterval(
|
|
() => void syncChannels('resync'),
|
|
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
|
);
|
|
this.channelSyncTimers.push(syncTimer);
|
|
socket.on('message.created', (m: FabricMessage) => {
|
|
const channelId = m.channelId ?? '';
|
|
if (!channelId) return;
|
|
if (m.authorUserId && m.authorUserId === selfUserId) return;
|
|
const key = `${agentId}:${m.messageId}`;
|
|
if (this.seen.has(key)) return;
|
|
this.seen.add(key);
|
|
if (this.seen.size > 5000) this.seen.clear();
|
|
void this.dispatch(agentId, g, channelId, m, session);
|
|
});
|
|
socket.connect();
|
|
this.sockets.push(socket);
|
|
}
|
|
}
|
|
|
|
// Download a message's attachments to a temp dir using the agent's guild
|
|
// token; returns local paths/types/urls for the inbound media context.
|
|
private async fetchAttachments(
|
|
agentId: string,
|
|
endpoint: string,
|
|
token: string | undefined,
|
|
m: FabricMessage,
|
|
): Promise<{ paths: string[]; types: string[]; urls: string[] }> {
|
|
const out = { paths: [] as string[], types: [] as string[], urls: [] as string[] };
|
|
const list = m.attachments ?? [];
|
|
if (!list.length || !token) return out;
|
|
const dir = join(tmpdir(), `fabric-media-${agentId}-${m.messageId}`.replace(/[^\w.-]/g, '_'));
|
|
try {
|
|
await fs.mkdir(dir, { recursive: true });
|
|
} catch {
|
|
return out;
|
|
}
|
|
let i = 0;
|
|
for (const a of list) {
|
|
try {
|
|
const abs = a.url.startsWith('http') ? a.url : `${endpoint}${a.url}`;
|
|
const res = await fetch(abs, { headers: { authorization: `Bearer ${token}` } });
|
|
if (!res.ok) {
|
|
this.log.warn(`fabric: attachment fetch ${res.status} ${abs}`);
|
|
continue;
|
|
}
|
|
const buf = Buffer.from(await res.arrayBuffer());
|
|
const safe = (a.name ?? `file-${i}`).replace(/[^\w.-]/g, '_').slice(0, 120) || `file-${i}`;
|
|
const p = join(dir, `${i}-${safe}`);
|
|
await fs.writeFile(p, buf);
|
|
out.paths.push(p);
|
|
out.types.push(
|
|
a.mimeType ||
|
|
res.headers.get('content-type')?.split(';')[0] ||
|
|
'application/octet-stream',
|
|
);
|
|
out.urls.push(abs);
|
|
i++;
|
|
} catch (err) {
|
|
this.log.warn(`fabric: attachment fetch failed agent=${agentId}: ${String(err)}`);
|
|
}
|
|
}
|
|
if (out.paths.length)
|
|
this.log.info(`fabric: fetched ${out.paths.length} attachment(s) agent=${agentId}`);
|
|
return out;
|
|
}
|
|
|
|
private async dispatch(
|
|
agentId: string,
|
|
guild: { nodeId: string; endpoint: string },
|
|
channelId: string,
|
|
m: FabricMessage,
|
|
session: FabricSession,
|
|
): Promise<void> {
|
|
const core = this.core as Core & Record<string, unknown>;
|
|
const cfg = this.cfg as { session?: { store?: unknown } };
|
|
try {
|
|
const route = core.channel.routing.resolveAgentRoute({
|
|
cfg: this.cfg,
|
|
channel: 'fabric',
|
|
accountId: agentId,
|
|
peer: { kind: 'group', id: channelId },
|
|
});
|
|
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
|
agentId: route.agentId,
|
|
});
|
|
|
|
const baseCtx: Record<string, unknown> = {
|
|
Body: m.content,
|
|
BodyForAgent: m.content,
|
|
RawBody: m.content,
|
|
CommandBody: m.content,
|
|
From: `fabric:channel:${channelId}`,
|
|
To: `fabric:${channelId}`,
|
|
SessionKey: route.sessionKey,
|
|
AccountId: route.accountId ?? agentId,
|
|
ChatType: 'group',
|
|
ConversationLabel: `fabric:${guild.nodeId}`,
|
|
SenderId: m.authorUserId ?? 'fabric',
|
|
Provider: 'fabric',
|
|
Surface: 'fabric',
|
|
MessageSid: m.messageId,
|
|
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
|
|
OriginatingChannel: 'fabric',
|
|
OriginatingTo: `fabric:${channelId}`,
|
|
};
|
|
|
|
// Non-wakeup: Fabric has already decided this agent is NOT the speaker
|
|
// this round. Do NOT run the model and do NOT send anything back — the
|
|
// discuss/work turn engine expects silence from non-woken agents (only
|
|
// the woken speaker emits a normal message or /no-reply). We still
|
|
// record the message into the agent's session so it has the full
|
|
// channel conversation as context whenever it IS later woken.
|
|
//
|
|
// Exception: dm channels are 1:1 — there is no turn/wakeup gating;
|
|
// any message that isn't the agent's own (already filtered above) is
|
|
// always delivered to the model.
|
|
if (m.xType !== 'dm' && m.wakeup !== true) {
|
|
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
|
await core.channel.session.recordInboundSession({
|
|
storePath,
|
|
sessionKey: route.sessionKey,
|
|
ctx: ctxPayload,
|
|
createIfMissing: true,
|
|
onRecordError: (err: unknown) =>
|
|
this.log.warn(`fabric: history record failed agent=${agentId}: ${String(err)}`),
|
|
});
|
|
this.log.info(
|
|
`fabric: recorded (no wakeup, history only) agent=${agentId} channel=${channelId}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
|
|
const gt = await this.freshGuildToken(agentId, guild.nodeId, session);
|
|
|
|
// Fetch any uploaded files for the agent: download to a temp dir and
|
|
// hand openclaw local MediaPaths (+types) so the model receives them.
|
|
const media = await this.fetchAttachments(agentId, guild.endpoint, gt, m);
|
|
|
|
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
|
...baseCtx,
|
|
// Provide ONLY local paths. The guild file URL is on a private host
|
|
// (e.g. localhost); openclaw's SSRF guard blocks re-fetching it, so
|
|
// passing MediaUrls is both redundant (we already downloaded the
|
|
// bytes) and noisy. Local MediaPaths is the reliable delivery.
|
|
...(media.paths.length
|
|
? {
|
|
MediaPaths: media.paths,
|
|
MediaTypes: media.types,
|
|
MediaPath: media.paths[0],
|
|
MediaType: media.types[0],
|
|
}
|
|
: {}),
|
|
});
|
|
|
|
await dispatchInboundReplyWithBase({
|
|
cfg: this.cfg as never,
|
|
channel: 'fabric',
|
|
accountId: agentId,
|
|
route,
|
|
storePath,
|
|
ctxPayload: ctxPayload as never,
|
|
core: this.core as never,
|
|
deliver: async (payload: { text?: string }) => {
|
|
const text = (payload?.text ?? '').trim();
|
|
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
|
|
if (!text || !gt) return;
|
|
// Buffer segments; the merged message is posted right after
|
|
// dispatch returns (the deterministic turn boundary, see the
|
|
// finally below). Disable per channel: channels.fabric.coalesce.
|
|
await enqueueDelivery({
|
|
channelId,
|
|
text,
|
|
coalesce: resolveCoalesce(this.cfg as never),
|
|
post: (t) =>
|
|
this.client.postMessage(guild.endpoint, gt, channelId, t, session.user.id) as Promise<void>,
|
|
log: (m) => this.log.info(m),
|
|
});
|
|
},
|
|
onRecordError: (err: unknown) =>
|
|
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
|
|
onDispatchError: (err: unknown, info: { kind: string }) =>
|
|
this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
|
|
// - disableBlockStreaming: Fabric has no length limit, deliver the
|
|
// whole reply as ONE message.
|
|
// - sourceReplyDeliveryMode 'automatic': OpenClaw defaults group
|
|
// chats to "message_tool_only", which SUPPRESSES auto-delivery of
|
|
// the agent's text reply (it expects the agent to call a message
|
|
// tool). Fabric already gates *when* an agent speaks via the
|
|
// per-recipient wakeup flag, so once a turn is dispatched the
|
|
// reply must always flow back through `deliver`. Forcing
|
|
// 'automatic' overrides the group default so the reply is
|
|
// delivered. (source-reply-delivery-mode: a truthy `requested`
|
|
// wins unless it's message_tool_only with no tool available.)
|
|
replyOptions: {
|
|
disableBlockStreaming: true,
|
|
sourceReplyDeliveryMode: 'automatic',
|
|
} as never,
|
|
});
|
|
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
|
|
} catch (err) {
|
|
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
|
} finally {
|
|
// Deterministic per-turn boundary: dispatchInboundReplyWithBase only
|
|
// resolves AFTER every deliver() call of this turn has run, so the
|
|
// buffer now holds all segments — flush them as ONE Fabric message.
|
|
// No hooks, no timers, no idle guessing.
|
|
await flushFabricForChannel(channelId);
|
|
}
|
|
}
|
|
}
|