Compare commits
9 Commits
4e015c677b
...
fix/inline
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f49edf59c | |||
| 037e92b421 | |||
| 0b24330787 | |||
| 1b7cd6b215 | |||
| cce85a9be8 | |||
| 5fca8f5da1 | |||
| 2e64e9ce02 | |||
| b94e0d25f6 | |||
| 91acce9b32 |
@@ -1,4 +1,4 @@
|
|||||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||||
import { runContractorAgentsAdd } from "./contractor-agents-add.js";
|
import { runContractorAgentsAdd } from "./contractor-agents-add.js";
|
||||||
|
|
||||||
export function registerCli(api: OpenClawPluginApi): void {
|
export function registerCli(api: OpenClawPluginApi): void {
|
||||||
|
|||||||
@@ -37,6 +37,15 @@ export type ClaudeDispatchOptions = {
|
|||||||
bridgePort?: number;
|
bridgePort?: number;
|
||||||
/** Bridge API key for MCP proxy callbacks */
|
/** Bridge API key for MCP proxy callbacks */
|
||||||
bridgeApiKey?: string;
|
bridgeApiKey?: string;
|
||||||
|
/**
|
||||||
|
* Abort signal from the bridge. When fired (typically because the upstream
|
||||||
|
* HTTP client closed the socket — OpenClaw's attempt-level retry / cancel),
|
||||||
|
* we kill the claude subprocess group and break out of the iterator
|
||||||
|
* promptly so a stale subprocess doesn't keep streaming into a dead socket
|
||||||
|
* (or worse, get its output multiplexed with a fresh subprocess started by
|
||||||
|
* a retry).
|
||||||
|
*/
|
||||||
|
signal?: AbortSignal;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Resolve the MCP server script path relative to this file.
|
// Resolve the MCP server script path relative to this file.
|
||||||
@@ -109,6 +118,7 @@ export async function* dispatchToClaude(
|
|||||||
openclawTools,
|
openclawTools,
|
||||||
bridgePort = 18800,
|
bridgePort = 18800,
|
||||||
bridgeApiKey = "",
|
bridgeApiKey = "",
|
||||||
|
signal,
|
||||||
} = opts;
|
} = opts;
|
||||||
|
|
||||||
// NOTE: put prompt right after -p, before --mcp-config.
|
// NOTE: put prompt right after -p, before --mcp-config.
|
||||||
@@ -202,6 +212,18 @@ export async function* dispatchToClaude(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
||||||
|
// closes the socket, propagate that into our process tree by SIGTERM/SIGKILL
|
||||||
|
// (via scheduleCleanup) and break out of the iterator (via markDone). This
|
||||||
|
// prevents stale subprocesses from outliving the request that started them.
|
||||||
|
if (signal) {
|
||||||
|
if (signal.aborted) {
|
||||||
|
markDone();
|
||||||
|
} else {
|
||||||
|
signal.addEventListener("abort", () => markDone(), { once: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rl.on("line", (line: string) => {
|
rl.on("line", (line: string) => {
|
||||||
if (!line.trim()) return;
|
if (!line.trim()) return;
|
||||||
let event: Record<string, unknown>;
|
let event: Record<string, unknown>;
|
||||||
|
|||||||
@@ -29,6 +29,13 @@ export type GeminiDispatchOptions = {
|
|||||||
openclawTools?: OpenAITool[];
|
openclawTools?: OpenAITool[];
|
||||||
bridgePort?: number;
|
bridgePort?: number;
|
||||||
bridgeApiKey?: string;
|
bridgeApiKey?: string;
|
||||||
|
/**
|
||||||
|
* Abort signal from the bridge. Mirror of dispatchToClaude's `signal` —
|
||||||
|
* see that file for rationale. When fired, we kill the gemini subprocess
|
||||||
|
* and break the iterator promptly so a stale process doesn't outlive
|
||||||
|
* the upstream request.
|
||||||
|
*/
|
||||||
|
signal?: AbortSignal;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -116,6 +123,7 @@ export async function* dispatchToGemini(
|
|||||||
openclawTools,
|
openclawTools,
|
||||||
bridgePort = 18800,
|
bridgePort = 18800,
|
||||||
bridgeApiKey = "",
|
bridgeApiKey = "",
|
||||||
|
signal,
|
||||||
} = opts;
|
} = opts;
|
||||||
|
|
||||||
// Write system-level instructions to workspace/GEMINI.md every turn.
|
// Write system-level instructions to workspace/GEMINI.md every turn.
|
||||||
@@ -166,6 +174,43 @@ export async function* dispatchToGemini(
|
|||||||
let done = false;
|
let done = false;
|
||||||
let resolveNext: (() => void) | null = null;
|
let resolveNext: (() => void) | null = null;
|
||||||
|
|
||||||
|
// Cleanup helper: SIGTERM the child first, then SIGKILL after a grace
|
||||||
|
// period if it hasn't exited. Idempotent — safe to call multiple times.
|
||||||
|
let cleanupScheduled = false;
|
||||||
|
const scheduleCleanup = (): void => {
|
||||||
|
if (cleanupScheduled) return;
|
||||||
|
cleanupScheduled = true;
|
||||||
|
if (child.exitCode !== null) return;
|
||||||
|
try { child.kill("SIGTERM"); } catch { /* already gone */ }
|
||||||
|
const killTimer = setTimeout(() => {
|
||||||
|
try { child.kill("SIGKILL"); } catch { /* already gone */ }
|
||||||
|
}, 5000);
|
||||||
|
killTimer.unref?.();
|
||||||
|
child.once("close", () => clearTimeout(killTimer));
|
||||||
|
};
|
||||||
|
|
||||||
|
const markDone = (): void => {
|
||||||
|
if (done) return;
|
||||||
|
done = true;
|
||||||
|
scheduleCleanup();
|
||||||
|
if (resolveNext) {
|
||||||
|
const r = resolveNext;
|
||||||
|
resolveNext = null;
|
||||||
|
r();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Hook the upstream abort signal: when the bridge's HTTP client (OpenClaw)
|
||||||
|
// closes the socket, kill the gemini subprocess and break the iterator.
|
||||||
|
// See dispatchToClaude for the full rationale.
|
||||||
|
if (signal) {
|
||||||
|
if (signal.aborted) {
|
||||||
|
markDone();
|
||||||
|
} else {
|
||||||
|
signal.addEventListener("abort", () => markDone(), { once: true });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rl.on("line", (line: string) => {
|
rl.on("line", (line: string) => {
|
||||||
if (!line.trim()) return;
|
if (!line.trim()) return;
|
||||||
let event: Record<string, unknown>;
|
let event: Record<string, unknown>;
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import fs from "node:fs";
|
import fs from "node:fs";
|
||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
|
||||||
|
import type { OpenClawPluginApi } from "openclaw/plugin-sdk/core";
|
||||||
import { normalizePluginConfig } from "./core/types/contractor.js";
|
import { normalizePluginConfig } from "./core/types/contractor.js";
|
||||||
import { resolveContractorAgentMetadata } from "./core/contractor/metadata-resolver.js";
|
import { resolveContractorAgentMetadata } from "./core/contractor/metadata-resolver.js";
|
||||||
import { createBridgeServer } from "./web/server.js";
|
import { createBridgeServer } from "./web/server.js";
|
||||||
@@ -19,9 +20,10 @@ const OPENCLAW_CONFIG_KEY = "_contractorOpenClawConfig";
|
|||||||
|
|
||||||
// ── Plugin entry ─────────────────────────────────────────────────────────────
|
// ── Plugin entry ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
export default {
|
export default definePluginEntry({
|
||||||
id: "contractor-agent",
|
id: "contractor-agent",
|
||||||
name: "Contractor Agent",
|
name: "Contractor Agent",
|
||||||
|
description: "Turns Claude Code into an OpenClaw-managed contractor agent",
|
||||||
// OpenClaw requires register() to be synchronous — returning a Promise
|
// OpenClaw requires register() to be synchronous — returning a Promise
|
||||||
// surfaces as `Error: plugin register must be synchronous` and the plugin
|
// surfaces as `Error: plugin register must be synchronous` and the plugin
|
||||||
// ends up in `error` state. We avoid `await` here and instead let the
|
// ends up in `error` state. We avoid `await` here and instead let the
|
||||||
@@ -60,6 +62,11 @@ export default {
|
|||||||
if (!_G[LIFECYCLE_KEY]) {
|
if (!_G[LIFECYCLE_KEY]) {
|
||||||
_G[LIFECYCLE_KEY] = true;
|
_G[LIFECYCLE_KEY] = true;
|
||||||
|
|
||||||
|
// Bind the bridge server only when the gateway boots, NOT eagerly at
|
||||||
|
// register-time. register() also runs in one-shot CLI subprocesses
|
||||||
|
// (e.g. `openclaw completion`, `openclaw doctor`); spawning a long-
|
||||||
|
// lived listener there would prevent those commands from exiting.
|
||||||
|
api.on("gateway_start", () => {
|
||||||
const server = createBridgeServer({
|
const server = createBridgeServer({
|
||||||
port: config.bridgePort,
|
port: config.bridgePort,
|
||||||
apiKey: config.bridgeApiKey,
|
apiKey: config.bridgeApiKey,
|
||||||
@@ -68,9 +75,7 @@ export default {
|
|||||||
logger: api.logger,
|
logger: api.logger,
|
||||||
});
|
});
|
||||||
|
|
||||||
// EADDRINUSE → another gateway/CLI process already owns the port; that's
|
// EADDRINUSE → another gateway already owns the port; fine, skip bind.
|
||||||
// fine, we just don't double-bind. Any other error is logged but does
|
|
||||||
// not crash registration.
|
|
||||||
server.on("error", (err: NodeJS.ErrnoException) => {
|
server.on("error", (err: NodeJS.ErrnoException) => {
|
||||||
if (err.code === "EADDRINUSE") {
|
if (err.code === "EADDRINUSE") {
|
||||||
api.logger.info(
|
api.logger.info(
|
||||||
@@ -81,7 +86,13 @@ export default {
|
|||||||
api.logger.warn(`[contractor-agent] bridge server error: ${err.message ?? String(err)}`);
|
api.logger.warn(`[contractor-agent] bridge server error: ${err.message ?? String(err)}`);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Defense in depth: even if this code path is somehow reached outside
|
||||||
|
// the gateway, .unref() prevents the listener from pinning the host's
|
||||||
|
// event loop and blocking process exit.
|
||||||
|
server.unref();
|
||||||
|
|
||||||
_G[SERVER_KEY] = server;
|
_G[SERVER_KEY] = server;
|
||||||
|
});
|
||||||
|
|
||||||
api.on("gateway_stop", () => {
|
api.on("gateway_stop", () => {
|
||||||
const s = _G[SERVER_KEY] as http.Server | undefined;
|
const s = _G[SERVER_KEY] as http.Server | undefined;
|
||||||
@@ -95,4 +106,4 @@ export default {
|
|||||||
|
|
||||||
api.logger.info(`[contractor-agent] plugin registered (bridge port: ${config.bridgePort})`);
|
api.logger.info(`[contractor-agent] plugin registered (bridge port: ${config.bridgePort})`);
|
||||||
},
|
},
|
||||||
};
|
});
|
||||||
|
|||||||
@@ -1,9 +1,13 @@
|
|||||||
{
|
{
|
||||||
"id": "contractor-agent",
|
"id": "contractor-agent",
|
||||||
"name": "Contractor Agent",
|
"name": "Contractor Agent",
|
||||||
"version": "0.1.0",
|
|
||||||
"description": "Turns Claude Code into an OpenClaw-managed contractor agent",
|
"description": "Turns Claude Code into an OpenClaw-managed contractor agent",
|
||||||
"main": "index.ts",
|
"activation": {
|
||||||
|
"onStartup": true
|
||||||
|
},
|
||||||
|
"commandAliases": [
|
||||||
|
{ "name": "contractor-agents" }
|
||||||
|
],
|
||||||
"configSchema": {
|
"configSchema": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"additionalProperties": false,
|
"additionalProperties": false,
|
||||||
|
|||||||
@@ -16,7 +16,98 @@ function stripOpenClawTimestampPrefix(raw: string): string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract the latest user message from the OpenClaw request.
|
* Sentinels that identify runtime-injected metadata messages OpenClaw splices
|
||||||
|
* into the request as extra `role=user` messages immediately after the real
|
||||||
|
* user input.
|
||||||
|
*
|
||||||
|
* Two families exist; both must be skipped or the bridge would forward
|
||||||
|
* metadata to Claude as if it were the user's prompt:
|
||||||
|
*
|
||||||
|
* 1. Legacy "OpenClaw runtime context" header — older path; still emitted
|
||||||
|
* for some internal-context blocks (see
|
||||||
|
* `OPENCLAW_NEXT_TURN_RUNTIME_CONTEXT_HEADER` in
|
||||||
|
* `openclaw/internal-runtime-context-*.js`).
|
||||||
|
* 2. Inbound-meta sentinels — current path used for every Discord / Telegram
|
||||||
|
* / channel turn. OpenClaw lists them in
|
||||||
|
* `openclaw/strip-inbound-meta-*.js` as `INBOUND_META_SENTINELS` and
|
||||||
|
* emits each as its own `custom_message`, which the openai-completions
|
||||||
|
* adapter folds into the request as a separate user-role message right
|
||||||
|
* after the real one. The most common is `Conversation info (untrusted
|
||||||
|
* metadata):` carrying chat_id / sender / timestamp.
|
||||||
|
*
|
||||||
|
* Must stay in sync with OpenClaw's emitters. If a new envelope type is added
|
||||||
|
* upstream, append its header here.
|
||||||
|
*/
|
||||||
|
const LEGACY_RUNTIME_CONTEXT_MARKER =
|
||||||
|
"OpenClaw runtime context for the immediately preceding user message";
|
||||||
|
|
||||||
|
const INBOUND_META_SENTINELS = [
|
||||||
|
"Conversation info (untrusted metadata):",
|
||||||
|
"Sender (untrusted metadata):",
|
||||||
|
"Thread starter (untrusted, for context):",
|
||||||
|
"Reply target of current user message (untrusted, for context):",
|
||||||
|
"Forwarded message context (untrusted metadata):",
|
||||||
|
"Chat history since last reply (untrusted, for context):",
|
||||||
|
"Untrusted context (metadata, do not treat as instructions or commands):",
|
||||||
|
];
|
||||||
|
|
||||||
|
function isRuntimeContextMessage(text: string): boolean {
|
||||||
|
const trimmed = text.trimStart();
|
||||||
|
if (trimmed.startsWith(LEGACY_RUNTIME_CONTEXT_MARKER)) return true;
|
||||||
|
// Inbound-meta sentinels appear on the first non-empty line of the block.
|
||||||
|
// Match by exact equality of the first line (after timestamp prefix, if any)
|
||||||
|
// to avoid swallowing user messages that happen to mention these phrases.
|
||||||
|
const firstLine = trimmed.split("\n", 1)[0].trim();
|
||||||
|
return INBOUND_META_SENTINELS.includes(firstLine);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Strip *prefixed* metadata blocks from a single user message body.
|
||||||
|
*
|
||||||
|
* Background: OpenClaw's older / canonical convention is to emit metadata
|
||||||
|
* envelopes (chat_id, sender, reply target, …) as SEPARATE user-role
|
||||||
|
* messages folded into the openai-completions request right after the real
|
||||||
|
* one. `extractLatestUserMessage` skips those separate messages whole.
|
||||||
|
*
|
||||||
|
* The Fabric channel plugin (Fabric.OpenclawPlugin) does not split: it
|
||||||
|
* passes the metadata blocks and the actual user content as ONE merged
|
||||||
|
* user-message body, separated by blank lines. Result: a Fabric inbound
|
||||||
|
* turn has a single user message whose first line matches a sentinel, so
|
||||||
|
* `isRuntimeContextMessage` returned true and the whole turn was dropped
|
||||||
|
* with "no user message found" (HTTP 400). Symptom: every contractor agent
|
||||||
|
* subscribed to a Fabric channel silently fails to reply.
|
||||||
|
*
|
||||||
|
* Strip leading sentinel-prefixed blocks (sentinel header + optional code
|
||||||
|
* fence + blank-line separator) until we hit a block whose first line is
|
||||||
|
* NOT a sentinel — that's the real prompt. Returns "" if the entire body
|
||||||
|
* is metadata-only (still a no-op turn, same as before).
|
||||||
|
*/
|
||||||
|
function stripPrefixedMetadataBlocks(raw: string): string {
|
||||||
|
// Split on blank-line block boundaries. Within a metadata block the JSON
|
||||||
|
// code fence has only single newlines, so it stays in the same chunk as
|
||||||
|
// its sentinel header.
|
||||||
|
const blocks = raw.split(/\n\n+/);
|
||||||
|
const kept: string[] = [];
|
||||||
|
let stillStripping = true;
|
||||||
|
for (const block of blocks) {
|
||||||
|
if (stillStripping) {
|
||||||
|
const trimmed = block.trimStart();
|
||||||
|
const firstLine = trimmed.split("\n", 1)[0].trim();
|
||||||
|
if (
|
||||||
|
INBOUND_META_SENTINELS.includes(firstLine) ||
|
||||||
|
trimmed.startsWith(LEGACY_RUNTIME_CONTEXT_MARKER)
|
||||||
|
) {
|
||||||
|
continue; // drop this prefix block
|
||||||
|
}
|
||||||
|
stillStripping = false;
|
||||||
|
}
|
||||||
|
kept.push(block);
|
||||||
|
}
|
||||||
|
return kept.join("\n\n").trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the latest user-authored message from the OpenClaw request.
|
||||||
*
|
*
|
||||||
* OpenClaw accumulates all user messages and sends the full array every turn,
|
* OpenClaw accumulates all user messages and sends the full array every turn,
|
||||||
* but assistant messages may be missing if the previous response wasn't streamed
|
* but assistant messages may be missing if the previous response wasn't streamed
|
||||||
@@ -26,16 +117,35 @@ function stripOpenClawTimestampPrefix(raw: string): string {
|
|||||||
* OpenClaw prefixes user messages with a timestamp: "[Day YYYY-MM-DD HH:MM TZ] text"
|
* OpenClaw prefixes user messages with a timestamp: "[Day YYYY-MM-DD HH:MM TZ] text"
|
||||||
* We strip the timestamp prefix before forwarding.
|
* We strip the timestamp prefix before forwarding.
|
||||||
*
|
*
|
||||||
* Returns "" if no user messages exist or the latest user message is empty
|
* OpenClaw also emits runtime-context / metadata envelopes (chat_id, sender,
|
||||||
* (e.g. a bare /new turn — see also extractRequestContext.bareSessionReset).
|
* reply target, etc.) as extra `role=user` messages after each real user
|
||||||
|
* message. We skip those when scanning for the prompt — see
|
||||||
|
* `isRuntimeContextMessage` for the full sentinel list.
|
||||||
|
*
|
||||||
|
* Returns "" if no user-authored messages exist (e.g. a bare /new turn — see
|
||||||
|
* also extractRequestContext.bareSessionReset).
|
||||||
*/
|
*/
|
||||||
export function extractLatestUserMessage(req: BridgeInboundRequest): string {
|
export function extractLatestUserMessage(req: BridgeInboundRequest): string {
|
||||||
const userMessages = req.messages.filter((m) => m.role === "user");
|
const userMessages = req.messages.filter((m) => m.role === "user");
|
||||||
if (userMessages.length === 0) return "";
|
for (let i = userMessages.length - 1; i >= 0; i -= 1) {
|
||||||
|
const raw = messageText(userMessages[i]);
|
||||||
const raw = messageText(userMessages[userMessages.length - 1]);
|
if (!raw) continue;
|
||||||
|
// First try the separate-message convention (Discord/Telegram path): a
|
||||||
|
// user message whose entire body is one metadata envelope — skip whole.
|
||||||
|
if (isRuntimeContextMessage(raw)) {
|
||||||
|
// ...but also try inline-prefixed-metadata recovery (Fabric path):
|
||||||
|
// some channels splice metadata + real content into one body. Walk
|
||||||
|
// past leading sentinel blocks; if any non-metadata block follows,
|
||||||
|
// that's the prompt. If nothing follows, this turn really is pure
|
||||||
|
// metadata and we keep skipping.
|
||||||
|
const stripped = stripPrefixedMetadataBlocks(raw);
|
||||||
|
if (stripped) return stripOpenClawTimestampPrefix(stripped);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
return stripOpenClawTimestampPrefix(raw);
|
return stripOpenClawTimestampPrefix(raw);
|
||||||
}
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
export type RequestContext = {
|
export type RequestContext = {
|
||||||
agentId: string;
|
agentId: string;
|
||||||
|
|||||||
@@ -88,6 +88,28 @@ function parseBody(req: http.IncomingMessage): Promise<BridgeInboundRequest> {
|
|||||||
return parseBodyRaw(req) as Promise<BridgeInboundRequest>;
|
return parseBodyRaw(req) as Promise<BridgeInboundRequest>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Per-sessionKey FIFO queue: same-session turns serialize so a user firing
|
||||||
|
* multiple Discord messages back-to-back gets them processed in order rather
|
||||||
|
* than spawning concurrent claude subprocesses. Cross-session requests bypass
|
||||||
|
* each other's chains and run in parallel.
|
||||||
|
*
|
||||||
|
* The chain is `prev.then(() => mySlot)` per request — `mySlot` is released
|
||||||
|
* in the request's finally block, so the next request waits until our work
|
||||||
|
* completes (success or failure) before starting.
|
||||||
|
*/
|
||||||
|
const queueBySession = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SSE heartbeat cadence. Bridge writes an empty-content `chat.completion.chunk`
|
||||||
|
* (a no-op for OpenAI stream parsers, but a real model-progress event on the
|
||||||
|
* canonical streaming channel) on this interval while a turn is in flight or
|
||||||
|
* queued. This keeps OpenClaw's LLM idle watchdog (default 120s) from firing
|
||||||
|
* during long quiet tool-call phases or while we're waiting our turn in the
|
||||||
|
* per-session queue. See the heartbeat block in handleChatCompletions for
|
||||||
|
* details on why an SSE comment frame is insufficient.
|
||||||
|
*/
|
||||||
|
const HEARTBEAT_MS = 30_000;
|
||||||
|
|
||||||
const _G = globalThis as Record<string, unknown>;
|
const _G = globalThis as Record<string, unknown>;
|
||||||
|
|
||||||
@@ -156,22 +178,151 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
// Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude
|
// Detect backend from body.model: "contractor-gemini-bridge" → Gemini, else → Claude
|
||||||
const isGemini = typeof body.model === "string" && body.model.includes("gemini");
|
const isGemini = typeof body.model === "string" && body.model.includes("gemini");
|
||||||
|
|
||||||
// Look up existing session (shared structure for both Claude and Gemini).
|
// ── Abort propagation ────────────────────────────────────────────────────
|
||||||
// On a bare /new or /reset turn we deliberately drop the existing entry so
|
// OpenClaw's LLM client cancels in-flight HTTP requests via AbortSignal
|
||||||
// the CLI starts a fresh session — otherwise --resume would bring back the
|
// when an attempt fails or the user cancels. On the wire this manifests
|
||||||
// very history the user just asked to abandon.
|
// as the client closing the socket, surfaced here as `req.on('close')`.
|
||||||
let existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
// We mirror that signal into our own AbortController and propagate it
|
||||||
if (bareSessionReset && existingEntry && sessionKey) {
|
// down to the dispatchTo{Claude,Gemini} adapter, which kills the
|
||||||
|
// subprocess. Without this, OpenClaw silently drops the response while
|
||||||
|
// the subprocess keeps running, and a retry spawns another subprocess
|
||||||
|
// alongside the orphan — the root cause of the duplicate-Discord-message
|
||||||
|
// bug we hit in May 2026.
|
||||||
|
const abortController = new AbortController();
|
||||||
|
let clientDisconnected = false;
|
||||||
|
const onClose = (): void => {
|
||||||
|
if (clientDisconnected) return;
|
||||||
|
clientDisconnected = true;
|
||||||
logger.info(
|
logger.info(
|
||||||
`[contractor-bridge] bare /new detected — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
`[contractor-bridge] client disconnected sessionKey=${sessionKey} — aborting in-flight work`,
|
||||||
|
);
|
||||||
|
abortController.abort();
|
||||||
|
};
|
||||||
|
req.on("close", onClose);
|
||||||
|
|
||||||
|
// Start SSE response immediately so OpenClaw sees the response status
|
||||||
|
// quickly. We may still spend time waiting in the per-session queue
|
||||||
|
// before any real chunk goes out — heartbeat (below) keeps that quiet
|
||||||
|
// window from tripping the upstream idle watchdog.
|
||||||
|
res.writeHead(200, {
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
"Transfer-Encoding": "chunked",
|
||||||
|
});
|
||||||
|
|
||||||
|
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
||||||
|
|
||||||
|
// ── SSE heartbeat ────────────────────────────────────────────────────────
|
||||||
|
// OpenClaw's LLM idle watchdog (default 120s) fires on lack of *model
|
||||||
|
// progress*, not lack of bytes — concretely "no content delta through
|
||||||
|
// SSE for 120s". An SSE comment frame (`: keepalive\n\n`) keeps the TCP
|
||||||
|
// socket alive but does NOT register as model progress, so a long quiet
|
||||||
|
// tool-call phase still idles out. When that happens OpenClaw falls back
|
||||||
|
// to re-sending the prior turn's assistant text (see
|
||||||
|
// pi-embedded-Bcz04p2i.js:1308 `fallbackAnswerText`), producing the
|
||||||
|
// duplicate-Discord-message symptom observed 2026-05-14.
|
||||||
|
//
|
||||||
|
// We emit a real `chat.completion.chunk` with an empty content delta
|
||||||
|
// every HEARTBEAT_MS. Clients drop empty deltas, but the upstream idle
|
||||||
|
// watchdog should count it as model progress because it's a real event
|
||||||
|
// on the canonical streaming channel. If empty content turns out to be
|
||||||
|
// filtered, the next step is a zero-width-space "".
|
||||||
|
const heartbeat = setInterval(() => {
|
||||||
|
if (clientDisconnected || res.writableEnded) return;
|
||||||
|
try {
|
||||||
|
sseWrite(res, buildChunk(completionId, ""));
|
||||||
|
} catch {
|
||||||
|
/* socket dead, ignore */
|
||||||
|
}
|
||||||
|
}, HEARTBEAT_MS);
|
||||||
|
heartbeat.unref?.();
|
||||||
|
|
||||||
|
// ── Per-sessionKey FIFO queue ────────────────────────────────────────────
|
||||||
|
// Same-sessionKey turns serialize: if a user fires multiple Discord
|
||||||
|
// messages quickly, we process them in order rather than starting
|
||||||
|
// concurrent claude subprocesses (which would corrupt the shared session
|
||||||
|
// file and produce overlapping replies). Cross-sessionKey requests run
|
||||||
|
// in parallel — they live on independent chains.
|
||||||
|
let releaseSlot: () => void = () => {};
|
||||||
|
const mySlot = new Promise<void>((r) => {
|
||||||
|
releaseSlot = r;
|
||||||
|
});
|
||||||
|
const prev = sessionKey
|
||||||
|
? queueBySession.get(sessionKey) ?? Promise.resolve()
|
||||||
|
: Promise.resolve();
|
||||||
|
// `myChainTail` advances only after `releaseSlot()` (called in our
|
||||||
|
// finally block). Tolerate prev rejecting so a crashed earlier turn
|
||||||
|
// doesn't poison the chain forever.
|
||||||
|
const myChainTail = prev.then(() => mySlot, () => mySlot);
|
||||||
|
if (sessionKey) queueBySession.set(sessionKey, myChainTail);
|
||||||
|
let newSessionId = "";
|
||||||
|
let hasError = false;
|
||||||
|
let resultErrorReason: string | null = null;
|
||||||
|
let existingEntry: ReturnType<typeof getSession> = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Block until the previous turn on this sessionKey finishes.
|
||||||
|
// Cross-session requests skip this wait (their `prev` is a fresh
|
||||||
|
// Promise.resolve()).
|
||||||
|
await prev.catch(() => {});
|
||||||
|
|
||||||
|
// While queued, OpenClaw may have given up on this request (attempt
|
||||||
|
// timeout, user cancel, etc.) and closed the socket. If so, exit
|
||||||
|
// cleanly without dispatching — the client wouldn't see the output
|
||||||
|
// anyway.
|
||||||
|
if (clientDisconnected) {
|
||||||
|
logger.info(
|
||||||
|
`[contractor-bridge] dropped queued request sessionKey=${sessionKey} (client disconnected during queue wait)`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up the session-map entry NOW (at the head of the queue), not
|
||||||
|
// earlier — the previous turn on the same sessionKey may have just
|
||||||
|
// updated `claudeSessionId` and we want to resume into the latest
|
||||||
|
// one rather than a stale snapshot from request-arrival time.
|
||||||
|
existingEntry = sessionKey ? getSession(workspace, sessionKey) : null;
|
||||||
|
|
||||||
|
// Detect a fresh OpenClaw session even when the bare-reset marker is
|
||||||
|
// absent. The marker only arrives when `/new` is the body of *this*
|
||||||
|
// user turn (see get-reply isBareSessionReset). When `/new` is sent
|
||||||
|
// as a separate slash command (e.g. via Discord's slash UI), OpenClaw
|
||||||
|
// processes the reset in a side lane that doesn't hit the bridge —
|
||||||
|
// it just renames the prior session file aside. The follow-up real
|
||||||
|
// message then arrives on a brand-new OpenClaw session, but as a
|
||||||
|
// normal turn with no marker. Without this check, the bridge happily
|
||||||
|
// resumes the long-stale claudeSessionId from before the reset.
|
||||||
|
//
|
||||||
|
// OpenClaw sends the full conversation history every turn (system +
|
||||||
|
// user/assistant pairs + latest user). A request with zero assistant
|
||||||
|
// turns is therefore a positive signal that the OpenClaw session is
|
||||||
|
// brand-new and any prior claudeSessionId we hold is from a previous
|
||||||
|
// OpenClaw session that the user already abandoned.
|
||||||
|
const hasAssistantHistory = body.messages.some((m) => {
|
||||||
|
if (m.role !== "assistant") return false;
|
||||||
|
if (typeof m.content === "string") return m.content.trim().length > 0;
|
||||||
|
return m.content.some(
|
||||||
|
(c) => c.type === "text" && (c.text ?? "").trim().length > 0,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
const isFreshOpenClawSession = !hasAssistantHistory;
|
||||||
|
|
||||||
|
if ((bareSessionReset || isFreshOpenClawSession) && existingEntry && sessionKey) {
|
||||||
|
const reason = bareSessionReset
|
||||||
|
? "bare /new detected"
|
||||||
|
: "fresh OpenClaw session (no assistant history in messages[])";
|
||||||
|
logger.info(
|
||||||
|
`[contractor-bridge] ${reason} — dropping prior CLI session sessionKey=${sessionKey} prevClaudeSessionId=${existingEntry.claudeSessionId}`,
|
||||||
);
|
);
|
||||||
removeSession(workspace, sessionKey);
|
removeSession(workspace, sessionKey);
|
||||||
existingEntry = null;
|
existingEntry = null;
|
||||||
}
|
}
|
||||||
let resumeSessionId = existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
const resumeSessionId =
|
||||||
|
existingEntry?.state === "active" ? existingEntry.claudeSessionId : null;
|
||||||
|
|
||||||
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
// Bootstrap is passed as the system prompt on every turn (stateless — not persisted in session files).
|
||||||
// For Claude: --system-prompt fully replaces any prior system prompt each invocation.
|
// For Claude: --append-system-prompt appends to the built-in prompt each invocation.
|
||||||
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
// For Gemini: written to workspace/GEMINI.md, read dynamically by Gemini CLI each turn.
|
||||||
// This keeps persona and skills current without needing to track first-turn state.
|
// This keeps persona and skills current without needing to track first-turn state.
|
||||||
const systemPrompt = buildBootstrap({
|
const systemPrompt = buildBootstrap({
|
||||||
@@ -182,22 +333,8 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
workspaceContextFiles,
|
workspaceContextFiles,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Start SSE response
|
|
||||||
res.writeHead(200, {
|
|
||||||
"Content-Type": "text/event-stream",
|
|
||||||
"Cache-Control": "no-cache",
|
|
||||||
"Connection": "keep-alive",
|
|
||||||
"Transfer-Encoding": "chunked",
|
|
||||||
});
|
|
||||||
|
|
||||||
const completionId = `chatcmpl-bridge-${randomUUID().slice(0, 8)}`;
|
|
||||||
let newSessionId = "";
|
|
||||||
let hasError = false;
|
|
||||||
let resultErrorReason: string | null = null;
|
|
||||||
|
|
||||||
const openclawTools = body.tools ?? [];
|
const openclawTools = body.tools ?? [];
|
||||||
|
|
||||||
try {
|
|
||||||
const dispatchIter = isGemini
|
const dispatchIter = isGemini
|
||||||
? dispatchToGemini({
|
? dispatchToGemini({
|
||||||
prompt: dispatchPrompt,
|
prompt: dispatchPrompt,
|
||||||
@@ -208,6 +345,7 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
openclawTools,
|
openclawTools,
|
||||||
bridgePort: port,
|
bridgePort: port,
|
||||||
bridgeApiKey: apiKey,
|
bridgeApiKey: apiKey,
|
||||||
|
signal: abortController.signal,
|
||||||
})
|
})
|
||||||
: dispatchToClaude({
|
: dispatchToClaude({
|
||||||
prompt: dispatchPrompt,
|
prompt: dispatchPrompt,
|
||||||
@@ -219,17 +357,21 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
openclawTools,
|
openclawTools,
|
||||||
bridgePort: port,
|
bridgePort: port,
|
||||||
bridgeApiKey: apiKey,
|
bridgeApiKey: apiKey,
|
||||||
|
signal: abortController.signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
for await (const event of dispatchIter) {
|
for await (const event of dispatchIter) {
|
||||||
if (event.type === "text") {
|
if (event.type === "text") {
|
||||||
sseWrite(res, buildChunk(completionId, event.text));
|
if (!clientDisconnected) sseWrite(res, buildChunk(completionId, event.text));
|
||||||
} else if (event.type === "done") {
|
} else if (event.type === "done") {
|
||||||
newSessionId = event.sessionId;
|
newSessionId = event.sessionId;
|
||||||
} else if (event.type === "result_error") {
|
} else if (event.type === "result_error") {
|
||||||
// CLI returned a terminal error (typically context overflow). The
|
// CLI signaled a fatal-but-graceful error (context overflow,
|
||||||
// text was already streamed via prior `text` events; record the
|
// refusal, billing, etc.) via `is_error: true` on the result
|
||||||
// session so we can drop it below and log the reason.
|
// event. The text was already streamed via prior `text` events;
|
||||||
|
// record the reason so the bridge can drop the session-map entry
|
||||||
|
// below.
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
`[contractor-bridge] ${isGemini ? "gemini" : "claude"} result_error reason=${event.reason} sessionId=${event.sessionId} message=${event.message.substring(0, 200)}`,
|
||||||
);
|
);
|
||||||
@@ -238,18 +380,23 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
} else if (event.type === "error") {
|
} else if (event.type === "error") {
|
||||||
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
logger.warn(`[contractor-bridge] ${isGemini ? "gemini" : "claude"} error: ${event.message}`);
|
||||||
hasError = true;
|
hasError = true;
|
||||||
|
if (!clientDisconnected) {
|
||||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
sseWrite(res, buildChunk(completionId, `[contractor-bridge error: ${event.message}]`));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
logger.warn(`[contractor-bridge] dispatch error: ${String(err)}`);
|
||||||
hasError = true;
|
hasError = true;
|
||||||
|
if (!clientDisconnected) {
|
||||||
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
sseWrite(res, buildChunk(completionId, `[contractor-bridge dispatch failed: ${String(err)}]`));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!clientDisconnected && !res.writableEnded) {
|
||||||
sseWrite(res, buildStopChunk(completionId));
|
sseWrite(res, buildStopChunk(completionId));
|
||||||
sseWrite(res, "[DONE]");
|
sseWrite(res, "[DONE]");
|
||||||
res.end();
|
}
|
||||||
|
|
||||||
// Session-map persistence:
|
// Session-map persistence:
|
||||||
// - Successful turn → upsert with the latest claudeSessionId so the next
|
// - Successful turn → upsert with the latest claudeSessionId so the next
|
||||||
@@ -259,7 +406,13 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
// into the same poisoned session and re-erroring.
|
// into the same poisoned session and re-erroring.
|
||||||
// - Stream/transport error before any sessionId was captured → mark the
|
// - Stream/transport error before any sessionId was captured → mark the
|
||||||
// prior entry orphaned (existing behavior).
|
// prior entry orphaned (existing behavior).
|
||||||
if (resultErrorReason && sessionKey) {
|
// - Aborted (client disconnected) → don't touch the session-map; the
|
||||||
|
// subprocess may have already updated its own session file on disk,
|
||||||
|
// and the next turn will resume from whatever it left there.
|
||||||
|
if (clientDisconnected) {
|
||||||
|
// No-op: aborted turn; preserve whatever the prior entry was so the
|
||||||
|
// next turn (likely an OpenClaw retry of the same prompt) can resume.
|
||||||
|
} else if (resultErrorReason && sessionKey) {
|
||||||
logger.info(
|
logger.info(
|
||||||
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
`[contractor-bridge] dropping CLI session after terminal error sessionKey=${sessionKey} reason=${resultErrorReason}`,
|
||||||
);
|
);
|
||||||
@@ -282,6 +435,29 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
} else if (hasError && sessionKey && existingEntry) {
|
} else if (hasError && sessionKey && existingEntry) {
|
||||||
markOrphaned(workspace, sessionKey);
|
markOrphaned(workspace, sessionKey);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
clearInterval(heartbeat);
|
||||||
|
req.off("close", onClose);
|
||||||
|
|
||||||
|
// Release our slot so the next request on this sessionKey can proceed.
|
||||||
|
// Must happen even on abort/throw, otherwise the chain stalls forever.
|
||||||
|
releaseSlot();
|
||||||
|
|
||||||
|
// Garbage-collect the queue entry if no later request chained on us.
|
||||||
|
// The Map identity check is the only safe way to tell — newer requests
|
||||||
|
// overwrite the entry with their own chain tail.
|
||||||
|
if (sessionKey && queueBySession.get(sessionKey) === myChainTail) {
|
||||||
|
queueBySession.delete(sessionKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!res.writableEnded) {
|
||||||
|
try {
|
||||||
|
res.end();
|
||||||
|
} catch {
|
||||||
|
/* ignore */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const server = http.createServer(async (req, res) => {
|
const server = http.createServer(async (req, res) => {
|
||||||
@@ -420,14 +596,47 @@ export function createBridgeServer(config: BridgeServerConfig): http.Server {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const execFn = (toolInstance as { execute: (id: string, args: unknown) => Promise<{ content?: Array<{ type: string; text?: string }> }> }).execute;
|
const execFn = (toolInstance as { execute: (id: string, args: unknown) => Promise<unknown> }).execute;
|
||||||
const toolResult = await execFn(randomUUID(), toolArgs);
|
const toolResultRaw = await execFn(randomUUID(), toolArgs);
|
||||||
|
|
||||||
// Extract text content from AgentToolResult
|
// Normalize the result shape. OpenClaw plugins return one of:
|
||||||
const text = (toolResult.content ?? [])
|
// (a) AgentToolResult — { content: [{type:'text', text:'...'}, ...] }
|
||||||
|
// used by tools that wrap via asContent() (e.g. all
|
||||||
|
// Dialectic.OpenclawPlugin tools).
|
||||||
|
// (b) raw JSON-able object — { ok:true, ...domain fields }
|
||||||
|
// used by tools that return data directly (e.g. every
|
||||||
|
// Fabric.OpenclawPlugin tool: fabric-channel-list,
|
||||||
|
// fabric-guild-list, fabric-send-message, etc).
|
||||||
|
// (c) null / undefined — fire-and-forget (rare).
|
||||||
|
//
|
||||||
|
// Pre-2026-05-24 the bridge ONLY handled shape (a); shape (b)
|
||||||
|
// silently returned "(no result)" to Claude Code (bug:
|
||||||
|
// fabric-channel-list and friends were unusable from contractor
|
||||||
|
// agents even though the tool actually ran successfully).
|
||||||
|
let text = "";
|
||||||
|
if (toolResultRaw == null) {
|
||||||
|
text = "";
|
||||||
|
} else if (
|
||||||
|
typeof toolResultRaw === "object" &&
|
||||||
|
Array.isArray((toolResultRaw as { content?: unknown }).content)
|
||||||
|
) {
|
||||||
|
// Shape (a)
|
||||||
|
const content = (toolResultRaw as { content: Array<{ type: string; text?: string }> }).content;
|
||||||
|
text = content
|
||||||
.filter((c) => c.type === "text" && c.text)
|
.filter((c) => c.type === "text" && c.text)
|
||||||
.map((c) => c.text as string)
|
.map((c) => c.text as string)
|
||||||
.join("\n");
|
.join("\n");
|
||||||
|
} else if (typeof toolResultRaw === "string") {
|
||||||
|
text = toolResultRaw;
|
||||||
|
} else {
|
||||||
|
// Shape (b) — serialize the whole object as JSON. Claude Code
|
||||||
|
// is happy to parse JSON tool results.
|
||||||
|
try {
|
||||||
|
text = JSON.stringify(toolResultRaw);
|
||||||
|
} catch {
|
||||||
|
text = String(toolResultRaw);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sendJson(res, 200, { result: text || "(no result)" });
|
sendJson(res, 200, { result: text || "(no result)" });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
@@ -67,10 +67,15 @@ function install() {
|
|||||||
// 3. Update openclaw.json
|
// 3. Update openclaw.json
|
||||||
const cfg = readConfig();
|
const cfg = readConfig();
|
||||||
|
|
||||||
// Add provider
|
// Add provider — spread existing first so user-added fields
|
||||||
|
// (e.g. timeoutSeconds, extraHeaders) survive reinstall. Script-managed
|
||||||
|
// fields (baseUrl/apiKey/api/models) are then overridden authoritatively
|
||||||
|
// since they're tied to the constants and model catalog above.
|
||||||
cfg.models = cfg.models ?? {};
|
cfg.models = cfg.models ?? {};
|
||||||
cfg.models.providers = cfg.models.providers ?? {};
|
cfg.models.providers = cfg.models.providers ?? {};
|
||||||
|
const existingProvider = cfg.models.providers[PLUGIN_ID] ?? {};
|
||||||
cfg.models.providers[PLUGIN_ID] = {
|
cfg.models.providers[PLUGIN_ID] = {
|
||||||
|
...existingProvider,
|
||||||
baseUrl: `http://127.0.0.1:${BRIDGE_PORT}/v1`,
|
baseUrl: `http://127.0.0.1:${BRIDGE_PORT}/v1`,
|
||||||
apiKey: BRIDGE_API_KEY,
|
apiKey: BRIDGE_API_KEY,
|
||||||
api: "openai-completions",
|
api: "openai-completions",
|
||||||
|
|||||||
Reference in New Issue
Block a user