Race observed on first turn: ClaudePlugin emits notifications/claude/channel before Claude Code finishes registering the channel handler internally. Claude logs "MCP server synthesis: Channel notifications registered" ~25ms AFTER hello_ack arrives. Notifications that arrive earlier are silently dropped — observed empty session, no turn, curl hangs until timeout. Fix: on first `inbound` frame after WS hello, wait 800ms before emitting the MCP notification. Subsequent inbounds skip the wait. End-to-end verified twice on laptop. Cold-start ~10s, hot path 2-3s.
183 lines
6.6 KiB
TypeScript
183 lines
6.6 KiB
TypeScript
#!/usr/bin/env bun
|
|
/**
|
|
* SynthesisAgent.ClaudePlugin
|
|
*
|
|
* Spawned by SynthesisAgent.OpenclawPlugin inside every long-lived Claude
|
|
* process. Acts as a Claude Code `--channels server:<channelName>` source.
|
|
*
|
|
* bridge WS this process Claude Code
|
|
* ────────────────────── ────────────────────────── ──────────────────────
|
|
* inbound → → → → → → notifications/claude/channel → → → new turn
|
|
* reply ← ← ← ← ← ← tools/call: reply(text) ← ← assistant turn
|
|
*
|
|
* Env vars set by the spawning plugin:
|
|
* SYNTHESIS_WS_URL ws://127.0.0.1:18901/bridge
|
|
* SYNTHESIS_OPENCLAW_SESSION e.g. "developer::discord:channel:123"
|
|
* SYNTHESIS_CLAUDE_SESSION UUID, for traceability
|
|
*
|
|
* Same-machine deployment — no auth handshake, no permission_request reverse
|
|
* channel (both sides have full perms by config).
|
|
*/
|
|
|
|
import { Server } from '@modelcontextprotocol/sdk/server/index.js'
|
|
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
|
|
import {
|
|
ListToolsRequestSchema,
|
|
CallToolRequestSchema,
|
|
} from '@modelcontextprotocol/sdk/types.js'
|
|
import WebSocket from 'ws'
|
|
|
|
const WS_URL = process.env.SYNTHESIS_WS_URL
|
|
const OPENCLAW_SESSION = process.env.SYNTHESIS_OPENCLAW_SESSION
|
|
const CLAUDE_SESSION = process.env.SYNTHESIS_CLAUDE_SESSION ?? ''
|
|
|
|
if (!WS_URL || !OPENCLAW_SESSION) {
|
|
process.stderr.write(
|
|
`synthesis-claude: required env vars missing\n` +
|
|
` SYNTHESIS_WS_URL=${WS_URL ?? '(unset)'}\n` +
|
|
` SYNTHESIS_OPENCLAW_SESSION=${OPENCLAW_SESSION ?? '(unset)'}\n`,
|
|
)
|
|
process.exit(1)
|
|
}
|
|
|
|
const log = (s: string) => process.stderr.write(`[synthesis-claude ${new Date().toISOString()}] ${s}\n`)
|
|
|
|
process.on('unhandledRejection', e => log(`unhandled rejection: ${e}`))
|
|
process.on('uncaughtException', e => log(`uncaught exception: ${e}`))
|
|
|
|
// ─── MCP server ──────────────────────────────────────────────────────────────
|
|
|
|
const mcp = new Server(
|
|
{ name: 'synthesis', version: '0.0.1' },
|
|
{
|
|
capabilities: {
|
|
tools: {},
|
|
// Channel capabilities MUST be nested under `experimental` — Claude
|
|
// Code's gating function `poH` (in pi-embedded-*.js) reads
|
|
// `caps.experimental['claude/channel']`. Top-level placement is silently
|
|
// ignored with: "server did not declare claude/channel capability".
|
|
experimental: {
|
|
'claude/channel': {},
|
|
},
|
|
},
|
|
},
|
|
)
|
|
|
|
mcp.setRequestHandler(ListToolsRequestSchema, async () => ({
|
|
tools: [
|
|
{
|
|
name: 'reply',
|
|
description:
|
|
'Send a reply back to the OpenClaw user. Call this once per inbound channel message with your final answer. ' +
|
|
'Optional `final=false` lets you stream progress updates (each chunk is buffered server-side until a final call).',
|
|
inputSchema: {
|
|
type: 'object',
|
|
properties: {
|
|
text: { type: 'string', description: 'The reply text to send back through OpenClaw.' },
|
|
final: { type: 'boolean', description: 'Defaults to true. Set false to stream a progress chunk.' },
|
|
},
|
|
required: ['text'],
|
|
},
|
|
},
|
|
],
|
|
}))
|
|
|
|
mcp.setRequestHandler(CallToolRequestSchema, async req => {
|
|
if (req.params.name !== 'reply') {
|
|
return { content: [{ type: 'text', text: `unknown tool: ${req.params.name}` }], isError: true }
|
|
}
|
|
const args = (req.params.arguments ?? {}) as { text?: string; final?: boolean }
|
|
const text = typeof args.text === 'string' ? args.text : ''
|
|
const final = args.final !== false
|
|
bridgeSend({ type: 'reply', content: text, final })
|
|
return { content: [{ type: 'text', text: final ? 'reply sent' : 'reply chunk sent' }] }
|
|
})
|
|
|
|
// ─── Bridge WebSocket ────────────────────────────────────────────────────────
|
|
|
|
let ws: WebSocket | null = null
|
|
let backoff = 1000
|
|
const outbox: unknown[] = []
|
|
let firstInboundDelivered = false
|
|
|
|
function bridgeSend(frame: unknown): void {
|
|
if (ws && ws.readyState === WebSocket.OPEN) {
|
|
ws.send(JSON.stringify(frame))
|
|
} else {
|
|
outbox.push(frame)
|
|
}
|
|
}
|
|
|
|
function connect(): void {
|
|
ws = new WebSocket(WS_URL!)
|
|
|
|
ws.on('open', () => {
|
|
log(`bridge connected: ${WS_URL}`)
|
|
backoff = 1000
|
|
ws!.send(
|
|
JSON.stringify({
|
|
type: 'hello',
|
|
openclaw_session: OPENCLAW_SESSION,
|
|
claude_session: CLAUDE_SESSION,
|
|
pid: process.pid,
|
|
}),
|
|
)
|
|
while (outbox.length) {
|
|
try { ws!.send(JSON.stringify(outbox.shift())) } catch { break }
|
|
}
|
|
})
|
|
|
|
ws.on('message', raw => {
|
|
let frame: any
|
|
try { frame = JSON.parse(String(raw)) } catch { return log(`bad frame: ${raw}`) }
|
|
handleBridge(frame).catch(e => log(`handler error: ${e}`))
|
|
})
|
|
|
|
ws.on('close', () => {
|
|
log(`bridge disconnected, reconnect in ${backoff}ms`)
|
|
setTimeout(connect, backoff)
|
|
backoff = Math.min(backoff * 2, 30_000)
|
|
})
|
|
|
|
ws.on('error', err => log(`bridge error: ${(err as Error).message}`))
|
|
}
|
|
|
|
async function handleBridge(frame: any): Promise<void> {
|
|
switch (frame.type) {
|
|
case 'hello_ack':
|
|
log(`hello_ack received`)
|
|
return
|
|
case 'inbound':
|
|
// Race guard: Claude Code's internal "Channel notifications registered"
|
|
// log line appears ~25ms AFTER we send hello_ack — if we push the
|
|
// notification before then, claude silently drops it. Buffer the
|
|
// first inbound briefly to dodge this. Subsequent inbounds don't
|
|
// need the wait (channel is registered for the lifetime of the
|
|
// session).
|
|
if (!firstInboundDelivered) {
|
|
await new Promise(r => setTimeout(r, 800))
|
|
firstInboundDelivered = true
|
|
}
|
|
await mcp.notification({
|
|
method: 'notifications/claude/channel',
|
|
params: {
|
|
content: frame.content,
|
|
meta: frame.meta ?? {},
|
|
},
|
|
})
|
|
log(`inbound pushed to channel (content_len=${(frame.content ?? '').length})`)
|
|
return
|
|
case 'ping':
|
|
bridgeSend({ type: 'pong' })
|
|
return
|
|
default:
|
|
log(`unknown frame type=${frame.type}`)
|
|
}
|
|
}
|
|
|
|
// ─── Boot ────────────────────────────────────────────────────────────────────
|
|
|
|
await mcp.connect(new StdioServerTransport())
|
|
log(`MCP transport ready (session=${OPENCLAW_SESSION}, pid=${process.pid})`)
|
|
connect()
|