Compare commits
8 Commits
4e015c677b
...
fix/bridge
| Author | SHA1 | Date | |
|---|---|---|---|
| 453dab3271 | |||
| 0b24330787 | |||
| 1b7cd6b215 | |||
| cce85a9be8 | |||
| 5fca8f5da1 | |||
| 2e64e9ce02 | |||
| b94e0d25f6 | |||
| 91acce9b32 |
@@ -1,4 +1,4 @@
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||
import { runContractorAgentsAdd } from "./contractor-agents-add.js";
|
||||
|
||||
export function registerCli(api: OpenClawPluginApi): void {
|
||||
|
||||
@@ -37,6 +37,15 @@ export type ClaudeDispatchOptions = {
|
||||
bridgePort?: number;
|
||||
/** Bridge API key for MCP proxy callbacks */
|
||||
bridgeApiKey?: string;
|
||||
/**
|
||||
* Abort signal from the bridge. When fired (typically because the upstream
|
||||
* HTTP client closed the socket — OpenClaw's attempt-level retry / cancel),
|
||||
* we kill the claude subprocess group and break out of the iterator
|
||||
* promptly so a stale subprocess doesn't keep streaming into a dead socket
|
||||
* (or worse, get its output multiplexed with a fresh subprocess started by
|
||||
* a retry).
|
||||
*/
|
||||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
// Resolve the MCP server script path relative to this file.
|
||||
@@ -109,6 +118,7 @@ export async function* dispatchToClaude(
|
||||
openclawTools,
|
||||
bridgePort = 18800,
|
||||
bridgeApiKey = "",
|
||||
signal,
|
||||
} = opts;
|
||||
|
||||
// NOTE: put prompt right after -p, before --mcp-config.
|
||||
@@ -202,6 +212,18 @@ export async function* dispatchToClaude(
|
||||
}
|
||||
};
|
||||
|
||||
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
||||
// closes the socket, propagate that into our process tree by SIGTERM/SIGKILL
|
||||
// (via scheduleCleanup) and break out of the iterator (via markDone). This
|
||||
// prevents stale subprocesses from outliving the request that started them.
|
||||
if (signal) {
|
||||
if (signal.aborted) {
|
||||
markDone();
|
||||
} else {
|
||||
signal.addEventListener("abort", () => markDone(), { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
rl.on("line", (line: string) => {
|
||||
if (!line.trim()) return;
|
||||
let event: Record<string, unknown>;
|
||||
|
||||
@@ -29,6 +29,13 @@ export type GeminiDispatchOptions = {
|
||||
openclawTools?: OpenAITool[];
|
||||
bridgePort?: number;
|
||||
bridgeApiKey?: string;
|
||||
/**
|
||||
* Abort signal from the bridge. Mirror of dispatchToClaude's `signal` —
|
||||
* see that file for rationale. When fired, we kill the gemini subprocess
|
||||
* and break the iterator promptly so a stale process doesn't outlive
|
||||
* the upstream request.
|
||||
*/
|
||||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -116,6 +123,7 @@ export async function* dispatchToGemini(
|
||||
openclawTools,
|
||||
bridgePort = 18800,
|
||||
bridgeApiKey = "",
|
||||
signal,
|
||||
} = opts;
|
||||
|
||||
// Write system-level instructions to workspace/GEMINI.md every turn.
|
||||
@@ -166,6 +174,43 @@ export async function* dispatchToGemini(
|
||||
let done = false;
|
||||
let resolveNext: (() => void) | null = null;
|
||||
|
||||
// Cleanup helper: SIGTERM the child first, then SIGKILL after a grace
|
||||
// period if it hasn't exited. Idempotent — safe to call multiple times.
|
||||
let cleanupScheduled = false;
|
||||
const scheduleCleanup = (): void => {
|
||||
if (cleanupScheduled) return;
|
||||
cleanupScheduled = true;
|
||||
if (child.exitCode !== null) return;
|
||||
try { child.kill("SIGTERM"); } catch { /* already gone */ }
|
||||
const killTimer = setTimeout(() => {
|
||||
try { child.kill("SIGKILL"); } catch { /* already gone */ }
|
||||
}, 5000);
|
||||
killTimer.unref?.();
|
||||
child.once("close", () => clearTimeout(killTimer));
|
||||
};
|
||||
|
||||
const markDone = (): void => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
scheduleCleanup();
|
||||
if (resolveNext) {
|
||||
const r = resolveNext;
|
||||
resolveNext = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
||||
// closes the socket, kill the gemini subprocess and break the iterator.
|
||||
// See dispatchToClaude for the full rationale.
|
||||
if (signal) {
|
||||
if (signal.aborted) {
|
||||
markDone();
|
||||
} else {
|
||||
signal.addEventListener("abort", () => markDone(), { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
rl.on("line", (line: string) => {
|
||||
if (!line.trim()) return;
|
||||
let event: Record<string, unknown>;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||
import { normalizePluginConfig } from "./core/types/contractor.js";
|
||||
import { resolveContractorAgentMetadata } from "./core/contractor/metadata-resolver.js";
|
||||
import { createBridgeServer } from "./web/server.js";
|
||||
@@ -17,11 +18,51 @@ const SERVER_KEY = "_contractorAgentBridgeServer";
|
||||
/** Key for the live OpenClaw config accessor (getter fn) shared via globalThis. */
|
||||
const OPENCLAW_CONFIG_KEY = "_contractorOpenClawConfig";
|
||||
|
||||
/** OpenClaw replaces secret-like config values with this before exposing
|
||||
* pluginConfig to plugins. */
|
||||
const OPENCLAW_REDACTION_SENTINEL = "__OPENCLAW_REDACTED__";
|
||||
|
||||
/**
|
||||
* OpenClaw redacts any secret-like key (anything containing "apiKey") before
|
||||
* handing `api.pluginConfig` to the plugin, so `pluginConfig.bridgeApiKey` is
|
||||
* the redaction sentinel — never the real value. The bridge server validates
|
||||
* inbound requests against this key, while the OpenClaw model provider sends
|
||||
* the real `models.providers.contractor-agent.apiKey`; using the redacted
|
||||
* value therefore guarantees a permanent HTTP 401.
|
||||
*
|
||||
* Resolve the real shared secret from the raw on-disk config instead (same
|
||||
* pattern `resolveAgent` already uses). If it is still missing or redacted,
|
||||
* return "" so the loopback-only bridge skips auth rather than hard-locking.
|
||||
*/
|
||||
function resolveBridgeApiKey(fallback: string): string {
|
||||
try {
|
||||
const configPath = path.join(
|
||||
process.env.HOME ?? "/root",
|
||||
".openclaw",
|
||||
"openclaw.json",
|
||||
);
|
||||
const raw = JSON.parse(fs.readFileSync(configPath, "utf8")) as {
|
||||
plugins?: {
|
||||
entries?: Record<string, { config?: { bridgeApiKey?: string } }>;
|
||||
};
|
||||
};
|
||||
const k = raw.plugins?.entries?.["contractor-agent"]?.config?.bridgeApiKey;
|
||||
if (typeof k === "string" && k && k !== OPENCLAW_REDACTION_SENTINEL) {
|
||||
return k;
|
||||
}
|
||||
} catch {
|
||||
/* fall through to fallback handling */
|
||||
}
|
||||
if (fallback && fallback !== OPENCLAW_REDACTION_SENTINEL) return fallback;
|
||||
return ""; // loopback-only bridge: skip auth instead of 401-locking
|
||||
}
|
||||
|
||||
// ── Plugin entry ─────────────────────────────────────────────────────────────
|
||||
|
||||
export default {
|
||||
export default definePluginEntry({
|
||||
id: "contractor-agent",
|
||||
name: "Contractor Agent",
|
||||
description: "Turns Claude Code into an OpenClaw-managed contractor agent",
|
||||
// OpenClaw requires register() to be synchronous — returning a Promise
|
||||
// surfaces as `Error: plugin register must be synchronous` and the plugin
|
||||
// ends up in `error` state. We avoid `await` here and instead let the
|
||||
@@ -60,28 +101,37 @@ export default {
|
||||
if (!_G[LIFECYCLE_KEY]) {
|
||||
_G[LIFECYCLE_KEY] = true;
|
||||
|
||||
const server = createBridgeServer({
|
||||
port: config.bridgePort,
|
||||
apiKey: config.bridgeApiKey,
|
||||
permissionMode: config.permissionMode,
|
||||
resolveAgent,
|
||||
logger: api.logger,
|
||||
});
|
||||
// Bind the bridge server only when the gateway boots, NOT eagerly at
|
||||
// register-time. register() also runs in one-shot CLI subprocesses
|
||||
// (e.g. `openclaw completion`, `openclaw doctor`); spawning a long-
|
||||
// lived listener there would prevent those commands from exiting.
|
||||
api.on("gateway_start", () => {
|
||||
const server = createBridgeServer({
|
||||
port: config.bridgePort,
|
||||
apiKey: resolveBridgeApiKey(config.bridgeApiKey),
|
||||
permissionMode: config.permissionMode,
|
||||
resolveAgent,
|
||||
logger: api.logger,
|
||||
});
|
||||
|
||||
// EADDRINUSE → another gateway/CLI process already owns the port; that's
|
||||
// fine, we just don't double-bind. Any other error is logged but does
|
||||
// not crash registration.
|
||||
server.on("error", (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === "EADDRINUSE") {
|
||||
api.logger.info(
|
||||
`[contractor-agent] bridge already running on port ${config.bridgePort}, skipping bind`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
api.logger.warn(`[contractor-agent] bridge server error: ${err.message ?? String(err)}`);
|
||||
});
|
||||
// EADDRINUSE → another gateway already owns the port; fine, skip bind.
|
||||
server.on("error", (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === "EADDRINUSE") {
|
||||
api.logger.info(
|
||||
`[contractor-agent] bridge already running on port ${config.bridgePort}, skipping bind`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
api.logger.warn(`[contractor-agent] bridge server error: ${err.message ?? String(err)}`);
|
||||
});
|
||||
|
||||
_G[SERVER_KEY] = server;
|
||||
// Defense in depth: even if this code path is somehow reached outside
|
||||
// the gateway, .unref() prevents the listener from pinning the host's
|
||||
// event loop and blocking process exit.
|
||||
server.unref();
|
||||
|
||||
_G[SERVER_KEY] = server;
|
||||
});
|
||||
|
||||
api.on("gateway_stop", () => {
|
||||
const s = _G[SERVER_KEY] as http.Server | undefined;
|
||||
@@ -95,4 +145,4 @@ export default {
|
||||
|
||||
api.logger.info(`[contractor-agent] plugin registered (bridge port: ${config.bridgePort})`);
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
{
|
||||
"id": "contractor-agent",
|
||||
"name": "Contractor Agent",
|
||||
"version": "0.1.0",
|
||||
"description": "Turns Claude Code into an OpenClaw-managed contractor agent",
|
||||
"main": "index.ts",
|
||||
"activation": {
|
||||
"onStartup": true
|
||||
},
|
||||
"commandAliases": [
|
||||
{ "name": "contractor-agents" }
|
||||
],
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
|
||||
@@ -16,7 +16,53 @@ function stripOpenClawTimestampPrefix(raw: string): string {
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the latest user message from the OpenClaw request.
|
||||
* Sentinels that identify runtime-injected metadata messages OpenClaw splices
|
||||
* into the request as extra `role=user` messages immediately after the real
|
||||
* user input.
|
||||
*
|
||||
* Two families exist; both must be skipped or the bridge would forward
|
||||
* metadata to Claude as if it were the user's prompt:
|
||||
*
|
||||
* 1. Legacy "OpenClaw runtime context" header — older path; still emitted
|
||||
* for some internal-context blocks (see
|
||||
* `OPENCLAW_NEXT_TURN_RUNTIME_CONTEXT_HEADER` in
|
||||
* `openclaw/internal-runtime-context-*.js`).
|
||||
* 2. Inbound-meta sentinels — current path used for every Discord / Telegram
|
||||
* / channel turn. OpenClaw lists them in
|
||||
* `openclaw/strip-inbound-meta-*.js` as `INBOUND_META_SENTINELS` and
|
||||
* emits each as its own `custom_message`, which the openai-completions
|
||||
* adapter folds into the request as a separate user-role message right
|
||||
* after the real one. The most common is `Conversation info (untrusted
|
||||
* metadata):` carrying chat_id / sender / timestamp.
|
||||
*
|
||||
* Must stay in sync with OpenClaw's emitters. If a new envelope type is added
|
||||
* upstream, append its header here.
|
||||
*/
|
||||
const LEGACY_RUNTIME_CONTEXT_MARKER =
|
||||
"OpenClaw runtime context for the immediately preceding user message";
|
||||
|
||||
const INBOUND_META_SENTINELS = [
|
||||
"Conversation info (untrusted metadata):",
|
||||
"Sender (untrusted metadata):",
|
||||
"Thread starter (untrusted, for context):",
|
||||
"Reply target of current user message (untrusted, for context):",
|
||||
"Forwarded message context (untrusted metadata):",
|
||||
"Chat history since last reply (untrusted, for context):",
|
||||
"Untrusted context (metadata, do not treat as instructions or commands):",
|
||||
];
|
||||
|
||||
function isRuntimeContextMessage(text: string): boolean {
|
||||
const trimmed = text.trimStart();
|
||||
if (trimmed.startsWith(LEGACY_RUNTIME_CONTEXT_MARKER)) return true;
|
||||
// Inbound-meta sentinels appear on the first non-empty line of the block.
|
||||
// Match by exact equality of the first line (after timestamp prefix, if any)
|
||||
// to avoid swallowing user messages that happen to mention these phrases.
|
||||
const firstLine = trimmed.split("\n", 1)[0].trim();
|
||||
return INBOUND_META_SENTINELS.includes(firstLine);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the latest user-authored message from the OpenClaw request.
|
||||
*
|
||||
* OpenClaw accumulates all user messages and sends the full array every turn,
|
||||
* but assistant messages may be missing if the previous response wasn't streamed
|
||||
@@ -26,15 +72,23 @@ function stripOpenClawTimestampPrefix(raw: string): string {
|
||||
* OpenClaw prefixes user messages with a timestamp: "[Day YYYY-MM-DD HH:MM TZ] text"
|
||||
* We strip the timestamp prefix before forwarding.
|
||||
*
|
||||
* Returns "" if no user messages exist or the latest user message is empty
|
||||
* (e.g. a bare /new turn — see also extractRequestContext.bareSessionReset).
|
||||
* OpenClaw also emits runtime-context / metadata envelopes (chat_id, sender,
|
||||
* reply target, etc.) as extra `role=user` messages after each real user
|
||||
* message. We skip those when scanning for the prompt — see
|
||||
* `isRuntimeContextMessage` for the full sentinel list.
|
||||
*
|
||||
* Returns "" if no user-authored messages exist (e.g. a bare /new turn — see
|
||||
* also extractRequestContext.bareSessionReset).
|
||||
*/
|
||||
export function extractLatestUserMessage(req: BridgeInboundRequest): string {
|
||||
const userMessages = req.messages.filter((m) => m.role === "user");
|
||||
if (userMessages.length === 0) return "";
|
||||
|
||||
const raw = messageText(userMessages[userMessages.length - 1]);
|
||||
return stripOpenClawTimestampPrefix(raw);
|
||||
for (let i = userMessages.length - 1; i >= 0; i -= 1) {
|
||||
const raw = messageText(userMessages[i]);
|
||||
if (!raw) continue;
|
||||
if (isRuntimeContextMessage(raw)) continue;
|
||||
return stripOpenClawTimestampPrefix(raw);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
export type RequestContext = {
|
||||
|
||||
@@ -88,6 +88,28 @@ function parseBody(req: http.IncomingMessage): Promise<BridgeInboundRequest> {
|
||||
return parseBodyRaw(req) as Promise<BridgeInboundRequest>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-sessionKey FIFO queue: same-session turns serialize so a user firing
|
||||
* multiple Discord messages back-to-back gets them processed in order rather
|
||||
* than spawning concurrent claude subprocesses. Cross-session requests bypass
|
||||
* each other's chains and run in parallel.
|
||||
*
|
||||
* The chain is `prev.then(() => mySlot)` per request — `mySlot` is released
|
||||
* in the request's finally block, so the next request waits until our work
|
||||
* completes (success or failure) before starting.
|
||||
*/
|
||||
const queueBySession = new Map<string, Promise<void>>();
|
||||
|
||||
/**
|
||||
* SSE heartbeat cadence. Bridge writes an empty-content `chat.completion.chunk`
|
||||
* (a no-op for OpenAI stream parsers, but a real model-progress event on the
|
||||
* canonical streaming channel) on this interval while a turn is in flight or
|
||||
* queued. This keeps OpenClaw's LLM idle watchdog (default 120s) from firing
|
||||
* during long quiet tool-call phases or while we're waiting our turn in the
|
||||
* per-session queue. See the heartbeat block in handleChatCompletions for
|
||||
* details on why an SSE comment frame is insufficient.
|
||||
*/
|
||||
const HEARTBEAT_MS = 30_000;
|
||||
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
|
||||
@@ -156,33 +178,32 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
// Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude
|
||||
const isGemini = typeof body.model === "string" && body.model.includes("gemini");
|
||||
|
||||
// Look up existing session (shared structure for both Claude and Gemini).
|
||||
// On a bare /new or /reset turn we deliberately drop the existing entry so
|
||||
// the CLI starts a fresh session — otherwise --resume would bring back the
|
||||
// very history the user just asked to abandon.
|
||||
let existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
||||
if (bareSessionReset && existingEntry && sessionKey) {
|
||||
// ── Abort propagation ────────────────────────────────────────────────────
|
||||
// OpenClaw's LLM client cancels in-flight HTTP requests via AbortSignal
|
||||
// when an attempt fails or the user cancels. On the wire this manifests
|
||||
// as the client closing the socket, surfaced here as `req.on('close')`.
|
||||
// We mirror that signal into our own AbortController and propagate it
|
||||
// down to the dispatchTo{Claude,Gemini} adapter, which kills the
|
||||
// subprocess. Without this, OpenClaw silently drops the response while
|
||||
// the subprocess keeps running, and a retry spawns another subprocess
|
||||
// alongside the orphan — the root cause of the duplicate-Discord-message
|
||||
// bug we hit in May 2026.
|
||||
const abortController = new AbortController();
|
||||
let clientDisconnected = false;
|
||||
const onClose = (): void => {
|
||||
if (clientDisconnected) return;
|
||||
clientDisconnected = true;
|
||||
logger.info(
|
||||
`[contractor-bridge] bare /new detected — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
||||
`[contractor-bridge] client disconnected sessionKey=${sessionKey} — aborting in-flight work`,
|
||||
);
|
||||
removeSession(workspace, sessionKey);
|
||||
existingEntry = null;
|
||||
}
|
||||
let resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
||||
abortController.abort();
|
||||
};
|
||||
req.on("close", onClose);
|
||||
|
||||
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
||||
// For Claude: --system-prompt fully replaces any prior system prompt each invocation.
|
||||
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
||||
// This keeps persona and skills current without needing to track first-turn state.
|
||||
const systemPrompt = buildBootstrap({
|
||||
agentId,
|
||||
openclawSessionKey: sessionKey,
|
||||
workspace,
|
||||
skillsBlock: skillsBlock || undefined,
|
||||
workspaceContextFiles,
|
||||
});
|
||||
|
||||
// Start SSE response
|
||||
// Start SSE response immediately so OpenClaw sees the response status
|
||||
// quickly. We may still spend time waiting in the per-session queue
|
||||
// before any real chunk goes out — heartbeat (below) keeps that quiet
|
||||
// window from tripping the upstream idle watchdog.
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
@@ -191,13 +212,129 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
});
|
||||
|
||||
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
||||
|
||||
// ── SSE heartbeat ────────────────────────────────────────────────────────
|
||||
// OpenClaw's LLM idle watchdog (default 120s) fires on lack of *model
|
||||
// progress*, not lack of bytes — concretely "no content delta through
|
||||
// SSE for 120s". An SSE comment frame (`: keepalive\n\n`) keeps the TCP
|
||||
// socket alive but does NOT register as model progress, so a long quiet
|
||||
// tool-call phase still idles out. When that happens OpenClaw falls back
|
||||
// to re-sending the prior turn's assistant text (see
|
||||
// pi-embedded-Bcz04p2i.js:1308 `fallbackAnswerText`), producing the
|
||||
// duplicate-Discord-message symptom observed 2026-05-14.
|
||||
//
|
||||
// We emit a real `chat.completion.chunk` with an empty content delta
|
||||
// every HEARTBEAT_MS. Clients drop empty deltas, but the upstream idle
|
||||
// watchdog should count it as model progress because it's a real event
|
||||
// on the canonical streaming channel. If empty content turns out to be
|
||||
// filtered, the next step is a zero-width-space "".
|
||||
const heartbeat = setInterval(() => {
|
||||
if (clientDisconnected || res.writableEnded) return;
|
||||
try {
|
||||
sseWrite(res, buildChunk(completionId, ""));
|
||||
} catch {
|
||||
/* socket dead, ignore */
|
||||
}
|
||||
}, HEARTBEAT_MS);
|
||||
heartbeat.unref?.();
|
||||
|
||||
// ── Per-sessionKey FIFO queue ────────────────────────────────────────────
|
||||
// Same-sessionKey turns serialize: if a user fires multiple Discord
|
||||
// messages quickly, we process them in order rather than starting
|
||||
// concurrent claude subprocesses (which would corrupt the shared session
|
||||
// file and produce overlapping replies). Cross-sessionKey requests run
|
||||
// in parallel — they live on independent chains.
|
||||
let releaseSlot: () => void = () => {};
|
||||
const mySlot = new Promise<void>((r) => {
|
||||
releaseSlot = r;
|
||||
});
|
||||
const prev = sessionKey
|
||||
? queueBySession.get(sessionKey) ?? Promise.resolve()
|
||||
: Promise.resolve();
|
||||
// `myChainTail` advances only after `releaseSlot()` (called in our
|
||||
// finally block). Tolerate prev rejecting so a crashed earlier turn
|
||||
// doesn't poison the chain forever.
|
||||
const myChainTail = prev.then(() => mySlot, () => mySlot);
|
||||
if (sessionKey) queueBySession.set(sessionKey, myChainTail);
|
||||
let newSessionId = "";
|
||||
let hasError = false;
|
||||
let resultErrorReason: string | null = null;
|
||||
|
||||
const openclawTools = body.tools ?? [];
|
||||
let existingEntry: ReturnType<typeof getSession> = null;
|
||||
|
||||
try {
|
||||
// Block until the previous turn on this sessionKey finishes.
|
||||
// Cross-session requests skip this wait (their `prev` is a fresh
|
||||
// Promise.resolve()).
|
||||
await prev.catch(() => {});
|
||||
|
||||
// While queued, OpenClaw may have given up on this request (attempt
|
||||
// timeout, user cancel, etc.) and closed the socket. If so, exit
|
||||
// cleanly without dispatching — the client wouldn't see the output
|
||||
// anyway.
|
||||
if (clientDisconnected) {
|
||||
logger.info(
|
||||
`[contractor-bridge] dropped queued request sessionKey=${sessionKey} (client disconnected during queue wait)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Look up the session-map entry NOW (at the head of the queue), not
|
||||
// earlier — the previous turn on the same sessionKey may have just
|
||||
// updated `claudeSessionId` and we want to resume into the latest
|
||||
// one rather than a stale snapshot from request-arrival time.
|
||||
existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
||||
|
||||
// Detect a fresh OpenClaw session even when the bare-reset marker is
|
||||
// absent. The marker only arrives when `/new` is the body of *this*
|
||||
// user turn (see get-reply isBareSessionReset). When `/new` is sent
|
||||
// as a separate slash command (e.g. via Discord's slash UI), OpenClaw
|
||||
// processes the reset in a side lane that doesn't hit the bridge —
|
||||
// it just renames the prior session file aside. The follow-up real
|
||||
// message then arrives on a brand-new OpenClaw session, but as a
|
||||
// normal turn with no marker. Without this check, the bridge happily
|
||||
// resumes the long-stale claudeSessionId from before the reset.
|
||||
//
|
||||
// OpenClaw sends the full conversation history every turn (system +
|
||||
// user/assistant pairs + latest user). A request with zero assistant
|
||||
// turns is therefore a positive signal that the OpenClaw session is
|
||||
// brand-new and any prior claudeSessionId we hold is from a previous
|
||||
// OpenClaw session that the user already abandoned.
|
||||
const hasAssistantHistory = body.messages.some((m) => {
|
||||
if (m.role !== "assistant") return false;
|
||||
if (typeof m.content === "string") return m.content.trim().length > 0;
|
||||
return m.content.some(
|
||||
(c) => c.type === "text" && (c.text ?? "").trim().length > 0,
|
||||
);
|
||||
});
|
||||
const isFreshOpenClawSession = !hasAssistantHistory;
|
||||
|
||||
if ((bareSessionReset || isFreshOpenClawSession) && existingEntry && sessionKey) {
|
||||
const reason = bareSessionReset
|
||||
? "bare /new detected"
|
||||
: "fresh OpenClaw session (no assistant history in messages[])";
|
||||
logger.info(
|
||||
`[contractor-bridge] ${reason} — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
||||
);
|
||||
removeSession(workspace, sessionKey);
|
||||
existingEntry = null;
|
||||
}
|
||||
const resumeSessionId =
|
||||
existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
||||
|
||||
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
||||
// For Claude: --append-system-prompt appends to the built-in prompt each invocation.
|
||||
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
||||
// This keeps persona and skills current without needing to track first-turn state.
|
||||
const systemPrompt = buildBootstrap({
|
||||
agentId,
|
||||
openclawSessionKey: sessionKey,
|
||||
workspace,
|
||||
skillsBlock: skillsBlock || undefined,
|
||||
workspaceContextFiles,
|
||||
});
|
||||
|
||||
const openclawTools = body.tools ?? [];
|
||||
|
||||
const dispatchIter = isGemini
|
||||
? dispatchToGemini({
|
||||
prompt: dispatchPrompt,
|
||||
@@ -208,6 +345,7 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
openclawTools,
|
||||
bridgePort: port,
|
||||
bridgeApiKey: apiKey,
|
||||
signal: abortController.signal,
|
||||
})
|
||||
: dispatchToClaude({
|
||||
prompt: dispatchPrompt,
|
||||
@@ -219,68 +357,106 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
openclawTools,
|
||||
bridgePort: port,
|
||||
bridgeApiKey: apiKey,
|
||||
signal: abortController.signal,
|
||||
});
|
||||
|
||||
for await (const event of dispatchIter) {
|
||||
if (event.type === "text") {
|
||||
sseWrite(res, buildChunk(completionId, event.text));
|
||||
} else if (event.type === "done") {
|
||||
newSessionId = event.sessionId;
|
||||
} else if (event.type === "result_error") {
|
||||
// CLI returned a terminal error (typically context overflow). The
|
||||
// text was already streamed via prior `text` events; record the
|
||||
// session so we can drop it below and log the reason.
|
||||
logger.warn(
|
||||
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
||||
);
|
||||
resultErrorReason = event.reason;
|
||||
newSessionId = event.sessionId;
|
||||
} else if (event.type === "error") {
|
||||
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
||||
hasError = true;
|
||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
||||
try {
|
||||
for await (const event of dispatchIter) {
|
||||
if (event.type === "text") {
|
||||
if (!clientDisconnected) sseWrite(res, buildChunk(completionId, event.text));
|
||||
} else if (event.type === "done") {
|
||||
newSessionId = event.sessionId;
|
||||
} else if (event.type === "result_error") {
|
||||
// CLI signaled a fatal-but-graceful error (context overflow,
|
||||
// refusal, billing, etc.) via `is_error: true` on the result
|
||||
// event. The text was already streamed via prior `text` events;
|
||||
// record the reason so the bridge can drop the session-map entry
|
||||
// below.
|
||||
logger.warn(
|
||||
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
||||
);
|
||||
resultErrorReason = event.reason;
|
||||
newSessionId = event.sessionId;
|
||||
} else if (event.type === "error") {
|
||||
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
||||
hasError = true;
|
||||
if (!clientDisconnected) {
|
||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
||||
hasError = true;
|
||||
if (!clientDisconnected) {
|
||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
||||
hasError = true;
|
||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
||||
}
|
||||
|
||||
sseWrite(res, buildStopChunk(completionId));
|
||||
sseWrite(res, "[DONE]");
|
||||
res.end();
|
||||
if (!clientDisconnected && !res.writableEnded) {
|
||||
sseWrite(res, buildStopChunk(completionId));
|
||||
sseWrite(res, "[DONE]");
|
||||
}
|
||||
|
||||
// Session-map persistence:
|
||||
// - Successful turn → upsert with the latest claudeSessionId so the next
|
||||
// turn can `--resume` into it.
|
||||
// - Terminal CLI error (context overflow etc., reported via result_error)
|
||||
// → drop the entry so the next turn starts fresh instead of resuming
|
||||
// into the same poisoned session and re-erroring.
|
||||
// - Stream/transport error before any sessionId was captured → mark the
|
||||
// prior entry orphaned (existing behavior).
|
||||
if (resultErrorReason && sessionKey) {
|
||||
logger.info(
|
||||
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
||||
);
|
||||
removeSession(workspace, sessionKey);
|
||||
} else if (newSessionId && sessionKey && !hasError) {
|
||||
const now = new Date().toISOString();
|
||||
putSession(workspace, {
|
||||
openclawSessionKey: sessionKey,
|
||||
agentId,
|
||||
contractor: isGemini ? "gemini" : "claude",
|
||||
claudeSessionId: newSessionId,
|
||||
workspace,
|
||||
createdAt: existingEntry?.createdAt ?? now,
|
||||
lastActivityAt: now,
|
||||
state: "active",
|
||||
});
|
||||
logger.info(
|
||||
`[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`,
|
||||
);
|
||||
} else if (hasError && sessionKey && existingEntry) {
|
||||
markOrphaned(workspace, sessionKey);
|
||||
// Session-map persistence:
|
||||
// - Successful turn → upsert with the latest claudeSessionId so the next
|
||||
// turn can `--resume` into it.
|
||||
// - Terminal CLI error (context overflow etc., reported via result_error)
|
||||
// → drop the entry so the next turn starts fresh instead of resuming
|
||||
// into the same poisoned session and re-erroring.
|
||||
// - Stream/transport error before any sessionId was captured → mark the
|
||||
// prior entry orphaned (existing behavior).
|
||||
// - Aborted (client disconnected) → don't touch the session-map; the
|
||||
// subprocess may have already updated its own session file on disk,
|
||||
// and the next turn will resume from whatever it left there.
|
||||
if (clientDisconnected) {
|
||||
// No-op: aborted turn; preserve whatever the prior entry was so the
|
||||
// next turn (likely an OpenClaw retry of the same prompt) can resume.
|
||||
} else if (resultErrorReason && sessionKey) {
|
||||
logger.info(
|
||||
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
||||
);
|
||||
removeSession(workspace, sessionKey);
|
||||
} else if (newSessionId && sessionKey && !hasError) {
|
||||
const now = new Date().toISOString();
|
||||
putSession(workspace, {
|
||||
openclawSessionKey: sessionKey,
|
||||
agentId,
|
||||
contractor: isGemini ? "gemini" : "claude",
|
||||
claudeSessionId: newSessionId,
|
||||
workspace,
|
||||
createdAt: existingEntry?.createdAt ?? now,
|
||||
lastActivityAt: now,
|
||||
state: "active",
|
||||
});
|
||||
logger.info(
|
||||
`[contractor-bridge] session mapped sessionKey=${sessionKey} contractor=${isGemini ? "gemini" : "claude"} sessionId=${newSessionId}`,
|
||||
);
|
||||
} else if (hasError && sessionKey && existingEntry) {
|
||||
markOrphaned(workspace, sessionKey);
|
||||
}
|
||||
} finally {
|
||||
clearInterval(heartbeat);
|
||||
req.off("close", onClose);
|
||||
|
||||
// Release our slot so the next request on this sessionKey can proceed.
|
||||
// Must happen even on abort/throw, otherwise the chain stalls forever.
|
||||
releaseSlot();
|
||||
|
||||
// Garbage-collect the queue entry if no later request chained on us.
|
||||
// The Map identity check is the only safe way to tell — newer requests
|
||||
// overwrite the entry with their own chain tail.
|
||||
if (sessionKey && queueBySession.get(sessionKey) === myChainTail) {
|
||||
queueBySession.delete(sessionKey);
|
||||
}
|
||||
|
||||
if (!res.writableEnded) {
|
||||
try {
|
||||
res.end();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -67,13 +67,23 @@ function install() {
|
||||
// 3. Update openclaw.json
|
||||
const cfg = readConfig();
|
||||
|
||||
// Add provider
|
||||
// Add provider — spread existing first so user-added fields
|
||||
// (e.g. timeoutSeconds, extraHeaders) survive reinstall. Script-managed
|
||||
// fields (baseUrl/apiKey/api/models) are then overridden authoritatively
|
||||
// since they're tied to the constants and model catalog above.
|
||||
cfg.models = cfg.models ?? {};
|
||||
cfg.models.providers = cfg.models.providers ?? {};
|
||||
const existingProvider = cfg.models.providers[PLUGIN_ID] ?? {};
|
||||
cfg.models.providers[PLUGIN_ID] = {
|
||||
...existingProvider,
|
||||
baseUrl: `http://127.0.0.1:${BRIDGE_PORT}/v1`,
|
||||
apiKey: BRIDGE_API_KEY,
|
||||
api: "openai-completions",
|
||||
// The bridge wraps a full Claude/Gemini agent turn (tool use, multi-step),
|
||||
// which routinely takes far longer than OpenClaw's default model-fetch
|
||||
// timeout. Without a generous timeout OpenClaw aborts the request mid-turn
|
||||
// and no reply is ever delivered. Preserve a user override if set.
|
||||
timeoutSeconds: existingProvider.timeoutSeconds ?? 600,
|
||||
models: [
|
||||
{
|
||||
id: "contractor-claude-bridge",
|
||||
@@ -115,11 +125,16 @@ function install() {
|
||||
cfg.plugins.entries[PLUGIN_ID] = cfg.plugins.entries[PLUGIN_ID] ?? {};
|
||||
cfg.plugins.entries[PLUGIN_ID].enabled = true;
|
||||
|
||||
// Set default config — setIfMissing so user values are preserved
|
||||
// Set default config — setIfMissing so user values are preserved.
|
||||
const pluginCfg = cfg.plugins.entries[PLUGIN_ID].config ?? {};
|
||||
setIfMissing(pluginCfg, "bridgePort", BRIDGE_PORT);
|
||||
setIfMissing(pluginCfg, "bridgeApiKey", BRIDGE_API_KEY);
|
||||
setIfMissing(pluginCfg, "permissionMode", "bypassPermissions");
|
||||
// bridgeApiKey is the shared secret between the bridge server (this plugin)
|
||||
// and the model provider written above. The provider apiKey is set
|
||||
// authoritatively (= BRIDGE_API_KEY); the bridge side MUST stay in lockstep
|
||||
// or every request 401s. Set it authoritatively too — never setIfMissing
|
||||
// (a stale prior value would desync the pair).
|
||||
pluginCfg.bridgeApiKey = BRIDGE_API_KEY;
|
||||
cfg.plugins.entries[PLUGIN_ID].config = pluginCfg;
|
||||
|
||||
writeConfig(cfg);
|
||||
|
||||
Reference in New Issue
Block a user