import http from "node:http"; import { randomUUID } from "node:crypto"; import type { BridgeInboundRequest } from "../core/types/model.js"; import { buildSessionKey, extractLatestUserMessage, extractRequestContext, resolveDispatchPrompt, } from "./input-filter.js"; import { buildBootstrap } from "./bootstrap.js"; import { dispatchToClaude } from "../core/claude/sdk-adapter.js"; import { dispatchToGemini } from "../core/gemini/sdk-adapter.js"; import { getGlobalPluginRegistry } from "openclaw/plugin-sdk/plugin-runtime"; import { getSession, putSession, markOrphaned, removeSession, } from "../core/contractor/session-map-store.js"; export type BridgeServerConfig = { port: number; apiKey: string; permissionMode: string; /** Fallback: resolve workspace from agent id if not parseable from system prompt */ resolveAgent?: ( agentId: string, sessionKey: string, ) => { workspace: string } | null; logger: { info: (msg: string) => void; warn: (msg: string) => void }; }; function sendJson(res: http.ServerResponse, status: number, body: unknown): void { const json = JSON.stringify(body); res.writeHead(status, { "Content-Type": "application/json; charset=utf-8", "Content-Length": Buffer.byteLength(json), }); res.end(json); } /** Write a single SSE data line. */ function sseWrite(res: http.ServerResponse, data: string): void { res.write(`data: ${data}\n\n`); } /** Build an OpenAI streaming chunk for a text delta. */ function buildChunk(id: string, text: string): string { return JSON.stringify({ id, object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: "contractor-claude-bridge", choices: [{ index: 0, delta: { content: text }, finish_reason: null }], }); } /** Build the final stop chunk. */ function buildStopChunk(id: string): string { return JSON.stringify({ id, object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: "contractor-claude-bridge", choices: [{ index: 0, delta: {}, finish_reason: "stop" }], }); } function parseBodyRaw(req: http.IncomingMessage): Promise { return new Promise((resolve, reject) => { let body = ""; req.on("data", (chunk: Buffer) => { body += chunk.toString("utf8"); if (body.length > 4_000_000) req.destroy(new Error("body too large")); }); req.on("end", () => { try { resolve(JSON.parse(body)); } catch (e) { reject(e); } }); req.on("error", reject); }); } function parseBody(req: http.IncomingMessage): Promise { return parseBodyRaw(req) as Promise; } /** * Per-sessionKey FIFO queue: same-session turns serialize so a user firing * multiple Discord messages back-to-back gets them processed in order rather * than spawning concurrent claude subprocesses. Cross-session requests bypass * each other's chains and run in parallel. * * The chain is `prev.then(() => mySlot)` per request — `mySlot` is released * in the request's finally block, so the next request waits until our work * completes (success or failure) before starting. */ const queueBySession = new Map>(); /** * SSE keepalive cadence. Bridge writes `: keepalive\n\n` (an SSE comment * frame, no-op for the OpenAI stream parser) on this interval while a turn * is in flight or queued. This keeps OpenClaw's LLM idle watchdog (default * 120s of stream silence → attempt failure → retry) from firing when the * underlying claude subprocess is in a long quiet tool-call phase or while * we're waiting our turn in the per-session queue. */ const HEARTBEAT_MS = 30_000; const _G = globalThis as Record; export function createBridgeServer(config: BridgeServerConfig): http.Server { const { port, apiKey, permissionMode, resolveAgent, logger } = config; async function handleChatCompletions( req: http.IncomingMessage, res: http.ServerResponse, ): Promise { let body: BridgeInboundRequest; try { body = await parseBody(req); } catch { sendJson(res, 400, { error: "invalid_json" }); return; } // Extract agent ID, workspace, chat id, and bare-reset signal from the // request. OpenClaw does NOT send agent/session info as HTTP headers — it // lives in the system prompt's Runtime line and the user envelope's // "Conversation info" untrusted-metadata block. const { agentId: parsedAgentId, workspace: parsedWorkspace, skillsBlock, workspaceContextFiles, chatId, bareSessionReset, } = extractRequestContext(body); const latestMessage = extractLatestUserMessage(body); // Pick the prompt to forward to the CLI. For bare /new turns OpenClaw // submits an empty user message — we synthesize a stub prompt instead so // the CLI has something to respond to. const dispatchPrompt = resolveDispatchPrompt(latestMessage, { bareSessionReset }); if (!dispatchPrompt) { sendJson(res, 400, { error: "no user message found" }); return; } // Scope the CLI session by (agentId, chat_id) so different Discord // channels / DMs / etc. for the same agent don't pile into one Claude // session and bleed context across surfaces. Falls back to agentId-only // when chat_id can't be parsed (local TUI, etc.). const agentId = parsedAgentId; const sessionKey = buildSessionKey(agentId, chatId); logger.info( `[contractor-bridge] turn agentId=${agentId} sessionKey=${sessionKey} workspace=${parsedWorkspace} bareReset=${bareSessionReset} msg=${dispatchPrompt.substring(0, 80)}`, ); // Resolve workspace: prefer what we parsed from the system prompt (most accurate); // fall back to openclaw.json lookup for validation. let workspace = parsedWorkspace; if (!workspace && agentId) { const agentMeta = resolveAgent?.(agentId, sessionKey); if (agentMeta) workspace = agentMeta.workspace; } if (!workspace) { logger.warn(`[contractor-bridge] could not resolve workspace agentId=${agentId}`); workspace = "/tmp"; } // Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude const isGemini = typeof body.model === "string" && body.model.includes("gemini"); // ── Abort propagation ──────────────────────────────────────────────────── // OpenClaw's LLM client cancels in-flight HTTP requests via AbortSignal // when an attempt fails or the user cancels. On the wire this manifests // as the client closing the socket, surfaced here as `req.on('close')`. // We mirror that signal into our own AbortController and propagate it // down to the dispatchTo{Claude,Gemini} adapter, which kills the // subprocess. Without this, OpenClaw silently drops the response while // the subprocess keeps running, and a retry spawns another subprocess // alongside the orphan — the root cause of the duplicate-Discord-message // bug we hit in May 2026. const abortController = new AbortController(); let clientDisconnected = false; const onClose = (): void => { if (clientDisconnected) return; clientDisconnected = true; logger.info( `[contractor-bridge] client disconnected sessionKey=${sessionKey} — aborting in-flight work`, ); abortController.abort(); }; req.on("close", onClose); // Start SSE response immediately so OpenClaw sees the response status // quickly. We may still spend time waiting in the per-session queue // before any real chunk goes out — heartbeat (below) keeps that quiet // window from tripping the upstream idle watchdog. res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "Transfer-Encoding": "chunked", }); // ── SSE heartbeat ──────────────────────────────────────────────────────── // OpenClaw's idle timeout fires after 120s of total stream silence with // no model progress (`LLM idle timeout (120s): no response from model`). // claude -p can easily produce 120s+ of zero assistant-text output during // long Bash / file / MCP tool sequences, since this bridge only forwards // assistant text deltas as SSE chunks (tool_use blocks are not surfaced). // A periodic SSE comment frame counts as bytes on the wire and resets // the upstream idle timer, while being a no-op for the OpenAI stream // parser. const heartbeat = setInterval(() => { if (clientDisconnected || res.writableEnded) return; try { res.write(": keepalive\n\n"); } catch { /* socket dead, ignore */ } }, HEARTBEAT_MS); heartbeat.unref?.(); // ── Per-sessionKey FIFO queue ──────────────────────────────────────────── // Same-sessionKey turns serialize: if a user fires multiple Discord // messages quickly, we process them in order rather than starting // concurrent claude subprocesses (which would corrupt the shared session // file and produce overlapping replies). Cross-sessionKey requests run // in parallel — they live on independent chains. let releaseSlot: () => void = () => {}; const mySlot = new Promise((r) => { releaseSlot = r; }); const prev = sessionKey ? queueBySession.get(sessionKey) ?? Promise.resolve() : Promise.resolve(); // `myChainTail` advances only after `releaseSlot()` (called in our // finally block). Tolerate prev rejecting so a crashed earlier turn // doesn't poison the chain forever. const myChainTail = prev.then(() => mySlot, () => mySlot); if (sessionKey) queueBySession.set(sessionKey, myChainTail); const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`; let newSessionId = ""; let hasError = false; let resultErrorReason: string | null = null; let existingEntry: ReturnType = null; try { // Block until the previous turn on this sessionKey finishes. // Cross-session requests skip this wait (their `prev` is a fresh // Promise.resolve()). await prev.catch(() => {}); // While queued, OpenClaw may have given up on this request (attempt // timeout, user cancel, etc.) and closed the socket. If so, exit // cleanly without dispatching — the client wouldn't see the output // anyway. if (clientDisconnected) { logger.info( `[contractor-bridge] dropped queued request sessionKey=${sessionKey} (client disconnected during queue wait)`, ); return; } // Look up the session-map entry NOW (at the head of the queue), not // earlier — the previous turn on the same sessionKey may have just // updated `claudeSessionId` and we want to resume into the latest // one rather than a stale snapshot from request-arrival time. existingEntry = sessionKey ? getSession(workspace, sessionKey) : null; // Detect a fresh OpenClaw session even when the bare-reset marker is // absent. The marker only arrives when `/new` is the body of *this* // user turn (see get-reply isBareSessionReset). When `/new` is sent // as a separate slash command (e.g. via Discord's slash UI), OpenClaw // processes the reset in a side lane that doesn't hit the bridge — // it just renames the prior session file aside. The follow-up real // message then arrives on a brand-new OpenClaw session, but as a // normal turn with no marker. Without this check, the bridge happily // resumes the long-stale claudeSessionId from before the reset. // // OpenClaw sends the full conversation history every turn (system + // user/assistant pairs + latest user). A request with zero assistant // turns is therefore a positive signal that the OpenClaw session is // brand-new and any prior claudeSessionId we hold is from a previous // OpenClaw session that the user already abandoned. const hasAssistantHistory = body.messages.some((m) => { if (m.role !== "assistant") return false; if (typeof m.content === "string") return m.content.trim().length > 0; return m.content.some( (c) => c.type === "text" && (c.text ?? "").trim().length > 0, ); }); const isFreshOpenClawSession = !hasAssistantHistory; if ((bareSessionReset || isFreshOpenClawSession) && existingEntry && sessionKey) { const reason = bareSessionReset ? "bare /new detected" : "fresh OpenClaw session (no assistant history in messages[])"; logger.info( `[contractor-bridge] ${reason} — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`, ); removeSession(workspace, sessionKey); existingEntry = null; } const resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null; // Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files). // For Claude: --append-system-prompt appends to the built-in prompt each invocation. // For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn. // This keeps persona and skills current without needing to track first-turn state. const systemPrompt = buildBootstrap({ agentId, openclawSessionKey: sessionKey, workspace, skillsBlock: skillsBlock || undefined, workspaceContextFiles, }); const openclawTools = body.tools ?? []; const dispatchIter = isGemini ? dispatchToGemini({ prompt: dispatchPrompt, systemPrompt, workspace, agentId, resumeSessionId: resumeSessionId ?? undefined, openclawTools, bridgePort: port, bridgeApiKey: apiKey, signal: abortController.signal, }) : dispatchToClaude({ prompt: dispatchPrompt, systemPrompt, workspace, agentId, resumeSessionId: resumeSessionId ?? undefined, permissionMode, openclawTools, bridgePort: port, bridgeApiKey: apiKey, signal: abortController.signal, }); try { for await (const event of dispatchIter) { if (event.type === "text") { if (!clientDisconnected) sseWrite(res, buildChunk(completionId, event.text)); } else if (event.type === "done") { newSessionId = event.sessionId; } else if (event.type === "result_error") { // CLI signaled a fatal-but-graceful error (context overflow, // refusal, billing, etc.) via `is_error: true` on the result // event. The text was already streamed via prior `text` events; // record the reason so the bridge can drop the session-map entry // below. logger.warn( `[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`, ); resultErrorReason = event.reason; newSessionId = event.sessionId; } else if (event.type === "error") { logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`); hasError = true; if (!clientDisconnected) { sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`)); } } } } catch (err) { logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`); hasError = true; if (!clientDisconnected) { sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`)); } } if (!clientDisconnected && !res.writableEnded) { sseWrite(res, buildStopChunk(completionId)); sseWrite(res, "[DONE]"); } // Session-map persistence: // - Successful turn → upsert with the latest claudeSessionId so the next // turn can `--resume` into it. // - Terminal CLI error (context overflow etc., reported via result_error) // → drop the entry so the next turn starts fresh instead of resuming // into the same poisoned session and re-erroring. // - Stream/transport error before any sessionId was captured → mark the // prior entry orphaned (existing behavior). // - Aborted (client disconnected) → don't touch the session-map; the // subprocess may have already updated its own session file on disk, // and the next turn will resume from whatever it left there. if (clientDisconnected) { // No-op: aborted turn; preserve whatever the prior entry was so the // next turn (likely an OpenClaw retry of the same prompt) can resume. } else if (resultErrorReason && sessionKey) { logger.info( `[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`, ); removeSession(workspace, sessionKey); } else if (newSessionId && sessionKey && !hasError) { const now = new Date().toISOString(); putSession(workspace, { openclawSessionKey: sessionKey, agentId, contractor: isGemini ? "gemini" : "claude", claudeSessionId: newSessionId, workspace, createdAt: existingEntry?.createdAt ?? now, lastActivityAt: now, state: "active", }); logger.info( `[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`, ); } else if (hasError && sessionKey && existingEntry) { markOrphaned(workspace, sessionKey); } } finally { clearInterval(heartbeat); req.off("close", onClose); // Release our slot so the next request on this sessionKey can proceed. // Must happen even on abort/throw, otherwise the chain stalls forever. releaseSlot(); // Garbage-collect the queue entry if no later request chained on us. // The Map identity check is the only safe way to tell — newer requests // overwrite the entry with their own chain tail. if (sessionKey && queueBySession.get(sessionKey) === myChainTail) { queueBySession.delete(sessionKey); } if (!res.writableEnded) { try { res.end(); } catch { /* ignore */ } } } } const server = http.createServer(async (req, res) => { const url = req.url ?? "/"; const method = req.method ?? "GET"; // Auth check if (apiKey) { const auth = req.headers.authorization ?? ""; if (auth !== `Bearer ${apiKey}`) { sendJson(res, 401, { error: "unauthorized" }); return; } } if (method === "GET" && url === "/health") { sendJson(res, 200, { ok: true, service: "contractor-bridge" }); return; } if (method === "GET" && url === "/v1/models") { sendJson(res, 200, { object: "list", data: [ { id: "contractor-claude-bridge", object: "model", created: Math.floor(Date.now() / 1000), owned_by: "contractor-agent", }, { id: "contractor-gemini-bridge", object: "model", created: Math.floor(Date.now() / 1000), owned_by: "contractor-agent", }, ], }); return; } if (method === "POST" && url === "/v1/chat/completions") { try { await handleChatCompletions(req, res); } catch (err) { logger.warn(`[contractor-bridge] unhandled error: ${String(err)}`); if (!res.headersSent) { sendJson(res, 500, { error: "internal server error" }); } } return; } // Debug: list registered tools in the global plugin registry if (method === "GET" && url === "/debug/tools") { const pluginRegistry = getGlobalPluginRegistry(); const liveConfig = _G["_contractorOpenClawConfig"] ?? null; if (!pluginRegistry) { sendJson(res, 200, { error: "registry not available" }); } else { sendJson(res, 200, { configAvailable: liveConfig !== null, tools: pluginRegistry.tools.map((t) => ({ names: t.names, pluginId: t.pluginId })), }); } return; } // MCP proxy tool execution callback. // Called by services/openclaw-mcp-server.mjs when Claude Code invokes an MCP tool. // Relays the call to OpenClaw's actual tool implementation via the global plugin registry. if (method === "POST" && url === "/mcp/execute") { let body: { tool?: string; args?: Record; workspace?: string; agentId?: string }; try { body = await parseBodyRaw(req) as { tool?: string; args?: Record; workspace?: string; agentId?: string }; } catch { sendJson(res, 400, { error: "invalid_json" }); return; } const toolName = body.tool ?? ""; const toolArgs = body.args ?? {}; const execWorkspace = body.workspace ?? ""; const execAgentId = body.agentId ?? ""; logger.info(`[contractor-bridge] mcp/execute tool=${toolName} workspace=${execWorkspace} agentId=${execAgentId}`); // Relay to OpenClaw's actual tool implementation via the global plugin registry. // This avoids reimplementing tool logic in the bridge — we call the real tool factory. try { const pluginRegistry = getGlobalPluginRegistry(); if (!pluginRegistry) { sendJson(res, 200, { error: `[mcp/execute] plugin registry not available (tool=${toolName})` }); return; } // Build tool execution context (needed before tool lookup for factory instantiation). const liveConfig = (_G["_contractorOpenClawConfig"] ?? null) as Record | null; const canonicalSessionKey = execAgentId ? `agent:${execAgentId}:direct:bridge` : undefined; const toolCtx = { config: liveConfig ?? undefined, workspaceDir: execWorkspace || undefined, agentDir: execWorkspace || undefined, agentId: execAgentId || undefined, sessionKey: canonicalSessionKey, }; // Find tool by name. Some plugins register tools via factory functions without // declaring names upfront (names array is empty). For those, instantiate the // factory and match by the returned tool's .name property. let toolReg = pluginRegistry.tools.find((t) => t.names.includes(toolName)); if (!toolReg) { for (const candidate of pluginRegistry.tools) { if (candidate.names.length > 0) continue; try { const inst = candidate.factory(toolCtx); const items = Array.isArray(inst) ? inst : [inst]; if (items.some((t) => (t as { name?: string }).name === toolName)) { toolReg = candidate; break; } } catch { /* skip */ } } } if (!toolReg) { sendJson(res, 200, { error: `Tool '${toolName}' not registered in OpenClaw plugin registry` }); return; } // Instantiate the tool via its factory const toolOrTools = toolReg.factory(toolCtx); const toolInstance = Array.isArray(toolOrTools) ? toolOrTools.find((t) => (t as { name?: string }).name === toolName) : toolOrTools; if (!toolInstance || typeof (toolInstance as { execute?: unknown }).execute !== "function") { sendJson(res, 200, { error: `Tool '${toolName}' factory returned no executable tool` }); return; } const execFn = (toolInstance as { execute: (id: string, args: unknown) => Promise<{ content?: Array<{ type: string; text?: string }> }> }).execute; const toolResult = await execFn(randomUUID(), toolArgs); // Extract text content from AgentToolResult const text = (toolResult.content ?? []) .filter((c) => c.type === "text" && c.text) .map((c) => c.text as string) .join("\n"); sendJson(res, 200, { result: text || "(no result)" }); } catch (err) { logger.warn(`[contractor-bridge] mcp/execute error tool=${toolName}: ${String(err)}`); sendJson(res, 200, { error: `Tool execution failed: ${String(err)}` }); } return; } sendJson(res, 404, { error: "not_found" }); }); server.listen(port, "127.0.0.1", () => { logger.info(`[contractor-bridge] sidecar listening on 127.0.0.1:${port}`); }); return server; }