Files
ContractorAgent/plugin/core/claude/sdk-adapter.ts
zhi 6be8d47982 fix(claude-adapter): commit turn on result event, dont block on process exit
Previously dispatchToClaude awaited child.on(close) before yielding the done
event. Claude CLIs Bash tool occasionally leaves ssh/bash grandchildren alive
(e.g. a GUI app that ignores SIGPIPE on the remote end of a piped ssh command);
that kept claude -p alive past end-of-turn, which kept the bridge SSE stream
open, which kept OpenClaw from committing the turn to its session jsonl.

Switch to emitting done as soon as the terminal result stream-json event
arrives. Spawn claude in its own process group (detached:true) and schedule
a best-effort SIGTERM/SIGKILL sweep of leaked descendants; temp-file cleanup
runs asynchronously on actual process close.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-21 08:52:57 +00:00

263 lines
7.9 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import os from "node:os";
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
export type ClaudeMessage =
| { type: "text"; text: string }
| { type: "done"; sessionId: string }
| { type: "error"; message: string };
export type OpenAITool = {
type: "function";
function: { name: string; description?: string; parameters?: unknown };
};
export type ClaudeDispatchOptions = {
prompt: string;
/** Appended to Claude Code's built-in system prompt via --append-system-prompt on every invocation.
* Stateless: not persisted in session file, fully replaces any prior appended content on resume. */
systemPrompt?: string;
workspace: string;
agentId?: string;
resumeSessionId?: string;
permissionMode?: string;
/** OpenClaw tool definitions to expose to Claude as MCP tools */
openclawTools?: OpenAITool[];
/** Bridge port for MCP proxy callbacks */
bridgePort?: number;
/** Bridge API key for MCP proxy callbacks */
bridgeApiKey?: string;
};
// Resolve the MCP server script path relative to this file.
// Installed layout: plugin root / core / claude / sdk-adapter.ts
// plugin root / services / openclaw-mcp-server.mjs
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const MCP_SERVER_SCRIPT = path.resolve(__dirname, "../../services/openclaw-mcp-server.mjs");
/**
* Write OpenClaw tool definitions to a temp file and create an --mcp-config JSON
* so Claude Code can call them as `mcp__openclaw__<toolname>` tools.
*
* Returns the path to the mcp-config JSON file, or null if setup fails.
*/
function setupMcpConfig(
tools: OpenAITool[],
bridgePort: number,
bridgeApiKey: string,
workspace: string,
agentId: string,
): string | null {
if (!tools.length) return null;
if (!fs.existsSync(MCP_SERVER_SCRIPT)) return null;
try {
const tmpDir = os.tmpdir();
const sessionId = `oc-${Date.now()}`;
const toolDefsPath = path.join(tmpDir, `${sessionId}-tools.json`);
const mcpConfigPath = path.join(tmpDir, `${sessionId}-mcp.json`);
fs.writeFileSync(toolDefsPath, JSON.stringify(tools, null, 2), "utf8");
const mcpConfig = {
mcpServers: {
openclaw: {
command: process.execPath,
args: [MCP_SERVER_SCRIPT],
env: {
TOOL_DEFS_FILE: toolDefsPath,
BRIDGE_EXECUTE_URL: `http://127.0.0.1:${bridgePort}/mcp/execute`,
BRIDGE_API_KEY: bridgeApiKey,
WORKSPACE: workspace,
AGENT_ID: agentId,
},
},
},
};
fs.writeFileSync(mcpConfigPath, JSON.stringify(mcpConfig, null, 2), "utf8");
return mcpConfigPath;
} catch {
return null;
}
}
/**
* Dispatch a turn to Claude Code using `claude -p --output-format stream-json --verbose`.
* Returns an async iterable of ClaudeMessage events.
*/
export async function* dispatchToClaude(
opts: ClaudeDispatchOptions,
): AsyncIterable<ClaudeMessage> {
const {
prompt,
systemPrompt,
workspace,
agentId = "",
resumeSessionId,
permissionMode = "default",
openclawTools,
bridgePort = 18800,
bridgeApiKey = "",
} = opts;
// NOTE: put prompt right after -p, before --mcp-config.
// --mcp-config takes <configs...> (multiple values) and would greedily
// consume the prompt if it came after --mcp-config.
const args: string[] = [
"-p",
prompt,
"--output-format", "stream-json",
"--verbose",
"--allowedTools", "Bash Edit Write Read Glob Grep WebFetch WebSearch NotebookEdit Monitor TodoWrite mcp__openclaw__*",
];
// --append-system-prompt appends to Claude Code's built-in system prompt rather
// than replacing it, preserving the full agent SDK instructions (tool use behavior,
// memory management, etc.). The appended bootstrap (persona + skills) is stateless:
// not persisted in the session file, takes effect every invocation including resumes.
if (systemPrompt) {
args.push("--append-system-prompt", systemPrompt);
}
if (resumeSessionId) {
args.push("--resume", resumeSessionId);
}
// Set up MCP proxy every turn — the MCP server process exits with each `claude -p`
// invocation, so --resume sessions also need --mcp-config to restart it.
// Put --mcp-config after the prompt so its <configs...> variadic doesn't consume the prompt.
let mcpConfigPath: string | null = null;
if (openclawTools?.length) {
mcpConfigPath = setupMcpConfig(openclawTools, bridgePort, bridgeApiKey, workspace, agentId);
if (mcpConfigPath) {
args.push("--mcp-config", mcpConfigPath);
}
}
// detached:true puts claude in its own process group. Claude's Bash tool
// occasionally leaks shells/ssh that keep claude alive past end-of-turn; when
// that happens we SIGKILL the whole group rather than wait forever.
const child = spawn("claude", args, {
cwd: workspace,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env },
detached: true,
});
const stderrLines: string[] = [];
child.stderr?.on("data", (chunk: Buffer) => {
stderrLines.push(chunk.toString("utf8").trim());
});
const rl = createInterface({ input: child.stdout!, crlfDelay: Infinity });
let capturedSessionId = "";
const events: ClaudeMessage[] = [];
let done = false;
let resolveNext: (() => void) | null = null;
let cleanupScheduled = false;
const scheduleCleanup = (): void => {
if (cleanupScheduled) return;
cleanupScheduled = true;
const killGroup = (sig: NodeJS.Signals): void => {
if (child.pid == null || child.exitCode !== null) return;
try { process.kill(-child.pid, sig); } catch { /* already gone */ }
};
const termTimer = setTimeout(() => killGroup("SIGTERM"), 3000);
const killTimer = setTimeout(() => killGroup("SIGKILL"), 10000);
child.once("close", () => {
clearTimeout(termTimer);
clearTimeout(killTimer);
if (mcpConfigPath) {
try { fs.unlinkSync(mcpConfigPath); } catch { /* ignore */ }
}
});
};
const markDone = (): void => {
if (done) return;
done = true;
scheduleCleanup();
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
};
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: Record<string, unknown>;
try {
event = JSON.parse(line);
} catch {
return;
}
const type = event.type as string;
if (type === "assistant") {
const msg = event.message as { content?: Array<{ type: string; text?: string }> };
for (const block of msg?.content ?? []) {
if (block.type === "text" && block.text) {
events.push({ type: "text", text: block.text });
}
}
}
if (type === "result") {
const sessionId = (event.session_id as string) ?? "";
if (sessionId) capturedSessionId = sessionId;
// `result` is the terminal stream-json event; commit the turn without
// waiting for claude's process tree to fully exit (leaked Bash grandchildren
// can otherwise hold stdout open indefinitely).
markDone();
return;
}
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
});
rl.on("close", () => {
// Fallback: claude exited without emitting a result event.
markDone();
});
while (true) {
if (events.length > 0) {
yield events.shift()!;
continue;
}
if (done) break;
await new Promise<void>((resolve) => {
resolveNext = resolve;
});
}
while (events.length > 0) {
yield events.shift()!;
}
if (capturedSessionId) {
yield { type: "done", sessionId: capturedSessionId };
} else {
const stderrSummary = stderrLines.join(" ").slice(0, 200);
yield {
type: "error",
message: `claude did not return a session_id${stderrSummary ? `: ${stderrSummary}` : ""}`,
};
}
}