feat: working v1 — full Fabric<->openclaw round-trip verified
Real channel-turn dispatch (resolveAgentRoute + finalizeInboundContext + dispatchInboundReplyWithBase), wakeup->drop/dispatch, messaging target grammar (fabric:<id>) + outbound.sendText, tools use execute/parameters. Verified live: human msg in Fabric -> wakeup -> openclaw agent runs -> reply posted back into the Fabric channel as the agent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
158
src/inbound.ts
158
src/inbound.ts
@@ -1,20 +1,24 @@
|
||||
import { io, type Socket } from 'socket.io-client';
|
||||
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
|
||||
import type { FabricClient, FabricSession } from './fabric-client.js';
|
||||
import type { IdentityRegistry } from './identity.js';
|
||||
|
||||
// OpenClaw plugin runtime — only the channel-turn kernel surface we use.
|
||||
// Typed loosely on purpose: the concrete shapes come from
|
||||
// openclaw/plugin-sdk/core at the host's SDK version.
|
||||
type PluginRuntime = {
|
||||
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
|
||||
// channels (nextcloud-talk) drive the kernel:
|
||||
// core = PluginRuntime (from setRuntime)
|
||||
// route = core.channel.routing.resolveAgentRoute(...)
|
||||
// ctx = core.channel.reply.finalizeInboundContext(...) // has SessionKey
|
||||
// dispatch= dispatchInboundReplyWithBase({ cfg, route, ctxPayload, core, deliver })
|
||||
// `core.channel.*` is accessed loosely so unrelated SDK drift won't break us.
|
||||
type Core = {
|
||||
channel: {
|
||||
turn: {
|
||||
run(args: unknown): Promise<unknown>;
|
||||
};
|
||||
routing: { resolveAgentRoute: (p: unknown) => { agentId: string; sessionKey: string; accountId?: string } };
|
||||
session: { resolveStorePath: (store: unknown, o: { agentId: string }) => string };
|
||||
reply: { finalizeInboundContext: (p: Record<string, unknown>) => unknown };
|
||||
};
|
||||
log?: { debug?: (m: string, x?: unknown) => void };
|
||||
};
|
||||
|
||||
type Logger = { info: (m: string) => void; warn: (m: string) => void };
|
||||
type Logger = { info: (m: string) => void; warn: (m: string) => void; error?: (m: string) => void };
|
||||
|
||||
type FabricMessage = {
|
||||
messageId: string;
|
||||
@@ -22,20 +26,17 @@ type FabricMessage = {
|
||||
content: string;
|
||||
authorUserId?: string;
|
||||
createdAt?: string;
|
||||
// per-recipient metadata Fabric attaches at push time (this agent's view)
|
||||
channelId?: string;
|
||||
wakeup?: boolean;
|
||||
};
|
||||
|
||||
// One live Fabric connection per agent identity (Phase 1 = B1). Lives in the
|
||||
// channel-plugin runtime (no separate sidecar). Firehose (B2) would replace
|
||||
// this class behind the same dispatch() call.
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
private timers: NodeJS.Timeout[] = [];
|
||||
private seen = new Set<string>();
|
||||
|
||||
constructor(
|
||||
private readonly runtime: PluginRuntime,
|
||||
private readonly core: unknown, // PluginRuntime
|
||||
private readonly cfg: unknown, // OpenClawConfig
|
||||
private readonly client: FabricClient,
|
||||
private readonly identity: IdentityRegistry,
|
||||
private readonly log: Logger,
|
||||
@@ -65,10 +66,8 @@ export class FabricInbound {
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const t of this.timers) clearInterval(t);
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
this.timers = [];
|
||||
}
|
||||
|
||||
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
||||
@@ -76,101 +75,112 @@ export class FabricInbound {
|
||||
for (const g of session.guilds) {
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok) continue;
|
||||
|
||||
const socket = io(`${g.endpoint}/realtime`, {
|
||||
transports: ['websocket'],
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
|
||||
const joinAll = async () => {
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, {
|
||||
headers: { authorization: `Bearer ${tok}` },
|
||||
});
|
||||
const res = await fetch(
|
||||
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
|
||||
{ headers: { authorization: `Bearer ${tok}` } },
|
||||
);
|
||||
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
|
||||
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
} catch {
|
||||
/* best effort */
|
||||
}
|
||||
};
|
||||
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('message.created', (m: FabricMessage & { channelId?: string }) => {
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
// self-echo guard + dedupe
|
||||
if (m.authorUserId && m.authorUserId === selfUserId) return;
|
||||
const key = `${agentId}:${m.messageId}`;
|
||||
if (this.seen.has(key)) return;
|
||||
this.seen.add(key);
|
||||
if (this.seen.size > 5000) this.seen.clear();
|
||||
void this.dispatch(agentId, g, channelId, m);
|
||||
void this.dispatch(agentId, g, channelId, m, session);
|
||||
});
|
||||
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
}
|
||||
}
|
||||
|
||||
// Hand the inbound Fabric message to OpenClaw's channel-turn kernel.
|
||||
// wakeup === true -> dispatch (agent runs, may reply)
|
||||
// wakeup !== true -> drop but keep as group history/context
|
||||
private async dispatch(
|
||||
agentId: string,
|
||||
guild: { nodeId: string; endpoint: string },
|
||||
channelId: string,
|
||||
m: FabricMessage,
|
||||
session: FabricSession,
|
||||
): Promise<void> {
|
||||
const admit = m.wakeup === true;
|
||||
// wakeup === false -> drop (Fabric already decided this agent is silent)
|
||||
if (m.wakeup !== true) {
|
||||
this.log.info(`fabric: drop (no wakeup) agent=${agentId} channel=${channelId}`);
|
||||
return;
|
||||
}
|
||||
this.log.info(`fabric: dispatch agent=${agentId} channel=${channelId}`);
|
||||
|
||||
const core = this.core as Core & Record<string, unknown>;
|
||||
const cfg = this.cfg as { session?: { store?: unknown } };
|
||||
try {
|
||||
await this.runtime.channel.turn.run({
|
||||
const route = core.channel.routing.resolveAgentRoute({
|
||||
cfg: this.cfg,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
raw: m,
|
||||
adapter: {
|
||||
ingest: (raw: FabricMessage) => ({
|
||||
id: raw.messageId,
|
||||
timestamp: raw.createdAt ? Date.parse(raw.createdAt) : Date.now(),
|
||||
rawText: raw.content,
|
||||
textForAgent: raw.content,
|
||||
}),
|
||||
classify: () => ({ kind: 'message', canStartAgentTurn: admit }),
|
||||
preflight: () =>
|
||||
admit ? {} : { admission: { kind: 'drop', reason: 'no-wakeup', recordHistory: true } },
|
||||
resolveTurn: (input: { id: string }) => ({
|
||||
route: {
|
||||
agentId,
|
||||
routeSessionKey: `agent:${agentId}:fabric:channel:${channelId}`,
|
||||
createIfMissing: true,
|
||||
},
|
||||
conversation: { kind: 'channel', id: channelId, label: `fabric:${guild.nodeId}` },
|
||||
reply: { to: channelId, nativeChannelId: channelId },
|
||||
message: {
|
||||
body: m.content,
|
||||
rawBody: m.content,
|
||||
bodyForAgent: m.content,
|
||||
envelopeFrom: m.authorUserId ?? 'fabric',
|
||||
},
|
||||
delivery: {
|
||||
deliver: async (payload: { text?: string }) => {
|
||||
const text = typeof payload?.text === 'string' ? payload.text : '';
|
||||
if (!text.trim()) return { visibleReplySent: false };
|
||||
const entry = this.identity.findByAgentId(agentId);
|
||||
const session = entry ? await this.client.agentLogin(entry.fabricApiKey) : null;
|
||||
const gt = session?.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token;
|
||||
if (!session || !gt) return { visibleReplySent: false };
|
||||
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
|
||||
return { visibleReplySent: true };
|
||||
},
|
||||
},
|
||||
meta: { admission: admit ? { kind: 'dispatch' } : { kind: 'drop', recordHistory: true } },
|
||||
}),
|
||||
},
|
||||
log: (e: { stage?: string }) => this.runtime.log?.debug?.(`fabric.turn.${e?.stage}`),
|
||||
peer: { kind: 'group', id: channelId },
|
||||
});
|
||||
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
|
||||
agentId: route.agentId,
|
||||
});
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext({
|
||||
Body: m.content,
|
||||
BodyForAgent: m.content,
|
||||
RawBody: m.content,
|
||||
CommandBody: m.content,
|
||||
From: `fabric:channel:${channelId}`,
|
||||
To: `fabric:${channelId}`,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId ?? agentId,
|
||||
ChatType: 'group',
|
||||
ConversationLabel: `fabric:${guild.nodeId}`,
|
||||
SenderId: m.authorUserId ?? 'fabric',
|
||||
Provider: 'fabric',
|
||||
Surface: 'fabric',
|
||||
MessageSid: m.messageId,
|
||||
Timestamp: m.createdAt ? Date.parse(m.createdAt) : Date.now(),
|
||||
OriginatingChannel: 'fabric',
|
||||
OriginatingTo: `fabric:${channelId}`,
|
||||
});
|
||||
|
||||
const gt = session.guildAccessTokens.find((t) => t.guildNodeId === guild.nodeId)?.token;
|
||||
|
||||
await dispatchInboundReplyWithBase({
|
||||
cfg: this.cfg as never,
|
||||
channel: 'fabric',
|
||||
accountId: agentId,
|
||||
route,
|
||||
storePath,
|
||||
ctxPayload: ctxPayload as never,
|
||||
core: this.core as never,
|
||||
deliver: async (payload: { text?: string }) => {
|
||||
const text = (payload?.text ?? '').trim();
|
||||
this.log.info(`fabric: deliver agent=${agentId} channel=${channelId} len=${text.length}`);
|
||||
if (!text || !gt) return;
|
||||
await this.client.postMessage(guild.endpoint, gt, channelId, text, session.user.id);
|
||||
this.log.info(`fabric: posted reply agent=${agentId} channel=${channelId}`);
|
||||
},
|
||||
onRecordError: (err: unknown) =>
|
||||
this.log.warn(`fabric: session record failed agent=${agentId}: ${String(err)}`),
|
||||
onDispatchError: (err: unknown, info: { kind: string }) =>
|
||||
this.log.warn(`fabric: ${info.kind} dispatch failed agent=${agentId}: ${String(err)}`),
|
||||
replyOptions: {},
|
||||
});
|
||||
this.log.info(`fabric: dispatch returned agent=${agentId} channel=${channelId}`);
|
||||
} catch (err) {
|
||||
this.log.warn(`fabric: turn.run failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
||||
this.log.warn(`fabric: dispatch failed agent=${agentId} channel=${channelId}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user