import http from "node:http"; import { randomUUID } from "node:crypto"; import type { BridgeInboundRequest } from "../core/types/model.js"; import { extractLatestUserMessage, extractRequestContext } from "./input-filter.js"; import { buildBootstrap } from "./bootstrap.js"; import { dispatchToClaude } from "../core/claude/sdk-adapter.js"; import { dispatchToGemini } from "../core/gemini/sdk-adapter.js"; import { getGlobalPluginRegistry } from "openclaw/plugin-sdk/plugin-runtime"; import { getSession, putSession, markOrphaned, } from "../core/contractor/session-map-store.js"; export type BridgeServerConfig = { port: number; apiKey: string; permissionMode: string; /** Fallback: resolve workspace from agent id if not parseable from system prompt */ resolveAgent?: ( agentId: string, sessionKey: string, ) => { workspace: string } | null; logger: { info: (msg: string) => void; warn: (msg: string) => void }; }; function sendJson(res: http.ServerResponse, status: number, body: unknown): void { const json = JSON.stringify(body); res.writeHead(status, { "Content-Type": "application/json; charset=utf-8", "Content-Length": Buffer.byteLength(json), }); res.end(json); } /** Write a single SSE data line. */ function sseWrite(res: http.ServerResponse, data: string): void { res.write(`data: ${data}\n\n`); } /** Build an OpenAI streaming chunk for a text delta. */ function buildChunk(id: string, text: string): string { return JSON.stringify({ id, object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: "contractor-claude-bridge", choices: [{ index: 0, delta: { content: text }, finish_reason: null }], }); } /** Build the final stop chunk. */ function buildStopChunk(id: string): string { return JSON.stringify({ id, object: "chat.completion.chunk", created: Math.floor(Date.now() / 1000), model: "contractor-claude-bridge", choices: [{ index: 0, delta: {}, finish_reason: "stop" }], }); } function parseBodyRaw(req: http.IncomingMessage): Promise { return new Promise((resolve, reject) => { let body = ""; req.on("data", (chunk: Buffer) => { body += chunk.toString("utf8"); if (body.length > 4_000_000) req.destroy(new Error("body too large")); }); req.on("end", () => { try { resolve(JSON.parse(body)); } catch (e) { reject(e); } }); req.on("error", reject); }); } function parseBody(req: http.IncomingMessage): Promise { return parseBodyRaw(req) as Promise; } const _G = globalThis as Record; export function createBridgeServer(config: BridgeServerConfig): http.Server { const { port, apiKey, permissionMode, resolveAgent, logger } = config; async function handleChatCompletions( req: http.IncomingMessage, res: http.ServerResponse, ): Promise { let body: BridgeInboundRequest; try { body = await parseBody(req); } catch { sendJson(res, 400, { error: "invalid_json" }); return; } // Extract agent ID and workspace from the system prompt's Runtime line. // OpenClaw does NOT send agent/session info as HTTP headers — it's in the system prompt. const { agentId: parsedAgentId, workspace: parsedWorkspace, skillsBlock, workspaceContextFiles } = extractRequestContext(body); const latestMessage = extractLatestUserMessage(body); if (!latestMessage) { sendJson(res, 400, { error: "no user message found" }); return; } // Use agentId as session key — one persistent Claude session per agent (v1). const agentId = parsedAgentId; const sessionKey = agentId; // stable per-agent key logger.info( `[contractor-bridge] turn agentId=${agentId} workspace=${parsedWorkspace} msg=${latestMessage.substring(0, 80)}`, ); // Resolve workspace: prefer what we parsed from the system prompt (most accurate); // fall back to openclaw.json lookup for validation. let workspace = parsedWorkspace; if (!workspace && agentId) { const agentMeta = resolveAgent?.(agentId, sessionKey); if (agentMeta) workspace = agentMeta.workspace; } if (!workspace) { logger.warn(`[contractor-bridge] could not resolve workspace agentId=${agentId}`); workspace = "/tmp"; } // Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude const isGemini = typeof body.model === "string" && body.model.includes("gemini"); // Look up existing session (shared structure for both Claude and Gemini) let existingEntry = sessionKey ? getSession(workspace, sessionKey) : null; let resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null; // Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files). // For Claude: --system-prompt fully replaces any prior system prompt each invocation. // For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn. // This keeps persona and skills current without needing to track first-turn state. const systemPrompt = buildBootstrap({ agentId, openclawSessionKey: sessionKey, workspace, skillsBlock: skillsBlock || undefined, workspaceContextFiles, }); // Start SSE response res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "Transfer-Encoding": "chunked", }); const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`; let newSessionId = ""; let hasError = false; const openclawTools = body.tools ?? []; try { const dispatchIter = isGemini ? dispatchToGemini({ prompt: latestMessage, systemPrompt, workspace, agentId, resumeSessionId: resumeSessionId ?? undefined, openclawTools, bridgePort: port, bridgeApiKey: apiKey, }) : dispatchToClaude({ prompt: latestMessage, systemPrompt, workspace, agentId, resumeSessionId: resumeSessionId ?? undefined, permissionMode, openclawTools, bridgePort: port, bridgeApiKey: apiKey, }); for await (const event of dispatchIter) { if (event.type === "text") { sseWrite(res, buildChunk(completionId, event.text)); } else if (event.type === "done") { newSessionId = event.sessionId; } else if (event.type === "error") { logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`); hasError = true; sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`)); } } } catch (err) { logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`); hasError = true; sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`)); } sseWrite(res, buildStopChunk(completionId)); sseWrite(res, "[DONE]"); res.end(); // Persist session mapping (shared for both Claude and Gemini) if (newSessionId && sessionKey && !hasError) { const now = new Date().toISOString(); putSession(workspace, { openclawSessionKey: sessionKey, agentId, contractor: isGemini ? "gemini" : "claude", claudeSessionId: newSessionId, workspace, createdAt: existingEntry?.createdAt ?? now, lastActivityAt: now, state: "active", }); logger.info( `[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`, ); } else if (hasError && sessionKey && existingEntry) { markOrphaned(workspace, sessionKey); } } const server = http.createServer(async (req, res) => { const url = req.url ?? "/"; const method = req.method ?? "GET"; // Auth check if (apiKey) { const auth = req.headers.authorization ?? ""; if (auth !== `Bearer ${apiKey}`) { sendJson(res, 401, { error: "unauthorized" }); return; } } if (method === "GET" && url === "/health") { sendJson(res, 200, { ok: true, service: "contractor-bridge" }); return; } if (method === "GET" && url === "/v1/models") { sendJson(res, 200, { object: "list", data: [ { id: "contractor-claude-bridge", object: "model", created: Math.floor(Date.now() / 1000), owned_by: "contractor-agent", }, { id: "contractor-gemini-bridge", object: "model", created: Math.floor(Date.now() / 1000), owned_by: "contractor-agent", }, ], }); return; } if (method === "POST" && url === "/v1/chat/completions") { try { await handleChatCompletions(req, res); } catch (err) { logger.warn(`[contractor-bridge] unhandled error: ${String(err)}`); if (!res.headersSent) { sendJson(res, 500, { error: "internal server error" }); } } return; } // Debug: list registered tools in the global plugin registry if (method === "GET" && url === "/debug/tools") { const pluginRegistry = getGlobalPluginRegistry(); const liveConfig = _G["_contractorOpenClawConfig"] ?? null; if (!pluginRegistry) { sendJson(res, 200, { error: "registry not available" }); } else { sendJson(res, 200, { configAvailable: liveConfig !== null, tools: pluginRegistry.tools.map((t) => ({ names: t.names, pluginId: t.pluginId })), }); } return; } // MCP proxy tool execution callback. // Called by services/openclaw-mcp-server.mjs when Claude Code invokes an MCP tool. // Relays the call to OpenClaw's actual tool implementation via the global plugin registry. if (method === "POST" && url === "/mcp/execute") { let body: { tool?: string; args?: Record; workspace?: string; agentId?: string }; try { body = await parseBodyRaw(req) as { tool?: string; args?: Record; workspace?: string; agentId?: string }; } catch { sendJson(res, 400, { error: "invalid_json" }); return; } const toolName = body.tool ?? ""; const toolArgs = body.args ?? {}; const execWorkspace = body.workspace ?? ""; const execAgentId = body.agentId ?? ""; logger.info(`[contractor-bridge] mcp/execute tool=${toolName} workspace=${execWorkspace} agentId=${execAgentId}`); // Relay to OpenClaw's actual tool implementation via the global plugin registry. // This avoids reimplementing tool logic in the bridge — we call the real tool factory. try { const pluginRegistry = getGlobalPluginRegistry(); if (!pluginRegistry) { sendJson(res, 200, { error: `[mcp/execute] plugin registry not available (tool=${toolName})` }); return; } // Find the tool registration by name const toolReg = pluginRegistry.tools.find((t) => t.names.includes(toolName)); if (!toolReg) { sendJson(res, 200, { error: `Tool '${toolName}' not registered in OpenClaw plugin registry` }); return; } // Build tool execution context. // config is required by memory tools (resolveMemoryToolContext checks it). // Read from globalThis where index.ts stores api.config on every registration. // sessionKey must be in OpenClaw's canonical format "agent::" so that // parseAgentSessionKey() can extract the agentId for memory tool context resolution. const liveConfig = (_G["_contractorOpenClawConfig"] ?? null) as Record | null; // Construct a canonical session key so memory tools can resolve the agentId from it. // Format: "agent::direct:bridge" const canonicalSessionKey = execAgentId ? `agent:${execAgentId}:direct:bridge` : undefined; const toolCtx = { config: liveConfig ?? undefined, workspaceDir: execWorkspace || undefined, agentDir: execWorkspace || undefined, agentId: execAgentId || undefined, sessionKey: canonicalSessionKey, }; // Instantiate the tool via its factory const toolOrTools = toolReg.factory(toolCtx); const toolInstance = Array.isArray(toolOrTools) ? toolOrTools.find((t) => (t as { name?: string }).name === toolName) : toolOrTools; if (!toolInstance || typeof (toolInstance as { execute?: unknown }).execute !== "function") { sendJson(res, 200, { error: `Tool '${toolName}' factory returned no executable tool` }); return; } const execFn = (toolInstance as { execute: (id: string, args: unknown) => Promise<{ content?: Array<{ type: string; text?: string }> }> }).execute; const toolResult = await execFn(randomUUID(), toolArgs); // Extract text content from AgentToolResult const text = (toolResult.content ?? []) .filter((c) => c.type === "text" && c.text) .map((c) => c.text as string) .join("\n"); sendJson(res, 200, { result: text || "(no result)" }); } catch (err) { logger.warn(`[contractor-bridge] mcp/execute error tool=${toolName}: ${String(err)}`); sendJson(res, 200, { error: `Tool execution failed: ${String(err)}` }); } return; } sendJson(res, 404, { error: "not_found" }); }); server.listen(port, "127.0.0.1", () => { logger.info(`[contractor-bridge] sidecar listening on 127.0.0.1:${port}`); }); return server; }