import http from 'node:http' import { WebSocketServer, type WebSocket } from 'ws' import { randomUUID } from 'node:crypto' import type { SynthesisConfig } from '../core/config.js' import type { SessionMapping } from '../core/session-mapping.js' import type { ProcessManager, ProcessHandle } from '../core/process-manager.js' interface BridgeDeps { config: SynthesisConfig mapping: SessionMapping processManager: ProcessManager resolveAgent?: (agentId: string) => { workspace: string } | null logger?: { info: (...a: unknown[]) => void; warn: (...a: unknown[]) => void } } interface ClientConn { ws: WebSocket pid: number openclawSessionKey: string isAlive: boolean /** Pending dispatch on this connection: resolves when we get a `reply` frame. */ pending: PendingDispatch | null } interface PendingDispatch { resolveReply: (text: string) => void rejectReply: (err: Error) => void partialChunks: string[] } const HEARTBEAT_MS = 30_000 /** * Combined HTTP + WebSocket server. * * HTTP 127.0.0.1:bridgePort /v1/chat/completions ← from OpenClaw * WS 127.0.0.1:channelWsPort /bridge ← from ClaudePlugin * * One inbound HTTP request maps to: * 1. ensure a Claude process for the session-key * 2. push an `inbound` WS frame to that process's ClaudePlugin * 3. await `reply` WS frame from that ClaudePlugin * 4. stream the reply text back to OpenClaw as SSE deltas */ export function startBridgeServer(deps: BridgeDeps): { httpServer: http.Server; wss: WebSocketServer; close(): Promise } { const { config, processManager } = deps const log = deps.logger ?? { info: (...a) => process.stderr.write(`[synthesis-bridge] ${a.join(' ')}\n`), warn: (...a) => process.stderr.write(`[synthesis-bridge] WARN ${a.join(' ')}\n`), } // Per-session FIFO so two requests for the same session don't overlap. const queueBySession = new Map>() // Active WS connections keyed by openclawSessionKey. const byKey = new Map() // ── WebSocket server (ClaudePlugin connections) ───────────────────────────── const wss = new WebSocketServer({ host: '127.0.0.1', port: config.channelWsPort, path: '/bridge' }) wss.on('connection', ws => { let conn: ClientConn | null = null ws.on('message', raw => { let frame: any try { frame = JSON.parse(String(raw)) } catch { return } if (!conn) { if (frame.type !== 'hello') { ws.close(4001, 'expected hello'); return } conn = { ws, pid: frame.pid, openclawSessionKey: frame.openclaw_session, isAlive: true, pending: null } const prev = byKey.get(conn.openclawSessionKey) if (prev && prev.ws !== ws) try { prev.ws.close(4002, 'replaced by reconnect') } catch {} byKey.set(conn.openclawSessionKey, conn) processManager.markReady(conn.pid) log.info(`hello session=${conn.openclawSessionKey} pid=${conn.pid}`) ws.send(JSON.stringify({ type: 'hello_ack' })) return } switch (frame.type) { case 'reply': { const pending = conn.pending if (!pending) { log.warn(`unsolicited reply session=${conn.openclawSessionKey} content_len=${(frame.content ?? '').length}`) return } if (typeof frame.content === 'string' && frame.content.length > 0) { pending.partialChunks.push(frame.content) } if (frame.final !== false) { const full = pending.partialChunks.join('') conn.pending = null pending.resolveReply(full) } return } case 'pong': conn.isAlive = true return default: log.warn(`unknown frame type=${frame.type}`) } }) ws.on('close', () => { if (conn && byKey.get(conn.openclawSessionKey)?.ws === ws) { byKey.delete(conn.openclawSessionKey) if (conn.pending) conn.pending.rejectReply(new Error('ClaudePlugin disconnected mid-turn')) } }) }) // Heartbeat const heartbeat = setInterval(() => { for (const c of byKey.values()) { if (!c.isAlive) { try { c.ws.terminate() } catch {} continue } c.isAlive = false try { c.ws.send(JSON.stringify({ type: 'ping' })) } catch {} } }, HEARTBEAT_MS) // ── HTTP server (OpenAI-compatible /v1/chat/completions) ──────────────────── const httpServer = http.createServer(async (req, res) => { if (req.method === 'GET' && req.url === '/health') { res.writeHead(200, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ ok: true, processes: processManager.list().length, conns: byKey.size })) return } if (!(req.method === 'POST' && (req.url === '/v1/chat/completions' || req.url === '/chat/completions'))) { res.writeHead(404); res.end(); return } let bodyRaw = '' req.on('data', c => { bodyRaw += c }) req.on('end', async () => { let body: any try { body = JSON.parse(bodyRaw) } catch { res.writeHead(400, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ error: 'invalid_json' })); return } // Minimal session-key extraction for v1 — use headers where OpenClaw // doesn't natively send them. Production OpenClaw runtime injects // session/agent info via the system prompt's Runtime block; we'll // upgrade to that parser when we wire into the real plugin runtime. const agentId = (req.headers['x-openclaw-agent-id'] as string) ?? body.user ?? 'default' const chatId = (req.headers['x-openclaw-chat-id'] as string) ?? '' const workspace = (req.headers['x-openclaw-workspace'] as string) ?? deps.resolveAgent?.(agentId)?.workspace ?? process.cwd() const sessionKey = chatId ? `${agentId}::${chatId}` : agentId const latestUser = extractLatestUserText(body) if (!latestUser) { res.writeHead(400, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ error: 'no user message found' })); return } const completionId = `chatcmpl-synthesis-${randomUUID().slice(0, 8)}` res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Transfer-Encoding': 'chunked', }) // SSE heartbeat: empty content delta every HEARTBEAT_MS to keep // OpenClaw's LLM idle watchdog from firing during long quiet turns. // (SSE comment frames are not counted as model progress upstream.) const sseHeartbeat = setInterval(() => { if (res.writableEnded) return try { writeSseChunk(res, completionId, '') } catch { /* ignore */ } }, HEARTBEAT_MS) sseHeartbeat.unref?.() const clientClosed = { v: false } req.on('close', () => { clientClosed.v = true }) // ── Per-sessionKey FIFO queue ──────────────────────────────────────── let releaseSlot: () => void = () => {} const mySlot = new Promise(r => { releaseSlot = r }) const prev = queueBySession.get(sessionKey) ?? Promise.resolve() const myTail = prev.then(() => mySlot, () => mySlot) queueBySession.set(sessionKey, myTail) try { await prev.catch(() => {}) if (clientClosed.v) return log.info(`dispatch session=${sessionKey} workspace=${workspace} msg=${latestUser.substring(0, 80)}`) const handle = await processManager.ensure(sessionKey, workspace) processManager.touch(sessionKey) const conn = byKey.get(sessionKey) if (!conn) throw new Error('no ws connection for session after ensure()') const replyText = await dispatchInbound(conn, latestUser, sessionKey) if (!clientClosed.v && !res.writableEnded) { writeSseChunk(res, completionId, replyText) writeSseStop(res, completionId) res.write('data: [DONE]\n\n') res.end() } } catch (err) { log.warn(`dispatch error: ${(err as Error).message}`) if (!clientClosed.v && !res.writableEnded) { writeSseChunk(res, completionId, `[synthesis-bridge error: ${(err as Error).message}]`) writeSseStop(res, completionId) res.write('data: [DONE]\n\n') res.end() } } finally { clearInterval(sseHeartbeat) releaseSlot() } }) }) httpServer.listen(config.bridgePort, '127.0.0.1', () => { log.info(`HTTP listening on 127.0.0.1:${config.bridgePort}`) log.info(`WS listening on 127.0.0.1:${config.channelWsPort}/bridge`) }) function dispatchInbound(conn: ClientConn, content: string, sessionKey: string): Promise { if (conn.pending) return Promise.reject(new Error('connection already has a pending dispatch')) return new Promise((resolve, reject) => { conn.pending = { resolveReply: resolve, rejectReply: reject, partialChunks: [] } const inbound = { type: 'inbound', content, meta: { chat_id: sessionKey, message_id: randomUUID(), ts: new Date().toISOString(), }, } try { conn.ws.send(JSON.stringify(inbound)) } catch (e) { conn.pending = null; reject(e as Error) } }) } return { httpServer, wss, async close() { clearInterval(heartbeat) for (const c of byKey.values()) try { c.ws.close() } catch {} await Promise.all([ new Promise(r => wss.close(() => r())), new Promise(r => httpServer.close(() => r())), ]) }, } } // ── helpers ──────────────────────────────────────────────────────────────────── function extractLatestUserText(body: any): string { const messages = body?.messages if (!Array.isArray(messages)) return '' for (let i = messages.length - 1; i >= 0; i--) { const m = messages[i] if (m?.role !== 'user') continue if (typeof m.content === 'string') return m.content if (Array.isArray(m.content)) { return m.content.filter((c: any) => c?.type === 'text').map((c: any) => c.text ?? '').join('\n') } } return '' } function writeSseChunk(res: http.ServerResponse, id: string, text: string): void { const chunk = JSON.stringify({ id, object: 'chat.completion.chunk', created: Math.floor(Date.now() / 1000), model: 'synthesis-claude-bridge', choices: [{ index: 0, delta: { content: text }, finish_reason: null }], }) res.write(`data: ${chunk}\n\n`) } function writeSseStop(res: http.ServerResponse, id: string): void { const chunk = JSON.stringify({ id, object: 'chat.completion.chunk', created: Math.floor(Date.now() / 1000), model: 'synthesis-claude-bridge', choices: [{ index: 0, delta: {}, finish_reason: 'stop' }], }) res.write(`data: ${chunk}\n\n`) }