import { io } from 'socket.io-client'; import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch'; export class FabricInbound { core; cfg; client; identity; log; accounts; sockets = []; seen = new Set(); constructor(core, // PluginRuntime cfg, // OpenClawConfig client, identity, log, accounts = []) { this.core = core; this.cfg = cfg; this.client = client; this.identity = identity; this.log = log; this.accounts = accounts; } async start() { const entries = this.accounts.length > 0 ? this.accounts.map((a) => ({ agentId: a.agentId, fabricApiKey: a.fabricApiKey })) : this.identity.list(); for (const entry of entries) { try { const session = await this.client.agentLogin(entry.fabricApiKey); this.identity.upsert({ agentId: entry.agentId, fabricApiKey: entry.fabricApiKey, fabricUserId: session.user.id, displayName: session.user.name, }); await this.connectAgent(entry.agentId, session); this.log.info(`fabric: agent ${entry.agentId} connected as ${session.user.email}`); } catch (err) { this.log.warn(`fabric: agent ${entry.agentId} connect failed: ${String(err)}`); } } } stop() { for (const s of this.sockets) s.disconnect(); this.sockets = []; } async connectAgent(agentId, session) { const selfUserId = session.user.id; for (const g of session.guilds) { const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token; if (!tok) continue; const socket = io(`${g.endpoint}/realtime`, { transports: ['websocket'], auth: { token: tok }, autoConnect: false, }); const joinAll = async () => { try { const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } }); const channels = res.ok ? (await res.json()) : []; for (const c of channels) socket.emit('join_channel', { channelId: c.id }); this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`); } catch { /* best effort */ } }; socket.on('connect', () => void joinAll()); socket.on('message.created', (m) => { const channelId = m.channelId ?? ''; if (!channelId) return; if (m.authorUserId && m.authorUserId === selfUserId) return; const key = `${agentId}:${m.messageId}`; if (this.seen.has(key)) return; this.seen.add(key); if (this.seen.size > 5000) this.seen.clear(); void this.dispatch(agentId, g, channelId, m, session); }); socket.connect(); this.sockets.push(socket); } } async dispatch(agentId, guild, channelId, m, session) { // wakeup === false -> drop (Fabric already decided this agent is silent) if (m.wakeup !== true) { this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`); return; } this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`); const core = this.core; const cfg = this.cfg; try { const route = core.channel.routing.resolveAgentRoute({ cfg: this.cfg, channel: 'fabric', accountId: agentId, peer: { kind: 'group', id: channelId }, }); const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { agentId: route.agentId, }); const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: m.content, BodyForAgent: m.content, RawBody: m.content, CommandBody: m.content, From: `fabric:channel:${channelId}`, To: `fabric:${channelId}`, SessionKey: route.sessionKey, AccountId: route.accountId ?? agentId, ChatType: 'group', ConversationLabel: `fabric:${guild.nodeId}`, SenderId: m.authorUserId ?? 'fabric', Provider: 'fabric', Surface: 'fabric', MessageSid: m.messageId, Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(), OriginatingChannel: 'fabric', OriginatingTo: `fabric:${channelId}`, }); const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token; await dispatchInboundReplyWithBase({ cfg: this.cfg, channel: 'fabric', accountId: agentId, route, storePath, ctxPayload: ctxPayload, core: this.core, deliver: async (payload) => { const text = (payload?.text ?? '').trim(); this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`); if (!text || !gt) return; await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id); this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`); }, onRecordError: (err) => this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`), onDispatchError: (err, info) => this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`), // Fabric has no length limit: deliver the whole reply as ONE message. replyOptions: { disableBlockStreaming: true }, }); this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`); } catch (err) { this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`); } } }