fix(bridge): abort propagation, SSE heartbeat, per-session FIFO queue

Three coordinated fixes for the duplicate-Discord-message bug where the
same prompt would be answered by two different claude subprocesses
running in parallel.

Root cause: handleChatCompletions had no concurrency control and no
way to detect when OpenClaw closed the upstream HTTP connection. When
OpenClaw's idle watchdog tripped (default 120s of stream silence), it
would close the socket and retry the prompt — but the original claude
subprocess kept running, and the bridge spawned a second one alongside
it. Both eventually streamed back, both got delivered to Discord.

Native (non-bridge) flow doesn't hit this because OpenClaw's fetch is
abort-aware end-to-end: attempt timeout fires AbortSignal, fetch closes
the socket, the model provider sees it, work stops. Bridge broke the
chain at "spawn subprocess" — this restores it.

Changes:

* SSE heartbeat (server.ts): write a `: keepalive\n\n` SSE comment
  every 30s while a turn is in flight. Counts as bytes on the wire so
  upstream idle timer resets, but is a spec-mandated no-op for the
  OpenAI stream parser. Eliminates the 120s-silence trigger that was
  causing OpenClaw to give up on long tool-call sequences in the first
  place.

* Abort propagation (server.ts + both adapters): hook req.on('close')
  to an AbortController and pass signal: through to dispatchToClaude /
  dispatchToGemini. Adapters listen on signal abort and call markDone
  → scheduleCleanup which SIGTERMs the child process group (3s grace
  for claude, 5s for gemini) then SIGKILLs. Mirrors what native fetch
  does when its caller aborts.

* Per-sessionKey FIFO queue (server.ts): same-session turns serialize
  via a Map<sessionKey, Promise<void>> chain so a user firing multiple
  Discord messages back-to-back gets them processed in order rather
  than spawning concurrent subprocesses (which would corrupt the shared
  --resume session file). Cross-session requests live on independent
  chains and run in parallel.

Subtle correctness points:

* getSession() moved to head-of-queue so we resume into the latest
  claudeSessionId from the just-finished prior turn instead of a stale
  request-arrival snapshot.
* Aborted turns skip session-map persistence — the subprocess may have
  already updated its own session file on disk, so the next retry
  resumes from there.
* Queue chain GC uses Map identity check so we don't delete an entry
  that a later request has already chained onto.
* prev.then(() => mySlot, () => mySlot) tolerates a crashed prior turn
  so the chain doesn't poison forever.
* writeHead(200) before queue wait so OpenClaw sees response status
  immediately; heartbeat covers the queue-wait quiet period.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
zhi
2026-05-07 23:58:17 +00:00
parent b94e0d25f6
commit 2e64e9ce02
3 changed files with 290 additions and 82 deletions

View File

@@ -37,6 +37,15 @@ export type ClaudeDispatchOptions = {
bridgePort?: number;
/** Bridge API key for MCP proxy callbacks */
bridgeApiKey?: string;
/**
* Abort signal from the bridge. When fired (typically because the upstream
* HTTP client closed the socket — OpenClaw's attempt-level retry / cancel),
* we kill the claude subprocess group and break out of the iterator
* promptly so a stale subprocess doesn't keep streaming into a dead socket
* (or worse, get its output multiplexed with a fresh subprocess started by
* a retry).
*/
signal?: AbortSignal;
};
// Resolve the MCP server script path relative to this file.
@@ -109,6 +118,7 @@ export async function* dispatchToClaude(
openclawTools,
bridgePort = 18800,
bridgeApiKey = "",
signal,
} = opts;
// NOTE: put prompt right after -p, before --mcp-config.
@@ -202,6 +212,18 @@ export async function* dispatchToClaude(
}
};
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
// closes the socket, propagate that into our process tree by SIGTERM/SIGKILL
// (via scheduleCleanup) and break out of the iterator (via markDone). This
// prevents stale subprocesses from outliving the request that started them.
if (signal) {
if (signal.aborted) {
markDone();
} else {
signal.addEventListener("abort", () => markDone(), { once: true });
}
}
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: Record<string, unknown>;

View File

@@ -29,6 +29,13 @@ export type GeminiDispatchOptions = {
openclawTools?: OpenAITool[];
bridgePort?: number;
bridgeApiKey?: string;
/**
* Abort signal from the bridge. Mirror of dispatchToClaude's `signal` —
* see that file for rationale. When fired, we kill the gemini subprocess
* and break the iterator promptly so a stale process doesn't outlive
* the upstream request.
*/
signal?: AbortSignal;
};
/**
@@ -116,6 +123,7 @@ export async function* dispatchToGemini(
openclawTools,
bridgePort = 18800,
bridgeApiKey = "",
signal,
} = opts;
// Write system-level instructions to workspace/GEMINI.md every turn.
@@ -166,6 +174,43 @@ export async function* dispatchToGemini(
let done = false;
let resolveNext: (() => void) | null = null;
// Cleanup helper: SIGTERM the child first, then SIGKILL after a grace
// period if it hasn't exited. Idempotent — safe to call multiple times.
let cleanupScheduled = false;
const scheduleCleanup = (): void => {
if (cleanupScheduled) return;
cleanupScheduled = true;
if (child.exitCode !== null) return;
try { child.kill("SIGTERM"); } catch { /* already gone */ }
const killTimer = setTimeout(() => {
try { child.kill("SIGKILL"); } catch { /* already gone */ }
}, 5000);
killTimer.unref?.();
child.once("close", () => clearTimeout(killTimer));
};
const markDone = (): void => {
if (done) return;
done = true;
scheduleCleanup();
if (resolveNext) {
const r = resolveNext;
resolveNext = null;
r();
}
};
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
// closes the socket, kill the gemini subprocess and break the iterator.
// See dispatchToClaude for the full rationale.
if (signal) {
if (signal.aborted) {
markDone();
} else {
signal.addEventListener("abort", () => markDone(), { once: true });
}
}
rl.on("line", (line: string) => {
if (!line.trim()) return;
let event: Record<string, unknown>;