Files
ContractorAgent/plugin/core/claude/sdk-adapter.ts
zhi 2e64e9ce02 fix(bridge): abort propagation, SSE heartbeat, per-session FIFO queue
Three coordinated fixes for the duplicate-Discord-message bug where the
same prompt would be answered by two different claude subprocesses
running in parallel.

Root cause: handleChatCompletions had no concurrency control and no
way to detect when OpenClaw closed the upstream HTTP connection. When
OpenClaw's idle watchdog tripped (default 120s of stream silence), it
would close the socket and retry the prompt — but the original claude
subprocess kept running, and the bridge spawned a second one alongside
it. Both eventually streamed back, both got delivered to Discord.

Native (non-bridge) flow doesn't hit this because OpenClaw's fetch is
abort-aware end-to-end: attempt timeout fires AbortSignal, fetch closes
the socket, the model provider sees it, work stops. Bridge broke the
chain at "spawn subprocess" — this restores it.

Changes:

* SSE heartbeat (server.ts): write a `: keepalive\n\n` SSE comment
  every 30s while a turn is in flight. Counts as bytes on the wire so
  upstream idle timer resets, but is a spec-mandated no-op for the
  OpenAI stream parser. Eliminates the 120s-silence trigger that was
  causing OpenClaw to give up on long tool-call sequences in the first
  place.

* Abort propagation (server.ts + both adapters): hook req.on('close')
  to an AbortController and pass signal: through to dispatchToClaude /
  dispatchToGemini. Adapters listen on signal abort and call markDone
  → scheduleCleanup which SIGTERMs the child process group (3s grace
  for claude, 5s for gemini) then SIGKILLs. Mirrors what native fetch
  does when its caller aborts.

* Per-sessionKey FIFO queue (server.ts): same-session turns serialize
  via a Map<sessionKey, Promise<void>> chain so a user firing multiple
  Discord messages back-to-back gets them processed in order rather
  than spawning concurrent subprocesses (which would corrupt the shared
  --resume session file). Cross-session requests live on independent
  chains and run in parallel.

Subtle correctness points:

* getSession() moved to head-of-queue so we resume into the latest
  claudeSessionId from the just-finished prior turn instead of a stale
  request-arrival snapshot.
* Aborted turns skip session-map persistence — the subprocess may have
  already updated its own session file on disk, so the next retry
  resumes from there.
* Queue chain GC uses Map identity check so we don't delete an entry
  that a later request has already chained onto.
* prev.then(() => mySlot, () => mySlot) tolerates a crashed prior turn
  so the chain doesn't poison forever.
* writeHead(200) before queue wait so OpenClaw sees response status
  immediately; heartbeat covers the queue-wait quiet period.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 23:58:17 +00:00

314 lines
10 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import os from "node:os";
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
export type ClaudeMessage =
| { type: "text"; text: string }
| { type: "done"; sessionId: string }
| { type: "error"; message: string }
/**
* Terminal error from the CLI's `result` event (e.g. `is_error: true` with
* `terminal_reason: "prompt_too_long"`). The bridge uses this signal to
* drop the session-map entry so the next turn starts a fresh CLI session
* instead of `--resume`-ing into the same poisoned context.
*/
| { type: "result_error"; sessionId: string; reason: string; message: string };
export type OpenAITool = {
type: "function";
function: { name: string; description?: string; parameters?: unknown };
};
export type ClaudeDispatchOptions = {
prompt: string;
/** Appended to Claude Code's built-in system prompt via --append-system-prompt on every invocation.
* Stateless: not persisted in session file, fully replaces any prior appended content on resume. */
systemPrompt?: string;
workspace: string;
agentId?: string;
resumeSessionId?: string;
permissionMode?: string;
/** OpenClaw tool definitions to expose to Claude as MCP tools */
openclawTools?: OpenAITool[];
/** Bridge port for MCP proxy callbacks */
bridgePort?: number;
/** Bridge API key for MCP proxy callbacks */
bridgeApiKey?: string;
/**
* Abort signal from the bridge. When fired (typically because the upstream
* HTTP client closed the socket — OpenClaw's attempt-level retry / cancel),
* we kill the claude subprocess group and break out of the iterator
* promptly so a stale subprocess doesn't keep streaming into a dead socket
* (or worse, get its output multiplexed with a fresh subprocess started by
* a retry).
*/
signal?: AbortSignal;
};
// Resolve the MCP server script path relative to this file.
// Installed layout: plugin root / core / claude / sdk-adapter.ts
// plugin root / services / openclaw-mcp-server.mjs
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const MCP_SERVER_SCRIPT = path.resolve(__dirname, "../../services/openclaw-mcp-server.mjs");
/**
* Write OpenClaw tool definitions to a temp file and create an --mcp-config JSON
* so Claude Code can call them as `mcp__openclaw__<toolname>` tools.
*
* Returns the path to the mcp-config JSON file, or null if setup fails.
*/
function setupMcpConfig(
tools: OpenAITool[],
bridgePort: number,
bridgeApiKey: string,
workspace: string,
agentId: string,
): string | null {
if (!tools.length) return null;
if (!fs.existsSync(MCP_SERVER_SCRIPT)) return null;
try {
const tmpDir = os.tmpdir();
const sessionId = `oc-${Date.now()}`;
const toolDefsPath = path.join(tmpDir, `${sessionId}-tools.json`);
const mcpConfigPath = path.join(tmpDir, `${sessionId}-mcp.json`);
fs.writeFileSync(toolDefsPath, JSON.stringify(tools, null, 2), "utf8");
const mcpConfig = {
mcpServers: {
openclaw: {
command: process.execPath,
args: [MCP_SERVER_SCRIPT],
env: {
TOOL_DEFS_FILE: toolDefsPath,
BRIDGE_EXECUTE_URL: `http://127.0.0.1:${bridgePort}/mcp/execute`,
BRIDGE_API_KEY: bridgeApiKey,
WORKSPACE: workspace,
AGENT_ID: agentId,
},
},
},
};
fs.writeFileSync(mcpConfigPath, JSON.stringify(mcpConfig, null, 2), "utf8");
return mcpConfigPath;
} catch {
return null;
}
}
/**
* Dispatch a turn to Claude Code using `claude -p --output-format stream-json --verbose`.
* Returns an async iterable of ClaudeMessage events.
*/
export async function* dispatchToClaude(
opts: ClaudeDispatchOptions,
): AsyncIterable<ClaudeMessage> {
const {
prompt,
systemPrompt,
workspace,
agentId = "",
resumeSessionId,
permissionMode = "default",
openclawTools,
bridgePort = 18800,
bridgeApiKey = "",
signal,
} = opts;
// NOTE: put prompt right after -p, before --mcp-config.
// --mcp-config takes <configs...> (multiple values) and would greedily
// consume the prompt if it came after --mcp-config.
const args: string[] = [
"-p",
prompt,
"--output-format", "stream-json",
"--verbose",
"--allowedTools", "Bash Edit Write Read Glob Grep WebFetch WebSearch NotebookEdit Monitor TodoWrite mcp__openclaw__*",
];
// --append-system-prompt appends to Claude Code's built-in system prompt rather
// than replacing it, preserving the full agent SDK instructions (tool use behavior,
// memory management, etc.). The appended bootstrap (persona + skills) is stateless:
// not persisted in the session file, takes effect every invocation including resumes.
if (systemPrompt) {
args.push("--append-system-prompt", systemPrompt);
}
if (resumeSessionId) {
args.push("--resume", resumeSessionId);
}
// Set up MCP proxy every turn — the MCP server process exits with each `claude -p`
// invocation, so --resume sessions also need --mcp-config to restart it.
// Put --mcp-config after the prompt so its <configs...> variadic doesn't consume the prompt.
let mcpConfigPath: string | null = null;
if (openclawTools?.length) {
mcpConfigPath = setupMcpConfig(openclawTools, bridgePort, bridgeApiKey, workspace, agentId);
if (mcpConfigPath) {
args.push("--mcp-config", mcpConfigPath);
}
}
// detached:true puts claude in its own process group. Claude's Bash tool
// occasionally leaks shells/ssh that keep claude alive past end-of-turn; when
// that happens we SIGKILL the whole group rather than wait forever.
const child = spawn("claude", args, {
cwd: workspace,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env },
detached: true,
});
const stderrLines: string[] = [];
child.stderr?.on("data", (chunk: Buffer) => {
stderrLines.push(chunk.toString("utf8").trim());
});
const rl = createInterface({ input: child.stdout!, crlfDelay: Infinity });
type CapturedResultError = { reason: string; message: string };
let capturedSessionId = "";
let capturedResultError = null as CapturedResultError | null;
const events: ClaudeMessage[] = [];
let done = false;
let resolveNext: (() => void) | null = null;
let cleanupScheduled = false;
const scheduleCleanup = (): void => {
if (cleanupScheduled) return;
cleanupScheduled = true;
const killGroup = (sig: NodeJS.Signals): void => {
if (child.pid == null || child.exitCode !== null) return;
try { process.kill(-child.pid, sig); } catch { /* already gone */ }
};
const termTimer = setTimeout(() => killGroup("SIGTERM"), 3000);
const killTimer = setTimeout(() => killGroup("SIGKILL"), 10000);
child.once("close", () => {
clearTimeout(termTimer);
clearTimeout(killTimer);
if (mcpConfigPath) {
try { fs.unlinkSync(mcpConfigPath); } catch { /* ignore */ }
}
});
};
const markDone = (): void => {
if (done) return;
done = true;
scheduleCleanup();
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
};
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
// closes the socket, propagate that into our process tree by SIGTERM/SIGKILL
// (via scheduleCleanup) and break out of the iterator (via markDone). This
// prevents stale subprocesses from outliving the request that started them.
if (signal) {
if (signal.aborted) {
markDone();
} else {
signal.addEventListener("abort", () => markDone(), { once: true });
}
}
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: Record<string, unknown>;
try {
event = JSON.parse(line);
} catch {
return;
}
const type = event.type as string;
if (type === "assistant") {
const msg = event.message as { content?: Array<{ type: string; text?: string }> };
for (const block of msg?.content ?? []) {
if (block.type === "text" && block.text) {
events.push({ type: "text", text: block.text });
}
}
}
if (type === "result") {
const sessionId = (event.session_id as string) ?? "";
if (sessionId) capturedSessionId = sessionId;
// CLI signals fatal-but-graceful errors (context overflow, refusal,
// billing, etc.) via `is_error: true` on the result event. Capture the
// reason so the bridge layer can decide whether to invalidate the
// session-map entry (e.g. context overflow → drop, retry next turn).
if (event.is_error === true) {
const reason = (event.terminal_reason as string) ?? (event.subtype as string) ?? "error";
const message = (event.result as string) ?? `claude result error (${reason})`;
capturedResultError = { reason, message };
}
// `result` is the terminal stream-json event; commit the turn without
// waiting for claude's process tree to fully exit (leaked Bash grandchildren
// can otherwise hold stdout open indefinitely).
markDone();
return;
}
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
});
rl.on("close", () => {
// Fallback: claude exited without emitting a result event.
markDone();
});
while (true) {
if (events.length > 0) {
yield events.shift()!;
continue;
}
if (done) break;
await new Promise<void>((resolve) => {
resolveNext = resolve;
});
}
while (events.length > 0) {
yield events.shift()!;
}
// Pull into a local with explicit type so TS doesn't infer the inner field
// accesses as `never` (the field is only ever assigned inside the readline
// callback above, so closure-based narrowing can't see it from this scope).
const resultErr: CapturedResultError | null = capturedResultError;
if (resultErr && capturedSessionId) {
yield {
type: "result_error",
sessionId: capturedSessionId,
reason: resultErr.reason,
message: resultErr.message,
};
} else if (capturedSessionId) {
yield { type: "done", sessionId: capturedSessionId };
} else {
const stderrSummary = stderrLines.join(" ").slice(0, 200);
yield {
type: "error",
message: `claude did not return a session_id${stderrSummary ? `: ${stderrSummary}` : ""}`,
};
}
}