import { io, type Socket } from 'socket.io-client'; import type { FabricClient, FabricSession } from './fabric-client.js'; import type { IdentityRegistry } from './identity.js'; // OpenClaw plugin runtime — only the channel-turn kernel surface we use. // Typed loosely on purpose: the concrete shapes come from // openclaw/plugin-sdk/core at the host's SDK version. type PluginRuntime = { channel: { turn: { run(args: unknown): Promise; }; }; log?: { debug?: (m: string, x?: unknown) => void }; }; type Logger = { info: (m: string) => void; warn: (m: string) => void }; type FabricMessage = { messageId: string; seq: number; content: string; authorUserId?: string; createdAt?: string; // per-recipient metadata Fabric attaches at push time (this agent's view) wakeup?: boolean; }; // One live Fabric connection per agent identity (Phase 1 = B1). Lives in the // channel-plugin runtime (no separate sidecar). Firehose (B2) would replace // this class behind the same dispatch() call. export class FabricInbound { private sockets: Socket[] = []; private timers: NodeJS.Timeout[] = []; private seen = new Set(); constructor( private readonly runtime: PluginRuntime, private readonly client: FabricClient, private readonly identity: IdentityRegistry, private readonly log: Logger, ) {} async start(): Promise { for (const entry of this.identity.list()) { try { const session = await this.client.agentLogin(entry.fabricApiKey); this.identity.upsert({ agentId: entry.agentId, fabricApiKey: entry.fabricApiKey, fabricUserId: session.user.id, displayName: session.user.name, }); await this.connectAgent(entry.agentId, session); this.log.info(`fabric: agent ${entry.agentId} connected as ${session.user.email}`); } catch (err) { this.log.warn(`fabric: agent ${entry.agentId} connect failed: ${String(err)}`); } } } stop(): void { for (const t of this.timers) clearInterval(t); for (const s of this.sockets) s.disconnect(); this.sockets = []; this.timers = []; } private async connectAgent(agentId: string, session: FabricSession): Promise { const selfUserId = session.user.id; for (const g of session.guilds) { const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; if (!tok) continue; const socket = io(`${g.endpoint}/realtime`, { transports: ['websocket'], auth: { token: tok }, autoConnect: false, }); const joinAll = async () => { try { const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` }, }); const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : []; for (const c of channels) socket.emit('join_channel', { channelId: c.id }); } catch { /* best effort */ } }; socket.on('connect', () => void joinAll()); socket.on('message.created', (m: FabricMessage & { channelId?: string }) => { const channelId = m.channelId ?? ''; if (!channelId) return; // self-echo guard + dedupe if (m.authorUserId && m.authorUserId === selfUserId) return; const key = `${agentId}:${m.messageId}`; if (this.seen.has(key)) return; this.seen.add(key); if (this.seen.size > 5000) this.seen.clear(); void this.dispatch(agentId, g, channelId, m); }); socket.connect(); this.sockets.push(socket); } } // Hand the inbound Fabric message to OpenClaw's channel-turn kernel. // wakeup === true -> dispatch (agent runs, may reply) // wakeup !== true -> drop but keep as group history/context private async dispatch( agentId: string, guild: { nodeId: string; endpoint: string }, channelId: string, m: FabricMessage, ): Promise { const admit = m.wakeup === true; try { await this.runtime.channel.turn.run({ channel: 'fabric', accountId: agentId, raw: m, adapter: { ingest: (raw: FabricMessage) => ({ id: raw.messageId, timestamp: raw.createdAt ? Date.parse(raw.createdAt) : Date.now(), rawText: raw.content, textForAgent: raw.content, }), classify: () => ({ kind: 'message', canStartAgentTurn: admit }), preflight: () => admit ? {} : { admission: { kind: 'drop', reason: 'no-wakeup', recordHistory: true } }, resolveTurn: (input: { id: string }) => ({ route: { agentId, routeSessionKey: `agent:${agentId}:fabric:channel:${channelId}`, createIfMissing: true, }, conversation: { kind: 'channel', id: channelId, label: `fabric:${guild.nodeId}` }, reply: { to: channelId, nativeChannelId: channelId }, message: { body: m.content, rawBody: m.content, bodyForAgent: m.content, envelopeFrom: m.authorUserId ?? 'fabric', }, delivery: { deliver: async (payload: { text?: string }) => { const text = typeof payload?.text === 'string' ? payload.text : ''; if (!text.trim()) return { visibleReplySent: false }; const entry = this.identity.findByAgentId(agentId); const session = entry ? await this.client.agentLogin(entry.fabricApiKey) : null; const gt = session?.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token; if (!session || !gt) return { visibleReplySent: false }; await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); return { visibleReplySent: true }; }, }, meta: { admission: admit ? { kind: 'dispatch' } : { kind: 'drop', recordHistory: true } }, }), }, log: (e: { stage?: string }) => this.runtime.log?.debug?.(`fabric.turn.${e?.stage}`), }); } catch (err) { this.log.warn(`fabric: turn.run failed agent=${agentId} channel=${channelId}: ${String(err)}`); } } }