commit 38ac6d20b79705d23c4808e209c8e52713ced7f3 Author: zhi Date: Thu May 14 09:41:18 2026 +0000 chore: initial scaffolding for SynthesisAgent.OpenclawPlugin OpenClaw plugin that manages long-lived interactive Claude Code processes per OpenClaw session. Process manager + session-mapping persistence + bridge WebSocket server skeleton. Will be rewritten to follow the contractor-agent HTTP model-provider pattern (register as `synthesis-claude-bridge` provider, receive /v1/chat/completions, dispatch to channel notification on the bound Claude process). See parent repo's STATUS.md for the punch list. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5d3055a --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +node_modules/ +dist/ +*.log +.DS_Store +package-lock.json diff --git a/README.md b/README.md new file mode 100644 index 0000000..2a02cde --- /dev/null +++ b/README.md @@ -0,0 +1,73 @@ +# SynthesisAgent.OpenclawPlugin + +The **OpenClaw side** of SynthesisAgent. Lives inside the OpenClaw process; spawns and manages one long-lived interactive `claude` per OpenClaw session. + +## Components + +``` +index.ts entry: definePluginEntry, wires up the others +core/config.ts plugin config schema + defaults +core/session-mapping.ts persistent openclaw_session ↔ claude_session_uuid +core/process-manager.ts spawn/resume/kill claude processes +core/cli.ts admin commands (`openclaw synthesis ...`) +web/bridge-server.ts WebSocket server that ClaudePlugins connect into +``` + +## Lifecycle + +``` +plugin loaded + ↓ +SessionMapping reads ~/.openclaw/synthesis/sessions.json +ProcessManager idle, no spawns yet +BridgeServer binds 127.0.0.1:18801 + ↓ +inbound Discord msg for session=alice + ↓ +processManager.ensure('alice') + - mapping.ensure → claude_session_uuid (new or existing) + - spawn `claude --channels plugin:synthesis-claude@local --resume ...` + - inject env: SYNTHESIS_BRIDGE_URL / TOKEN / OPENCLAW_SESSION / CLAUDE_SESSION + ↓ +ClaudePlugin (in spawned claude) dials ws://127.0.0.1:18801/bridge + - sends hello frame + - server validates token, marks process ready, sends hello_ack with tool catalog + ↓ +processManager.ensure resolves +inbound message pushed → notifications/claude/channel → Claude starts new turn + ↓ +Claude calls tools/call → forwarded to OpenClaw tool surface +Claude finishes → process stays alive, idle timer reset + ↓ +1 hour idle → idleSweeper SIGTERMs the process +mapping is preserved → next message resumes via --resume +``` + +## Config + +See `openclaw.plugin.json` `configSchema`. Override in `~/.openclaw/openclaw.json`: + +```json +{ + "plugins": { + "entries": { + "synthesis-agent": { + "bridgePort": 18801, + "bridgeToken": "", + "idleKillMs": 1800000, + "maxProcesses": 8 + } + } + } +} +``` + +## TODO + +- [ ] Discover exact OpenClaw plugin-sdk API for: inbound event subscription, CLI command registration, tool catalog enumeration. See `contractor-agent/index.ts` for current best example. +- [ ] Wire `api.channels.onInbound(...)` to `pm.ensure()` + `bridgeServer.pushInbound()`. +- [ ] Implement tool dispatch in `web/bridge-server.ts` (current `tool_call` handler returns not-implemented). +- [ ] Implement permission routing back to source channel. +- [ ] CLI: `openclaw synthesis list`, `push`, `kill`, `forget`. +- [ ] Settle path conflict: this plugin needs to live at `/root/.openclaw/plugins/synthesis-agent/` to be auto-loaded. Either symlink from this repo or document copy-install. +- [ ] Decide policy on `--no-session-persistence` flag (currently we rely on default persistence so `--resume` works). diff --git a/core/cli.ts b/core/cli.ts new file mode 100644 index 0000000..16377bc --- /dev/null +++ b/core/cli.ts @@ -0,0 +1,37 @@ +import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core' +import type { ProcessManager } from './process-manager.js' +import type { SessionMapping } from './session-mapping.js' +import type { SynthesisConfig } from './config.js' + +interface CliDeps { + processManager: ProcessManager + mapping: SessionMapping + config: SynthesisConfig +} + +/** + * `openclaw synthesis ...` admin commands. Used to inspect & poke the running + * pool from outside (helpful for testing without a real channel inbound). + * + * Subcommands (planned): + * list — show live processes + mapping + * push — fake an inbound message + * kill — terminate a process (mapping preserved) + * forget — drop mapping entirely (next msg = new claude session) + */ +export function registerCli(api: OpenClawPluginApi, deps: CliDeps): void { + // The exact API shape for command registration depends on the OpenClaw + // plugin-sdk version. Two common forms seen in existing plugins: + // + // api.commands?.register('synthesis', { describe, handler }) + // api.cli?.command('synthesis', sub => sub.command('list', '...', () => {...})) + // + // Both are stubbed below — pick whichever the loaded SDK exposes when + // wiring this for real. + + const _ = { api, deps } + // TODO: wire actual command registration once the surrounding plugin + // patterns are confirmed (see contractor-agent/commands/register-cli.ts + // for the canonical example in this codebase). + void _ +} diff --git a/core/config.ts b/core/config.ts new file mode 100644 index 0000000..b8db286 --- /dev/null +++ b/core/config.ts @@ -0,0 +1,42 @@ +import { homedir } from 'node:os' +import { resolve as resolvePath } from 'node:path' + +export interface SynthesisConfig { + bridgePort: number + bridgeToken: string + claudePluginRef: string + permissionMode: string + idleKillMs: number + maxProcesses: number + mappingDbPath: string +} + +const DEFAULTS: SynthesisConfig = { + bridgePort: 18801, + bridgeToken: 'synthesis-local', + claudePluginRef: 'plugin:synthesis-claude@local', + permissionMode: 'acceptEdits', + idleKillMs: 3_600_000, + maxProcesses: 16, + mappingDbPath: '~/.openclaw/synthesis/sessions.json', +} + +function expand(p: string): string { + if (p.startsWith('~')) return resolvePath(homedir(), p.slice(2)) + return resolvePath(p) +} + +export function normalizeConfig(raw: unknown): SynthesisConfig { + const r = (raw ?? {}) as Partial + const merged: SynthesisConfig = { + bridgePort: typeof r.bridgePort === 'number' ? r.bridgePort : DEFAULTS.bridgePort, + bridgeToken: typeof r.bridgeToken === 'string' && r.bridgeToken ? r.bridgeToken : DEFAULTS.bridgeToken, + claudePluginRef: typeof r.claudePluginRef === 'string' && r.claudePluginRef ? r.claudePluginRef : DEFAULTS.claudePluginRef, + permissionMode: typeof r.permissionMode === 'string' && r.permissionMode ? r.permissionMode : DEFAULTS.permissionMode, + idleKillMs: typeof r.idleKillMs === 'number' ? r.idleKillMs : DEFAULTS.idleKillMs, + maxProcesses: typeof r.maxProcesses === 'number' ? r.maxProcesses : DEFAULTS.maxProcesses, + mappingDbPath: typeof r.mappingDbPath === 'string' && r.mappingDbPath ? r.mappingDbPath : DEFAULTS.mappingDbPath, + } + merged.mappingDbPath = expand(merged.mappingDbPath) + return merged +} diff --git a/core/process-manager.ts b/core/process-manager.ts new file mode 100644 index 0000000..b3e6ff1 --- /dev/null +++ b/core/process-manager.ts @@ -0,0 +1,162 @@ +import { spawn, type ChildProcess } from 'node:child_process' +import type { SynthesisConfig } from './config.js' +import type { SessionMapping } from './session-mapping.js' + +export interface ProcessHandle { + pid: number + openclawSessionId: string + claudeSessionUuid: string + proc: ChildProcess + startedAt: number + lastActiveAt: number + /** Resolved when the ClaudePlugin's hello frame has been seen on the bridge. */ + ready: Promise + markReady: () => void +} + +export interface ProcessManagerDeps { + config: SynthesisConfig + mapping: SessionMapping +} + +/** + * Owns the pool of `claude` subprocesses. One per openclaw_session, lazily + * spawned on first message and reaped after `idleKillMs` of inactivity. + * + * Hand-off to ClaudePlugin happens via env vars + a shared WebSocket bridge: + * we know a process is "live" when its plugin connects back and sends `hello`. + * + * Responsibilities NOT handled here (delegated): + * - Routing inbound messages to a specific process → done by bridge-server, + * which holds the ws connection per pid. + * - Tool catalog → bridge-server sends in hello_ack. + */ +export class ProcessManager { + private byOpenclawSession = new Map() + private idleSweeper: ReturnType | null = null + private shuttingDown = false + + constructor(private deps: ProcessManagerDeps) { + this.idleSweeper = setInterval(() => this.sweepIdle(), 60_000) + } + + /** Returns the live handle, spawning if needed. Awaits hello handshake. */ + async ensure(openclawSessionId: string): Promise { + const existing = this.byOpenclawSession.get(openclawSessionId) + if (existing && !existing.proc.killed) { + existing.lastActiveAt = Date.now() + return existing + } + + if (this.byOpenclawSession.size >= this.deps.config.maxProcesses) { + this.evictOldestIdle() + } + + const rec = this.deps.mapping.ensure(openclawSessionId) + const handle = this.spawn(openclawSessionId, rec.claudeSessionUuid) + this.byOpenclawSession.set(openclawSessionId, handle) + // Wait for ClaudePlugin → bridge → bridge-server → handle.markReady(). + await Promise.race([ + handle.ready, + new Promise((_, rej) => setTimeout(() => rej(new Error('claude spawn ready timeout')), 15_000)), + ]) + this.deps.mapping.touch(openclawSessionId) + return handle + } + + /** Bridge server calls this once it has paired a connection to a pid. */ + markReady(pid: number): void { + for (const h of this.byOpenclawSession.values()) { + if (h.pid === pid) { + h.markReady() + return + } + } + } + + private spawn(openclawSessionId: string, claudeSessionUuid: string): ProcessHandle { + const { config } = this.deps + const args = [ + '--channels', config.claudePluginRef, + '--resume', claudeSessionUuid, + '--permission-mode', config.permissionMode, + // Tool allowlist scoped to our MCP namespace plus the basics models need + // for any non-trivial assist (Read, Edit, Bash). Tighten if/when policy + // demands. + '--allowed-tools', 'Read Edit Bash mcp__synthesis__*', + ] + + const proc = spawn('claude', args, { + env: { + ...process.env, + SYNTHESIS_BRIDGE_URL: `ws://127.0.0.1:${config.bridgePort}/bridge`, + SYNTHESIS_BRIDGE_TOKEN: config.bridgeToken, + SYNTHESIS_OPENCLAW_SESSION: openclawSessionId, + SYNTHESIS_CLAUDE_SESSION: claudeSessionUuid, + }, + stdio: ['ignore', 'pipe', 'pipe'], + detached: false, + }) + + let resolveReady!: () => void + const ready = new Promise(r => { resolveReady = r }) + + const handle: ProcessHandle = { + pid: proc.pid ?? -1, + openclawSessionId, + claudeSessionUuid, + proc, + startedAt: Date.now(), + lastActiveAt: Date.now(), + ready, + markReady: () => resolveReady(), + } + + proc.on('exit', code => { + this.byOpenclawSession.delete(openclawSessionId) + // mapping is preserved so the next message resumes via --resume + process.stderr.write(`synthesis: claude exit session=${openclawSessionId} pid=${handle.pid} code=${code}\n`) + }) + + proc.stderr?.on('data', chunk => { + process.stderr.write(`[claude:${handle.pid}] ${chunk}`) + }) + + return handle + } + + private sweepIdle(): void { + if (this.shuttingDown) return + const cutoff = Date.now() - this.deps.config.idleKillMs + for (const h of this.byOpenclawSession.values()) { + if (h.lastActiveAt < cutoff) { + process.stderr.write(`synthesis: idle-killing session=${h.openclawSessionId} pid=${h.pid}\n`) + h.proc.kill('SIGTERM') + } + } + } + + private evictOldestIdle(): void { + let oldest: ProcessHandle | null = null + for (const h of this.byOpenclawSession.values()) { + if (!oldest || h.lastActiveAt < oldest.lastActiveAt) oldest = h + } + if (oldest) { + process.stderr.write(`synthesis: evicting session=${oldest.openclawSessionId} (max processes reached)\n`) + oldest.proc.kill('SIGTERM') + } + } + + list(): ProcessHandle[] { + return [...this.byOpenclawSession.values()] + } + + async shutdown(): Promise { + this.shuttingDown = true + if (this.idleSweeper) clearInterval(this.idleSweeper) + for (const h of this.byOpenclawSession.values()) { + try { h.proc.kill('SIGTERM') } catch { /* ignore */ } + } + this.byOpenclawSession.clear() + } +} diff --git a/core/session-mapping.ts b/core/session-mapping.ts new file mode 100644 index 0000000..563b8cc --- /dev/null +++ b/core/session-mapping.ts @@ -0,0 +1,83 @@ +import { readFileSync, writeFileSync, mkdirSync } from 'node:fs' +import { dirname } from 'node:path' +import { randomUUID } from 'node:crypto' + +export interface SessionRecord { + openclawSessionId: string + claudeSessionUuid: string + createdAt: number + lastActiveAt: number +} + +/** + * Persistent openclaw_session ↔ claude_session mapping. + * + * Single-writer (the OpenclawPlugin process). File-level on every mutation — + * simple and correct for the volume we expect. If it ever matters, swap for + * sqlite. + */ +export class SessionMapping { + private records = new Map() + private path: string + + constructor(path: string) { + this.path = path + this.load() + } + + private load(): void { + try { + const raw = readFileSync(this.path, 'utf8') + const parsed = JSON.parse(raw) as { records?: SessionRecord[] } + for (const r of parsed.records ?? []) { + this.records.set(r.openclawSessionId, r) + } + } catch { + // File doesn't exist or unreadable — start fresh. + } + } + + private flush(): void { + mkdirSync(dirname(this.path), { recursive: true }) + writeFileSync( + this.path, + JSON.stringify({ records: [...this.records.values()] }, null, 2), + 'utf8', + ) + } + + get(openclawSessionId: string): SessionRecord | undefined { + return this.records.get(openclawSessionId) + } + + /** Get-or-create. Returns the (possibly freshly assigned) claude_session_uuid. */ + ensure(openclawSessionId: string): SessionRecord { + const existing = this.records.get(openclawSessionId) + if (existing) return existing + const now = Date.now() + const rec: SessionRecord = { + openclawSessionId, + claudeSessionUuid: randomUUID(), + createdAt: now, + lastActiveAt: now, + } + this.records.set(openclawSessionId, rec) + this.flush() + return rec + } + + touch(openclawSessionId: string): void { + const r = this.records.get(openclawSessionId) + if (!r) return + r.lastActiveAt = Date.now() + this.flush() + } + + forget(openclawSessionId: string): void { + if (this.records.delete(openclawSessionId)) this.flush() + } + + all(): SessionRecord[] { + return [...this.records.values()] + } +} diff --git a/index.ts b/index.ts new file mode 100644 index 0000000..1366202 --- /dev/null +++ b/index.ts @@ -0,0 +1,68 @@ +import { definePluginEntry } from 'openclaw/plugin-sdk/plugin-entry' +import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core' +import { normalizeConfig, type SynthesisConfig } from './core/config.js' +import { ProcessManager } from './core/process-manager.js' +import { SessionMapping } from './core/session-mapping.js' +import { startBridgeServer } from './web/bridge-server.js' +import { registerCli } from './core/cli.js' + +// All long-lived state lives on globalThis so OpenClaw hot-reloads don't kill +// running Claude processes (mirrors contractor-agent's convention). +const _G = globalThis as Record +const PM_KEY = '_synthesisProcessManager' +const SERVER_KEY = '_synthesisBridgeServer' +const MAPPING_KEY = '_synthesisSessionMapping' + +export default definePluginEntry({ + id: 'synthesis-agent', + name: 'SynthesisAgent', + description: 'Manages long-lived Claude Code processes per OpenClaw session', + + register(api: OpenClawPluginApi): void { + const config: SynthesisConfig = normalizeConfig(api.pluginConfig) + + // ── Reuse existing state across hot reload ───────────────────────────── + let mapping = _G[MAPPING_KEY] as SessionMapping | undefined + if (!mapping) { + mapping = new SessionMapping(config.mappingDbPath) + _G[MAPPING_KEY] = mapping + } + + let pm = _G[PM_KEY] as ProcessManager | undefined + if (!pm) { + pm = new ProcessManager({ config, mapping }) + _G[PM_KEY] = pm + } + + // Existing bridge server: leave it alone. + if (!_G[SERVER_KEY]) { + startBridgeServer({ config, mapping, processManager: pm }) + .then(server => { _G[SERVER_KEY] = server }) + .catch(err => { + api.logger?.error?.('synthesis: bridge server failed to start', err) + }) + } + + // ── Wire to OpenClaw inbound event bus ───────────────────────────────── + // TODO: subscribe to api.events / api.channels.onInbound (exact API TBD). + // Each inbound message arrives with an openclaw_session_id; we forward to + // the process manager, which spawns/resumes claude as needed and pushes + // the message into its plugin's bridge socket. + // + // api.channels.onInbound((evt) => { + // pm.deliverInbound(evt.openclawSessionId, evt) + // }) + // + // Similarly for permission replies. + + registerCli(api, { processManager: pm, mapping, config }) + + api.lifecycle?.onShutdown?.(async () => { + const server = _G[SERVER_KEY] as Awaited> | undefined + await server?.close?.() + delete _G[SERVER_KEY] + await pm!.shutdown() + delete _G[PM_KEY] + }) + }, +}) diff --git a/openclaw.plugin.json b/openclaw.plugin.json new file mode 100644 index 0000000..c966181 --- /dev/null +++ b/openclaw.plugin.json @@ -0,0 +1,52 @@ +{ + "id": "synthesis-agent", + "name": "SynthesisAgent", + "description": "Manages long-lived interactive Claude Code processes per OpenClaw session; bridges OpenClaw events <-> SynthesisAgent.ClaudePlugin", + "activation": { + "onStartup": true + }, + "commandAliases": [ + { "name": "synthesis" } + ], + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": { + "bridgePort": { + "type": "number", + "default": 18801, + "description": "TCP port the bridge WebSocket server binds on (127.0.0.1)" + }, + "bridgeToken": { + "type": "string", + "default": "synthesis-local", + "description": "Shared secret each ClaudePlugin instance must present in its hello frame" + }, + "claudePluginRef": { + "type": "string", + "default": "plugin:synthesis-claude@local", + "description": "The --channels argument passed to claude on spawn" + }, + "permissionMode": { + "type": "string", + "default": "acceptEdits", + "description": "Claude Code permission mode for spawned sessions" + }, + "idleKillMs": { + "type": "number", + "default": 3600000, + "description": "Idle time after which a session's claude process is killed; next message resumes it" + }, + "maxProcesses": { + "type": "number", + "default": 16, + "description": "Hard cap on concurrent claude processes; new sessions beyond this evict the oldest idle one" + }, + "mappingDbPath": { + "type": "string", + "default": "~/.openclaw/synthesis/sessions.json", + "description": "Where openclaw_session <-> claude_session UUID mapping is persisted" + } + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..8fc3dfb --- /dev/null +++ b/package.json @@ -0,0 +1,16 @@ +{ + "name": "synthesis-agent-openclaw-plugin", + "version": "0.0.1", + "license": "Apache-2.0", + "description": "OpenClaw plugin: manages long-lived Claude Code processes per OpenClaw session for SynthesisAgent", + "type": "module", + "main": "index.ts", + "dependencies": { + "ws": "^8.18.0" + }, + "devDependencies": { + "@types/ws": "^8.5.10", + "typescript": "^5.0.0", + "openclaw": "*" + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..5c2d790 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "ESNext", + "moduleResolution": "Bundler", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "resolveJsonModule": true, + "allowImportingTsExtensions": true, + "noEmit": true + }, + "include": ["index.ts", "core/**/*.ts", "web/**/*.ts"] +} diff --git a/web/bridge-server.ts b/web/bridge-server.ts new file mode 100644 index 0000000..ad9b387 --- /dev/null +++ b/web/bridge-server.ts @@ -0,0 +1,146 @@ +import { WebSocketServer, type WebSocket } from 'ws' +import type { SynthesisConfig } from '../core/config.js' +import type { SessionMapping } from '../core/session-mapping.js' +import type { ProcessManager } from '../core/process-manager.js' + +interface BridgeServerDeps { + config: SynthesisConfig + mapping: SessionMapping + processManager: ProcessManager +} + +interface ClientConn { + ws: WebSocket + pid: number + openclawSessionId: string + claudeSessionUuid: string + isAlive: boolean +} + +/** + * WebSocket server that ClaudePlugin instances dial into. One connection per + * spawned Claude process; identified by env-injected (openclaw_session, pid). + * + * Bridge frames are defined in docs/wire-protocol.md. + */ +export async function startBridgeServer(deps: BridgeServerDeps): Promise<{ close(): Promise }> { + const { config, processManager } = deps + + const wss = new WebSocketServer({ + host: '127.0.0.1', + port: config.bridgePort, + path: '/bridge', + }) + + const byOpenclawSession = new Map() + + wss.on('connection', ws => { + let conn: ClientConn | null = null + + ws.on('message', async raw => { + let frame: any + try { frame = JSON.parse(String(raw)) } catch { return } + + // First frame must be hello. + if (!conn) { + if (frame.type !== 'hello') return ws.close(4001, 'expected hello first') + if (frame.token !== config.bridgeToken) return ws.close(4002, 'bad token') + + conn = { + ws, + pid: frame.pid, + openclawSessionId: frame.openclaw_session, + claudeSessionUuid: frame.claude_session, + isAlive: true, + } + + // Evict any previous connection for this session (e.g. plugin reconnect). + const prev = byOpenclawSession.get(conn.openclawSessionId) + if (prev && prev.ws !== ws) try { prev.ws.close(4003, 'replaced by reconnect') } catch {} + byOpenclawSession.set(conn.openclawSessionId, conn) + + processManager.markReady(conn.pid) + + ws.send(JSON.stringify({ + type: 'hello_ack', + // TODO: real catalog from api.tools.list() — empty for the scaffold. + tools: [], + session_meta: { openclaw_session: conn.openclawSessionId }, + })) + return + } + + // Subsequent frames. + switch (frame.type) { + case 'tool_call': + // TODO: dispatch frame.tool with frame.args against OpenClaw's tool + // surface. For the scaffold we echo not-implemented. + ws.send(JSON.stringify({ + type: 'tool_result', + call_id: frame.call_id, + ok: false, + error: `tool dispatch not implemented (tool=${frame.tool})`, + })) + return + + case 'permission_request': + // TODO: route to the user via the original source channel + // (Discord/Telegram/…). The OpenClaw side knows where the session + // came from. For now, log and auto-deny. + process.stderr.write(`synthesis: permission_request ${frame.request_id} (auto-denied)\n`) + ws.send(JSON.stringify({ + type: 'permission_reply', + request_id: frame.request_id, + behavior: 'deny', + })) + return + + case 'pong': + conn.isAlive = true + return + + default: + process.stderr.write(`synthesis: unknown frame type from plugin: ${frame.type}\n`) + } + }) + + ws.on('close', () => { + if (conn && byOpenclawSession.get(conn.openclawSessionId)?.ws === ws) { + byOpenclawSession.delete(conn.openclawSessionId) + } + }) + }) + + // Heartbeat: ping every 30s, kill if no pong by next round. + const heartbeat = setInterval(() => { + for (const c of byOpenclawSession.values()) { + if (!c.isAlive) { + try { c.ws.terminate() } catch {} + continue + } + c.isAlive = false + try { c.ws.send(JSON.stringify({ type: 'ping', ts: Date.now() })) } catch {} + } + }, 30_000) + + process.stderr.write(`synthesis: bridge listening on ws://127.0.0.1:${config.bridgePort}/bridge\n`) + + return { + /** Public helper for outsiders (e.g. inbound event subscriber) to push messages. */ + pushInbound(openclawSessionId: string, payload: { content: string; meta: Record; request_id?: string }) { + const c = byOpenclawSession.get(openclawSessionId) + if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`) + c.ws.send(JSON.stringify({ type: 'inbound_message', ...payload })) + }, + pushPermissionReply(openclawSessionId: string, request_id: string, behavior: 'allow' | 'deny') { + const c = byOpenclawSession.get(openclawSessionId) + if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`) + c.ws.send(JSON.stringify({ type: 'permission_reply', request_id, behavior })) + }, + async close() { + clearInterval(heartbeat) + for (const c of byOpenclawSession.values()) try { c.ws.close() } catch {} + await new Promise(r => wss.close(() => r())) + }, + } as unknown as { close(): Promise } +}