Files
ContractorAgent/plugin/core/claude/sdk-adapter.ts
zhi 992f4d8703 fix(bridge): scope CLI sessions per OpenClaw session and reset on /new
The bridge was keying claudeSessionId by agentId alone, so every Discord
channel, DM, and cron run for a single agent shared one Claude CLI
session. Two consequences in the wild:

  - Cross-channel context bleed: 8.7MB session for `developer` mixed
    references from channels 1474327736242798612 and 1498579994044010566
    plus the operator DM all in one --resume thread.
  - `/new` had no effect on the CLI side. OpenClaw rotated its session
    file but the bridge kept --resume-ing the same long-lived
    claudeSessionId, eventually crossing the 1M model context (debug log
    showed `prompt is too long: 1179616 tokens > 1000000 maximum`).

Changes:

  * input-filter: extract `chat_id` from the Conversation-info
    untrusted-metadata block (scanning all messages, since runtimeOnly
    turns put it in the system prompt) and detect bare `/new`/`/reset`
    via the BARE_SESSION_RESET_PROMPT_BASE marker. Add buildSessionKey
    `${agentId}::${chatId}` and resolveDispatchPrompt fallback for the
    empty user message that OpenClaw sends on bare resets.

  * server: use the composite session key for getSession/putSession;
    on bareSessionReset, removeSession before dispatching so the CLI
    starts a fresh session; on a CLI result_error (typically
    prompt_too_long) drop the entry too so the next turn doesn't
    re-resume into the poisoned context.

  * claude/sdk-adapter: surface CLI terminal errors via a new
    `result_error` event (carries reason + sessionId) so the bridge
    can react instead of just streaming the synthetic
    "Prompt is too long" assistant text and silently re-using the
    same session.

  * index: convert register() to synchronous (OpenClaw rejects async
    register with "plugin register must be synchronous"); replace the
    pre-bind port probe with a server-level EADDRINUSE handler.

  * .gitignore: ignore node_modules/ and dist/.
2026-04-28 12:32:37 +00:00

292 lines
9.4 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import os from "node:os";
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
import { fileURLToPath } from "node:url";
export type ClaudeMessage =
| { type: "text"; text: string }
| { type: "done"; sessionId: string }
| { type: "error"; message: string }
/**
* Terminal error from the CLI's `result` event (e.g. `is_error: true` with
* `terminal_reason: "prompt_too_long"`). The bridge uses this signal to
* drop the session-map entry so the next turn starts a fresh CLI session
* instead of `--resume`-ing into the same poisoned context.
*/
| { type: "result_error"; sessionId: string; reason: string; message: string };
export type OpenAITool = {
type: "function";
function: { name: string; description?: string; parameters?: unknown };
};
export type ClaudeDispatchOptions = {
prompt: string;
/** Appended to Claude Code's built-in system prompt via --append-system-prompt on every invocation.
* Stateless: not persisted in session file, fully replaces any prior appended content on resume. */
systemPrompt?: string;
workspace: string;
agentId?: string;
resumeSessionId?: string;
permissionMode?: string;
/** OpenClaw tool definitions to expose to Claude as MCP tools */
openclawTools?: OpenAITool[];
/** Bridge port for MCP proxy callbacks */
bridgePort?: number;
/** Bridge API key for MCP proxy callbacks */
bridgeApiKey?: string;
};
// Resolve the MCP server script path relative to this file.
// Installed layout: plugin root / core / claude / sdk-adapter.ts
// plugin root / services / openclaw-mcp-server.mjs
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const MCP_SERVER_SCRIPT = path.resolve(__dirname, "../../services/openclaw-mcp-server.mjs");
/**
* Write OpenClaw tool definitions to a temp file and create an --mcp-config JSON
* so Claude Code can call them as `mcp__openclaw__<toolname>` tools.
*
* Returns the path to the mcp-config JSON file, or null if setup fails.
*/
function setupMcpConfig(
tools: OpenAITool[],
bridgePort: number,
bridgeApiKey: string,
workspace: string,
agentId: string,
): string | null {
if (!tools.length) return null;
if (!fs.existsSync(MCP_SERVER_SCRIPT)) return null;
try {
const tmpDir = os.tmpdir();
const sessionId = `oc-${Date.now()}`;
const toolDefsPath = path.join(tmpDir, `${sessionId}-tools.json`);
const mcpConfigPath = path.join(tmpDir, `${sessionId}-mcp.json`);
fs.writeFileSync(toolDefsPath, JSON.stringify(tools, null, 2), "utf8");
const mcpConfig = {
mcpServers: {
openclaw: {
command: process.execPath,
args: [MCP_SERVER_SCRIPT],
env: {
TOOL_DEFS_FILE: toolDefsPath,
BRIDGE_EXECUTE_URL: `http://127.0.0.1:${bridgePort}/mcp/execute`,
BRIDGE_API_KEY: bridgeApiKey,
WORKSPACE: workspace,
AGENT_ID: agentId,
},
},
},
};
fs.writeFileSync(mcpConfigPath, JSON.stringify(mcpConfig, null, 2), "utf8");
return mcpConfigPath;
} catch {
return null;
}
}
/**
* Dispatch a turn to Claude Code using `claude -p --output-format stream-json --verbose`.
* Returns an async iterable of ClaudeMessage events.
*/
export async function* dispatchToClaude(
opts: ClaudeDispatchOptions,
): AsyncIterable<ClaudeMessage> {
const {
prompt,
systemPrompt,
workspace,
agentId = "",
resumeSessionId,
permissionMode = "default",
openclawTools,
bridgePort = 18800,
bridgeApiKey = "",
} = opts;
// NOTE: put prompt right after -p, before --mcp-config.
// --mcp-config takes <configs...> (multiple values) and would greedily
// consume the prompt if it came after --mcp-config.
const args: string[] = [
"-p",
prompt,
"--output-format", "stream-json",
"--verbose",
"--allowedTools", "Bash Edit Write Read Glob Grep WebFetch WebSearch NotebookEdit Monitor TodoWrite mcp__openclaw__*",
];
// --append-system-prompt appends to Claude Code's built-in system prompt rather
// than replacing it, preserving the full agent SDK instructions (tool use behavior,
// memory management, etc.). The appended bootstrap (persona + skills) is stateless:
// not persisted in the session file, takes effect every invocation including resumes.
if (systemPrompt) {
args.push("--append-system-prompt", systemPrompt);
}
if (resumeSessionId) {
args.push("--resume", resumeSessionId);
}
// Set up MCP proxy every turn — the MCP server process exits with each `claude -p`
// invocation, so --resume sessions also need --mcp-config to restart it.
// Put --mcp-config after the prompt so its <configs...> variadic doesn't consume the prompt.
let mcpConfigPath: string | null = null;
if (openclawTools?.length) {
mcpConfigPath = setupMcpConfig(openclawTools, bridgePort, bridgeApiKey, workspace, agentId);
if (mcpConfigPath) {
args.push("--mcp-config", mcpConfigPath);
}
}
// detached:true puts claude in its own process group. Claude's Bash tool
// occasionally leaks shells/ssh that keep claude alive past end-of-turn; when
// that happens we SIGKILL the whole group rather than wait forever.
const child = spawn("claude", args, {
cwd: workspace,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env },
detached: true,
});
const stderrLines: string[] = [];
child.stderr?.on("data", (chunk: Buffer) => {
stderrLines.push(chunk.toString("utf8").trim());
});
const rl = createInterface({ input: child.stdout!, crlfDelay: Infinity });
type CapturedResultError = { reason: string; message: string };
let capturedSessionId = "";
let capturedResultError = null as CapturedResultError | null;
const events: ClaudeMessage[] = [];
let done = false;
let resolveNext: (() => void) | null = null;
let cleanupScheduled = false;
const scheduleCleanup = (): void => {
if (cleanupScheduled) return;
cleanupScheduled = true;
const killGroup = (sig: NodeJS.Signals): void => {
if (child.pid == null || child.exitCode !== null) return;
try { process.kill(-child.pid, sig); } catch { /* already gone */ }
};
const termTimer = setTimeout(() => killGroup("SIGTERM"), 3000);
const killTimer = setTimeout(() => killGroup("SIGKILL"), 10000);
child.once("close", () => {
clearTimeout(termTimer);
clearTimeout(killTimer);
if (mcpConfigPath) {
try { fs.unlinkSync(mcpConfigPath); } catch { /* ignore */ }
}
});
};
const markDone = (): void => {
if (done) return;
done = true;
scheduleCleanup();
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
};
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: Record<string, unknown>;
try {
event = JSON.parse(line);
} catch {
return;
}
const type = event.type as string;
if (type === "assistant") {
const msg = event.message as { content?: Array<{ type: string; text?: string }> };
for (const block of msg?.content ?? []) {
if (block.type === "text" && block.text) {
events.push({ type: "text", text: block.text });
}
}
}
if (type === "result") {
const sessionId = (event.session_id as string) ?? "";
if (sessionId) capturedSessionId = sessionId;
// CLI signals fatal-but-graceful errors (context overflow, refusal,
// billing, etc.) via `is_error: true` on the result event. Capture the
// reason so the bridge layer can decide whether to invalidate the
// session-map entry (e.g. context overflow → drop, retry next turn).
if (event.is_error === true) {
const reason = (event.terminal_reason as string) ?? (event.subtype as string) ?? "error";
const message = (event.result as string) ?? `claude result error (${reason})`;
capturedResultError = { reason, message };
}
// `result` is the terminal stream-json event; commit the turn without
// waiting for claude's process tree to fully exit (leaked Bash grandchildren
// can otherwise hold stdout open indefinitely).
markDone();
return;
}
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
});
rl.on("close", () => {
// Fallback: claude exited without emitting a result event.
markDone();
});
while (true) {
if (events.length > 0) {
yield events.shift()!;
continue;
}
if (done) break;
await new Promise<void>((resolve) => {
resolveNext = resolve;
});
}
while (events.length > 0) {
yield events.shift()!;
}
// Pull into a local with explicit type so TS doesn't infer the inner field
// accesses as `never` (the field is only ever assigned inside the readline
// callback above, so closure-based narrowing can't see it from this scope).
const resultErr: CapturedResultError | null = capturedResultError;
if (resultErr && capturedSessionId) {
yield {
type: "result_error",
sessionId: capturedSessionId,
reason: resultErr.reason,
message: resultErr.message,
};
} else if (capturedSessionId) {
yield { type: "done", sessionId: capturedSessionId };
} else {
const stderrSummary = stderrLines.join(" ").slice(0, 200);
yield {
type: "error",
message: `claude did not return a session_id${stderrSummary ? `: ${stderrSummary}` : ""}`,
};
}
}