import { WebSocketServer, type WebSocket } from 'ws' import type { SynthesisConfig } from '../core/config.js' import type { SessionMapping } from '../core/session-mapping.js' import type { ProcessManager } from '../core/process-manager.js' interface BridgeServerDeps { config: SynthesisConfig mapping: SessionMapping processManager: ProcessManager } interface ClientConn { ws: WebSocket pid: number openclawSessionId: string claudeSessionUuid: string isAlive: boolean } /** * WebSocket server that ClaudePlugin instances dial into. One connection per * spawned Claude process; identified by env-injected (openclaw_session, pid). * * Bridge frames are defined in docs/wire-protocol.md. */ export async function startBridgeServer(deps: BridgeServerDeps): Promise<{ close(): Promise }> { const { config, processManager } = deps const wss = new WebSocketServer({ host: '127.0.0.1', port: config.bridgePort, path: '/bridge', }) const byOpenclawSession = new Map() wss.on('connection', ws => { let conn: ClientConn | null = null ws.on('message', async raw => { let frame: any try { frame = JSON.parse(String(raw)) } catch { return } // First frame must be hello. if (!conn) { if (frame.type !== 'hello') return ws.close(4001, 'expected hello first') if (frame.token !== config.bridgeToken) return ws.close(4002, 'bad token') conn = { ws, pid: frame.pid, openclawSessionId: frame.openclaw_session, claudeSessionUuid: frame.claude_session, isAlive: true, } // Evict any previous connection for this session (e.g. plugin reconnect). const prev = byOpenclawSession.get(conn.openclawSessionId) if (prev && prev.ws !== ws) try { prev.ws.close(4003, 'replaced by reconnect') } catch {} byOpenclawSession.set(conn.openclawSessionId, conn) processManager.markReady(conn.pid) ws.send(JSON.stringify({ type: 'hello_ack', // TODO: real catalog from api.tools.list() — empty for the scaffold. tools: [], session_meta: { openclaw_session: conn.openclawSessionId }, })) return } // Subsequent frames. switch (frame.type) { case 'tool_call': // TODO: dispatch frame.tool with frame.args against OpenClaw's tool // surface. For the scaffold we echo not-implemented. ws.send(JSON.stringify({ type: 'tool_result', call_id: frame.call_id, ok: false, error: `tool dispatch not implemented (tool=${frame.tool})`, })) return case 'permission_request': // TODO: route to the user via the original source channel // (Discord/Telegram/…). The OpenClaw side knows where the session // came from. For now, log and auto-deny. process.stderr.write(`synthesis: permission_request ${frame.request_id} (auto-denied)\n`) ws.send(JSON.stringify({ type: 'permission_reply', request_id: frame.request_id, behavior: 'deny', })) return case 'pong': conn.isAlive = true return default: process.stderr.write(`synthesis: unknown frame type from plugin: ${frame.type}\n`) } }) ws.on('close', () => { if (conn && byOpenclawSession.get(conn.openclawSessionId)?.ws === ws) { byOpenclawSession.delete(conn.openclawSessionId) } }) }) // Heartbeat: ping every 30s, kill if no pong by next round. const heartbeat = setInterval(() => { for (const c of byOpenclawSession.values()) { if (!c.isAlive) { try { c.ws.terminate() } catch {} continue } c.isAlive = false try { c.ws.send(JSON.stringify({ type: 'ping', ts: Date.now() })) } catch {} } }, 30_000) process.stderr.write(`synthesis: bridge listening on ws://127.0.0.1:${config.bridgePort}/bridge\n`) return { /** Public helper for outsiders (e.g. inbound event subscriber) to push messages. */ pushInbound(openclawSessionId: string, payload: { content: string; meta: Record; request_id?: string }) { const c = byOpenclawSession.get(openclawSessionId) if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`) c.ws.send(JSON.stringify({ type: 'inbound_message', ...payload })) }, pushPermissionReply(openclawSessionId: string, request_id: string, behavior: 'allow' | 'deny') { const c = byOpenclawSession.get(openclawSessionId) if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`) c.ws.send(JSON.stringify({ type: 'permission_reply', request_id, behavior })) }, async close() { clearInterval(heartbeat) for (const c of byOpenclawSession.values()) try { c.ws.close() } catch {} await new Promise(r => wss.close(() => r())) }, } as unknown as { close(): Promise } }