OpenClaw's LLM idle watchdog (default 120s) fires on lack of *model
progress*, not lack of bytes — an SSE comment frame (": keepalive\n\n")
keeps the TCP socket alive but isn't recognized as progress, so a long
quiet tool-call phase still idles out. When that happens OpenClaw falls
back to re-sending the prior turn's assistant text (pi-embedded:1308
fallbackAnswerText), producing duplicate-Discord-message symptoms.
Heartbeat now emits a real chat.completion.chunk with an empty content
delta every 30s. Clients drop empty deltas; the upstream idle watchdog
should count it as model progress because it's a real event on the
canonical streaming channel.
scripts/install.mjs now spreads the existing provider entry before
overriding script-managed fields, so user-added fields like
timeoutSeconds survive reinstall.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
625 lines
25 KiB
TypeScript
625 lines
25 KiB
TypeScript
import http from "node:http";
|
||
import { randomUUID } from "node:crypto";
|
||
import type { BridgeInboundRequest } from "../core/types/model.js";
|
||
import {
|
||
buildSessionKey,
|
||
extractLatestUserMessage,
|
||
extractRequestContext,
|
||
resolveDispatchPrompt,
|
||
} from "./input-filter.js";
|
||
import { buildBootstrap } from "./bootstrap.js";
|
||
import { dispatchToClaude } from "../core/claude/sdk-adapter.js";
|
||
import { dispatchToGemini } from "../core/gemini/sdk-adapter.js";
|
||
import { getGlobalPluginRegistry } from "openclaw/plugin-sdk/plugin-runtime";
|
||
import {
|
||
getSession,
|
||
putSession,
|
||
markOrphaned,
|
||
removeSession,
|
||
} from "../core/contractor/session-map-store.js";
|
||
|
||
export type BridgeServerConfig = {
|
||
port: number;
|
||
apiKey: string;
|
||
permissionMode: string;
|
||
/** Fallback: resolve workspace from agent id if not parseable from system prompt */
|
||
resolveAgent?: (
|
||
agentId: string,
|
||
sessionKey: string,
|
||
) => { workspace: string } | null;
|
||
logger: { info: (msg: string) => void; warn: (msg: string) => void };
|
||
};
|
||
|
||
function sendJson(res: http.ServerResponse, status: number, body: unknown): void {
|
||
const json = JSON.stringify(body);
|
||
res.writeHead(status, {
|
||
"Content-Type": "application/json; charset=utf-8",
|
||
"Content-Length": Buffer.byteLength(json),
|
||
});
|
||
res.end(json);
|
||
}
|
||
|
||
/** Write a single SSE data line. */
|
||
function sseWrite(res: http.ServerResponse, data: string): void {
|
||
res.write(`data: ${data}\n\n`);
|
||
}
|
||
|
||
/** Build an OpenAI streaming chunk for a text delta. */
|
||
function buildChunk(id: string, text: string): string {
|
||
return JSON.stringify({
|
||
id,
|
||
object: "chat.completion.chunk",
|
||
created: Math.floor(Date.now() / 1000),
|
||
model: "contractor-claude-bridge",
|
||
choices: [{ index: 0, delta: { content: text }, finish_reason: null }],
|
||
});
|
||
}
|
||
|
||
/** Build the final stop chunk. */
|
||
function buildStopChunk(id: string): string {
|
||
return JSON.stringify({
|
||
id,
|
||
object: "chat.completion.chunk",
|
||
created: Math.floor(Date.now() / 1000),
|
||
model: "contractor-claude-bridge",
|
||
choices: [{ index: 0, delta: {}, finish_reason: "stop" }],
|
||
});
|
||
}
|
||
|
||
function parseBodyRaw(req: http.IncomingMessage): Promise<unknown> {
|
||
return new Promise((resolve, reject) => {
|
||
let body = "";
|
||
req.on("data", (chunk: Buffer) => {
|
||
body += chunk.toString("utf8");
|
||
if (body.length > 4_000_000) req.destroy(new Error("body too large"));
|
||
});
|
||
req.on("end", () => {
|
||
try {
|
||
resolve(JSON.parse(body));
|
||
} catch (e) {
|
||
reject(e);
|
||
}
|
||
});
|
||
req.on("error", reject);
|
||
});
|
||
}
|
||
|
||
function parseBody(req: http.IncomingMessage): Promise<BridgeInboundRequest> {
|
||
return parseBodyRaw(req) as Promise<BridgeInboundRequest>;
|
||
}
|
||
|
||
/**
|
||
* Per-sessionKey FIFO queue: same-session turns serialize so a user firing
|
||
* multiple Discord messages back-to-back gets them processed in order rather
|
||
* than spawning concurrent claude subprocesses. Cross-session requests bypass
|
||
* each other's chains and run in parallel.
|
||
*
|
||
* The chain is `prev.then(() => mySlot)` per request — `mySlot` is released
|
||
* in the request's finally block, so the next request waits until our work
|
||
* completes (success or failure) before starting.
|
||
*/
|
||
const queueBySession = new Map<string, Promise<void>>();
|
||
|
||
/**
|
||
* SSE heartbeat cadence. Bridge writes an empty-content `chat.completion.chunk`
|
||
* (a no-op for OpenAI stream parsers, but a real model-progress event on the
|
||
* canonical streaming channel) on this interval while a turn is in flight or
|
||
* queued. This keeps OpenClaw's LLM idle watchdog (default 120s) from firing
|
||
* during long quiet tool-call phases or while we're waiting our turn in the
|
||
* per-session queue. See the heartbeat block in handleChatCompletions for
|
||
* details on why an SSE comment frame is insufficient.
|
||
*/
|
||
const HEARTBEAT_MS = 30_000;
|
||
|
||
const _G = globalThis as Record<string, unknown>;
|
||
|
||
export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||
const { port, apiKey, permissionMode, resolveAgent, logger } = config;
|
||
|
||
async function handleChatCompletions(
|
||
req: http.IncomingMessage,
|
||
res: http.ServerResponse,
|
||
): Promise<void> {
|
||
let body: BridgeInboundRequest;
|
||
try {
|
||
body = await parseBody(req);
|
||
} catch {
|
||
sendJson(res, 400, { error: "invalid_json" });
|
||
return;
|
||
}
|
||
|
||
// Extract agent ID, workspace, chat id, and bare-reset signal from the
|
||
// request. OpenClaw does NOT send agent/session info as HTTP headers — it
|
||
// lives in the system prompt's Runtime line and the user envelope's
|
||
// "Conversation info" untrusted-metadata block.
|
||
const {
|
||
agentId: parsedAgentId,
|
||
workspace: parsedWorkspace,
|
||
skillsBlock,
|
||
workspaceContextFiles,
|
||
chatId,
|
||
bareSessionReset,
|
||
} = extractRequestContext(body);
|
||
const latestMessage = extractLatestUserMessage(body);
|
||
|
||
// Pick the prompt to forward to the CLI. For bare /new turns OpenClaw
|
||
// submits an empty user message — we synthesize a stub prompt instead so
|
||
// the CLI has something to respond to.
|
||
const dispatchPrompt = resolveDispatchPrompt(latestMessage, { bareSessionReset });
|
||
|
||
if (!dispatchPrompt) {
|
||
sendJson(res, 400, { error: "no user message found" });
|
||
return;
|
||
}
|
||
|
||
// Scope the CLI session by (agentId, chat_id) so different Discord
|
||
// channels / DMs / etc. for the same agent don't pile into one Claude
|
||
// session and bleed context across surfaces. Falls back to agentId-only
|
||
// when chat_id can't be parsed (local TUI, etc.).
|
||
const agentId = parsedAgentId;
|
||
const sessionKey = buildSessionKey(agentId, chatId);
|
||
|
||
logger.info(
|
||
`[contractor-bridge] turn agentId=${agentId} sessionKey=${sessionKey} workspace=${parsedWorkspace} bareReset=${bareSessionReset} msg=${dispatchPrompt.substring(0, 80)}`,
|
||
);
|
||
|
||
// Resolve workspace: prefer what we parsed from the system prompt (most accurate);
|
||
// fall back to openclaw.json lookup for validation.
|
||
let workspace = parsedWorkspace;
|
||
if (!workspace && agentId) {
|
||
const agentMeta = resolveAgent?.(agentId, sessionKey);
|
||
if (agentMeta) workspace = agentMeta.workspace;
|
||
}
|
||
if (!workspace) {
|
||
logger.warn(`[contractor-bridge] could not resolve workspace agentId=${agentId}`);
|
||
workspace = "/tmp";
|
||
}
|
||
|
||
// Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude
|
||
const isGemini = typeof body.model === "string" && body.model.includes("gemini");
|
||
|
||
// ── Abort propagation ────────────────────────────────────────────────────
|
||
// OpenClaw's LLM client cancels in-flight HTTP requests via AbortSignal
|
||
// when an attempt fails or the user cancels. On the wire this manifests
|
||
// as the client closing the socket, surfaced here as `req.on('close')`.
|
||
// We mirror that signal into our own AbortController and propagate it
|
||
// down to the dispatchTo{Claude,Gemini} adapter, which kills the
|
||
// subprocess. Without this, OpenClaw silently drops the response while
|
||
// the subprocess keeps running, and a retry spawns another subprocess
|
||
// alongside the orphan — the root cause of the duplicate-Discord-message
|
||
// bug we hit in May 2026.
|
||
const abortController = new AbortController();
|
||
let clientDisconnected = false;
|
||
const onClose = (): void => {
|
||
if (clientDisconnected) return;
|
||
clientDisconnected = true;
|
||
logger.info(
|
||
`[contractor-bridge] client disconnected sessionKey=${sessionKey} — aborting in-flight work`,
|
||
);
|
||
abortController.abort();
|
||
};
|
||
req.on("close", onClose);
|
||
|
||
// Start SSE response immediately so OpenClaw sees the response status
|
||
// quickly. We may still spend time waiting in the per-session queue
|
||
// before any real chunk goes out — heartbeat (below) keeps that quiet
|
||
// window from tripping the upstream idle watchdog.
|
||
res.writeHead(200, {
|
||
"Content-Type": "text/event-stream",
|
||
"Cache-Control": "no-cache",
|
||
"Connection": "keep-alive",
|
||
"Transfer-Encoding": "chunked",
|
||
});
|
||
|
||
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
||
|
||
// ── SSE heartbeat ────────────────────────────────────────────────────────
|
||
// OpenClaw's LLM idle watchdog (default 120s) fires on lack of *model
|
||
// progress*, not lack of bytes — concretely "no content delta through
|
||
// SSE for 120s". An SSE comment frame (`: keepalive\n\n`) keeps the TCP
|
||
// socket alive but does NOT register as model progress, so a long quiet
|
||
// tool-call phase still idles out. When that happens OpenClaw falls back
|
||
// to re-sending the prior turn's assistant text (see
|
||
// pi-embedded-Bcz04p2i.js:1308 `fallbackAnswerText`), producing the
|
||
// duplicate-Discord-message symptom observed 2026-05-14.
|
||
//
|
||
// We emit a real `chat.completion.chunk` with an empty content delta
|
||
// every HEARTBEAT_MS. Clients drop empty deltas, but the upstream idle
|
||
// watchdog should count it as model progress because it's a real event
|
||
// on the canonical streaming channel. If empty content turns out to be
|
||
// filtered, the next step is a zero-width-space "".
|
||
const heartbeat = setInterval(() => {
|
||
if (clientDisconnected || res.writableEnded) return;
|
||
try {
|
||
sseWrite(res, buildChunk(completionId, ""));
|
||
} catch {
|
||
/* socket dead, ignore */
|
||
}
|
||
}, HEARTBEAT_MS);
|
||
heartbeat.unref?.();
|
||
|
||
// ── Per-sessionKey FIFO queue ────────────────────────────────────────────
|
||
// Same-sessionKey turns serialize: if a user fires multiple Discord
|
||
// messages quickly, we process them in order rather than starting
|
||
// concurrent claude subprocesses (which would corrupt the shared session
|
||
// file and produce overlapping replies). Cross-sessionKey requests run
|
||
// in parallel — they live on independent chains.
|
||
let releaseSlot: () => void = () => {};
|
||
const mySlot = new Promise<void>((r) => {
|
||
releaseSlot = r;
|
||
});
|
||
const prev = sessionKey
|
||
? queueBySession.get(sessionKey) ?? Promise.resolve()
|
||
: Promise.resolve();
|
||
// `myChainTail` advances only after `releaseSlot()` (called in our
|
||
// finally block). Tolerate prev rejecting so a crashed earlier turn
|
||
// doesn't poison the chain forever.
|
||
const myChainTail = prev.then(() => mySlot, () => mySlot);
|
||
if (sessionKey) queueBySession.set(sessionKey, myChainTail);
|
||
let newSessionId = "";
|
||
let hasError = false;
|
||
let resultErrorReason: string | null = null;
|
||
let existingEntry: ReturnType<typeof getSession> = null;
|
||
|
||
try {
|
||
// Block until the previous turn on this sessionKey finishes.
|
||
// Cross-session requests skip this wait (their `prev` is a fresh
|
||
// Promise.resolve()).
|
||
await prev.catch(() => {});
|
||
|
||
// While queued, OpenClaw may have given up on this request (attempt
|
||
// timeout, user cancel, etc.) and closed the socket. If so, exit
|
||
// cleanly without dispatching — the client wouldn't see the output
|
||
// anyway.
|
||
if (clientDisconnected) {
|
||
logger.info(
|
||
`[contractor-bridge] dropped queued request sessionKey=${sessionKey} (client disconnected during queue wait)`,
|
||
);
|
||
return;
|
||
}
|
||
|
||
// Look up the session-map entry NOW (at the head of the queue), not
|
||
// earlier — the previous turn on the same sessionKey may have just
|
||
// updated `claudeSessionId` and we want to resume into the latest
|
||
// one rather than a stale snapshot from request-arrival time.
|
||
existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
||
|
||
// Detect a fresh OpenClaw session even when the bare-reset marker is
|
||
// absent. The marker only arrives when `/new` is the body of *this*
|
||
// user turn (see get-reply isBareSessionReset). When `/new` is sent
|
||
// as a separate slash command (e.g. via Discord's slash UI), OpenClaw
|
||
// processes the reset in a side lane that doesn't hit the bridge —
|
||
// it just renames the prior session file aside. The follow-up real
|
||
// message then arrives on a brand-new OpenClaw session, but as a
|
||
// normal turn with no marker. Without this check, the bridge happily
|
||
// resumes the long-stale claudeSessionId from before the reset.
|
||
//
|
||
// OpenClaw sends the full conversation history every turn (system +
|
||
// user/assistant pairs + latest user). A request with zero assistant
|
||
// turns is therefore a positive signal that the OpenClaw session is
|
||
// brand-new and any prior claudeSessionId we hold is from a previous
|
||
// OpenClaw session that the user already abandoned.
|
||
const hasAssistantHistory = body.messages.some((m) => {
|
||
if (m.role !== "assistant") return false;
|
||
if (typeof m.content === "string") return m.content.trim().length > 0;
|
||
return m.content.some(
|
||
(c) => c.type === "text" && (c.text ?? "").trim().length > 0,
|
||
);
|
||
});
|
||
const isFreshOpenClawSession = !hasAssistantHistory;
|
||
|
||
if ((bareSessionReset || isFreshOpenClawSession) && existingEntry && sessionKey) {
|
||
const reason = bareSessionReset
|
||
? "bare /new detected"
|
||
: "fresh OpenClaw session (no assistant history in messages[])";
|
||
logger.info(
|
||
`[contractor-bridge] ${reason} — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
||
);
|
||
removeSession(workspace, sessionKey);
|
||
existingEntry = null;
|
||
}
|
||
const resumeSessionId =
|
||
existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
||
|
||
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
||
// For Claude: --append-system-prompt appends to the built-in prompt each invocation.
|
||
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
||
// This keeps persona and skills current without needing to track first-turn state.
|
||
const systemPrompt = buildBootstrap({
|
||
agentId,
|
||
openclawSessionKey: sessionKey,
|
||
workspace,
|
||
skillsBlock: skillsBlock || undefined,
|
||
workspaceContextFiles,
|
||
});
|
||
|
||
const openclawTools = body.tools ?? [];
|
||
|
||
const dispatchIter = isGemini
|
||
? dispatchToGemini({
|
||
prompt: dispatchPrompt,
|
||
systemPrompt,
|
||
workspace,
|
||
agentId,
|
||
resumeSessionId: resumeSessionId ?? undefined,
|
||
openclawTools,
|
||
bridgePort: port,
|
||
bridgeApiKey: apiKey,
|
||
signal: abortController.signal,
|
||
})
|
||
: dispatchToClaude({
|
||
prompt: dispatchPrompt,
|
||
systemPrompt,
|
||
workspace,
|
||
agentId,
|
||
resumeSessionId: resumeSessionId ?? undefined,
|
||
permissionMode,
|
||
openclawTools,
|
||
bridgePort: port,
|
||
bridgeApiKey: apiKey,
|
||
signal: abortController.signal,
|
||
});
|
||
|
||
try {
|
||
for await (const event of dispatchIter) {
|
||
if (event.type === "text") {
|
||
if (!clientDisconnected) sseWrite(res, buildChunk(completionId, event.text));
|
||
} else if (event.type === "done") {
|
||
newSessionId = event.sessionId;
|
||
} else if (event.type === "result_error") {
|
||
// CLI signaled a fatal-but-graceful error (context overflow,
|
||
// refusal, billing, etc.) via `is_error: true` on the result
|
||
// event. The text was already streamed via prior `text` events;
|
||
// record the reason so the bridge can drop the session-map entry
|
||
// below.
|
||
logger.warn(
|
||
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
||
);
|
||
resultErrorReason = event.reason;
|
||
newSessionId = event.sessionId;
|
||
} else if (event.type === "error") {
|
||
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
||
hasError = true;
|
||
if (!clientDisconnected) {
|
||
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
||
}
|
||
}
|
||
}
|
||
} catch (err) {
|
||
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
||
hasError = true;
|
||
if (!clientDisconnected) {
|
||
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
||
}
|
||
}
|
||
|
||
if (!clientDisconnected && !res.writableEnded) {
|
||
sseWrite(res, buildStopChunk(completionId));
|
||
sseWrite(res, "[DONE]");
|
||
}
|
||
|
||
// Session-map persistence:
|
||
// - Successful turn → upsert with the latest claudeSessionId so the next
|
||
// turn can `--resume` into it.
|
||
// - Terminal CLI error (context overflow etc., reported via result_error)
|
||
// → drop the entry so the next turn starts fresh instead of resuming
|
||
// into the same poisoned session and re-erroring.
|
||
// - Stream/transport error before any sessionId was captured → mark the
|
||
// prior entry orphaned (existing behavior).
|
||
// - Aborted (client disconnected) → don't touch the session-map; the
|
||
// subprocess may have already updated its own session file on disk,
|
||
// and the next turn will resume from whatever it left there.
|
||
if (clientDisconnected) {
|
||
// No-op: aborted turn; preserve whatever the prior entry was so the
|
||
// next turn (likely an OpenClaw retry of the same prompt) can resume.
|
||
} else if (resultErrorReason && sessionKey) {
|
||
logger.info(
|
||
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
||
);
|
||
removeSession(workspace, sessionKey);
|
||
} else if (newSessionId && sessionKey && !hasError) {
|
||
const now = new Date().toISOString();
|
||
putSession(workspace, {
|
||
openclawSessionKey: sessionKey,
|
||
agentId,
|
||
contractor: isGemini ? "gemini" : "claude",
|
||
claudeSessionId: newSessionId,
|
||
workspace,
|
||
createdAt: existingEntry?.createdAt ?? now,
|
||
lastActivityAt: now,
|
||
state: "active",
|
||
});
|
||
logger.info(
|
||
`[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`,
|
||
);
|
||
} else if (hasError && sessionKey && existingEntry) {
|
||
markOrphaned(workspace, sessionKey);
|
||
}
|
||
} finally {
|
||
clearInterval(heartbeat);
|
||
req.off("close", onClose);
|
||
|
||
// Release our slot so the next request on this sessionKey can proceed.
|
||
// Must happen even on abort/throw, otherwise the chain stalls forever.
|
||
releaseSlot();
|
||
|
||
// Garbage-collect the queue entry if no later request chained on us.
|
||
// The Map identity check is the only safe way to tell — newer requests
|
||
// overwrite the entry with their own chain tail.
|
||
if (sessionKey && queueBySession.get(sessionKey) === myChainTail) {
|
||
queueBySession.delete(sessionKey);
|
||
}
|
||
|
||
if (!res.writableEnded) {
|
||
try {
|
||
res.end();
|
||
} catch {
|
||
/* ignore */
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
const server = http.createServer(async (req, res) => {
|
||
const url = req.url ?? "/";
|
||
const method = req.method ?? "GET";
|
||
|
||
// Auth check
|
||
if (apiKey) {
|
||
const auth = req.headers.authorization ?? "";
|
||
if (auth !== `Bearer ${apiKey}`) {
|
||
sendJson(res, 401, { error: "unauthorized" });
|
||
return;
|
||
}
|
||
}
|
||
|
||
if (method === "GET" && url === "/health") {
|
||
sendJson(res, 200, { ok: true, service: "contractor-bridge" });
|
||
return;
|
||
}
|
||
|
||
if (method === "GET" && url === "/v1/models") {
|
||
sendJson(res, 200, {
|
||
object: "list",
|
||
data: [
|
||
{
|
||
id: "contractor-claude-bridge",
|
||
object: "model",
|
||
created: Math.floor(Date.now() / 1000),
|
||
owned_by: "contractor-agent",
|
||
},
|
||
{
|
||
id: "contractor-gemini-bridge",
|
||
object: "model",
|
||
created: Math.floor(Date.now() / 1000),
|
||
owned_by: "contractor-agent",
|
||
},
|
||
],
|
||
});
|
||
return;
|
||
}
|
||
|
||
if (method === "POST" && url === "/v1/chat/completions") {
|
||
try {
|
||
await handleChatCompletions(req, res);
|
||
} catch (err) {
|
||
logger.warn(`[contractor-bridge] unhandled error: ${String(err)}`);
|
||
if (!res.headersSent) {
|
||
sendJson(res, 500, { error: "internal server error" });
|
||
}
|
||
}
|
||
return;
|
||
}
|
||
|
||
// Debug: list registered tools in the global plugin registry
|
||
if (method === "GET" && url === "/debug/tools") {
|
||
const pluginRegistry = getGlobalPluginRegistry();
|
||
const liveConfig = _G["_contractorOpenClawConfig"] ?? null;
|
||
if (!pluginRegistry) {
|
||
sendJson(res, 200, { error: "registry not available" });
|
||
} else {
|
||
sendJson(res, 200, {
|
||
configAvailable: liveConfig !== null,
|
||
tools: pluginRegistry.tools.map((t) => ({ names: t.names, pluginId: t.pluginId })),
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
|
||
// MCP proxy tool execution callback.
|
||
// Called by services/openclaw-mcp-server.mjs when Claude Code invokes an MCP tool.
|
||
// Relays the call to OpenClaw's actual tool implementation via the global plugin registry.
|
||
if (method === "POST" && url === "/mcp/execute") {
|
||
let body: { tool?: string; args?: Record<string, unknown>; workspace?: string; agentId?: string };
|
||
try {
|
||
body = await parseBodyRaw(req) as { tool?: string; args?: Record<string, unknown>; workspace?: string; agentId?: string };
|
||
} catch {
|
||
sendJson(res, 400, { error: "invalid_json" });
|
||
return;
|
||
}
|
||
const toolName = body.tool ?? "";
|
||
const toolArgs = body.args ?? {};
|
||
const execWorkspace = body.workspace ?? "";
|
||
const execAgentId = body.agentId ?? "";
|
||
logger.info(`[contractor-bridge] mcp/execute tool=${toolName} workspace=${execWorkspace} agentId=${execAgentId}`);
|
||
|
||
// Relay to OpenClaw's actual tool implementation via the global plugin registry.
|
||
// This avoids reimplementing tool logic in the bridge — we call the real tool factory.
|
||
try {
|
||
const pluginRegistry = getGlobalPluginRegistry();
|
||
if (!pluginRegistry) {
|
||
sendJson(res, 200, { error: `[mcp/execute] plugin registry not available (tool=${toolName})` });
|
||
return;
|
||
}
|
||
|
||
// Build tool execution context (needed before tool lookup for factory instantiation).
|
||
const liveConfig = (_G["_contractorOpenClawConfig"] ?? null) as Record<string, unknown> | null;
|
||
const canonicalSessionKey = execAgentId ? `agent:${execAgentId}:direct:bridge` : undefined;
|
||
const toolCtx = {
|
||
config: liveConfig ?? undefined,
|
||
workspaceDir: execWorkspace || undefined,
|
||
agentDir: execWorkspace || undefined,
|
||
agentId: execAgentId || undefined,
|
||
sessionKey: canonicalSessionKey,
|
||
};
|
||
|
||
// Find tool by name. Some plugins register tools via factory functions without
|
||
// declaring names upfront (names array is empty). For those, instantiate the
|
||
// factory and match by the returned tool's .name property.
|
||
let toolReg = pluginRegistry.tools.find((t) => t.names.includes(toolName));
|
||
if (!toolReg) {
|
||
for (const candidate of pluginRegistry.tools) {
|
||
if (candidate.names.length > 0) continue;
|
||
try {
|
||
const inst = candidate.factory(toolCtx);
|
||
const items = Array.isArray(inst) ? inst : [inst];
|
||
if (items.some((t) => (t as { name?: string }).name === toolName)) {
|
||
toolReg = candidate;
|
||
break;
|
||
}
|
||
} catch { /* skip */ }
|
||
}
|
||
}
|
||
if (!toolReg) {
|
||
sendJson(res, 200, { error: `Tool '${toolName}' not registered in OpenClaw plugin registry` });
|
||
return;
|
||
}
|
||
|
||
// Instantiate the tool via its factory
|
||
const toolOrTools = toolReg.factory(toolCtx);
|
||
const toolInstance = Array.isArray(toolOrTools)
|
||
? toolOrTools.find((t) => (t as { name?: string }).name === toolName)
|
||
: toolOrTools;
|
||
|
||
if (!toolInstance || typeof (toolInstance as { execute?: unknown }).execute !== "function") {
|
||
sendJson(res, 200, { error: `Tool '${toolName}' factory returned no executable tool` });
|
||
return;
|
||
}
|
||
|
||
const execFn = (toolInstance as { execute: (id: string, args: unknown) => Promise<{ content?: Array<{ type: string; text?: string }> }> }).execute;
|
||
const toolResult = await execFn(randomUUID(), toolArgs);
|
||
|
||
// Extract text content from AgentToolResult
|
||
const text = (toolResult.content ?? [])
|
||
.filter((c) => c.type === "text" && c.text)
|
||
.map((c) => c.text as string)
|
||
.join("\n");
|
||
|
||
sendJson(res, 200, { result: text || "(no result)" });
|
||
} catch (err) {
|
||
logger.warn(`[contractor-bridge] mcp/execute error tool=${toolName}: ${String(err)}`);
|
||
sendJson(res, 200, { error: `Tool execution failed: ${String(err)}` });
|
||
}
|
||
return;
|
||
}
|
||
|
||
sendJson(res, 404, { error: "not_found" });
|
||
});
|
||
|
||
server.listen(port, "127.0.0.1", () => {
|
||
logger.info(`[contractor-bridge] sidecar listening on 127.0.0.1:${port}`);
|
||
});
|
||
|
||
return server;
|
||
}
|