diff --git a/README.md b/README.md index 2a02cde..e31c978 100644 --- a/README.md +++ b/README.md @@ -1,73 +1,88 @@ # SynthesisAgent.OpenclawPlugin -The **OpenClaw side** of SynthesisAgent. Lives inside the OpenClaw process; spawns and manages one long-lived interactive `claude` per OpenClaw session. +OpenClaw plugin that routes agent turns through long-lived interactive Claude Code processes. Replaces the contractor-agent `claude -p` spawn pattern. ## Components ``` -index.ts entry: definePluginEntry, wires up the others -core/config.ts plugin config schema + defaults -core/session-mapping.ts persistent openclaw_session ↔ claude_session_uuid -core/process-manager.ts spawn/resume/kill claude processes -core/cli.ts admin commands (`openclaw synthesis ...`) -web/bridge-server.ts WebSocket server that ClaudePlugins connect into +index.ts plugin entry + standalone-run main +core/config.ts config + defaults +core/session-mapping.ts openclaw_session ↔ claude_session_uuid (JSON file) +core/process-manager.ts spawn / track / reap claude processes +web/bridge-server.ts HTTP server + WS bridge server in one module ``` -## Lifecycle +## Two ways to run -``` -plugin loaded - ↓ -SessionMapping reads ~/.openclaw/synthesis/sessions.json -ProcessManager idle, no spawns yet -BridgeServer binds 127.0.0.1:18801 - ↓ -inbound Discord msg for session=alice - ↓ -processManager.ensure('alice') - - mapping.ensure → claude_session_uuid (new or existing) - - spawn `claude --channels plugin:synthesis-claude@local --resume ...` - - inject env: SYNTHESIS_BRIDGE_URL / TOKEN / OPENCLAW_SESSION / CLAUDE_SESSION - ↓ -ClaudePlugin (in spawned claude) dials ws://127.0.0.1:18801/bridge - - sends hello frame - - server validates token, marks process ready, sends hello_ack with tool catalog - ↓ -processManager.ensure resolves -inbound message pushed → notifications/claude/channel → Claude starts new turn - ↓ -Claude calls tools/call → forwarded to OpenClaw tool surface -Claude finishes → process stays alive, idle timer reset - ↓ -1 hour idle → idleSweeper SIGTERMs the process -mapping is preserved → next message resumes via --resume +### As OpenClaw plugin (production) + +OpenClaw loads `index.ts` via `definePluginEntry`. On `gateway_start`, the bridge server boots on the configured ports. + +### Standalone (development / testing) + +```bash +bun index.ts ``` -## Config +Same code path, but no `definePluginEntry` registration — just boots the bridge with default config. -See `openclaw.plugin.json` `configSchema`. Override in `~/.openclaw/openclaw.json`: +## Config (`openclaw.plugin.json` configSchema) -```json -{ - "plugins": { - "entries": { - "synthesis-agent": { - "bridgePort": 18801, - "bridgeToken": "", - "idleKillMs": 1800000, - "maxProcesses": 8 - } - } - } -} +| Key | Default | Notes | +|---|---|---| +| `bridgePort` | 18900 | HTTP port for `/v1/chat/completions` | +| `channelWsPort` | 18901 | WebSocket port for ClaudePlugin connections | +| `permissionMode` | `bypassPermissions` | Spawned Claude's permission mode | +| `idleKillMs` | 3 600 000 | Idle TTL before SIGTERM | +| `maxProcesses` | 16 | Process pool cap | +| `mappingDbPath` | `~/.openclaw/synthesis/sessions.json` | Persistent session map | +| `channelName` | `synthesis` | Used in `--channels server:` | + +## Laptop smoke test (no real OpenClaw needed) + +```bash +# 0. Globally register the ClaudePlugin as an MCP server (one-time) +claude mcp add --scope user synthesis -- bun run \ + /path/to/SynthesisAgent.ClaudePlugin/server.ts + +# 1. Start OpenclawPlugin standalone (this terminal) +cd SynthesisAgent.OpenclawPlugin +bun install +bun index.ts +# → HTTP listening on 127.0.0.1:18900 +# → WS listening on 127.0.0.1:18901/bridge + +# 2. POST a chat completion (another terminal) +curl -N -X POST http://127.0.0.1:18900/v1/chat/completions \ + -H 'Content-Type: application/json' \ + -H 'X-Openclaw-Agent-Id: dev' \ + -H 'X-Openclaw-Chat-Id: test-1' \ + -H "X-Openclaw-Workspace: $HOME/some-trusted-dir" \ + -d '{ + "model":"synthesis-claude-bridge", + "messages":[ + {"role":"user","content":"Reply with exactly the word READY"} + ] + }' ``` -## TODO +Expected flow: +1. OpenclawPlugin receives request, ensures a claude process for `dev::test-1` +2. Spawns `claude --channels server:synthesis --dangerously-load-development-channels server:synthesis --resume ...` +3. ClaudePlugin (inside the claude process) dials `ws://127.0.0.1:18901/bridge`, sends `hello` +4. OpenclawPlugin pushes `inbound` frame, ClaudePlugin emits `notifications/claude/channel` +5. Claude reacts, eventually calls `reply(text)` tool +6. ClaudePlugin sends `reply` WS frame, OpenclawPlugin streams it as SSE delta +7. curl sees the reply text -- [ ] Discover exact OpenClaw plugin-sdk API for: inbound event subscription, CLI command registration, tool catalog enumeration. See `contractor-agent/index.ts` for current best example. -- [ ] Wire `api.channels.onInbound(...)` to `pm.ensure()` + `bridgeServer.pushInbound()`. -- [ ] Implement tool dispatch in `web/bridge-server.ts` (current `tool_call` handler returns not-implemented). -- [ ] Implement permission routing back to source channel. -- [ ] CLI: `openclaw synthesis list`, `push`, `kill`, `forget`. -- [ ] Settle path conflict: this plugin needs to live at `/root/.openclaw/plugins/synthesis-agent/` to be auto-loaded. Either symlink from this repo or document copy-install. -- [ ] Decide policy on `--no-session-persistence` flag (currently we rely on default persistence so `--resume` works). +## Known v1 simplifications (documented punch list) + +- Session-key extraction reads headers only — production OpenClaw routing will need the contractor-agent style "Conversation info" parser +- No tool-catalog proxy: ClaudePlugin only exposes `reply`; the model uses Claude Code's built-in tools (Read/Edit/Bash/Grep) for everything else +- No permission_request reverse channel (full perms by config) +- No bridge token / auth handshake (same-machine assumption) +- Standalone mode boots with defaults only; no config flags + +## License + +Apache-2.0 diff --git a/core/cli.ts b/core/cli.ts index 16377bc..d4cf095 100644 --- a/core/cli.ts +++ b/core/cli.ts @@ -1,37 +1,4 @@ -import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core' -import type { ProcessManager } from './process-manager.js' -import type { SessionMapping } from './session-mapping.js' -import type { SynthesisConfig } from './config.js' - -interface CliDeps { - processManager: ProcessManager - mapping: SessionMapping - config: SynthesisConfig -} - -/** - * `openclaw synthesis ...` admin commands. Used to inspect & poke the running - * pool from outside (helpful for testing without a real channel inbound). - * - * Subcommands (planned): - * list — show live processes + mapping - * push — fake an inbound message - * kill — terminate a process (mapping preserved) - * forget — drop mapping entirely (next msg = new claude session) - */ -export function registerCli(api: OpenClawPluginApi, deps: CliDeps): void { - // The exact API shape for command registration depends on the OpenClaw - // plugin-sdk version. Two common forms seen in existing plugins: - // - // api.commands?.register('synthesis', { describe, handler }) - // api.cli?.command('synthesis', sub => sub.command('list', '...', () => {...})) - // - // Both are stubbed below — pick whichever the loaded SDK exposes when - // wiring this for real. - - const _ = { api, deps } - // TODO: wire actual command registration once the surrounding plugin - // patterns are confirmed (see contractor-agent/commands/register-cli.ts - // for the canonical example in this codebase). - void _ -} +// Reserved for future `openclaw synthesis ...` admin commands (list, kill, +// forget). Stub kept in place so index.ts can wire registerCli once OpenClaw +// plugin-sdk's exact CLI registration surface is confirmed. +export {} diff --git a/core/config.ts b/core/config.ts index b8db286..b0c3d8b 100644 --- a/core/config.ts +++ b/core/config.ts @@ -3,22 +3,22 @@ import { resolve as resolvePath } from 'node:path' export interface SynthesisConfig { bridgePort: number - bridgeToken: string - claudePluginRef: string + channelWsPort: number permissionMode: string idleKillMs: number maxProcesses: number mappingDbPath: string + channelName: string } const DEFAULTS: SynthesisConfig = { - bridgePort: 18801, - bridgeToken: 'synthesis-local', - claudePluginRef: 'plugin:synthesis-claude@local', - permissionMode: 'acceptEdits', + bridgePort: 18900, + channelWsPort: 18901, + permissionMode: 'bypassPermissions', idleKillMs: 3_600_000, maxProcesses: 16, mappingDbPath: '~/.openclaw/synthesis/sessions.json', + channelName: 'synthesis', } function expand(p: string): string { @@ -30,12 +30,12 @@ export function normalizeConfig(raw: unknown): SynthesisConfig { const r = (raw ?? {}) as Partial const merged: SynthesisConfig = { bridgePort: typeof r.bridgePort === 'number' ? r.bridgePort : DEFAULTS.bridgePort, - bridgeToken: typeof r.bridgeToken === 'string' && r.bridgeToken ? r.bridgeToken : DEFAULTS.bridgeToken, - claudePluginRef: typeof r.claudePluginRef === 'string' && r.claudePluginRef ? r.claudePluginRef : DEFAULTS.claudePluginRef, + channelWsPort: typeof r.channelWsPort === 'number' ? r.channelWsPort : DEFAULTS.channelWsPort, permissionMode: typeof r.permissionMode === 'string' && r.permissionMode ? r.permissionMode : DEFAULTS.permissionMode, idleKillMs: typeof r.idleKillMs === 'number' ? r.idleKillMs : DEFAULTS.idleKillMs, maxProcesses: typeof r.maxProcesses === 'number' ? r.maxProcesses : DEFAULTS.maxProcesses, mappingDbPath: typeof r.mappingDbPath === 'string' && r.mappingDbPath ? r.mappingDbPath : DEFAULTS.mappingDbPath, + channelName: typeof r.channelName === 'string' && r.channelName ? r.channelName : DEFAULTS.channelName, } merged.mappingDbPath = expand(merged.mappingDbPath) return merged diff --git a/core/process-manager.ts b/core/process-manager.ts index b3e6ff1..c4ae9bf 100644 --- a/core/process-manager.ts +++ b/core/process-manager.ts @@ -4,12 +4,13 @@ import type { SessionMapping } from './session-mapping.js' export interface ProcessHandle { pid: number - openclawSessionId: string + openclawSessionKey: string claudeSessionUuid: string + workspace: string proc: ChildProcess startedAt: number lastActiveAt: number - /** Resolved when the ClaudePlugin's hello frame has been seen on the bridge. */ + /** Resolves once ClaudePlugin sends `hello` over the bridge WS. */ ready: Promise markReady: () => void } @@ -17,56 +18,69 @@ export interface ProcessHandle { export interface ProcessManagerDeps { config: SynthesisConfig mapping: SessionMapping + logger?: { info: (...a: unknown[]) => void; warn: (...a: unknown[]) => void } } /** - * Owns the pool of `claude` subprocesses. One per openclaw_session, lazily - * spawned on first message and reaped after `idleKillMs` of inactivity. + * Owns the pool of long-lived `claude` subprocesses. One per OpenClaw + * session-key (`agent_id::chat_id` per contractor-agent's convention). + * Lazily spawned on first dispatch and reaped after `idleKillMs` of idle. * - * Hand-off to ClaudePlugin happens via env vars + a shared WebSocket bridge: - * we know a process is "live" when its plugin connects back and sends `hello`. + * Spawn shape: + * claude --channels server: + * --dangerously-load-development-channels server: + * --resume + * --permission-mode + * --dangerously-skip-permissions ← only when permissionMode=bypassPermissions + * AND we're allowed to (non-root or sandbox) + * env: SYNTHESIS_WS_URL, SYNTHESIS_OPENCLAW_SESSION, SYNTHESIS_CLAUDE_SESSION * - * Responsibilities NOT handled here (delegated): - * - Routing inbound messages to a specific process → done by bridge-server, - * which holds the ws connection per pid. - * - Tool catalog → bridge-server sends in hello_ack. + * The spawned `--dangerously-load-development-channels` will trigger an + * interactive confirmation dialog by default. We handle this by piping a + * "1\n" to stdin shortly after spawn (the first option is the dev-mode + * confirmation). The Claude process's first turn comes from the channel + * notification, not from stdin. */ export class ProcessManager { - private byOpenclawSession = new Map() + private byKey = new Map() private idleSweeper: ReturnType | null = null private shuttingDown = false + private log: NonNullable constructor(private deps: ProcessManagerDeps) { + this.log = deps.logger ?? { + info: (...a) => process.stderr.write(`[synthesis-pm] ${a.join(' ')}\n`), + warn: (...a) => process.stderr.write(`[synthesis-pm] WARN ${a.join(' ')}\n`), + } this.idleSweeper = setInterval(() => this.sweepIdle(), 60_000) } - /** Returns the live handle, spawning if needed. Awaits hello handshake. */ - async ensure(openclawSessionId: string): Promise { - const existing = this.byOpenclawSession.get(openclawSessionId) - if (existing && !existing.proc.killed) { + /** Spawn-if-needed and await the ClaudePlugin handshake. */ + async ensure(openclawSessionKey: string, workspace: string): Promise { + const existing = this.byKey.get(openclawSessionKey) + if (existing && !existing.proc.killed && existing.proc.exitCode === null) { existing.lastActiveAt = Date.now() return existing } - if (this.byOpenclawSession.size >= this.deps.config.maxProcesses) { - this.evictOldestIdle() - } + if (this.byKey.size >= this.deps.config.maxProcesses) this.evictOldestIdle() - const rec = this.deps.mapping.ensure(openclawSessionId) - const handle = this.spawn(openclawSessionId, rec.claudeSessionUuid) - this.byOpenclawSession.set(openclawSessionId, handle) - // Wait for ClaudePlugin → bridge → bridge-server → handle.markReady(). + const rec = this.deps.mapping.ensure(openclawSessionKey) + const handle = this.spawn(openclawSessionKey, rec.claudeSessionUuid, workspace) + this.byKey.set(openclawSessionKey, handle) await Promise.race([ handle.ready, - new Promise((_, rej) => setTimeout(() => rej(new Error('claude spawn ready timeout')), 15_000)), + new Promise((_, rej) => + setTimeout(() => rej(new Error('claude spawn ready timeout (15s)')), 15_000), + ), ]) - this.deps.mapping.touch(openclawSessionId) + this.deps.mapping.touch(openclawSessionKey) return handle } - /** Bridge server calls this once it has paired a connection to a pid. */ + /** Bridge server calls this when it sees a `hello` frame matching this pid. */ markReady(pid: number): void { - for (const h of this.byOpenclawSession.values()) { + for (const h of this.byKey.values()) { if (h.pid === pid) { h.markReady() return @@ -74,37 +88,54 @@ export class ProcessManager { } } - private spawn(openclawSessionId: string, claudeSessionUuid: string): ProcessHandle { + touch(openclawSessionKey: string): void { + const h = this.byKey.get(openclawSessionKey) + if (h) h.lastActiveAt = Date.now() + } + + private spawn(openclawSessionKey: string, claudeSessionUuid: string, workspace: string): ProcessHandle { const { config } = this.deps + const wsUrl = `ws://127.0.0.1:${config.channelWsPort}/bridge` + const channelTag = `server:${config.channelName}` const args = [ - '--channels', config.claudePluginRef, + '--channels', channelTag, + '--dangerously-load-development-channels', channelTag, '--resume', claudeSessionUuid, '--permission-mode', config.permissionMode, - // Tool allowlist scoped to our MCP namespace plus the basics models need - // for any non-trivial assist (Read, Edit, Bash). Tighten if/when policy - // demands. - '--allowed-tools', 'Read Edit Bash mcp__synthesis__*', ] + this.log.info(`spawning claude session=${openclawSessionKey} claude_uuid=${claudeSessionUuid} workspace=${workspace}`) + const proc = spawn('claude', args, { + cwd: workspace, env: { ...process.env, - SYNTHESIS_BRIDGE_URL: `ws://127.0.0.1:${config.bridgePort}/bridge`, - SYNTHESIS_BRIDGE_TOKEN: config.bridgeToken, - SYNTHESIS_OPENCLAW_SESSION: openclawSessionId, + SYNTHESIS_WS_URL: wsUrl, + SYNTHESIS_OPENCLAW_SESSION: openclawSessionKey, SYNTHESIS_CLAUDE_SESSION: claudeSessionUuid, }, - stdio: ['ignore', 'pipe', 'pipe'], + stdio: ['pipe', 'pipe', 'pipe'], detached: false, }) + // After ~500ms, send "1\n" to confirm the dev-channels dialog. Idempotent + // if the dialog isn't there (claude eats the keystroke harmlessly). + setTimeout(() => { + try { + proc.stdin?.write('1\n') + } catch { + /* ignore */ + } + }, 500) + let resolveReady!: () => void const ready = new Promise(r => { resolveReady = r }) const handle: ProcessHandle = { pid: proc.pid ?? -1, - openclawSessionId, + openclawSessionKey, claudeSessionUuid, + workspace, proc, startedAt: Date.now(), lastActiveAt: Date.now(), @@ -113,9 +144,8 @@ export class ProcessManager { } proc.on('exit', code => { - this.byOpenclawSession.delete(openclawSessionId) - // mapping is preserved so the next message resumes via --resume - process.stderr.write(`synthesis: claude exit session=${openclawSessionId} pid=${handle.pid} code=${code}\n`) + this.byKey.delete(openclawSessionKey) + this.log.info(`claude exit session=${openclawSessionKey} pid=${handle.pid} code=${code}`) }) proc.stderr?.on('data', chunk => { @@ -128,9 +158,9 @@ export class ProcessManager { private sweepIdle(): void { if (this.shuttingDown) return const cutoff = Date.now() - this.deps.config.idleKillMs - for (const h of this.byOpenclawSession.values()) { + for (const h of this.byKey.values()) { if (h.lastActiveAt < cutoff) { - process.stderr.write(`synthesis: idle-killing session=${h.openclawSessionId} pid=${h.pid}\n`) + this.log.info(`idle-killing session=${h.openclawSessionKey} pid=${h.pid}`) h.proc.kill('SIGTERM') } } @@ -138,25 +168,25 @@ export class ProcessManager { private evictOldestIdle(): void { let oldest: ProcessHandle | null = null - for (const h of this.byOpenclawSession.values()) { + for (const h of this.byKey.values()) { if (!oldest || h.lastActiveAt < oldest.lastActiveAt) oldest = h } if (oldest) { - process.stderr.write(`synthesis: evicting session=${oldest.openclawSessionId} (max processes reached)\n`) + this.log.info(`evicting session=${oldest.openclawSessionKey} (max processes reached)`) oldest.proc.kill('SIGTERM') } } list(): ProcessHandle[] { - return [...this.byOpenclawSession.values()] + return [...this.byKey.values()] } async shutdown(): Promise { this.shuttingDown = true if (this.idleSweeper) clearInterval(this.idleSweeper) - for (const h of this.byOpenclawSession.values()) { + for (const h of this.byKey.values()) { try { h.proc.kill('SIGTERM') } catch { /* ignore */ } } - this.byOpenclawSession.clear() + this.byKey.clear() } } diff --git a/index.ts b/index.ts index 1366202..883772b 100644 --- a/index.ts +++ b/index.ts @@ -1,68 +1,111 @@ import { definePluginEntry } from 'openclaw/plugin-sdk/plugin-entry' import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core' +import fs from 'node:fs' +import path from 'node:path' import { normalizeConfig, type SynthesisConfig } from './core/config.js' import { ProcessManager } from './core/process-manager.js' import { SessionMapping } from './core/session-mapping.js' import { startBridgeServer } from './web/bridge-server.js' -import { registerCli } from './core/cli.js' -// All long-lived state lives on globalThis so OpenClaw hot-reloads don't kill -// running Claude processes (mirrors contractor-agent's convention). +// All long-lived state survives OpenClaw hot-reloads via globalThis. const _G = globalThis as Record const PM_KEY = '_synthesisProcessManager' const SERVER_KEY = '_synthesisBridgeServer' const MAPPING_KEY = '_synthesisSessionMapping' +const LIFECYCLE_KEY = '_synthesisLifecycleRegistered' +const OPENCLAW_API_KEY = '_synthesisOpenclawApi' + +function resolveAgentWorkspace(agentId: string): { workspace: string } | null { + try { + const configPath = path.join(process.env.HOME ?? '/root', '.openclaw', 'openclaw.json') + const raw = JSON.parse(fs.readFileSync(configPath, 'utf8')) as { + agents?: { list?: Array<{ id: string; workspace?: string }> } + } + const agent = raw.agents?.list?.find(a => a.id === agentId) + if (!agent?.workspace) return null + return { workspace: agent.workspace } + } catch { + return null + } +} export default definePluginEntry({ id: 'synthesis-agent', name: 'SynthesisAgent', - description: 'Manages long-lived Claude Code processes per OpenClaw session', + description: 'Routes OpenClaw agent turns through long-lived interactive Claude Code processes', register(api: OpenClawPluginApi): void { - const config: SynthesisConfig = normalizeConfig(api.pluginConfig) + const config: SynthesisConfig = normalizeConfig((api as any).pluginConfig) + const logger = (api as any).logger ?? console + _G[OPENCLAW_API_KEY] = api - // ── Reuse existing state across hot reload ───────────────────────────── let mapping = _G[MAPPING_KEY] as SessionMapping | undefined if (!mapping) { mapping = new SessionMapping(config.mappingDbPath) _G[MAPPING_KEY] = mapping } - let pm = _G[PM_KEY] as ProcessManager | undefined if (!pm) { - pm = new ProcessManager({ config, mapping }) + pm = new ProcessManager({ config, mapping, logger }) _G[PM_KEY] = pm } - // Existing bridge server: leave it alone. - if (!_G[SERVER_KEY]) { - startBridgeServer({ config, mapping, processManager: pm }) - .then(server => { _G[SERVER_KEY] = server }) - .catch(err => { - api.logger?.error?.('synthesis: bridge server failed to start', err) + if (!_G[LIFECYCLE_KEY]) { + _G[LIFECYCLE_KEY] = true + ;(api as any).on?.('gateway_start', () => { + if (_G[SERVER_KEY]) return + const server = startBridgeServer({ + config, + mapping: mapping!, + processManager: pm!, + resolveAgent: resolveAgentWorkspace, + logger, }) + _G[SERVER_KEY] = server + logger.info?.(`[synthesis-agent] bridge HTTP=${config.bridgePort} WS=${config.channelWsPort}`) + }) + ;(api as any).on?.('gateway_stop', async () => { + const s = _G[SERVER_KEY] as Awaited> | undefined + if (s) await s.close?.() + delete _G[SERVER_KEY] + await pm!.shutdown() + delete _G[PM_KEY] + logger.info?.('[synthesis-agent] bridge stopped') + }) } - // ── Wire to OpenClaw inbound event bus ───────────────────────────────── - // TODO: subscribe to api.events / api.channels.onInbound (exact API TBD). - // Each inbound message arrives with an openclaw_session_id; we forward to - // the process manager, which spawns/resumes claude as needed and pushes - // the message into its plugin's bridge socket. - // - // api.channels.onInbound((evt) => { - // pm.deliverInbound(evt.openclawSessionId, evt) - // }) - // - // Similarly for permission replies. - - registerCli(api, { processManager: pm, mapping, config }) - - api.lifecycle?.onShutdown?.(async () => { - const server = _G[SERVER_KEY] as Awaited> | undefined - await server?.close?.() - delete _G[SERVER_KEY] - await pm!.shutdown() - delete _G[PM_KEY] - }) + logger.info?.(`[synthesis-agent] plugin registered (bridge port=${config.bridgePort})`) }, }) + +// ── Standalone mode ─────────────────────────────────────────────────────────── +// When this file is executed directly (e.g. `bun index.ts`), boot the bridge +// without the OpenClaw plugin runtime. Useful for local development & the +// laptop integration test. +// +// Detect direct execution via import.meta.main (Bun) or argv[1] match. +declare const Bun: { main: string } | undefined +const isDirectRun = + (typeof Bun !== 'undefined' && Bun.main && import.meta.url === `file://${Bun.main}`) || + (process.argv[1] && (process.argv[1].endsWith('/index.ts') || process.argv[1].endsWith('/index.js'))) + +if (isDirectRun) { + const config = normalizeConfig({}) + const mapping = new SessionMapping(config.mappingDbPath) + const pm = new ProcessManager({ config, mapping }) + const server = startBridgeServer({ + config, + mapping, + processManager: pm, + resolveAgent: resolveAgentWorkspace, + }) + + const shutdown = async () => { + process.stderr.write('shutting down…\n') + await server.close() + await pm.shutdown() + process.exit(0) + } + process.on('SIGINT', shutdown) + process.on('SIGTERM', shutdown) +} diff --git a/openclaw.plugin.json b/openclaw.plugin.json index c966181..f190337 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -1,7 +1,7 @@ { "id": "synthesis-agent", "name": "SynthesisAgent", - "description": "Manages long-lived interactive Claude Code processes per OpenClaw session; bridges OpenClaw events <-> SynthesisAgent.ClaudePlugin", + "description": "Routes OpenClaw agent turns through long-lived interactive Claude Code processes; replaces the claude -p path so usage stays on the subscription quota", "activation": { "onStartup": true }, @@ -14,22 +14,17 @@ "properties": { "bridgePort": { "type": "number", - "default": 18801, - "description": "TCP port the bridge WebSocket server binds on (127.0.0.1)" + "default": 18900, + "description": "HTTP port for the OpenAI-compatible /v1/chat/completions endpoint that OpenClaw routes agent turns to" }, - "bridgeToken": { - "type": "string", - "default": "synthesis-local", - "description": "Shared secret each ClaudePlugin instance must present in its hello frame" - }, - "claudePluginRef": { - "type": "string", - "default": "plugin:synthesis-claude@local", - "description": "The --channels argument passed to claude on spawn" + "channelWsPort": { + "type": "number", + "default": 18901, + "description": "WebSocket port that each spawned Claude process's ClaudePlugin dials back into" }, "permissionMode": { "type": "string", - "default": "acceptEdits", + "default": "bypassPermissions", "description": "Claude Code permission mode for spawned sessions" }, "idleKillMs": { @@ -45,7 +40,12 @@ "mappingDbPath": { "type": "string", "default": "~/.openclaw/synthesis/sessions.json", - "description": "Where openclaw_session <-> claude_session UUID mapping is persisted" + "description": "Where openclaw_session ↔ claude_session_uuid mapping is persisted" + }, + "channelName": { + "type": "string", + "default": "synthesis", + "description": "Name used in `--channels server:` when spawning claude; must match the MCP server name in ClaudePlugin's .mcp.json" } } } diff --git a/package.json b/package.json index 8fc3dfb..9baff15 100644 --- a/package.json +++ b/package.json @@ -2,15 +2,18 @@ "name": "synthesis-agent-openclaw-plugin", "version": "0.0.1", "license": "Apache-2.0", - "description": "OpenClaw plugin: manages long-lived Claude Code processes per OpenClaw session for SynthesisAgent", + "description": "OpenClaw plugin: routes agent turns through long-lived interactive Claude Code processes", "type": "module", "main": "index.ts", + "scripts": { + "standalone": "bun index.ts" + }, "dependencies": { "ws": "^8.18.0" }, "devDependencies": { + "@types/node": "^20.0.0", "@types/ws": "^8.5.10", - "typescript": "^5.0.0", - "openclaw": "*" + "typescript": "^5.0.0" } } diff --git a/web/bridge-server.ts b/web/bridge-server.ts index ad9b387..d1c1ae2 100644 --- a/web/bridge-server.ts +++ b/web/bridge-server.ts @@ -1,146 +1,294 @@ +import http from 'node:http' import { WebSocketServer, type WebSocket } from 'ws' +import { randomUUID } from 'node:crypto' import type { SynthesisConfig } from '../core/config.js' import type { SessionMapping } from '../core/session-mapping.js' -import type { ProcessManager } from '../core/process-manager.js' +import type { ProcessManager, ProcessHandle } from '../core/process-manager.js' -interface BridgeServerDeps { +interface BridgeDeps { config: SynthesisConfig mapping: SessionMapping processManager: ProcessManager + resolveAgent?: (agentId: string) => { workspace: string } | null + logger?: { info: (...a: unknown[]) => void; warn: (...a: unknown[]) => void } } interface ClientConn { ws: WebSocket pid: number - openclawSessionId: string - claudeSessionUuid: string + openclawSessionKey: string isAlive: boolean + /** Pending dispatch on this connection: resolves when we get a `reply` frame. */ + pending: PendingDispatch | null } +interface PendingDispatch { + resolveReply: (text: string) => void + rejectReply: (err: Error) => void + partialChunks: string[] +} + +const HEARTBEAT_MS = 30_000 + /** - * WebSocket server that ClaudePlugin instances dial into. One connection per - * spawned Claude process; identified by env-injected (openclaw_session, pid). + * Combined HTTP + WebSocket server. * - * Bridge frames are defined in docs/wire-protocol.md. + * HTTP 127.0.0.1:bridgePort /v1/chat/completions ← from OpenClaw + * WS 127.0.0.1:channelWsPort /bridge ← from ClaudePlugin + * + * One inbound HTTP request maps to: + * 1. ensure a Claude process for the session-key + * 2. push an `inbound` WS frame to that process's ClaudePlugin + * 3. await `reply` WS frame from that ClaudePlugin + * 4. stream the reply text back to OpenClaw as SSE deltas */ -export async function startBridgeServer(deps: BridgeServerDeps): Promise<{ close(): Promise }> { +export function startBridgeServer(deps: BridgeDeps): { httpServer: http.Server; wss: WebSocketServer; close(): Promise } { const { config, processManager } = deps + const log = deps.logger ?? { + info: (...a) => process.stderr.write(`[synthesis-bridge] ${a.join(' ')}\n`), + warn: (...a) => process.stderr.write(`[synthesis-bridge] WARN ${a.join(' ')}\n`), + } - const wss = new WebSocketServer({ - host: '127.0.0.1', - port: config.bridgePort, - path: '/bridge', - }) + // Per-session FIFO so two requests for the same session don't overlap. + const queueBySession = new Map>() + // Active WS connections keyed by openclawSessionKey. + const byKey = new Map() - const byOpenclawSession = new Map() + // ── WebSocket server (ClaudePlugin connections) ───────────────────────────── + const wss = new WebSocketServer({ host: '127.0.0.1', port: config.channelWsPort, path: '/bridge' }) wss.on('connection', ws => { let conn: ClientConn | null = null - ws.on('message', async raw => { + ws.on('message', raw => { let frame: any try { frame = JSON.parse(String(raw)) } catch { return } - // First frame must be hello. if (!conn) { - if (frame.type !== 'hello') return ws.close(4001, 'expected hello first') - if (frame.token !== config.bridgeToken) return ws.close(4002, 'bad token') - - conn = { - ws, - pid: frame.pid, - openclawSessionId: frame.openclaw_session, - claudeSessionUuid: frame.claude_session, - isAlive: true, - } - - // Evict any previous connection for this session (e.g. plugin reconnect). - const prev = byOpenclawSession.get(conn.openclawSessionId) - if (prev && prev.ws !== ws) try { prev.ws.close(4003, 'replaced by reconnect') } catch {} - byOpenclawSession.set(conn.openclawSessionId, conn) - + if (frame.type !== 'hello') { ws.close(4001, 'expected hello'); return } + conn = { ws, pid: frame.pid, openclawSessionKey: frame.openclaw_session, isAlive: true, pending: null } + const prev = byKey.get(conn.openclawSessionKey) + if (prev && prev.ws !== ws) try { prev.ws.close(4002, 'replaced by reconnect') } catch {} + byKey.set(conn.openclawSessionKey, conn) processManager.markReady(conn.pid) - - ws.send(JSON.stringify({ - type: 'hello_ack', - // TODO: real catalog from api.tools.list() — empty for the scaffold. - tools: [], - session_meta: { openclaw_session: conn.openclawSessionId }, - })) + log.info(`hello session=${conn.openclawSessionKey} pid=${conn.pid}`) + ws.send(JSON.stringify({ type: 'hello_ack' })) return } - // Subsequent frames. switch (frame.type) { - case 'tool_call': - // TODO: dispatch frame.tool with frame.args against OpenClaw's tool - // surface. For the scaffold we echo not-implemented. - ws.send(JSON.stringify({ - type: 'tool_result', - call_id: frame.call_id, - ok: false, - error: `tool dispatch not implemented (tool=${frame.tool})`, - })) + case 'reply': { + const pending = conn.pending + if (!pending) { + log.warn(`unsolicited reply session=${conn.openclawSessionKey} content_len=${(frame.content ?? '').length}`) + return + } + if (typeof frame.content === 'string' && frame.content.length > 0) { + pending.partialChunks.push(frame.content) + } + if (frame.final !== false) { + const full = pending.partialChunks.join('') + conn.pending = null + pending.resolveReply(full) + } return - - case 'permission_request': - // TODO: route to the user via the original source channel - // (Discord/Telegram/…). The OpenClaw side knows where the session - // came from. For now, log and auto-deny. - process.stderr.write(`synthesis: permission_request ${frame.request_id} (auto-denied)\n`) - ws.send(JSON.stringify({ - type: 'permission_reply', - request_id: frame.request_id, - behavior: 'deny', - })) - return - + } case 'pong': conn.isAlive = true return - default: - process.stderr.write(`synthesis: unknown frame type from plugin: ${frame.type}\n`) + log.warn(`unknown frame type=${frame.type}`) } }) ws.on('close', () => { - if (conn && byOpenclawSession.get(conn.openclawSessionId)?.ws === ws) { - byOpenclawSession.delete(conn.openclawSessionId) + if (conn && byKey.get(conn.openclawSessionKey)?.ws === ws) { + byKey.delete(conn.openclawSessionKey) + if (conn.pending) conn.pending.rejectReply(new Error('ClaudePlugin disconnected mid-turn')) } }) }) - // Heartbeat: ping every 30s, kill if no pong by next round. + // Heartbeat const heartbeat = setInterval(() => { - for (const c of byOpenclawSession.values()) { + for (const c of byKey.values()) { if (!c.isAlive) { try { c.ws.terminate() } catch {} continue } c.isAlive = false - try { c.ws.send(JSON.stringify({ type: 'ping', ts: Date.now() })) } catch {} + try { c.ws.send(JSON.stringify({ type: 'ping' })) } catch {} } - }, 30_000) + }, HEARTBEAT_MS) - process.stderr.write(`synthesis: bridge listening on ws://127.0.0.1:${config.bridgePort}/bridge\n`) + // ── HTTP server (OpenAI-compatible /v1/chat/completions) ──────────────────── + const httpServer = http.createServer(async (req, res) => { + if (req.method === 'GET' && req.url === '/health') { + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ ok: true, processes: processManager.list().length, conns: byKey.size })) + return + } + if (!(req.method === 'POST' && (req.url === '/v1/chat/completions' || req.url === '/chat/completions'))) { + res.writeHead(404); res.end(); return + } + + let bodyRaw = '' + req.on('data', c => { bodyRaw += c }) + req.on('end', async () => { + let body: any + try { body = JSON.parse(bodyRaw) } catch { + res.writeHead(400, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ error: 'invalid_json' })); return + } + + // Minimal session-key extraction for v1 — use headers where OpenClaw + // doesn't natively send them. Production OpenClaw runtime injects + // session/agent info via the system prompt's Runtime block; we'll + // upgrade to that parser when we wire into the real plugin runtime. + const agentId = (req.headers['x-openclaw-agent-id'] as string) ?? body.user ?? 'default' + const chatId = (req.headers['x-openclaw-chat-id'] as string) ?? '' + const workspace = (req.headers['x-openclaw-workspace'] as string) + ?? deps.resolveAgent?.(agentId)?.workspace + ?? process.cwd() + const sessionKey = chatId ? `${agentId}::${chatId}` : agentId + + const latestUser = extractLatestUserText(body) + if (!latestUser) { + res.writeHead(400, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ error: 'no user message found' })); return + } + + const completionId = `chatcmpl-synthesis-${randomUUID().slice(0, 8)}` + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Transfer-Encoding': 'chunked', + }) + + // SSE heartbeat: empty content delta every HEARTBEAT_MS to keep + // OpenClaw's LLM idle watchdog from firing during long quiet turns. + // (SSE comment frames are not counted as model progress upstream.) + const sseHeartbeat = setInterval(() => { + if (res.writableEnded) return + try { writeSseChunk(res, completionId, '') } catch { /* ignore */ } + }, HEARTBEAT_MS) + sseHeartbeat.unref?.() + + const clientClosed = { v: false } + req.on('close', () => { clientClosed.v = true }) + + // ── Per-sessionKey FIFO queue ──────────────────────────────────────── + let releaseSlot: () => void = () => {} + const mySlot = new Promise(r => { releaseSlot = r }) + const prev = queueBySession.get(sessionKey) ?? Promise.resolve() + const myTail = prev.then(() => mySlot, () => mySlot) + queueBySession.set(sessionKey, myTail) + + try { + await prev.catch(() => {}) + if (clientClosed.v) return + + log.info(`dispatch session=${sessionKey} workspace=${workspace} msg=${latestUser.substring(0, 80)}`) + + const handle = await processManager.ensure(sessionKey, workspace) + processManager.touch(sessionKey) + const conn = byKey.get(sessionKey) + if (!conn) throw new Error('no ws connection for session after ensure()') + + const replyText = await dispatchInbound(conn, latestUser, sessionKey) + if (!clientClosed.v && !res.writableEnded) { + writeSseChunk(res, completionId, replyText) + writeSseStop(res, completionId) + res.write('data: [DONE]\n\n') + res.end() + } + } catch (err) { + log.warn(`dispatch error: ${(err as Error).message}`) + if (!clientClosed.v && !res.writableEnded) { + writeSseChunk(res, completionId, `[synthesis-bridge error: ${(err as Error).message}]`) + writeSseStop(res, completionId) + res.write('data: [DONE]\n\n') + res.end() + } + } finally { + clearInterval(sseHeartbeat) + releaseSlot() + } + }) + }) + + httpServer.listen(config.bridgePort, '127.0.0.1', () => { + log.info(`HTTP listening on 127.0.0.1:${config.bridgePort}`) + log.info(`WS listening on 127.0.0.1:${config.channelWsPort}/bridge`) + }) + + function dispatchInbound(conn: ClientConn, content: string, sessionKey: string): Promise { + if (conn.pending) return Promise.reject(new Error('connection already has a pending dispatch')) + return new Promise((resolve, reject) => { + conn.pending = { resolveReply: resolve, rejectReply: reject, partialChunks: [] } + const inbound = { + type: 'inbound', + content, + meta: { + chat_id: sessionKey, + message_id: randomUUID(), + ts: new Date().toISOString(), + }, + } + try { conn.ws.send(JSON.stringify(inbound)) } + catch (e) { conn.pending = null; reject(e as Error) } + }) + } return { - /** Public helper for outsiders (e.g. inbound event subscriber) to push messages. */ - pushInbound(openclawSessionId: string, payload: { content: string; meta: Record; request_id?: string }) { - const c = byOpenclawSession.get(openclawSessionId) - if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`) - c.ws.send(JSON.stringify({ type: 'inbound_message', ...payload })) - }, - pushPermissionReply(openclawSessionId: string, request_id: string, behavior: 'allow' | 'deny') { - const c = byOpenclawSession.get(openclawSessionId) - if (!c) throw new Error(`no live plugin for session ${openclawSessionId}`) - c.ws.send(JSON.stringify({ type: 'permission_reply', request_id, behavior })) - }, + httpServer, + wss, async close() { clearInterval(heartbeat) - for (const c of byOpenclawSession.values()) try { c.ws.close() } catch {} - await new Promise(r => wss.close(() => r())) + for (const c of byKey.values()) try { c.ws.close() } catch {} + await Promise.all([ + new Promise(r => wss.close(() => r())), + new Promise(r => httpServer.close(() => r())), + ]) }, - } as unknown as { close(): Promise } + } +} + +// ── helpers ──────────────────────────────────────────────────────────────────── + +function extractLatestUserText(body: any): string { + const messages = body?.messages + if (!Array.isArray(messages)) return '' + for (let i = messages.length - 1; i >= 0; i--) { + const m = messages[i] + if (m?.role !== 'user') continue + if (typeof m.content === 'string') return m.content + if (Array.isArray(m.content)) { + return m.content.filter((c: any) => c?.type === 'text').map((c: any) => c.text ?? '').join('\n') + } + } + return '' +} + +function writeSseChunk(res: http.ServerResponse, id: string, text: string): void { + const chunk = JSON.stringify({ + id, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: 'synthesis-claude-bridge', + choices: [{ index: 0, delta: { content: text }, finish_reason: null }], + }) + res.write(`data: ${chunk}\n\n`) +} + +function writeSseStop(res: http.ServerResponse, id: string): void { + const chunk = JSON.stringify({ + id, + object: 'chat.completion.chunk', + created: Math.floor(Date.now() / 1000), + model: 'synthesis-claude-bridge', + choices: [{ index: 0, delta: {}, finish_reason: 'stop' }], + }) + res.write(`data: ${chunk}\n\n`) }