Three coordinated fixes for the duplicate-Discord-message bug where the
same prompt would be answered by two different claude subprocesses
running in parallel.
Root cause: handleChatCompletions had no concurrency control and no
way to detect when OpenClaw closed the upstream HTTP connection. When
OpenClaw's idle watchdog tripped (default 120s of stream silence), it
would close the socket and retry the prompt — but the original claude
subprocess kept running, and the bridge spawned a second one alongside
it. Both eventually streamed back, both got delivered to Discord.
Native (non-bridge) flow doesn't hit this because OpenClaw's fetch is
abort-aware end-to-end: attempt timeout fires AbortSignal, fetch closes
the socket, the model provider sees it, work stops. Bridge broke the
chain at "spawn subprocess" — this restores it.
Changes:
* SSE heartbeat (server.ts): write a `: keepalive\n\n` SSE comment
every 30s while a turn is in flight. Counts as bytes on the wire so
upstream idle timer resets, but is a spec-mandated no-op for the
OpenAI stream parser. Eliminates the 120s-silence trigger that was
causing OpenClaw to give up on long tool-call sequences in the first
place.
* Abort propagation (server.ts + both adapters): hook req.on('close')
to an AbortController and pass signal: through to dispatchToClaude /
dispatchToGemini. Adapters listen on signal abort and call markDone
→ scheduleCleanup which SIGTERMs the child process group (3s grace
for claude, 5s for gemini) then SIGKILLs. Mirrors what native fetch
does when its caller aborts.
* Per-sessionKey FIFO queue (server.ts): same-session turns serialize
via a Map<sessionKey, Promise<void>> chain so a user firing multiple
Discord messages back-to-back gets them processed in order rather
than spawning concurrent subprocesses (which would corrupt the shared
--resume session file). Cross-session requests live on independent
chains and run in parallel.
Subtle correctness points:
* getSession() moved to head-of-queue so we resume into the latest
claudeSessionId from the just-finished prior turn instead of a stale
request-arrival snapshot.
* Aborted turns skip session-map persistence — the subprocess may have
already updated its own session file on disk, so the next retry
resumes from there.
* Queue chain GC uses Map identity check so we don't delete an entry
that a later request has already chained onto.
* prev.then(() => mySlot, () => mySlot) tolerates a crashed prior turn
so the chain doesn't poison forever.
* writeHead(200) before queue wait so OpenClaw sees response status
immediately; heartbeat covers the queue-wait quiet period.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
289 lines
8.8 KiB
TypeScript
289 lines
8.8 KiB
TypeScript
import fs from "node:fs";
|
|
import path from "node:path";
|
|
import os from "node:os";
|
|
import { spawn } from "node:child_process";
|
|
import { createInterface } from "node:readline";
|
|
import { fileURLToPath } from "node:url";
|
|
import type { ClaudeMessage, OpenAITool } from "../claude/sdk-adapter.js";
|
|
|
|
// Resolve the MCP server script path relative to this file.
|
|
// Installed layout: plugin root / core / gemini / sdk-adapter.ts
|
|
// plugin root / services / openclaw-mcp-server.mjs
|
|
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
|
const MCP_SERVER_SCRIPT = path.resolve(__dirname, "../../services/openclaw-mcp-server.mjs");
|
|
|
|
export type GeminiDispatchOptions = {
|
|
prompt: string;
|
|
/**
|
|
* System-level instructions written to workspace/GEMINI.md before each invocation.
|
|
* Gemini CLI reads GEMINI.md from cwd on every turn (including resumes) — it is NOT
|
|
* stored in the session file — so updating it takes effect immediately.
|
|
*/
|
|
systemPrompt?: string;
|
|
workspace: string;
|
|
agentId?: string;
|
|
resumeSessionId?: string;
|
|
/** Gemini model override (e.g. "gemini-2.5-flash"). Defaults to gemini-cli's configured default. */
|
|
model?: string;
|
|
/** OpenClaw tool definitions to expose via MCP */
|
|
openclawTools?: OpenAITool[];
|
|
bridgePort?: number;
|
|
bridgeApiKey?: string;
|
|
/**
|
|
* Abort signal from the bridge. Mirror of dispatchToClaude's `signal` —
|
|
* see that file for rationale. When fired, we kill the gemini subprocess
|
|
* and break the iterator promptly so a stale process doesn't outlive
|
|
* the upstream request.
|
|
*/
|
|
signal?: AbortSignal;
|
|
};
|
|
|
|
/**
|
|
* Write the OpenClaw MCP server config to the workspace's .gemini/settings.json.
|
|
*
|
|
* Unlike Claude's --mcp-config flag, Gemini CLI reads MCP configuration from the
|
|
* project settings file (.gemini/settings.json in the cwd). We write it before
|
|
* each invocation to keep the tool list in sync with the current turn's tools.
|
|
*
|
|
* The file is merged with any existing settings to preserve user/project config.
|
|
*/
|
|
function setupGeminiMcpSettings(
|
|
tools: OpenAITool[],
|
|
bridgePort: number,
|
|
bridgeApiKey: string,
|
|
workspace: string,
|
|
agentId: string,
|
|
): void {
|
|
const geminiDir = path.join(workspace, ".gemini");
|
|
const settingsPath = path.join(geminiDir, "settings.json");
|
|
|
|
fs.mkdirSync(geminiDir, { recursive: true });
|
|
|
|
// Preserve existing non-MCP settings
|
|
let existing: Record<string, unknown> = {};
|
|
if (fs.existsSync(settingsPath)) {
|
|
try {
|
|
existing = JSON.parse(fs.readFileSync(settingsPath, "utf8")) as Record<string, unknown>;
|
|
} catch {
|
|
// corrupt settings — start fresh
|
|
}
|
|
}
|
|
|
|
if (!tools.length || !fs.existsSync(MCP_SERVER_SCRIPT)) {
|
|
delete existing.mcpServers;
|
|
} else {
|
|
const tmpDir = os.tmpdir();
|
|
const sessionId = `oc-${Date.now()}`;
|
|
const toolDefsPath = path.join(tmpDir, `${sessionId}-tools.json`);
|
|
fs.writeFileSync(toolDefsPath, JSON.stringify(tools, null, 2), "utf8");
|
|
|
|
// Gemini MCP server config — server alias must not contain underscores
|
|
// (Gemini uses underscores as FQN separator: mcp_<alias>_<toolname>).
|
|
// We use "openclaw" (no underscores) to keep names clean.
|
|
existing.mcpServers = {
|
|
openclaw: {
|
|
command: process.execPath,
|
|
args: [MCP_SERVER_SCRIPT],
|
|
env: {
|
|
TOOL_DEFS_FILE: toolDefsPath,
|
|
BRIDGE_EXECUTE_URL: `http://127.0.0.1:${bridgePort}/mcp/execute`,
|
|
BRIDGE_API_KEY: bridgeApiKey,
|
|
WORKSPACE: workspace,
|
|
AGENT_ID: agentId,
|
|
},
|
|
trust: true, // auto-approve MCP tool calls (equivalent to Claude's bypassPermissions)
|
|
},
|
|
};
|
|
}
|
|
|
|
fs.writeFileSync(settingsPath, JSON.stringify(existing, null, 2), "utf8");
|
|
}
|
|
|
|
/**
|
|
* Dispatch a turn to Gemini CLI using `gemini -p --output-format stream-json`.
|
|
* Returns an async iterable of ClaudeMessage events (same interface as Claude adapter).
|
|
*
|
|
* Stream-json event types from Gemini CLI:
|
|
* init → { session_id, model }
|
|
* message → { role: "assistant", content: string, delta: true } (streaming text)
|
|
* tool_use → informational (Gemini handles internally)
|
|
* tool_result → informational (Gemini handles internally)
|
|
* result → { status, stats }
|
|
*/
|
|
export async function* dispatchToGemini(
|
|
opts: GeminiDispatchOptions,
|
|
): AsyncIterable<ClaudeMessage> {
|
|
const {
|
|
prompt,
|
|
systemPrompt,
|
|
workspace,
|
|
agentId = "",
|
|
resumeSessionId,
|
|
model,
|
|
openclawTools,
|
|
bridgePort = 18800,
|
|
bridgeApiKey = "",
|
|
signal,
|
|
} = opts;
|
|
|
|
// Write system-level instructions to workspace/GEMINI.md every turn.
|
|
// Gemini CLI reads GEMINI.md from cwd on every invocation (including resumes)
|
|
// and does NOT store it in the session file, so this acts as a stateless system prompt.
|
|
if (systemPrompt) {
|
|
fs.writeFileSync(path.join(workspace, "GEMINI.md"), systemPrompt, "utf8");
|
|
}
|
|
|
|
// Write MCP config to workspace .gemini/settings.json every turn.
|
|
// Gemini CLI restarts the MCP server process each invocation (like Claude),
|
|
// so we must re-inject the config on every turn including resumes.
|
|
if (openclawTools?.length) {
|
|
setupGeminiMcpSettings(openclawTools, bridgePort, bridgeApiKey, workspace, agentId);
|
|
}
|
|
|
|
// NOTE: prompt goes right after -p before other flags to avoid ambiguity.
|
|
const args: string[] = [
|
|
"-p",
|
|
prompt,
|
|
"--output-format", "stream-json",
|
|
"--approval-mode", "yolo",
|
|
];
|
|
|
|
if (model) {
|
|
args.push("-m", model);
|
|
}
|
|
|
|
if (resumeSessionId) {
|
|
args.push("--resume", resumeSessionId);
|
|
}
|
|
|
|
const child = spawn("gemini", args, {
|
|
cwd: workspace,
|
|
stdio: ["ignore", "pipe", "pipe"],
|
|
env: { ...process.env },
|
|
});
|
|
|
|
const stderrLines: string[] = [];
|
|
child.stderr?.on("data", (chunk: Buffer) => {
|
|
stderrLines.push(chunk.toString("utf8").trim());
|
|
});
|
|
|
|
const rl = createInterface({ input: child.stdout!, crlfDelay: Infinity });
|
|
|
|
let capturedSessionId = "";
|
|
const events: ClaudeMessage[] = [];
|
|
let done = false;
|
|
let resolveNext: (() => void) | null = null;
|
|
|
|
// Cleanup helper: SIGTERM the child first, then SIGKILL after a grace
|
|
// period if it hasn't exited. Idempotent — safe to call multiple times.
|
|
let cleanupScheduled = false;
|
|
const scheduleCleanup = (): void => {
|
|
if (cleanupScheduled) return;
|
|
cleanupScheduled = true;
|
|
if (child.exitCode !== null) return;
|
|
try { child.kill("SIGTERM"); } catch { /* already gone */ }
|
|
const killTimer = setTimeout(() => {
|
|
try { child.kill("SIGKILL"); } catch { /* already gone */ }
|
|
}, 5000);
|
|
killTimer.unref?.();
|
|
child.once("close", () => clearTimeout(killTimer));
|
|
};
|
|
|
|
const markDone = (): void => {
|
|
if (done) return;
|
|
done = true;
|
|
scheduleCleanup();
|
|
if (resolveNext) {
|
|
const r = resolveNext;
|
|
resolveNext = null;
|
|
r();
|
|
}
|
|
};
|
|
|
|
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
|
// closes the socket, kill the gemini subprocess and break the iterator.
|
|
// See dispatchToClaude for the full rationale.
|
|
if (signal) {
|
|
if (signal.aborted) {
|
|
markDone();
|
|
} else {
|
|
signal.addEventListener("abort", () => markDone(), { once: true });
|
|
}
|
|
}
|
|
|
|
rl.on("line", (line: string) => {
|
|
if (!line.trim()) return;
|
|
let event: Record<string, unknown>;
|
|
try {
|
|
event = JSON.parse(line);
|
|
} catch {
|
|
return; // ignore non-JSON lines (e.g. "YOLO mode is enabled." warning)
|
|
}
|
|
|
|
const type = event.type as string;
|
|
|
|
// init event carries the session_id for future --resume
|
|
if (type === "init") {
|
|
const sid = event.session_id as string;
|
|
if (sid) capturedSessionId = sid;
|
|
}
|
|
|
|
// Streaming assistant text: message events with delta:true
|
|
if (type === "message" && event.role === "assistant" && event.delta) {
|
|
const content = event.content as string;
|
|
if (content) {
|
|
events.push({ type: "text", text: content });
|
|
}
|
|
}
|
|
|
|
if (resolveNext) {
|
|
const r = resolveNext;
|
|
resolveNext = null;
|
|
r();
|
|
}
|
|
});
|
|
|
|
rl.on("close", () => {
|
|
done = true;
|
|
if (resolveNext) {
|
|
const r = resolveNext;
|
|
resolveNext = null;
|
|
r();
|
|
}
|
|
});
|
|
|
|
while (true) {
|
|
if (events.length > 0) {
|
|
yield events.shift()!;
|
|
continue;
|
|
}
|
|
if (done) break;
|
|
await new Promise<void>((resolve) => {
|
|
resolveNext = resolve;
|
|
});
|
|
}
|
|
|
|
while (events.length > 0) {
|
|
yield events.shift()!;
|
|
}
|
|
|
|
await new Promise<void>((resolve) => {
|
|
child.on("close", resolve);
|
|
if (child.exitCode !== null) resolve();
|
|
});
|
|
|
|
if (capturedSessionId) {
|
|
yield { type: "done", sessionId: capturedSessionId };
|
|
} else {
|
|
const stderrSummary = stderrLines
|
|
.join(" ")
|
|
.replace(/YOLO mode is enabled\./g, "")
|
|
.trim()
|
|
.slice(0, 200);
|
|
yield {
|
|
type: "error",
|
|
message: `gemini did not return a session_id${stderrSummary ? `: ${stderrSummary}` : ""}`,
|
|
};
|
|
}
|
|
}
|