chore: initial scaffolding for SynthesisAgent.OpenclawPlugin
OpenClaw plugin that manages long-lived interactive Claude Code processes per OpenClaw session. Process manager + session-mapping persistence + bridge WebSocket server skeleton. Will be rewritten to follow the contractor-agent HTTP model-provider pattern (register as `synthesis-claude-bridge` provider, receive /v1/chat/completions, dispatch to channel notification on the bound Claude process). See parent repo's STATUS.md for the punch list.
This commit is contained in:
146
web/bridge-server.ts
Normal file
146
web/bridge-server.ts
Normal file
@@ -0,0 +1,146 @@
|
||||
import { WebSocketServer, type WebSocket } from 'ws'
|
||||
import type { SynthesisConfig } from '../core/config.js'
|
||||
import type { SessionMapping } from '../core/session-mapping.js'
|
||||
import type { ProcessManager } from '../core/process-manager.js'
|
||||
|
||||
interface BridgeServerDeps {
|
||||
config: SynthesisConfig
|
||||
mapping: SessionMapping
|
||||
processManager: ProcessManager
|
||||
}
|
||||
|
||||
interface ClientConn {
|
||||
ws: WebSocket
|
||||
pid: number
|
||||
openclawSessionId: string
|
||||
claudeSessionUuid: string
|
||||
isAlive: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* WebSocket server that ClaudePlugin instances dial into. One connection per
|
||||
* spawned Claude process; identified by env-injected (openclaw_session, pid).
|
||||
*
|
||||
* Bridge frames are defined in docs/wire-protocol.md.
|
||||
*/
|
||||
export async function startBridgeServer(deps: BridgeServerDeps): Promise<{ close(): Promise<void> }> {
|
||||
const { config, processManager } = deps
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
host: '127.0.0.1',
|
||||
port: config.bridgePort,
|
||||
path: '/bridge',
|
||||
})
|
||||
|
||||
const byOpenclawSession = new Map<string, ClientConn>()
|
||||
|
||||
wss.on('connection', ws => {
|
||||
let conn: ClientConn | null = null
|
||||
|
||||
ws.on('message', async raw => {
|
||||
let frame: any
|
||||
try { frame = JSON.parse(String(raw)) } catch { return }
|
||||
|
||||
// First frame must be hello.
|
||||
if (!conn) {
|
||||
if (frame.type !== 'hello') return ws.close(4001, 'expected hello first')
|
||||
if (frame.token !== config.bridgeToken) return ws.close(4002, 'bad token')
|
||||
|
||||
conn = {
|
||||
ws,
|
||||
pid: frame.pid,
|
||||
openclawSessionId: frame.openclaw_session,
|
||||
claudeSessionUuid: frame.claude_session,
|
||||
isAlive: true,
|
||||
}
|
||||
|
||||
// Evict any previous connection for this session (e.g. plugin reconnect).
|
||||
const prev = byOpenclawSession.get(conn.openclawSessionId)
|
||||
if (prev && prev.ws !== ws) try { prev.ws.close(4003, 'replaced by reconnect') } catch {}
|
||||
byOpenclawSession.set(conn.openclawSessionId, conn)
|
||||
|
||||
processManager.markReady(conn.pid)
|
||||
|
||||
ws.send(JSON.stringify({
|
||||
type: 'hello_ack',
|
||||
// TODO: real catalog from api.tools.list() — empty for the scaffold.
|
||||
tools: [],
|
||||
session_meta: { openclaw_session: conn.openclawSessionId },
|
||||
}))
|
||||
return
|
||||
}
|
||||
|
||||
// Subsequent frames.
|
||||
switch (frame.type) {
|
||||
case 'tool_call':
|
||||
// TODO: dispatch frame.tool with frame.args against OpenClaw's tool
|
||||
// surface. For the scaffold we echo not-implemented.
|
||||
ws.send(JSON.stringify({
|
||||
type: 'tool_result',
|
||||
call_id: frame.call_id,
|
||||
ok: false,
|
||||
error: `tool dispatch not implemented (tool=${frame.tool})`,
|
||||
}))
|
||||
return
|
||||
|
||||
case 'permission_request':
|
||||
// TODO: route to the user via the original source channel
|
||||
// (Discord/Telegram/…). The OpenClaw side knows where the session
|
||||
// came from. For now, log and auto-deny.
|
||||
process.stderr.write(`synthesis: permission_request ${frame.request_id} (auto-denied)\n`)
|
||||
ws.send(JSON.stringify({
|
||||
type: 'permission_reply',
|
||||
request_id: frame.request_id,
|
||||
behavior: 'deny',
|
||||
}))
|
||||
return
|
||||
|
||||
case 'pong':
|
||||
conn.isAlive = true
|
||||
return
|
||||
|
||||
default:
|
||||
process.stderr.write(`synthesis: unknown frame type from plugin: ${frame.type}\n`)
|
||||
}
|
||||
})
|
||||
|
||||
ws.on('close', () => {
|
||||
if (conn && byOpenclawSession.get(conn.openclawSessionId)?.ws === ws) {
|
||||
byOpenclawSession.delete(conn.openclawSessionId)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Heartbeat: ping every 30s, kill if no pong by next round.
|
||||
const heartbeat = setInterval(() => {
|
||||
for (const c of byOpenclawSession.values()) {
|
||||
if (!c.isAlive) {
|
||||
try { c.ws.terminate() } catch {}
|
||||
continue
|
||||
}
|
||||
c.isAlive = false
|
||||
try { c.ws.send(JSON.stringify({ type: 'ping', ts: Date.now() })) } catch {}
|
||||
}
|
||||
}, 30_000)
|
||||
|
||||
process.stderr.write(`synthesis: bridge listening on ws://127.0.0.1:${config.bridgePort}/bridge\n`)
|
||||
|
||||
return {
|
||||
/** Public helper for outsiders (e.g. inbound event subscriber) to push messages. */
|
||||
pushInbound(openclawSessionId: string, payload: { content: string; meta: Record<string, unknown>; request_id?: string }) {
|
||||
const c = byOpenclawSession.get(openclawSessionId)
|
||||
if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`)
|
||||
c.ws.send(JSON.stringify({ type: 'inbound_message', ...payload }))
|
||||
},
|
||||
pushPermissionReply(openclawSessionId: string, request_id: string, behavior: 'allow' | 'deny') {
|
||||
const c = byOpenclawSession.get(openclawSessionId)
|
||||
if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`)
|
||||
c.ws.send(JSON.stringify({ type: 'permission_reply', request_id, behavior }))
|
||||
},
|
||||
async close() {
|
||||
clearInterval(heartbeat)
|
||||
for (const c of byOpenclawSession.values()) try { c.ws.close() } catch {}
|
||||
await new Promise<void>(r => wss.close(() => r()))
|
||||
},
|
||||
} as unknown as { close(): Promise<void> }
|
||||
}
|
||||
Reference in New Issue
Block a user