The bridge was keying claudeSessionId by agentId alone, so every Discord
channel, DM, and cron run for a single agent shared one Claude CLI
session. Two consequences in the wild:
- Cross-channel context bleed: 8.7MB session for `developer` mixed
references from channels 1474327736242798612 and 1498579994044010566
plus the operator DM all in one --resume thread.
- `/new` had no effect on the CLI side. OpenClaw rotated its session
file but the bridge kept --resume-ing the same long-lived
claudeSessionId, eventually crossing the 1M model context (debug log
showed `prompt is too long: 1179616 tokens > 1000000 maximum`).
Changes:
* input-filter: extract `chat_id` from the Conversation-info
untrusted-metadata block (scanning all messages, since runtimeOnly
turns put it in the system prompt) and detect bare `/new`/`/reset`
via the BARE_SESSION_RESET_PROMPT_BASE marker. Add buildSessionKey
`${agentId}::${chatId}` and resolveDispatchPrompt fallback for the
empty user message that OpenClaw sends on bare resets.
* server: use the composite session key for getSession/putSession;
on bareSessionReset, removeSession before dispatching so the CLI
starts a fresh session; on a CLI result_error (typically
prompt_too_long) drop the entry too so the next turn doesn't
re-resume into the poisoned context.
* claude/sdk-adapter: surface CLI terminal errors via a new
`result_error` event (carries reason + sessionId) so the bridge
can react instead of just streaming the synthetic
"Prompt is too long" assistant text and silently re-using the
same session.
* index: convert register() to synchronous (OpenClaw rejects async
register with "plugin register must be synchronous"); replace the
pre-bind port probe with a server-level EADDRINUSE handler.
* .gitignore: ignore node_modules/ and dist/.
449 lines
17 KiB
TypeScript
449 lines
17 KiB
TypeScript
import http from "node:http";
|
|
import { randomUUID } from "node:crypto";
|
|
import type { BridgeInboundRequest } from "../core/types/model.js";
|
|
import {
|
|
buildSessionKey,
|
|
extractLatestUserMessage,
|
|
extractRequestContext,
|
|
resolveDispatchPrompt,
|
|
} from "./input-filter.js";
|
|
import { buildBootstrap } from "./bootstrap.js";
|
|
import { dispatchToClaude } from "../core/claude/sdk-adapter.js";
|
|
import { dispatchToGemini } from "../core/gemini/sdk-adapter.js";
|
|
import { getGlobalPluginRegistry } from "openclaw/plugin-sdk/plugin-runtime";
|
|
import {
|
|
getSession,
|
|
putSession,
|
|
markOrphaned,
|
|
removeSession,
|
|
} from "../core/contractor/session-map-store.js";
|
|
|
|
export type BridgeServerConfig = {
|
|
port: number;
|
|
apiKey: string;
|
|
permissionMode: string;
|
|
/** Fallback: resolve workspace from agent id if not parseable from system prompt */
|
|
resolveAgent?: (
|
|
agentId: string,
|
|
sessionKey: string,
|
|
) => { workspace: string } | null;
|
|
logger: { info: (msg: string) => void; warn: (msg: string) => void };
|
|
};
|
|
|
|
function sendJson(res: http.ServerResponse, status: number, body: unknown): void {
|
|
const json = JSON.stringify(body);
|
|
res.writeHead(status, {
|
|
"Content-Type": "application/json; charset=utf-8",
|
|
"Content-Length": Buffer.byteLength(json),
|
|
});
|
|
res.end(json);
|
|
}
|
|
|
|
/** Write a single SSE data line. */
|
|
function sseWrite(res: http.ServerResponse, data: string): void {
|
|
res.write(`data: ${data}\n\n`);
|
|
}
|
|
|
|
/** Build an OpenAI streaming chunk for a text delta. */
|
|
function buildChunk(id: string, text: string): string {
|
|
return JSON.stringify({
|
|
id,
|
|
object: "chat.completion.chunk",
|
|
created: Math.floor(Date.now() / 1000),
|
|
model: "contractor-claude-bridge",
|
|
choices: [{ index: 0, delta: { content: text }, finish_reason: null }],
|
|
});
|
|
}
|
|
|
|
/** Build the final stop chunk. */
|
|
function buildStopChunk(id: string): string {
|
|
return JSON.stringify({
|
|
id,
|
|
object: "chat.completion.chunk",
|
|
created: Math.floor(Date.now() / 1000),
|
|
model: "contractor-claude-bridge",
|
|
choices: [{ index: 0, delta: {}, finish_reason: "stop" }],
|
|
});
|
|
}
|
|
|
|
function parseBodyRaw(req: http.IncomingMessage): Promise<unknown> {
|
|
return new Promise((resolve, reject) => {
|
|
let body = "";
|
|
req.on("data", (chunk: Buffer) => {
|
|
body += chunk.toString("utf8");
|
|
if (body.length > 4_000_000) req.destroy(new Error("body too large"));
|
|
});
|
|
req.on("end", () => {
|
|
try {
|
|
resolve(JSON.parse(body));
|
|
} catch (e) {
|
|
reject(e);
|
|
}
|
|
});
|
|
req.on("error", reject);
|
|
});
|
|
}
|
|
|
|
function parseBody(req: http.IncomingMessage): Promise<BridgeInboundRequest> {
|
|
return parseBodyRaw(req) as Promise<BridgeInboundRequest>;
|
|
}
|
|
|
|
|
|
const _G = globalThis as Record<string, unknown>;
|
|
|
|
export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|
const { port, apiKey, permissionMode, resolveAgent, logger } = config;
|
|
|
|
async function handleChatCompletions(
|
|
req: http.IncomingMessage,
|
|
res: http.ServerResponse,
|
|
): Promise<void> {
|
|
let body: BridgeInboundRequest;
|
|
try {
|
|
body = await parseBody(req);
|
|
} catch {
|
|
sendJson(res, 400, { error: "invalid_json" });
|
|
return;
|
|
}
|
|
|
|
// Extract agent ID, workspace, chat id, and bare-reset signal from the
|
|
// request. OpenClaw does NOT send agent/session info as HTTP headers — it
|
|
// lives in the system prompt's Runtime line and the user envelope's
|
|
// "Conversation info" untrusted-metadata block.
|
|
const {
|
|
agentId: parsedAgentId,
|
|
workspace: parsedWorkspace,
|
|
skillsBlock,
|
|
workspaceContextFiles,
|
|
chatId,
|
|
bareSessionReset,
|
|
} = extractRequestContext(body);
|
|
const latestMessage = extractLatestUserMessage(body);
|
|
|
|
// Pick the prompt to forward to the CLI. For bare /new turns OpenClaw
|
|
// submits an empty user message — we synthesize a stub prompt instead so
|
|
// the CLI has something to respond to.
|
|
const dispatchPrompt = resolveDispatchPrompt(latestMessage, { bareSessionReset });
|
|
|
|
if (!dispatchPrompt) {
|
|
sendJson(res, 400, { error: "no user message found" });
|
|
return;
|
|
}
|
|
|
|
// Scope the CLI session by (agentId, chat_id) so different Discord
|
|
// channels / DMs / etc. for the same agent don't pile into one Claude
|
|
// session and bleed context across surfaces. Falls back to agentId-only
|
|
// when chat_id can't be parsed (local TUI, etc.).
|
|
const agentId = parsedAgentId;
|
|
const sessionKey = buildSessionKey(agentId, chatId);
|
|
|
|
logger.info(
|
|
`[contractor-bridge] turn agentId=${agentId} sessionKey=${sessionKey} workspace=${parsedWorkspace} bareReset=${bareSessionReset} msg=${dispatchPrompt.substring(0, 80)}`,
|
|
);
|
|
|
|
// Resolve workspace: prefer what we parsed from the system prompt (most accurate);
|
|
// fall back to openclaw.json lookup for validation.
|
|
let workspace = parsedWorkspace;
|
|
if (!workspace && agentId) {
|
|
const agentMeta = resolveAgent?.(agentId, sessionKey);
|
|
if (agentMeta) workspace = agentMeta.workspace;
|
|
}
|
|
if (!workspace) {
|
|
logger.warn(`[contractor-bridge] could not resolve workspace agentId=${agentId}`);
|
|
workspace = "/tmp";
|
|
}
|
|
|
|
// Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude
|
|
const isGemini = typeof body.model === "string" && body.model.includes("gemini");
|
|
|
|
// Look up existing session (shared structure for both Claude and Gemini).
|
|
// On a bare /new or /reset turn we deliberately drop the existing entry so
|
|
// the CLI starts a fresh session — otherwise --resume would bring back the
|
|
// very history the user just asked to abandon.
|
|
let existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
|
if (bareSessionReset && existingEntry && sessionKey) {
|
|
logger.info(
|
|
`[contractor-bridge] bare /new detected — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
|
);
|
|
removeSession(workspace, sessionKey);
|
|
existingEntry = null;
|
|
}
|
|
let resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
|
|
|
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
|
// For Claude: --system-prompt fully replaces any prior system prompt each invocation.
|
|
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
|
// This keeps persona and skills current without needing to track first-turn state.
|
|
const systemPrompt = buildBootstrap({
|
|
agentId,
|
|
openclawSessionKey: sessionKey,
|
|
workspace,
|
|
skillsBlock: skillsBlock || undefined,
|
|
workspaceContextFiles,
|
|
});
|
|
|
|
// Start SSE response
|
|
res.writeHead(200, {
|
|
"Content-Type": "text/event-stream",
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"Transfer-Encoding": "chunked",
|
|
});
|
|
|
|
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
|
let newSessionId = "";
|
|
let hasError = false;
|
|
let resultErrorReason: string | null = null;
|
|
|
|
const openclawTools = body.tools ?? [];
|
|
|
|
try {
|
|
const dispatchIter = isGemini
|
|
? dispatchToGemini({
|
|
prompt: dispatchPrompt,
|
|
systemPrompt,
|
|
workspace,
|
|
agentId,
|
|
resumeSessionId: resumeSessionId ?? undefined,
|
|
openclawTools,
|
|
bridgePort: port,
|
|
bridgeApiKey: apiKey,
|
|
})
|
|
: dispatchToClaude({
|
|
prompt: dispatchPrompt,
|
|
systemPrompt,
|
|
workspace,
|
|
agentId,
|
|
resumeSessionId: resumeSessionId ?? undefined,
|
|
permissionMode,
|
|
openclawTools,
|
|
bridgePort: port,
|
|
bridgeApiKey: apiKey,
|
|
});
|
|
|
|
for await (const event of dispatchIter) {
|
|
if (event.type === "text") {
|
|
sseWrite(res, buildChunk(completionId, event.text));
|
|
} else if (event.type === "done") {
|
|
newSessionId = event.sessionId;
|
|
} else if (event.type === "result_error") {
|
|
// CLI returned a terminal error (typically context overflow). The
|
|
// text was already streamed via prior `text` events; record the
|
|
// session so we can drop it below and log the reason.
|
|
logger.warn(
|
|
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
|
);
|
|
resultErrorReason = event.reason;
|
|
newSessionId = event.sessionId;
|
|
} else if (event.type === "error") {
|
|
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
|
hasError = true;
|
|
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
|
}
|
|
}
|
|
} catch (err) {
|
|
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
|
hasError = true;
|
|
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
|
}
|
|
|
|
sseWrite(res, buildStopChunk(completionId));
|
|
sseWrite(res, "[DONE]");
|
|
res.end();
|
|
|
|
// Session-map persistence:
|
|
// - Successful turn → upsert with the latest claudeSessionId so the next
|
|
// turn can `--resume` into it.
|
|
// - Terminal CLI error (context overflow etc., reported via result_error)
|
|
// → drop the entry so the next turn starts fresh instead of resuming
|
|
// into the same poisoned session and re-erroring.
|
|
// - Stream/transport error before any sessionId was captured → mark the
|
|
// prior entry orphaned (existing behavior).
|
|
if (resultErrorReason && sessionKey) {
|
|
logger.info(
|
|
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
|
);
|
|
removeSession(workspace, sessionKey);
|
|
} else if (newSessionId && sessionKey && !hasError) {
|
|
const now = new Date().toISOString();
|
|
putSession(workspace, {
|
|
openclawSessionKey: sessionKey,
|
|
agentId,
|
|
contractor: isGemini ? "gemini" : "claude",
|
|
claudeSessionId: newSessionId,
|
|
workspace,
|
|
createdAt: existingEntry?.createdAt ?? now,
|
|
lastActivityAt: now,
|
|
state: "active",
|
|
});
|
|
logger.info(
|
|
`[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`,
|
|
);
|
|
} else if (hasError && sessionKey && existingEntry) {
|
|
markOrphaned(workspace, sessionKey);
|
|
}
|
|
}
|
|
|
|
const server = http.createServer(async (req, res) => {
|
|
const url = req.url ?? "/";
|
|
const method = req.method ?? "GET";
|
|
|
|
// Auth check
|
|
if (apiKey) {
|
|
const auth = req.headers.authorization ?? "";
|
|
if (auth !== `Bearer ${apiKey}`) {
|
|
sendJson(res, 401, { error: "unauthorized" });
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (method === "GET" && url === "/health") {
|
|
sendJson(res, 200, { ok: true, service: "contractor-bridge" });
|
|
return;
|
|
}
|
|
|
|
if (method === "GET" && url === "/v1/models") {
|
|
sendJson(res, 200, {
|
|
object: "list",
|
|
data: [
|
|
{
|
|
id: "contractor-claude-bridge",
|
|
object: "model",
|
|
created: Math.floor(Date.now() / 1000),
|
|
owned_by: "contractor-agent",
|
|
},
|
|
{
|
|
id: "contractor-gemini-bridge",
|
|
object: "model",
|
|
created: Math.floor(Date.now() / 1000),
|
|
owned_by: "contractor-agent",
|
|
},
|
|
],
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (method === "POST" && url === "/v1/chat/completions") {
|
|
try {
|
|
await handleChatCompletions(req, res);
|
|
} catch (err) {
|
|
logger.warn(`[contractor-bridge] unhandled error: ${String(err)}`);
|
|
if (!res.headersSent) {
|
|
sendJson(res, 500, { error: "internal server error" });
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Debug: list registered tools in the global plugin registry
|
|
if (method === "GET" && url === "/debug/tools") {
|
|
const pluginRegistry = getGlobalPluginRegistry();
|
|
const liveConfig = _G["_contractorOpenClawConfig"] ?? null;
|
|
if (!pluginRegistry) {
|
|
sendJson(res, 200, { error: "registry not available" });
|
|
} else {
|
|
sendJson(res, 200, {
|
|
configAvailable: liveConfig !== null,
|
|
tools: pluginRegistry.tools.map((t) => ({ names: t.names, pluginId: t.pluginId })),
|
|
});
|
|
}
|
|
return;
|
|
}
|
|
|
|
// MCP proxy tool execution callback.
|
|
// Called by services/openclaw-mcp-server.mjs when Claude Code invokes an MCP tool.
|
|
// Relays the call to OpenClaw's actual tool implementation via the global plugin registry.
|
|
if (method === "POST" && url === "/mcp/execute") {
|
|
let body: { tool?: string; args?: Record<string, unknown>; workspace?: string; agentId?: string };
|
|
try {
|
|
body = await parseBodyRaw(req) as { tool?: string; args?: Record<string, unknown>; workspace?: string; agentId?: string };
|
|
} catch {
|
|
sendJson(res, 400, { error: "invalid_json" });
|
|
return;
|
|
}
|
|
const toolName = body.tool ?? "";
|
|
const toolArgs = body.args ?? {};
|
|
const execWorkspace = body.workspace ?? "";
|
|
const execAgentId = body.agentId ?? "";
|
|
logger.info(`[contractor-bridge] mcp/execute tool=${toolName} workspace=${execWorkspace} agentId=${execAgentId}`);
|
|
|
|
// Relay to OpenClaw's actual tool implementation via the global plugin registry.
|
|
// This avoids reimplementing tool logic in the bridge — we call the real tool factory.
|
|
try {
|
|
const pluginRegistry = getGlobalPluginRegistry();
|
|
if (!pluginRegistry) {
|
|
sendJson(res, 200, { error: `[mcp/execute] plugin registry not available (tool=${toolName})` });
|
|
return;
|
|
}
|
|
|
|
// Build tool execution context (needed before tool lookup for factory instantiation).
|
|
const liveConfig = (_G["_contractorOpenClawConfig"] ?? null) as Record<string, unknown> | null;
|
|
const canonicalSessionKey = execAgentId ? `agent:${execAgentId}:direct:bridge` : undefined;
|
|
const toolCtx = {
|
|
config: liveConfig ?? undefined,
|
|
workspaceDir: execWorkspace || undefined,
|
|
agentDir: execWorkspace || undefined,
|
|
agentId: execAgentId || undefined,
|
|
sessionKey: canonicalSessionKey,
|
|
};
|
|
|
|
// Find tool by name. Some plugins register tools via factory functions without
|
|
// declaring names upfront (names array is empty). For those, instantiate the
|
|
// factory and match by the returned tool's .name property.
|
|
let toolReg = pluginRegistry.tools.find((t) => t.names.includes(toolName));
|
|
if (!toolReg) {
|
|
for (const candidate of pluginRegistry.tools) {
|
|
if (candidate.names.length > 0) continue;
|
|
try {
|
|
const inst = candidate.factory(toolCtx);
|
|
const items = Array.isArray(inst) ? inst : [inst];
|
|
if (items.some((t) => (t as { name?: string }).name === toolName)) {
|
|
toolReg = candidate;
|
|
break;
|
|
}
|
|
} catch { /* skip */ }
|
|
}
|
|
}
|
|
if (!toolReg) {
|
|
sendJson(res, 200, { error: `Tool '${toolName}' not registered in OpenClaw plugin registry` });
|
|
return;
|
|
}
|
|
|
|
// Instantiate the tool via its factory
|
|
const toolOrTools = toolReg.factory(toolCtx);
|
|
const toolInstance = Array.isArray(toolOrTools)
|
|
? toolOrTools.find((t) => (t as { name?: string }).name === toolName)
|
|
: toolOrTools;
|
|
|
|
if (!toolInstance || typeof (toolInstance as { execute?: unknown }).execute !== "function") {
|
|
sendJson(res, 200, { error: `Tool '${toolName}' factory returned no executable tool` });
|
|
return;
|
|
}
|
|
|
|
const execFn = (toolInstance as { execute: (id: string, args: unknown) => Promise<{ content?: Array<{ type: string; text?: string }> }> }).execute;
|
|
const toolResult = await execFn(randomUUID(), toolArgs);
|
|
|
|
// Extract text content from AgentToolResult
|
|
const text = (toolResult.content ?? [])
|
|
.filter((c) => c.type === "text" && c.text)
|
|
.map((c) => c.text as string)
|
|
.join("\n");
|
|
|
|
sendJson(res, 200, { result: text || "(no result)" });
|
|
} catch (err) {
|
|
logger.warn(`[contractor-bridge] mcp/execute error tool=${toolName}: ${String(err)}`);
|
|
sendJson(res, 200, { error: `Tool execution failed: ${String(err)}` });
|
|
}
|
|
return;
|
|
}
|
|
|
|
sendJson(res, 404, { error: "not_found" });
|
|
});
|
|
|
|
server.listen(port, "127.0.0.1", () => {
|
|
logger.info(`[contractor-bridge] sidecar listening on 127.0.0.1:${port}`);
|
|
});
|
|
|
|
return server;
|
|
}
|