diff --git a/plugin/core/claude/sdk-adapter.ts b/plugin/core/claude/sdk-adapter.ts index 2980ff4..96d1349 100644 --- a/plugin/core/claude/sdk-adapter.ts +++ b/plugin/core/claude/sdk-adapter.ts @@ -37,6 +37,15 @@ export type ClaudeDispatchOptions = { bridgePort?: number; /** Bridge API key for MCP proxy callbacks */ bridgeApiKey?: string; + /** + * Abort signal from the bridge. When fired (typically because the upstream + * HTTP client closed the socket — OpenClaw's attempt-level retry / cancel), + * we kill the claude subprocess group and break out of the iterator + * promptly so a stale subprocess doesn't keep streaming into a dead socket + * (or worse, get its output multiplexed with a fresh subprocess started by + * a retry). + */ + signal?: AbortSignal; }; // Resolve the MCP server script path relative to this file. @@ -109,6 +118,7 @@ export async function* dispatchToClaude( openclawTools, bridgePort = 18800, bridgeApiKey = "", + signal, } = opts; // NOTE: put prompt right after -p, before --mcp-config. @@ -202,6 +212,18 @@ export async function* dispatchToClaude( } }; + // Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw) + // closes the socket, propagate that into our process tree by SIGTERM/SIGKILL + // (via scheduleCleanup) and break out of the iterator (via markDone). This + // prevents stale subprocesses from outliving the request that started them. + if (signal) { + if (signal.aborted) { + markDone(); + } else { + signal.addEventListener("abort", () => markDone(), { once: true }); + } + } + rl.on("line", (line: string) => { if (!line.trim()) return; let event: Record; diff --git a/plugin/core/gemini/sdk-adapter.ts b/plugin/core/gemini/sdk-adapter.ts index 11cd6e2..d3d935e 100644 --- a/plugin/core/gemini/sdk-adapter.ts +++ b/plugin/core/gemini/sdk-adapter.ts @@ -29,6 +29,13 @@ export type GeminiDispatchOptions = { openclawTools?: OpenAITool[]; bridgePort?: number; bridgeApiKey?: string; + /** + * Abort signal from the bridge. Mirror of dispatchToClaude's `signal` — + * see that file for rationale. When fired, we kill the gemini subprocess + * and break the iterator promptly so a stale process doesn't outlive + * the upstream request. + */ + signal?: AbortSignal; }; /** @@ -116,6 +123,7 @@ export async function* dispatchToGemini( openclawTools, bridgePort = 18800, bridgeApiKey = "", + signal, } = opts; // Write system-level instructions to workspace/GEMINI.md every turn. @@ -166,6 +174,43 @@ export async function* dispatchToGemini( let done = false; let resolveNext: (() => void) | null = null; + // Cleanup helper: SIGTERM the child first, then SIGKILL after a grace + // period if it hasn't exited. Idempotent — safe to call multiple times. + let cleanupScheduled = false; + const scheduleCleanup = (): void => { + if (cleanupScheduled) return; + cleanupScheduled = true; + if (child.exitCode !== null) return; + try { child.kill("SIGTERM"); } catch { /* already gone */ } + const killTimer = setTimeout(() => { + try { child.kill("SIGKILL"); } catch { /* already gone */ } + }, 5000); + killTimer.unref?.(); + child.once("close", () => clearTimeout(killTimer)); + }; + + const markDone = (): void => { + if (done) return; + done = true; + scheduleCleanup(); + if (resolveNext) { + const r = resolveNext; + resolveNext = null; + r(); + } + }; + + // Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw) + // closes the socket, kill the gemini subprocess and break the iterator. + // See dispatchToClaude for the full rationale. + if (signal) { + if (signal.aborted) { + markDone(); + } else { + signal.addEventListener("abort", () => markDone(), { once: true }); + } + } + rl.on("line", (line: string) => { if (!line.trim()) return; let event: Record; diff --git a/plugin/web/server.ts b/plugin/web/server.ts index 068a519..82aa2e8 100644 --- a/plugin/web/server.ts +++ b/plugin/web/server.ts @@ -88,6 +88,27 @@ function parseBody(req: http.IncomingMessage): Promise { return parseBodyRaw(req) as Promise; } +/** + * Per-sessionKey FIFO queue: same-session turns serialize so a user firing + * multiple Discord messages back-to-back gets them processed in order rather + * than spawning concurrent claude subprocesses. Cross-session requests bypass + * each other's chains and run in parallel. + * + * The chain is `prev.then(() => mySlot)` per request — `mySlot` is released + * in the request's finally block, so the next request waits until our work + * completes (success or failure) before starting. + */ +const queueBySession = new Map>(); + +/** + * SSE keepalive cadence. Bridge writes `: keepalive\n\n` (an SSE comment + * frame, no-op for the OpenAI stream parser) on this interval while a turn + * is in flight or queued. This keeps OpenClaw's LLM idle watchdog (default + * 120s of stream silence → attempt failure → retry) from firing when the + * underlying claude subprocess is in a long quiet tool-call phase or while + * we're waiting our turn in the per-session queue. + */ +const HEARTBEAT_MS = 30_000; const _G = globalThis as Record; @@ -156,33 +177,32 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server { // Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude const isGemini = typeof body.model === "string" && body.model.includes("gemini"); - // Look up existing session (shared structure for both Claude and Gemini). - // On a bare /new or /reset turn we deliberately drop the existing entry so - // the CLI starts a fresh session — otherwise --resume would bring back the - // very history the user just asked to abandon. - let existingEntry = sessionKey ? getSession(workspace, sessionKey) : null; - if (bareSessionReset && existingEntry && sessionKey) { + // ── Abort propagation ──────────────────────────────────────────────────── + // OpenClaw's LLM client cancels in-flight HTTP requests via AbortSignal + // when an attempt fails or the user cancels. On the wire this manifests + // as the client closing the socket, surfaced here as `req.on('close')`. + // We mirror that signal into our own AbortController and propagate it + // down to the dispatchTo{Claude,Gemini} adapter, which kills the + // subprocess. Without this, OpenClaw silently drops the response while + // the subprocess keeps running, and a retry spawns another subprocess + // alongside the orphan — the root cause of the duplicate-Discord-message + // bug we hit in May 2026. + const abortController = new AbortController(); + let clientDisconnected = false; + const onClose = (): void => { + if (clientDisconnected) return; + clientDisconnected = true; logger.info( - `[contractor-bridge] bare /new detected — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`, + `[contractor-bridge] client disconnected sessionKey=${sessionKey} — aborting in-flight work`, ); - removeSession(workspace, sessionKey); - existingEntry = null; - } - let resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null; + abortController.abort(); + }; + req.on("close", onClose); - // Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files). - // For Claude: --system-prompt fully replaces any prior system prompt each invocation. - // For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn. - // This keeps persona and skills current without needing to track first-turn state. - const systemPrompt = buildBootstrap({ - agentId, - openclawSessionKey: sessionKey, - workspace, - skillsBlock: skillsBlock || undefined, - workspaceContextFiles, - }); - - // Start SSE response + // Start SSE response immediately so OpenClaw sees the response status + // quickly. We may still spend time waiting in the per-session queue + // before any real chunk goes out — heartbeat (below) keeps that quiet + // window from tripping the upstream idle watchdog. res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", @@ -190,14 +210,96 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server { "Transfer-Encoding": "chunked", }); + // ── SSE heartbeat ──────────────────────────────────────────────────────── + // OpenClaw's idle timeout fires after 120s of total stream silence with + // no model progress (`LLM idle timeout (120s): no response from model`). + // claude -p can easily produce 120s+ of zero assistant-text output during + // long Bash / file / MCP tool sequences, since this bridge only forwards + // assistant text deltas as SSE chunks (tool_use blocks are not surfaced). + // A periodic SSE comment frame counts as bytes on the wire and resets + // the upstream idle timer, while being a no-op for the OpenAI stream + // parser. + const heartbeat = setInterval(() => { + if (clientDisconnected || res.writableEnded) return; + try { + res.write(": keepalive\n\n"); + } catch { + /* socket dead, ignore */ + } + }, HEARTBEAT_MS); + heartbeat.unref?.(); + + // ── Per-sessionKey FIFO queue ──────────────────────────────────────────── + // Same-sessionKey turns serialize: if a user fires multiple Discord + // messages quickly, we process them in order rather than starting + // concurrent claude subprocesses (which would corrupt the shared session + // file and produce overlapping replies). Cross-sessionKey requests run + // in parallel — they live on independent chains. + let releaseSlot: () => void = () => {}; + const mySlot = new Promise((r) => { + releaseSlot = r; + }); + const prev = sessionKey + ? queueBySession.get(sessionKey) ?? Promise.resolve() + : Promise.resolve(); + // `myChainTail` advances only after `releaseSlot()` (called in our + // finally block). Tolerate prev rejecting so a crashed earlier turn + // doesn't poison the chain forever. + const myChainTail = prev.then(() => mySlot, () => mySlot); + if (sessionKey) queueBySession.set(sessionKey, myChainTail); + const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`; let newSessionId = ""; let hasError = false; let resultErrorReason: string | null = null; - - const openclawTools = body.tools ?? []; + let existingEntry: ReturnType = null; try { + // Block until the previous turn on this sessionKey finishes. + // Cross-session requests skip this wait (their `prev` is a fresh + // Promise.resolve()). + await prev.catch(() => {}); + + // While queued, OpenClaw may have given up on this request (attempt + // timeout, user cancel, etc.) and closed the socket. If so, exit + // cleanly without dispatching — the client wouldn't see the output + // anyway. + if (clientDisconnected) { + logger.info( + `[contractor-bridge] dropped queued request sessionKey=${sessionKey} (client disconnected during queue wait)`, + ); + return; + } + + // Look up the session-map entry NOW (at the head of the queue), not + // earlier — the previous turn on the same sessionKey may have just + // updated `claudeSessionId` and we want to resume into the latest + // one rather than a stale snapshot from request-arrival time. + existingEntry = sessionKey ? getSession(workspace, sessionKey) : null; + if (bareSessionReset && existingEntry && sessionKey) { + logger.info( + `[contractor-bridge] bare /new detected — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`, + ); + removeSession(workspace, sessionKey); + existingEntry = null; + } + const resumeSessionId = + existingEntry?.state === "active" ? existingEntry.claudeSessionId : null; + + // Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files). + // For Claude: --append-system-prompt appends to the built-in prompt each invocation. + // For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn. + // This keeps persona and skills current without needing to track first-turn state. + const systemPrompt = buildBootstrap({ + agentId, + openclawSessionKey: sessionKey, + workspace, + skillsBlock: skillsBlock || undefined, + workspaceContextFiles, + }); + + const openclawTools = body.tools ?? []; + const dispatchIter = isGemini ? dispatchToGemini({ prompt: dispatchPrompt, @@ -208,6 +310,7 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server { openclawTools, bridgePort: port, bridgeApiKey: apiKey, + signal: abortController.signal, }) : dispatchToClaude({ prompt: dispatchPrompt, @@ -219,68 +322,106 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server { openclawTools, bridgePort: port, bridgeApiKey: apiKey, + signal: abortController.signal, }); - for await (const event of dispatchIter) { - if (event.type === "text") { - sseWrite(res, buildChunk(completionId, event.text)); - } else if (event.type === "done") { - newSessionId = event.sessionId; - } else if (event.type === "result_error") { - // CLI returned a terminal error (typically context overflow). The - // text was already streamed via prior `text` events; record the - // session so we can drop it below and log the reason. - logger.warn( - `[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`, - ); - resultErrorReason = event.reason; - newSessionId = event.sessionId; - } else if (event.type === "error") { - logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`); - hasError = true; - sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`)); + try { + for await (const event of dispatchIter) { + if (event.type === "text") { + if (!clientDisconnected) sseWrite(res, buildChunk(completionId, event.text)); + } else if (event.type === "done") { + newSessionId = event.sessionId; + } else if (event.type === "result_error") { + // CLI signaled a fatal-but-graceful error (context overflow, + // refusal, billing, etc.) via `is_error: true` on the result + // event. The text was already streamed via prior `text` events; + // record the reason so the bridge can drop the session-map entry + // below. + logger.warn( + `[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`, + ); + resultErrorReason = event.reason; + newSessionId = event.sessionId; + } else if (event.type === "error") { + logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`); + hasError = true; + if (!clientDisconnected) { + sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`)); + } + } + } + } catch (err) { + logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`); + hasError = true; + if (!clientDisconnected) { + sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`)); } } - } catch (err) { - logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`); - hasError = true; - sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`)); - } - sseWrite(res, buildStopChunk(completionId)); - sseWrite(res, "[DONE]"); - res.end(); + if (!clientDisconnected && !res.writableEnded) { + sseWrite(res, buildStopChunk(completionId)); + sseWrite(res, "[DONE]"); + } - // Session-map persistence: - // - Successful turn → upsert with the latest claudeSessionId so the next - // turn can `--resume` into it. - // - Terminal CLI error (context overflow etc., reported via result_error) - // → drop the entry so the next turn starts fresh instead of resuming - // into the same poisoned session and re-erroring. - // - Stream/transport error before any sessionId was captured → mark the - // prior entry orphaned (existing behavior). - if (resultErrorReason && sessionKey) { - logger.info( - `[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`, - ); - removeSession(workspace, sessionKey); - } else if (newSessionId && sessionKey && !hasError) { - const now = new Date().toISOString(); - putSession(workspace, { - openclawSessionKey: sessionKey, - agentId, - contractor: isGemini ? "gemini" : "claude", - claudeSessionId: newSessionId, - workspace, - createdAt: existingEntry?.createdAt ?? now, - lastActivityAt: now, - state: "active", - }); - logger.info( - `[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`, - ); - } else if (hasError && sessionKey && existingEntry) { - markOrphaned(workspace, sessionKey); + // Session-map persistence: + // - Successful turn → upsert with the latest claudeSessionId so the next + // turn can `--resume` into it. + // - Terminal CLI error (context overflow etc., reported via result_error) + // → drop the entry so the next turn starts fresh instead of resuming + // into the same poisoned session and re-erroring. + // - Stream/transport error before any sessionId was captured → mark the + // prior entry orphaned (existing behavior). + // - Aborted (client disconnected) → don't touch the session-map; the + // subprocess may have already updated its own session file on disk, + // and the next turn will resume from whatever it left there. + if (clientDisconnected) { + // No-op: aborted turn; preserve whatever the prior entry was so the + // next turn (likely an OpenClaw retry of the same prompt) can resume. + } else if (resultErrorReason && sessionKey) { + logger.info( + `[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`, + ); + removeSession(workspace, sessionKey); + } else if (newSessionId && sessionKey && !hasError) { + const now = new Date().toISOString(); + putSession(workspace, { + openclawSessionKey: sessionKey, + agentId, + contractor: isGemini ? "gemini" : "claude", + claudeSessionId: newSessionId, + workspace, + createdAt: existingEntry?.createdAt ?? now, + lastActivityAt: now, + state: "active", + }); + logger.info( + `[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`, + ); + } else if (hasError && sessionKey && existingEntry) { + markOrphaned(workspace, sessionKey); + } + } finally { + clearInterval(heartbeat); + req.off("close", onClose); + + // Release our slot so the next request on this sessionKey can proceed. + // Must happen even on abort/throw, otherwise the chain stalls forever. + releaseSlot(); + + // Garbage-collect the queue entry if no later request chained on us. + // The Map identity check is the only safe way to tell — newer requests + // overwrite the entry with their own chain tail. + if (sessionKey && queueBySession.get(sessionKey) === myChainTail) { + queueBySession.delete(sessionKey); + } + + if (!res.writableEnded) { + try { + res.end(); + } catch { + /* ignore */ + } + } } }