Compare commits
14 Commits
6ae795eea8
...
fix/bridge
| Author | SHA1 | Date | |
|---|---|---|---|
| 453dab3271 | |||
| 0b24330787 | |||
| 1b7cd6b215 | |||
| cce85a9be8 | |||
| 5fca8f5da1 | |||
| 2e64e9ce02 | |||
| b94e0d25f6 | |||
| 91acce9b32 | |||
| 4e015c677b | |||
| 992f4d8703 | |||
| 6be8d47982 | |||
| e73a7ea049 | |||
| 49af2129ae | |||
| 9cd90f7213 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -8,3 +8,7 @@ CLAUDE_CONTRACTOR_TEST_TOKEN
|
||||
|
||||
# IDE
|
||||
.idea/
|
||||
|
||||
# Local dependencies / build output
|
||||
node_modules/
|
||||
dist/
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||
import { runContractorAgentsAdd } from "./contractor-agents-add.js";
|
||||
|
||||
export function registerCli(api: OpenClawPluginApi): void {
|
||||
|
||||
@@ -8,7 +8,14 @@ import { fileURLToPath } from "node:url";
|
||||
export type ClaudeMessage =
|
||||
| { type: "text"; text: string }
|
||||
| { type: "done"; sessionId: string }
|
||||
| { type: "error"; message: string };
|
||||
| { type: "error"; message: string }
|
||||
/**
|
||||
* Terminal error from the CLI's `result` event (e.g. `is_error: true` with
|
||||
* `terminal_reason: "prompt_too_long"`). The bridge uses this signal to
|
||||
* drop the session-map entry so the next turn starts a fresh CLI session
|
||||
* instead of `--resume`-ing into the same poisoned context.
|
||||
*/
|
||||
| { type: "result_error"; sessionId: string; reason: string; message: string };
|
||||
|
||||
export type OpenAITool = {
|
||||
type: "function";
|
||||
@@ -17,7 +24,8 @@ export type OpenAITool = {
|
||||
|
||||
export type ClaudeDispatchOptions = {
|
||||
prompt: string;
|
||||
/** System prompt passed via --system-prompt on every invocation (stateless, not stored in session) */
|
||||
/** Appended to Claude Code's built-in system prompt via --append-system-prompt on every invocation.
|
||||
* Stateless: not persisted in session file, fully replaces any prior appended content on resume. */
|
||||
systemPrompt?: string;
|
||||
workspace: string;
|
||||
agentId?: string;
|
||||
@@ -29,6 +37,15 @@ export type ClaudeDispatchOptions = {
|
||||
bridgePort?: number;
|
||||
/** Bridge API key for MCP proxy callbacks */
|
||||
bridgeApiKey?: string;
|
||||
/**
|
||||
* Abort signal from the bridge. When fired (typically because the upstream
|
||||
* HTTP client closed the socket — OpenClaw's attempt-level retry / cancel),
|
||||
* we kill the claude subprocess group and break out of the iterator
|
||||
* promptly so a stale subprocess doesn't keep streaming into a dead socket
|
||||
* (or worse, get its output multiplexed with a fresh subprocess started by
|
||||
* a retry).
|
||||
*/
|
||||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
// Resolve the MCP server script path relative to this file.
|
||||
@@ -97,10 +114,11 @@ export async function* dispatchToClaude(
|
||||
workspace,
|
||||
agentId = "",
|
||||
resumeSessionId,
|
||||
permissionMode = "bypassPermissions",
|
||||
permissionMode = "default",
|
||||
openclawTools,
|
||||
bridgePort = 18800,
|
||||
bridgeApiKey = "",
|
||||
signal,
|
||||
} = opts;
|
||||
|
||||
// NOTE: put prompt right after -p, before --mcp-config.
|
||||
@@ -111,15 +129,15 @@ export async function* dispatchToClaude(
|
||||
prompt,
|
||||
"--output-format", "stream-json",
|
||||
"--verbose",
|
||||
"--permission-mode", permissionMode,
|
||||
"--dangerously-skip-permissions",
|
||||
"--allowedTools", "Bash Edit Write Read Glob Grep WebFetch WebSearch NotebookEdit Monitor TodoWrite mcp__openclaw__*",
|
||||
];
|
||||
|
||||
// --system-prompt is stateless (not persisted in session file) and fully
|
||||
// replaces any prior system prompt on each invocation, including resumes.
|
||||
// We pass it every turn so skills/persona stay current.
|
||||
// --append-system-prompt appends to Claude Code's built-in system prompt rather
|
||||
// than replacing it, preserving the full agent SDK instructions (tool use behavior,
|
||||
// memory management, etc.). The appended bootstrap (persona + skills) is stateless:
|
||||
// not persisted in the session file, takes effect every invocation including resumes.
|
||||
if (systemPrompt) {
|
||||
args.push("--system-prompt", systemPrompt);
|
||||
args.push("--append-system-prompt", systemPrompt);
|
||||
}
|
||||
|
||||
if (resumeSessionId) {
|
||||
@@ -137,10 +155,14 @@ export async function* dispatchToClaude(
|
||||
}
|
||||
}
|
||||
|
||||
// detached:true puts claude in its own process group. Claude's Bash tool
|
||||
// occasionally leaks shells/ssh that keep claude alive past end-of-turn; when
|
||||
// that happens we SIGKILL the whole group rather than wait forever.
|
||||
const child = spawn("claude", args, {
|
||||
cwd: workspace,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
env: { ...process.env },
|
||||
detached: true,
|
||||
});
|
||||
|
||||
const stderrLines: string[] = [];
|
||||
@@ -150,12 +172,58 @@ export async function* dispatchToClaude(
|
||||
|
||||
const rl = createInterface({ input: child.stdout!, crlfDelay: Infinity });
|
||||
|
||||
type CapturedResultError = { reason: string; message: string };
|
||||
let capturedSessionId = "";
|
||||
let capturedResultError = null as CapturedResultError | null;
|
||||
|
||||
const events: ClaudeMessage[] = [];
|
||||
let done = false;
|
||||
let resolveNext: (() => void) | null = null;
|
||||
|
||||
let cleanupScheduled = false;
|
||||
const scheduleCleanup = (): void => {
|
||||
if (cleanupScheduled) return;
|
||||
cleanupScheduled = true;
|
||||
|
||||
const killGroup = (sig: NodeJS.Signals): void => {
|
||||
if (child.pid == null || child.exitCode !== null) return;
|
||||
try { process.kill(-child.pid, sig); } catch { /* already gone */ }
|
||||
};
|
||||
const termTimer = setTimeout(() => killGroup("SIGTERM"), 3000);
|
||||
const killTimer = setTimeout(() => killGroup("SIGKILL"), 10000);
|
||||
|
||||
child.once("close", () => {
|
||||
clearTimeout(termTimer);
|
||||
clearTimeout(killTimer);
|
||||
if (mcpConfigPath) {
|
||||
try { fs.unlinkSync(mcpConfigPath); } catch { /* ignore */ }
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
const markDone = (): void => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
scheduleCleanup();
|
||||
if (resolveNext) {
|
||||
const r = resolveNext;
|
||||
resolveNext = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
||||
// closes the socket, propagate that into our process tree by SIGTERM/SIGKILL
|
||||
// (via scheduleCleanup) and break out of the iterator (via markDone). This
|
||||
// prevents stale subprocesses from outliving the request that started them.
|
||||
if (signal) {
|
||||
if (signal.aborted) {
|
||||
markDone();
|
||||
} else {
|
||||
signal.addEventListener("abort", () => markDone(), { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
rl.on("line", (line: string) => {
|
||||
if (!line.trim()) return;
|
||||
let event: Record<string, unknown>;
|
||||
@@ -179,6 +247,20 @@ export async function* dispatchToClaude(
|
||||
if (type === "result") {
|
||||
const sessionId = (event.session_id as string) ?? "";
|
||||
if (sessionId) capturedSessionId = sessionId;
|
||||
// CLI signals fatal-but-graceful errors (context overflow, refusal,
|
||||
// billing, etc.) via `is_error: true` on the result event. Capture the
|
||||
// reason so the bridge layer can decide whether to invalidate the
|
||||
// session-map entry (e.g. context overflow → drop, retry next turn).
|
||||
if (event.is_error === true) {
|
||||
const reason = (event.terminal_reason as string) ?? (event.subtype as string) ?? "error";
|
||||
const message = (event.result as string) ?? `claude result error (${reason})`;
|
||||
capturedResultError = { reason, message };
|
||||
}
|
||||
// `result` is the terminal stream-json event; commit the turn without
|
||||
// waiting for claude's process tree to fully exit (leaked Bash grandchildren
|
||||
// can otherwise hold stdout open indefinitely).
|
||||
markDone();
|
||||
return;
|
||||
}
|
||||
|
||||
if (resolveNext) {
|
||||
@@ -189,12 +271,8 @@ export async function* dispatchToClaude(
|
||||
});
|
||||
|
||||
rl.on("close", () => {
|
||||
done = true;
|
||||
if (resolveNext) {
|
||||
const r = resolveNext;
|
||||
resolveNext = null;
|
||||
r();
|
||||
}
|
||||
// Fallback: claude exited without emitting a result event.
|
||||
markDone();
|
||||
});
|
||||
|
||||
while (true) {
|
||||
@@ -212,18 +290,18 @@ export async function* dispatchToClaude(
|
||||
yield events.shift()!;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
child.on("close", resolve);
|
||||
if (child.exitCode !== null) resolve();
|
||||
});
|
||||
|
||||
// Clean up temp files
|
||||
if (mcpConfigPath) {
|
||||
try { fs.unlinkSync(mcpConfigPath); } catch { /* ignore */ }
|
||||
// tool defs file path is embedded in the config — leave it for now
|
||||
}
|
||||
|
||||
if (capturedSessionId) {
|
||||
// Pull into a local with explicit type so TS doesn't infer the inner field
|
||||
// accesses as `never` (the field is only ever assigned inside the readline
|
||||
// callback above, so closure-based narrowing can't see it from this scope).
|
||||
const resultErr: CapturedResultError | null = capturedResultError;
|
||||
if (resultErr && capturedSessionId) {
|
||||
yield {
|
||||
type: "result_error",
|
||||
sessionId: capturedSessionId,
|
||||
reason: resultErr.reason,
|
||||
message: resultErr.message,
|
||||
};
|
||||
} else if (capturedSessionId) {
|
||||
yield { type: "done", sessionId: capturedSessionId };
|
||||
} else {
|
||||
const stderrSummary = stderrLines.join(" ").slice(0, 200);
|
||||
|
||||
@@ -29,6 +29,13 @@ export type GeminiDispatchOptions = {
|
||||
openclawTools?: OpenAITool[];
|
||||
bridgePort?: number;
|
||||
bridgeApiKey?: string;
|
||||
/**
|
||||
* Abort signal from the bridge. Mirror of dispatchToClaude's `signal` —
|
||||
* see that file for rationale. When fired, we kill the gemini subprocess
|
||||
* and break the iterator promptly so a stale process doesn't outlive
|
||||
* the upstream request.
|
||||
*/
|
||||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -116,6 +123,7 @@ export async function* dispatchToGemini(
|
||||
openclawTools,
|
||||
bridgePort = 18800,
|
||||
bridgeApiKey = "",
|
||||
signal,
|
||||
} = opts;
|
||||
|
||||
// Write system-level instructions to workspace/GEMINI.md every turn.
|
||||
@@ -166,6 +174,43 @@ export async function* dispatchToGemini(
|
||||
let done = false;
|
||||
let resolveNext: (() => void) | null = null;
|
||||
|
||||
// Cleanup helper: SIGTERM the child first, then SIGKILL after a grace
|
||||
// period if it hasn't exited. Idempotent — safe to call multiple times.
|
||||
let cleanupScheduled = false;
|
||||
const scheduleCleanup = (): void => {
|
||||
if (cleanupScheduled) return;
|
||||
cleanupScheduled = true;
|
||||
if (child.exitCode !== null) return;
|
||||
try { child.kill("SIGTERM"); } catch { /* already gone */ }
|
||||
const killTimer = setTimeout(() => {
|
||||
try { child.kill("SIGKILL"); } catch { /* already gone */ }
|
||||
}, 5000);
|
||||
killTimer.unref?.();
|
||||
child.once("close", () => clearTimeout(killTimer));
|
||||
};
|
||||
|
||||
const markDone = (): void => {
|
||||
if (done) return;
|
||||
done = true;
|
||||
scheduleCleanup();
|
||||
if (resolveNext) {
|
||||
const r = resolveNext;
|
||||
resolveNext = null;
|
||||
r();
|
||||
}
|
||||
};
|
||||
|
||||
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
||||
// closes the socket, kill the gemini subprocess and break the iterator.
|
||||
// See dispatchToClaude for the full rationale.
|
||||
if (signal) {
|
||||
if (signal.aborted) {
|
||||
markDone();
|
||||
} else {
|
||||
signal.addEventListener("abort", () => markDone(), { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
rl.on("line", (line: string) => {
|
||||
if (!line.trim()) return;
|
||||
let event: Record<string, unknown>;
|
||||
|
||||
101
plugin/index.ts
101
plugin/index.ts
@@ -1,22 +1,13 @@
|
||||
import fs from "node:fs";
|
||||
import net from "node:net";
|
||||
import path from "node:path";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||
import { normalizePluginConfig } from "./core/types/contractor.js";
|
||||
import { resolveContractorAgentMetadata } from "./core/contractor/metadata-resolver.js";
|
||||
import { createBridgeServer } from "./web/server.js";
|
||||
import { registerCli } from "./commands/register-cli.js";
|
||||
import type http from "node:http";
|
||||
|
||||
function isPortFree(port: number): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
const tester = net.createServer();
|
||||
tester.once("error", () => resolve(false));
|
||||
tester.once("listening", () => tester.close(() => resolve(true)));
|
||||
tester.listen(port, "127.0.0.1");
|
||||
});
|
||||
}
|
||||
|
||||
// ── GlobalThis state ─────────────────────────────────────────────────────────
|
||||
// All persistent state lives on globalThis to survive OpenClaw hot-reloads.
|
||||
// See LESSONS_LEARNED.md items 1, 3, 11.
|
||||
@@ -27,12 +18,57 @@ const SERVER_KEY = "_contractorAgentBridgeServer";
|
||||
/** Key for the live OpenClaw config accessor (getter fn) shared via globalThis. */
|
||||
const OPENCLAW_CONFIG_KEY = "_contractorOpenClawConfig";
|
||||
|
||||
/** OpenClaw replaces secret-like config values with this before exposing
|
||||
* pluginConfig to plugins. */
|
||||
const OPENCLAW_REDACTION_SENTINEL = "__OPENCLAW_REDACTED__";
|
||||
|
||||
/**
|
||||
* OpenClaw redacts any secret-like key (anything containing "apiKey") before
|
||||
* handing `api.pluginConfig` to the plugin, so `pluginConfig.bridgeApiKey` is
|
||||
* the redaction sentinel — never the real value. The bridge server validates
|
||||
* inbound requests against this key, while the OpenClaw model provider sends
|
||||
* the real `models.providers.contractor-agent.apiKey`; using the redacted
|
||||
* value therefore guarantees a permanent HTTP 401.
|
||||
*
|
||||
* Resolve the real shared secret from the raw on-disk config instead (same
|
||||
* pattern `resolveAgent` already uses). If it is still missing or redacted,
|
||||
* return "" so the loopback-only bridge skips auth rather than hard-locking.
|
||||
*/
|
||||
function resolveBridgeApiKey(fallback: string): string {
|
||||
try {
|
||||
const configPath = path.join(
|
||||
process.env.HOME ?? "/root",
|
||||
".openclaw",
|
||||
"openclaw.json",
|
||||
);
|
||||
const raw = JSON.parse(fs.readFileSync(configPath, "utf8")) as {
|
||||
plugins?: {
|
||||
entries?: Record<string, { config?: { bridgeApiKey?: string } }>;
|
||||
};
|
||||
};
|
||||
const k = raw.plugins?.entries?.["contractor-agent"]?.config?.bridgeApiKey;
|
||||
if (typeof k === "string" && k && k !== OPENCLAW_REDACTION_SENTINEL) {
|
||||
return k;
|
||||
}
|
||||
} catch {
|
||||
/* fall through to fallback handling */
|
||||
}
|
||||
if (fallback && fallback !== OPENCLAW_REDACTION_SENTINEL) return fallback;
|
||||
return ""; // loopback-only bridge: skip auth instead of 401-locking
|
||||
}
|
||||
|
||||
// ── Plugin entry ─────────────────────────────────────────────────────────────
|
||||
|
||||
export default {
|
||||
export default definePluginEntry({
|
||||
id: "contractor-agent",
|
||||
name: "Contractor Agent",
|
||||
async register(api: OpenClawPluginApi) {
|
||||
description: "Turns Claude Code into an OpenClaw-managed contractor agent",
|
||||
// OpenClaw requires register() to be synchronous — returning a Promise
|
||||
// surfaces as `Error: plugin register must be synchronous` and the plugin
|
||||
// ends up in `error` state. We avoid `await` here and instead let the
|
||||
// bridge server bind asynchronously, handling EADDRINUSE via the server's
|
||||
// `error` event when another gateway/CLI process already owns the port.
|
||||
register(api: OpenClawPluginApi): void {
|
||||
const config = normalizePluginConfig(api.pluginConfig);
|
||||
|
||||
// Resolve agent metadata for the bridge server's resolveAgent callback.
|
||||
@@ -58,9 +94,6 @@ export default {
|
||||
}
|
||||
|
||||
// ── Gateway lifecycle (start bridge server once per gateway process) ──────
|
||||
// Guard with globalThis flag AND a port probe to handle the case where the
|
||||
// gateway is already running the server while a CLI subprocess is starting up.
|
||||
// (See LESSONS_LEARNED.md item 7 — lock file / port probe pattern)
|
||||
// Always update the config accessor so hot-reloads get fresh config.
|
||||
// server.ts reads this via globalThis to build tool execution context.
|
||||
_G[OPENCLAW_CONFIG_KEY] = api.config;
|
||||
@@ -68,23 +101,37 @@ export default {
|
||||
if (!_G[LIFECYCLE_KEY]) {
|
||||
_G[LIFECYCLE_KEY] = true;
|
||||
|
||||
// Only bind if port is not already in use (avoids EADDRINUSE in CLI mode)
|
||||
const portFree = await isPortFree(config.bridgePort);
|
||||
if (!portFree) {
|
||||
// Bind the bridge server only when the gateway boots, NOT eagerly at
|
||||
// register-time. register() also runs in one-shot CLI subprocesses
|
||||
// (e.g. `openclaw completion`, `openclaw doctor`); spawning a long-
|
||||
// lived listener there would prevent those commands from exiting.
|
||||
api.on("gateway_start", () => {
|
||||
const server = createBridgeServer({
|
||||
port: config.bridgePort,
|
||||
apiKey: resolveBridgeApiKey(config.bridgeApiKey),
|
||||
permissionMode: config.permissionMode,
|
||||
resolveAgent,
|
||||
logger: api.logger,
|
||||
});
|
||||
|
||||
// EADDRINUSE → another gateway already owns the port; fine, skip bind.
|
||||
server.on("error", (err: NodeJS.ErrnoException) => {
|
||||
if (err.code === "EADDRINUSE") {
|
||||
api.logger.info(
|
||||
`[contractor-agent] bridge already running on port ${config.bridgePort}, skipping bind`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const server = createBridgeServer({
|
||||
port: config.bridgePort,
|
||||
apiKey: config.bridgeApiKey,
|
||||
permissionMode: config.permissionMode,
|
||||
resolveAgent,
|
||||
logger: api.logger,
|
||||
api.logger.warn(`[contractor-agent] bridge server error: ${err.message ?? String(err)}`);
|
||||
});
|
||||
|
||||
// Defense in depth: even if this code path is somehow reached outside
|
||||
// the gateway, .unref() prevents the listener from pinning the host's
|
||||
// event loop and blocking process exit.
|
||||
server.unref();
|
||||
|
||||
_G[SERVER_KEY] = server;
|
||||
});
|
||||
|
||||
api.on("gateway_stop", () => {
|
||||
const s = _G[SERVER_KEY] as http.Server | undefined;
|
||||
@@ -98,4 +145,4 @@ export default {
|
||||
|
||||
api.logger.info(`[contractor-agent] plugin registered (bridge port: ${config.bridgePort})`);
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
{
|
||||
"id": "contractor-agent",
|
||||
"name": "Contractor Agent",
|
||||
"version": "0.1.0",
|
||||
"description": "Turns Claude Code into an OpenClaw-managed contractor agent",
|
||||
"main": "index.ts",
|
||||
"activation": {
|
||||
"onStartup": true
|
||||
},
|
||||
"commandAliases": [
|
||||
{ "name": "contractor-agents" }
|
||||
],
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
|
||||
@@ -10,8 +10,59 @@ function messageText(m: OpenAIMessage): string {
|
||||
.join("");
|
||||
}
|
||||
|
||||
function stripOpenClawTimestampPrefix(raw: string): string {
|
||||
// "[Sat 2026-04-11 08:32 GMT+1] " → ""
|
||||
return raw.replace(/^\[[^\]]+\]\s*/, "").trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the latest user message from the OpenClaw request.
|
||||
* Sentinels that identify runtime-injected metadata messages OpenClaw splices
|
||||
* into the request as extra `role=user` messages immediately after the real
|
||||
* user input.
|
||||
*
|
||||
* Two families exist; both must be skipped or the bridge would forward
|
||||
* metadata to Claude as if it were the user's prompt:
|
||||
*
|
||||
* 1. Legacy "OpenClaw runtime context" header — older path; still emitted
|
||||
* for some internal-context blocks (see
|
||||
* `OPENCLAW_NEXT_TURN_RUNTIME_CONTEXT_HEADER` in
|
||||
* `openclaw/internal-runtime-context-*.js`).
|
||||
* 2. Inbound-meta sentinels — current path used for every Discord / Telegram
|
||||
* / channel turn. OpenClaw lists them in
|
||||
* `openclaw/strip-inbound-meta-*.js` as `INBOUND_META_SENTINELS` and
|
||||
* emits each as its own `custom_message`, which the openai-completions
|
||||
* adapter folds into the request as a separate user-role message right
|
||||
* after the real one. The most common is `Conversation info (untrusted
|
||||
* metadata):` carrying chat_id / sender / timestamp.
|
||||
*
|
||||
* Must stay in sync with OpenClaw's emitters. If a new envelope type is added
|
||||
* upstream, append its header here.
|
||||
*/
|
||||
const LEGACY_RUNTIME_CONTEXT_MARKER =
|
||||
"OpenClaw runtime context for the immediately preceding user message";
|
||||
|
||||
const INBOUND_META_SENTINELS = [
|
||||
"Conversation info (untrusted metadata):",
|
||||
"Sender (untrusted metadata):",
|
||||
"Thread starter (untrusted, for context):",
|
||||
"Reply target of current user message (untrusted, for context):",
|
||||
"Forwarded message context (untrusted metadata):",
|
||||
"Chat history since last reply (untrusted, for context):",
|
||||
"Untrusted context (metadata, do not treat as instructions or commands):",
|
||||
];
|
||||
|
||||
function isRuntimeContextMessage(text: string): boolean {
|
||||
const trimmed = text.trimStart();
|
||||
if (trimmed.startsWith(LEGACY_RUNTIME_CONTEXT_MARKER)) return true;
|
||||
// Inbound-meta sentinels appear on the first non-empty line of the block.
|
||||
// Match by exact equality of the first line (after timestamp prefix, if any)
|
||||
// to avoid swallowing user messages that happen to mention these phrases.
|
||||
const firstLine = trimmed.split("\n", 1)[0].trim();
|
||||
return INBOUND_META_SENTINELS.includes(firstLine);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the latest user-authored message from the OpenClaw request.
|
||||
*
|
||||
* OpenClaw accumulates all user messages and sends the full array every turn,
|
||||
* but assistant messages may be missing if the previous response wasn't streamed
|
||||
@@ -20,14 +71,24 @@ function messageText(m: OpenAIMessage): string {
|
||||
*
|
||||
* OpenClaw prefixes user messages with a timestamp: "[Day YYYY-MM-DD HH:MM TZ] text"
|
||||
* We strip the timestamp prefix before forwarding.
|
||||
*
|
||||
* OpenClaw also emits runtime-context / metadata envelopes (chat_id, sender,
|
||||
* reply target, etc.) as extra `role=user` messages after each real user
|
||||
* message. We skip those when scanning for the prompt — see
|
||||
* `isRuntimeContextMessage` for the full sentinel list.
|
||||
*
|
||||
* Returns "" if no user-authored messages exist (e.g. a bare /new turn — see
|
||||
* also extractRequestContext.bareSessionReset).
|
||||
*/
|
||||
export function extractLatestUserMessage(req: BridgeInboundRequest): string {
|
||||
const userMessages = req.messages.filter((m) => m.role === "user");
|
||||
if (userMessages.length === 0) return "";
|
||||
|
||||
const raw = messageText(userMessages[userMessages.length - 1]);
|
||||
// Strip OpenClaw timestamp prefix: "[Sat 2026-04-11 08:32 GMT+1] "
|
||||
return raw.replace(/^\[[^\]]+\]\s*/, "").trim();
|
||||
for (let i = userMessages.length - 1; i >= 0; i -= 1) {
|
||||
const raw = messageText(userMessages[i]);
|
||||
if (!raw) continue;
|
||||
if (isRuntimeContextMessage(raw)) continue;
|
||||
return stripOpenClawTimestampPrefix(raw);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
export type RequestContext = {
|
||||
@@ -37,26 +98,90 @@ export type RequestContext = {
|
||||
skillsBlock: string;
|
||||
/** OpenClaw context files present in the workspace (SOUL.md, IDENTITY.md, etc.) */
|
||||
workspaceContextFiles: string[];
|
||||
/**
|
||||
* OpenClaw conversation/chat identifier scraped from the "Conversation info"
|
||||
* untrusted-metadata JSON block that OpenClaw appends to user messages on
|
||||
* non-direct or non-webchat surfaces (Discord channels, Discord DMs,
|
||||
* Telegram, etc.).
|
||||
*
|
||||
* Format examples:
|
||||
* - DM: "user:561921120408698910"
|
||||
* - Channel: "channel:1498579994044010566"
|
||||
*
|
||||
* Empty when not parseable (typical for local TUI / webchat direct chats),
|
||||
* in which case we fall back to keying sessions by agentId only.
|
||||
*/
|
||||
chatId: string;
|
||||
/**
|
||||
* True when this turn was triggered by `/new` (or the equivalent bare
|
||||
* `/reset`) on the OpenClaw side. We detect it by looking for the literal
|
||||
* marker that OpenClaw injects into the runtime prompt:
|
||||
*
|
||||
* "A new session was started via /new or /reset."
|
||||
*
|
||||
* (See `BARE_SESSION_RESET_PROMPT_BASE` in OpenClaw's
|
||||
* startup-context module.)
|
||||
*
|
||||
* The bridge uses this to discard any prior `claudeSessionId` so we start
|
||||
* a fresh Claude CLI session instead of `--resume`-ing into an old one
|
||||
* that the user just asked to abandon.
|
||||
*/
|
||||
bareSessionReset: boolean;
|
||||
};
|
||||
|
||||
const BARE_SESSION_RESET_MARKER =
|
||||
"A new session was started via /new or /reset";
|
||||
|
||||
function extractChatIdFromText(text: string): string {
|
||||
// OpenClaw injects an untrusted-metadata block of the form:
|
||||
//
|
||||
// Conversation info (untrusted metadata):
|
||||
// ```json
|
||||
// {
|
||||
// "chat_id": "channel:1498579994044010566",
|
||||
// ...
|
||||
// }
|
||||
// ```
|
||||
//
|
||||
// It can appear inside a user message body, the runtime-context system
|
||||
// message, or both. A non-greedy regex on the JSON literal is enough — we
|
||||
// don't need to JSON.parse the whole block (and parsing would be brittle
|
||||
// against truncation / nested code fences).
|
||||
const match = text.match(/"chat_id"\s*:\s*"([^"\n]+)"/);
|
||||
return match ? match[1] : "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse agent ID and workspace path from the OpenClaw system prompt.
|
||||
* Parse agent ID, workspace path, chat id, and the bare-session-reset flag
|
||||
* out of the OpenClaw request.
|
||||
*
|
||||
* OpenClaw does NOT send agent ID / session key as HTTP headers — it's embedded
|
||||
* in the system prompt as a "## Runtime" line:
|
||||
* Runtime: agent=contractor-e2e | host=... | repo=/tmp/contractor-e2e-workspace | ...
|
||||
* OpenClaw does NOT send agent ID / session key as HTTP headers — agent and
|
||||
* workspace come from the system prompt's "## Runtime" line:
|
||||
*
|
||||
* We parse this line to extract `agent` (agent ID) and `repo` (workspace path).
|
||||
* Runtime: agent=<id> | host=... | repo=<workspace> | ...
|
||||
*
|
||||
* Conversation info (chat_id) is injected into the user message envelope
|
||||
* as untrusted metadata; we scrape it so the bridge can scope sessions per
|
||||
* Discord channel / DM / etc., instead of collapsing everything for an
|
||||
* agent into a single Claude CLI session.
|
||||
*/
|
||||
export function extractRequestContext(req: BridgeInboundRequest): RequestContext {
|
||||
const empty: RequestContext = {
|
||||
agentId: "",
|
||||
workspace: "",
|
||||
skillsBlock: "",
|
||||
workspaceContextFiles: [],
|
||||
chatId: "",
|
||||
bareSessionReset: false,
|
||||
};
|
||||
const systemMsg = req.messages.find((m) => m.role === "system");
|
||||
if (!systemMsg) return { agentId: "", workspace: "", skillsBlock: "", workspaceContextFiles: [] };
|
||||
if (!systemMsg) return empty;
|
||||
|
||||
const text = messageText(systemMsg);
|
||||
const systemText = messageText(systemMsg);
|
||||
|
||||
// Match "Runtime: agent=<id> | ... | repo=<path> | ..."
|
||||
const runtimeMatch = text.match(/Runtime:\s*([^\n]+)/);
|
||||
if (!runtimeMatch) return { agentId: "", workspace: "", skillsBlock: "", workspaceContextFiles: [] };
|
||||
const runtimeMatch = systemText.match(/Runtime:\s*([^\n]+)/);
|
||||
if (!runtimeMatch) return empty;
|
||||
|
||||
const runtimeLine = runtimeMatch[1];
|
||||
const agentMatch = runtimeLine.match(/\bagent=([^|\s]+)/);
|
||||
@@ -65,14 +190,13 @@ export function extractRequestContext(req: BridgeInboundRequest): RequestContext
|
||||
// Extract <available_skills>...</available_skills> XML block.
|
||||
// Expand leading "~/" in <location> paths to the actual home dir so Claude doesn't
|
||||
// try /root/.openclaw/... (which fails with EACCES).
|
||||
const skillsMatch = text.match(/<available_skills>[\s\S]*?<\/available_skills>/);
|
||||
const skillsMatch = systemText.match(/<available_skills>[\s\S]*?<\/available_skills>/);
|
||||
const home = process.env.HOME ?? "/root";
|
||||
const skillsBlock = skillsMatch
|
||||
? skillsMatch[0].replace(/~\//g, `${home}/`)
|
||||
: "";
|
||||
|
||||
// Detect which OpenClaw context files are present in the workspace.
|
||||
// These tell us what persona/memory files to surface to Claude.
|
||||
const workspace = repoMatch?.[1] ?? "";
|
||||
const CONTEXT_FILES = ["SOUL.md", "IDENTITY.md", "MEMORY.md", "AGENTS.md", "USER.md"];
|
||||
const workspaceContextFiles: string[] = [];
|
||||
@@ -80,16 +204,77 @@ export function extractRequestContext(req: BridgeInboundRequest): RequestContext
|
||||
for (const f of CONTEXT_FILES) {
|
||||
if (fs.existsSync(path.join(workspace, f))) workspaceContextFiles.push(f);
|
||||
}
|
||||
// Also check for memory/ directory
|
||||
if (fs.existsSync(path.join(workspace, "memory"))) {
|
||||
workspaceContextFiles.push("memory/");
|
||||
}
|
||||
}
|
||||
|
||||
// chat_id can appear in any message (user envelope or runtime-context
|
||||
// system block). Scan from newest to oldest and take the first hit.
|
||||
let chatId = "";
|
||||
for (let i = req.messages.length - 1; i >= 0; i -= 1) {
|
||||
const text = messageText(req.messages[i]);
|
||||
if (!text) continue;
|
||||
const found = extractChatIdFromText(text);
|
||||
if (found) {
|
||||
chatId = found;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Detect bare /new or /reset: OpenClaw injects the BARE_SESSION_RESET_PROMPT_BASE
|
||||
// marker into the prompt body when the user typed `/new` (or bare `/reset`)
|
||||
// with no trailing instruction.
|
||||
let bareSessionReset = false;
|
||||
for (const m of req.messages) {
|
||||
if (messageText(m).includes(BARE_SESSION_RESET_MARKER)) {
|
||||
bareSessionReset = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
agentId: agentMatch?.[1] ?? "",
|
||||
workspace,
|
||||
skillsBlock,
|
||||
workspaceContextFiles,
|
||||
chatId,
|
||||
bareSessionReset,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the per-CLI-session map key from the parsed request context.
|
||||
*
|
||||
* Each unique OpenClaw session (DM, channel, etc.) gets its own Claude CLI
|
||||
* session so contexts don't bleed across surfaces. Falls back to the agent
|
||||
* id alone when chat_id can't be parsed (e.g. local TUI direct chats), so
|
||||
* the historical "one session per agent" behavior remains as a backstop
|
||||
* rather than degrading to one session per *request*.
|
||||
*/
|
||||
export function buildSessionKey(agentId: string, chatId: string): string {
|
||||
if (!agentId) return "";
|
||||
if (!chatId) return agentId;
|
||||
return `${agentId}::${chatId}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick the prompt to forward to the Claude CLI for this turn.
|
||||
*
|
||||
* Normal turns: the latest user message (timestamp prefix stripped).
|
||||
*
|
||||
* Bare `/new` turns: OpenClaw sends an empty user message body alongside a
|
||||
* runtime-context system block that asks the agent to greet the user; the
|
||||
* provider rejects an empty user message so we synthesize a short prompt
|
||||
* from the bare-reset marker instead.
|
||||
*/
|
||||
export function resolveDispatchPrompt(
|
||||
latestMessage: string,
|
||||
ctx: Pick<RequestContext, "bareSessionReset">,
|
||||
): string {
|
||||
if (latestMessage) return latestMessage;
|
||||
if (ctx.bareSessionReset) {
|
||||
return "A new session was just started. Greet the user briefly in your configured persona and ask what they'd like to do.";
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
import http from "node:http";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import type { BridgeInboundRequest } from "../core/types/model.js";
|
||||
import { extractLatestUserMessage, extractRequestContext } from "./input-filter.js";
|
||||
import {
|
||||
buildSessionKey,
|
||||
extractLatestUserMessage,
|
||||
extractRequestContext,
|
||||
resolveDispatchPrompt,
|
||||
} from "./input-filter.js";
|
||||
import { buildBootstrap } from "./bootstrap.js";
|
||||
import { dispatchToClaude } from "../core/claude/sdk-adapter.js";
|
||||
import { dispatchToGemini } from "../core/gemini/sdk-adapter.js";
|
||||
@@ -10,6 +15,7 @@ import {
|
||||
getSession,
|
||||
putSession,
|
||||
markOrphaned,
|
||||
removeSession,
|
||||
} from "../core/contractor/session-map-store.js";
|
||||
|
||||
export type BridgeServerConfig = {
|
||||
@@ -82,6 +88,28 @@ function parseBody(req: http.IncomingMessage): Promise<BridgeInboundRequest> {
|
||||
return parseBodyRaw(req) as Promise<BridgeInboundRequest>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-sessionKey FIFO queue: same-session turns serialize so a user firing
|
||||
* multiple Discord messages back-to-back gets them processed in order rather
|
||||
* than spawning concurrent claude subprocesses. Cross-session requests bypass
|
||||
* each other's chains and run in parallel.
|
||||
*
|
||||
* The chain is `prev.then(() => mySlot)` per request — `mySlot` is released
|
||||
* in the request's finally block, so the next request waits until our work
|
||||
* completes (success or failure) before starting.
|
||||
*/
|
||||
const queueBySession = new Map<string, Promise<void>>();
|
||||
|
||||
/**
|
||||
* SSE heartbeat cadence. Bridge writes an empty-content `chat.completion.chunk`
|
||||
* (a no-op for OpenAI stream parsers, but a real model-progress event on the
|
||||
* canonical streaming channel) on this interval while a turn is in flight or
|
||||
* queued. This keeps OpenClaw's LLM idle watchdog (default 120s) from firing
|
||||
* during long quiet tool-call phases or while we're waiting our turn in the
|
||||
* per-session queue. See the heartbeat block in handleChatCompletions for
|
||||
* details on why an SSE comment frame is insufficient.
|
||||
*/
|
||||
const HEARTBEAT_MS = 30_000;
|
||||
|
||||
const _G = globalThis as Record<string, unknown>;
|
||||
|
||||
@@ -100,22 +128,39 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract agent ID and workspace from the system prompt's Runtime line.
|
||||
// OpenClaw does NOT send agent/session info as HTTP headers — it's in the system prompt.
|
||||
const { agentId: parsedAgentId, workspace: parsedWorkspace, skillsBlock, workspaceContextFiles } = extractRequestContext(body);
|
||||
// Extract agent ID, workspace, chat id, and bare-reset signal from the
|
||||
// request. OpenClaw does NOT send agent/session info as HTTP headers — it
|
||||
// lives in the system prompt's Runtime line and the user envelope's
|
||||
// "Conversation info" untrusted-metadata block.
|
||||
const {
|
||||
agentId: parsedAgentId,
|
||||
workspace: parsedWorkspace,
|
||||
skillsBlock,
|
||||
workspaceContextFiles,
|
||||
chatId,
|
||||
bareSessionReset,
|
||||
} = extractRequestContext(body);
|
||||
const latestMessage = extractLatestUserMessage(body);
|
||||
|
||||
if (!latestMessage) {
|
||||
// Pick the prompt to forward to the CLI. For bare /new turns OpenClaw
|
||||
// submits an empty user message — we synthesize a stub prompt instead so
|
||||
// the CLI has something to respond to.
|
||||
const dispatchPrompt = resolveDispatchPrompt(latestMessage, { bareSessionReset });
|
||||
|
||||
if (!dispatchPrompt) {
|
||||
sendJson(res, 400, { error: "no user message found" });
|
||||
return;
|
||||
}
|
||||
|
||||
// Use agentId as session key — one persistent Claude session per agent (v1).
|
||||
// Scope the CLI session by (agentId, chat_id) so different Discord
|
||||
// channels / DMs / etc. for the same agent don't pile into one Claude
|
||||
// session and bleed context across surfaces. Falls back to agentId-only
|
||||
// when chat_id can't be parsed (local TUI, etc.).
|
||||
const agentId = parsedAgentId;
|
||||
const sessionKey = agentId; // stable per-agent key
|
||||
const sessionKey = buildSessionKey(agentId, chatId);
|
||||
|
||||
logger.info(
|
||||
`[contractor-bridge] turn agentId=${agentId} workspace=${parsedWorkspace} msg=${latestMessage.substring(0, 80)}`,
|
||||
`[contractor-bridge] turn agentId=${agentId} sessionKey=${sessionKey} workspace=${parsedWorkspace} bareReset=${bareSessionReset} msg=${dispatchPrompt.substring(0, 80)}`,
|
||||
);
|
||||
|
||||
// Resolve workspace: prefer what we parsed from the system prompt (most accurate);
|
||||
@@ -133,12 +178,151 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
// Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude
|
||||
const isGemini = typeof body.model === "string" && body.model.includes("gemini");
|
||||
|
||||
// Look up existing session (shared structure for both Claude and Gemini)
|
||||
let existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
||||
let resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
||||
// ── Abort propagation ────────────────────────────────────────────────────
|
||||
// OpenClaw's LLM client cancels in-flight HTTP requests via AbortSignal
|
||||
// when an attempt fails or the user cancels. On the wire this manifests
|
||||
// as the client closing the socket, surfaced here as `req.on('close')`.
|
||||
// We mirror that signal into our own AbortController and propagate it
|
||||
// down to the dispatchTo{Claude,Gemini} adapter, which kills the
|
||||
// subprocess. Without this, OpenClaw silently drops the response while
|
||||
// the subprocess keeps running, and a retry spawns another subprocess
|
||||
// alongside the orphan — the root cause of the duplicate-Discord-message
|
||||
// bug we hit in May 2026.
|
||||
const abortController = new AbortController();
|
||||
let clientDisconnected = false;
|
||||
const onClose = (): void => {
|
||||
if (clientDisconnected) return;
|
||||
clientDisconnected = true;
|
||||
logger.info(
|
||||
`[contractor-bridge] client disconnected sessionKey=${sessionKey} — aborting in-flight work`,
|
||||
);
|
||||
abortController.abort();
|
||||
};
|
||||
req.on("close", onClose);
|
||||
|
||||
// Start SSE response immediately so OpenClaw sees the response status
|
||||
// quickly. We may still spend time waiting in the per-session queue
|
||||
// before any real chunk goes out — heartbeat (below) keeps that quiet
|
||||
// window from tripping the upstream idle watchdog.
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"Transfer-Encoding": "chunked",
|
||||
});
|
||||
|
||||
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
||||
|
||||
// ── SSE heartbeat ────────────────────────────────────────────────────────
|
||||
// OpenClaw's LLM idle watchdog (default 120s) fires on lack of *model
|
||||
// progress*, not lack of bytes — concretely "no content delta through
|
||||
// SSE for 120s". An SSE comment frame (`: keepalive\n\n`) keeps the TCP
|
||||
// socket alive but does NOT register as model progress, so a long quiet
|
||||
// tool-call phase still idles out. When that happens OpenClaw falls back
|
||||
// to re-sending the prior turn's assistant text (see
|
||||
// pi-embedded-Bcz04p2i.js:1308 `fallbackAnswerText`), producing the
|
||||
// duplicate-Discord-message symptom observed 2026-05-14.
|
||||
//
|
||||
// We emit a real `chat.completion.chunk` with an empty content delta
|
||||
// every HEARTBEAT_MS. Clients drop empty deltas, but the upstream idle
|
||||
// watchdog should count it as model progress because it's a real event
|
||||
// on the canonical streaming channel. If empty content turns out to be
|
||||
// filtered, the next step is a zero-width-space "".
|
||||
const heartbeat = setInterval(() => {
|
||||
if (clientDisconnected || res.writableEnded) return;
|
||||
try {
|
||||
sseWrite(res, buildChunk(completionId, ""));
|
||||
} catch {
|
||||
/* socket dead, ignore */
|
||||
}
|
||||
}, HEARTBEAT_MS);
|
||||
heartbeat.unref?.();
|
||||
|
||||
// ── Per-sessionKey FIFO queue ────────────────────────────────────────────
|
||||
// Same-sessionKey turns serialize: if a user fires multiple Discord
|
||||
// messages quickly, we process them in order rather than starting
|
||||
// concurrent claude subprocesses (which would corrupt the shared session
|
||||
// file and produce overlapping replies). Cross-sessionKey requests run
|
||||
// in parallel — they live on independent chains.
|
||||
let releaseSlot: () => void = () => {};
|
||||
const mySlot = new Promise<void>((r) => {
|
||||
releaseSlot = r;
|
||||
});
|
||||
const prev = sessionKey
|
||||
? queueBySession.get(sessionKey) ?? Promise.resolve()
|
||||
: Promise.resolve();
|
||||
// `myChainTail` advances only after `releaseSlot()` (called in our
|
||||
// finally block). Tolerate prev rejecting so a crashed earlier turn
|
||||
// doesn't poison the chain forever.
|
||||
const myChainTail = prev.then(() => mySlot, () => mySlot);
|
||||
if (sessionKey) queueBySession.set(sessionKey, myChainTail);
|
||||
let newSessionId = "";
|
||||
let hasError = false;
|
||||
let resultErrorReason: string | null = null;
|
||||
let existingEntry: ReturnType<typeof getSession> = null;
|
||||
|
||||
try {
|
||||
// Block until the previous turn on this sessionKey finishes.
|
||||
// Cross-session requests skip this wait (their `prev` is a fresh
|
||||
// Promise.resolve()).
|
||||
await prev.catch(() => {});
|
||||
|
||||
// While queued, OpenClaw may have given up on this request (attempt
|
||||
// timeout, user cancel, etc.) and closed the socket. If so, exit
|
||||
// cleanly without dispatching — the client wouldn't see the output
|
||||
// anyway.
|
||||
if (clientDisconnected) {
|
||||
logger.info(
|
||||
`[contractor-bridge] dropped queued request sessionKey=${sessionKey} (client disconnected during queue wait)`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Look up the session-map entry NOW (at the head of the queue), not
|
||||
// earlier — the previous turn on the same sessionKey may have just
|
||||
// updated `claudeSessionId` and we want to resume into the latest
|
||||
// one rather than a stale snapshot from request-arrival time.
|
||||
existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
||||
|
||||
// Detect a fresh OpenClaw session even when the bare-reset marker is
|
||||
// absent. The marker only arrives when `/new` is the body of *this*
|
||||
// user turn (see get-reply isBareSessionReset). When `/new` is sent
|
||||
// as a separate slash command (e.g. via Discord's slash UI), OpenClaw
|
||||
// processes the reset in a side lane that doesn't hit the bridge —
|
||||
// it just renames the prior session file aside. The follow-up real
|
||||
// message then arrives on a brand-new OpenClaw session, but as a
|
||||
// normal turn with no marker. Without this check, the bridge happily
|
||||
// resumes the long-stale claudeSessionId from before the reset.
|
||||
//
|
||||
// OpenClaw sends the full conversation history every turn (system +
|
||||
// user/assistant pairs + latest user). A request with zero assistant
|
||||
// turns is therefore a positive signal that the OpenClaw session is
|
||||
// brand-new and any prior claudeSessionId we hold is from a previous
|
||||
// OpenClaw session that the user already abandoned.
|
||||
const hasAssistantHistory = body.messages.some((m) => {
|
||||
if (m.role !== "assistant") return false;
|
||||
if (typeof m.content === "string") return m.content.trim().length > 0;
|
||||
return m.content.some(
|
||||
(c) => c.type === "text" && (c.text ?? "").trim().length > 0,
|
||||
);
|
||||
});
|
||||
const isFreshOpenClawSession = !hasAssistantHistory;
|
||||
|
||||
if ((bareSessionReset || isFreshOpenClawSession) && existingEntry && sessionKey) {
|
||||
const reason = bareSessionReset
|
||||
? "bare /new detected"
|
||||
: "fresh OpenClaw session (no assistant history in messages[])";
|
||||
logger.info(
|
||||
`[contractor-bridge] ${reason} — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
||||
);
|
||||
removeSession(workspace, sessionKey);
|
||||
existingEntry = null;
|
||||
}
|
||||
const resumeSessionId =
|
||||
existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
||||
|
||||
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
||||
// For Claude: --system-prompt fully replaces any prior system prompt each invocation.
|
||||
// For Claude: --append-system-prompt appends to the built-in prompt each invocation.
|
||||
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
||||
// This keeps persona and skills current without needing to track first-turn state.
|
||||
const systemPrompt = buildBootstrap({
|
||||
@@ -149,24 +333,11 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
workspaceContextFiles,
|
||||
});
|
||||
|
||||
// Start SSE response
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"Transfer-Encoding": "chunked",
|
||||
});
|
||||
|
||||
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
||||
let newSessionId = "";
|
||||
let hasError = false;
|
||||
|
||||
const openclawTools = body.tools ?? [];
|
||||
|
||||
try {
|
||||
const dispatchIter = isGemini
|
||||
? dispatchToGemini({
|
||||
prompt: latestMessage,
|
||||
prompt: dispatchPrompt,
|
||||
systemPrompt,
|
||||
workspace,
|
||||
agentId,
|
||||
@@ -174,9 +345,10 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
openclawTools,
|
||||
bridgePort: port,
|
||||
bridgeApiKey: apiKey,
|
||||
signal: abortController.signal,
|
||||
})
|
||||
: dispatchToClaude({
|
||||
prompt: latestMessage,
|
||||
prompt: dispatchPrompt,
|
||||
systemPrompt,
|
||||
workspace,
|
||||
agentId,
|
||||
@@ -185,31 +357,67 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
openclawTools,
|
||||
bridgePort: port,
|
||||
bridgeApiKey: apiKey,
|
||||
signal: abortController.signal,
|
||||
});
|
||||
|
||||
try {
|
||||
for await (const event of dispatchIter) {
|
||||
if (event.type === "text") {
|
||||
sseWrite(res, buildChunk(completionId, event.text));
|
||||
if (!clientDisconnected) sseWrite(res, buildChunk(completionId, event.text));
|
||||
} else if (event.type === "done") {
|
||||
newSessionId = event.sessionId;
|
||||
} else if (event.type === "result_error") {
|
||||
// CLI signaled a fatal-but-graceful error (context overflow,
|
||||
// refusal, billing, etc.) via `is_error: true` on the result
|
||||
// event. The text was already streamed via prior `text` events;
|
||||
// record the reason so the bridge can drop the session-map entry
|
||||
// below.
|
||||
logger.warn(
|
||||
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
||||
);
|
||||
resultErrorReason = event.reason;
|
||||
newSessionId = event.sessionId;
|
||||
} else if (event.type === "error") {
|
||||
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
||||
hasError = true;
|
||||
if (!clientDisconnected) {
|
||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
||||
hasError = true;
|
||||
if (!clientDisconnected) {
|
||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
||||
}
|
||||
}
|
||||
|
||||
if (!clientDisconnected && !res.writableEnded) {
|
||||
sseWrite(res, buildStopChunk(completionId));
|
||||
sseWrite(res, "[DONE]");
|
||||
res.end();
|
||||
}
|
||||
|
||||
// Persist session mapping (shared for both Claude and Gemini)
|
||||
if (newSessionId && sessionKey && !hasError) {
|
||||
// Session-map persistence:
|
||||
// - Successful turn → upsert with the latest claudeSessionId so the next
|
||||
// turn can `--resume` into it.
|
||||
// - Terminal CLI error (context overflow etc., reported via result_error)
|
||||
// → drop the entry so the next turn starts fresh instead of resuming
|
||||
// into the same poisoned session and re-erroring.
|
||||
// - Stream/transport error before any sessionId was captured → mark the
|
||||
// prior entry orphaned (existing behavior).
|
||||
// - Aborted (client disconnected) → don't touch the session-map; the
|
||||
// subprocess may have already updated its own session file on disk,
|
||||
// and the next turn will resume from whatever it left there.
|
||||
if (clientDisconnected) {
|
||||
// No-op: aborted turn; preserve whatever the prior entry was so the
|
||||
// next turn (likely an OpenClaw retry of the same prompt) can resume.
|
||||
} else if (resultErrorReason && sessionKey) {
|
||||
logger.info(
|
||||
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
||||
);
|
||||
removeSession(workspace, sessionKey);
|
||||
} else if (newSessionId && sessionKey && !hasError) {
|
||||
const now = new Date().toISOString();
|
||||
putSession(workspace, {
|
||||
openclawSessionKey: sessionKey,
|
||||
@@ -227,6 +435,29 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
} else if (hasError && sessionKey && existingEntry) {
|
||||
markOrphaned(workspace, sessionKey);
|
||||
}
|
||||
} finally {
|
||||
clearInterval(heartbeat);
|
||||
req.off("close", onClose);
|
||||
|
||||
// Release our slot so the next request on this sessionKey can proceed.
|
||||
// Must happen even on abort/throw, otherwise the chain stalls forever.
|
||||
releaseSlot();
|
||||
|
||||
// Garbage-collect the queue entry if no later request chained on us.
|
||||
// The Map identity check is the only safe way to tell — newer requests
|
||||
// overwrite the entry with their own chain tail.
|
||||
if (sessionKey && queueBySession.get(sessionKey) === myChainTail) {
|
||||
queueBySession.delete(sessionKey);
|
||||
}
|
||||
|
||||
if (!res.writableEnded) {
|
||||
try {
|
||||
res.end();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const server = http.createServer(async (req, res) => {
|
||||
@@ -321,21 +552,8 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
return;
|
||||
}
|
||||
|
||||
// Find the tool registration by name
|
||||
const toolReg = pluginRegistry.tools.find((t) => t.names.includes(toolName));
|
||||
if (!toolReg) {
|
||||
sendJson(res, 200, { error: `Tool '${toolName}' not registered in OpenClaw plugin registry` });
|
||||
return;
|
||||
}
|
||||
|
||||
// Build tool execution context.
|
||||
// config is required by memory tools (resolveMemoryToolContext checks it).
|
||||
// Read from globalThis where index.ts stores api.config on every registration.
|
||||
// sessionKey must be in OpenClaw's canonical format "agent:<agentId>:<rest>" so that
|
||||
// parseAgentSessionKey() can extract the agentId for memory tool context resolution.
|
||||
// Build tool execution context (needed before tool lookup for factory instantiation).
|
||||
const liveConfig = (_G["_contractorOpenClawConfig"] ?? null) as Record<string, unknown> | null;
|
||||
// Construct a canonical session key so memory tools can resolve the agentId from it.
|
||||
// Format: "agent:<agentId>:direct:bridge"
|
||||
const canonicalSessionKey = execAgentId ? `agent:${execAgentId}:direct:bridge` : undefined;
|
||||
const toolCtx = {
|
||||
config: liveConfig ?? undefined,
|
||||
@@ -345,6 +563,28 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
||||
sessionKey: canonicalSessionKey,
|
||||
};
|
||||
|
||||
// Find tool by name. Some plugins register tools via factory functions without
|
||||
// declaring names upfront (names array is empty). For those, instantiate the
|
||||
// factory and match by the returned tool's .name property.
|
||||
let toolReg = pluginRegistry.tools.find((t) => t.names.includes(toolName));
|
||||
if (!toolReg) {
|
||||
for (const candidate of pluginRegistry.tools) {
|
||||
if (candidate.names.length > 0) continue;
|
||||
try {
|
||||
const inst = candidate.factory(toolCtx);
|
||||
const items = Array.isArray(inst) ? inst : [inst];
|
||||
if (items.some((t) => (t as { name?: string }).name === toolName)) {
|
||||
toolReg = candidate;
|
||||
break;
|
||||
}
|
||||
} catch { /* skip */ }
|
||||
}
|
||||
}
|
||||
if (!toolReg) {
|
||||
sendJson(res, 200, { error: `Tool '${toolName}' not registered in OpenClaw plugin registry` });
|
||||
return;
|
||||
}
|
||||
|
||||
// Instantiate the tool via its factory
|
||||
const toolOrTools = toolReg.factory(toolCtx);
|
||||
const toolInstance = Array.isArray(toolOrTools)
|
||||
|
||||
@@ -67,13 +67,23 @@ function install() {
|
||||
// 3. Update openclaw.json
|
||||
const cfg = readConfig();
|
||||
|
||||
// Add provider
|
||||
// Add provider — spread existing first so user-added fields
|
||||
// (e.g. timeoutSeconds, extraHeaders) survive reinstall. Script-managed
|
||||
// fields (baseUrl/apiKey/api/models) are then overridden authoritatively
|
||||
// since they're tied to the constants and model catalog above.
|
||||
cfg.models = cfg.models ?? {};
|
||||
cfg.models.providers = cfg.models.providers ?? {};
|
||||
const existingProvider = cfg.models.providers[PLUGIN_ID] ?? {};
|
||||
cfg.models.providers[PLUGIN_ID] = {
|
||||
...existingProvider,
|
||||
baseUrl: `http://127.0.0.1:${BRIDGE_PORT}/v1`,
|
||||
apiKey: BRIDGE_API_KEY,
|
||||
api: "openai-completions",
|
||||
// The bridge wraps a full Claude/Gemini agent turn (tool use, multi-step),
|
||||
// which routinely takes far longer than OpenClaw's default model-fetch
|
||||
// timeout. Without a generous timeout OpenClaw aborts the request mid-turn
|
||||
// and no reply is ever delivered. Preserve a user override if set.
|
||||
timeoutSeconds: existingProvider.timeoutSeconds ?? 600,
|
||||
models: [
|
||||
{
|
||||
id: "contractor-claude-bridge",
|
||||
@@ -115,11 +125,16 @@ function install() {
|
||||
cfg.plugins.entries[PLUGIN_ID] = cfg.plugins.entries[PLUGIN_ID] ?? {};
|
||||
cfg.plugins.entries[PLUGIN_ID].enabled = true;
|
||||
|
||||
// Set default config — setIfMissing so user values are preserved
|
||||
// Set default config — setIfMissing so user values are preserved.
|
||||
const pluginCfg = cfg.plugins.entries[PLUGIN_ID].config ?? {};
|
||||
setIfMissing(pluginCfg, "bridgePort", BRIDGE_PORT);
|
||||
setIfMissing(pluginCfg, "bridgeApiKey", BRIDGE_API_KEY);
|
||||
setIfMissing(pluginCfg, "permissionMode", "bypassPermissions");
|
||||
// bridgeApiKey is the shared secret between the bridge server (this plugin)
|
||||
// and the model provider written above. The provider apiKey is set
|
||||
// authoritatively (= BRIDGE_API_KEY); the bridge side MUST stay in lockstep
|
||||
// or every request 401s. Set it authoritatively too — never setIfMissing
|
||||
// (a stale prior value would desync the pair).
|
||||
pluginCfg.bridgeApiKey = BRIDGE_API_KEY;
|
||||
cfg.plugins.entries[PLUGIN_ID].config = pluginCfg;
|
||||
|
||||
writeConfig(cfg);
|
||||
|
||||
Reference in New Issue
Block a user