From 209ab0d82e1896bd01958594a7b59ee8fa047002 Mon Sep 17 00:00:00 2001 From: hzhang Date: Tue, 2 Jun 2026 07:23:02 +0100 Subject: [PATCH] feat: trim-tool-result + list-tool-results for agent-driven session pruning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new openclaw tools that let an agent reclaim ctx tokens consumed by past tool results once it has extracted what it needs: list-tool-results — enumerates past toolResults in the agent's own session jsonl (size, tool name, turns-ago, args summary, already- trimmed flag); does NOT return content body trim-tool-result — replaces a past toolResult's content[].text with a short sentinel-tagged replacement, identified by tool_call_id The actual file rewrite is deferred to the `agent_end` hook (drained from an in-memory queue) because writing during tool execute() trips openclaw's session-file fence (EmbeddedAttemptSessionTakeoverError) — the fingerprint check around releaseForPrompt rejects third-party writes. By agent_end the lock window is closed and the next turn's fence baseline picks up our mutation cleanly. Queue-time validation rejects bad tool_call_ids up-front so weak models that confuse opaque call_function_*_N ids with topic/fact numeric ids get a clear error instead of a silent skip at drain time. install.mjs now auto-sets plugins.entries.padded-cell.hooks. allowConversationAccess=true (required for the agent_end hook on non-bundled plugins; without it the drain never fires and queued trims rot in memory). Sim-verified end-to-end: model dispatches trim, drain fires on agent_end, next turn's list shows already_trimmed=true. --- install.mjs | 9 + plugin/index.ts | 124 +++++++++++ plugin/openclaw.plugin.json | 4 +- plugin/tools/session-rewrite.ts | 375 ++++++++++++++++++++++++++++++++ 4 files changed, 511 insertions(+), 1 deletion(-) create mode 100644 plugin/tools/session-rewrite.ts diff --git a/install.mjs b/install.mjs index 3071ea2..b29122b 100755 --- a/install.mjs +++ b/install.mjs @@ -363,6 +363,15 @@ async function configure() { const existingProfile = getOpenclawConfig(`${cfgPath}.openclawProfilePath`, undefined); if (existingProfile === undefined) setOpenclawConfig(`${cfgPath}.openclawProfilePath`, openclawPath); + // hooks.allowConversationAccess gates `agent_end` (and similar + // conversation-scoped hooks) for non-bundled plugins. trim-tool-result's + // deferred drain runs in agent_end — without this flag openclaw logs + // "typed hook agent_end blocked" and the drain never fires, leaving + // trim requests queued in memory forever. + const hooksPath = `${entryPath}.hooks`; + const existingHookAccess = getOpenclawConfig(`${hooksPath}.allowConversationAccess`, undefined); + if (existingHookAccess === undefined) setOpenclawConfig(`${hooksPath}.allowConversationAccess`, true); + logOk('Plugin entry configured (set missing defaults only)'); } catch (err) { logWarn(`Config failed: ${err.message}`); diff --git a/plugin/index.ts b/plugin/index.ts index a1168ab..f9542b6 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -3,10 +3,16 @@ import os from 'node:os'; import path from 'node:path'; +import { existsSync } from 'node:fs'; import { definePluginEntry } from 'openclaw/plugin-sdk/plugin-entry'; import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core'; import { pcexec, pcexecSync } from './tools/pcexec.js'; +import { + queueTrimToolResult, + listToolResults, + drainTrimQueueForSession, +} from './tools/session-rewrite.js'; import { safeRestart, createSafeRestartTool, @@ -186,6 +192,82 @@ function register(api: OpenClawPluginApi): void { } as any; }); + // Register trim-tool-result — rewrite a past toolResult to free ctx tokens. + api.registerTool((ctx) => { + const agentDir = ctx.agentDir; + const agentIdInner = ctx.agentId; + const sessionId = ctx.sessionId; + return { + name: 'trim-tool-result', + description: + 'Replace a past tool result in your own session with a shorter version (or empty sentinel). Use after extracting what you need from a noisy tool output to free ctx tokens. Irreversible — re-run the original tool if you mis-trim.', + parameters: { + type: 'object', + properties: { + tool_call_id: { + type: 'string', + description: 'The id of the toolCall whose result you want to trim (from list-tool-results).', + }, + replacement: { + type: 'string', + description: 'Optional condensed text to keep. Omit to fully elide the result.', + }, + }, + required: ['tool_call_id'], + }, + async execute(_id: string, params: any) { + const r = queueTrimToolResult( + { agentDir, agentId: agentIdInner, sessionId }, + { + tool_call_id: String(params.tool_call_id ?? ''), + replacement: params.replacement != null ? String(params.replacement) : undefined, + }, + ); + return { content: [{ type: 'text', text: JSON.stringify(r) }] }; + }, + } as any; + }); + + // Register list-tool-results — enumerate past toolResults to pick trim candidates. + api.registerTool((ctx) => { + const agentDir = ctx.agentDir; + const agentIdInner = ctx.agentId; + const sessionId = ctx.sessionId; + return { + name: 'list-tool-results', + description: + 'List past toolResults in your own session ordered by size (largest first), with tool name, args summary, byte size, and turns-ago. Does NOT include the result content itself — use this to pick trim-tool-result targets without re-reading bulky outputs.', + parameters: { + type: 'object', + properties: { + min_bytes: { type: 'number', description: 'Only include results at least this many bytes.' }, + older_than_turns: { + type: 'number', + description: 'Only include results from at least this many assistant turns ago (default 0).', + }, + include_trimmed: { + type: 'boolean', + description: 'Include already-trimmed results (default false).', + }, + limit: { type: 'number', description: 'Max entries to return (default 50).' }, + }, + }, + async execute(_id: string, params: any) { + const r = await listToolResults( + { agentDir, agentId: agentIdInner, sessionId }, + { + min_bytes: typeof params.min_bytes === 'number' ? params.min_bytes : undefined, + older_than_turns: + typeof params.older_than_turns === 'number' ? params.older_than_turns : undefined, + include_trimmed: params.include_trimmed === true, + limit: typeof params.limit === 'number' ? params.limit : undefined, + }, + ); + return { content: [{ type: 'text', text: JSON.stringify(r) }] }; + }, + } as any; + }); + // Register /ego-mgr slash command if the host exposes the (non-standard) hook. // This API is not part of the current OpenClawPluginApi surface; the guard // makes the plugin a no-op for slash commands when the host doesn't support @@ -212,6 +294,48 @@ function register(api: OpenClawPluginApi): void { logger.info('Registered /ego-mgr slash command'); } + // agent_end hook drains the trim queue AFTER openclaw's session-file fence + // window closes. Writing during execute() would trip + // EmbeddedAttemptSessionTakeoverError because the lock-released fingerprint + // check sees our mutation as a third-party takeover. By agent_end the lock + // is in its between-turns state and the next turn's fence snapshot will + // include our trim as baseline. + const apiOn = (api as unknown as { on?: (ev: string, fn: (...args: unknown[]) => unknown) => void }).on; + if (typeof apiOn === 'function') { + apiOn.call(api, 'agent_end', async (_event: unknown, ctx: unknown) => { + const c = (ctx ?? {}) as { agentId?: string; agentDir?: string; sessionId?: string }; + if (!c.sessionId) return; + try { + // Resolve the same file the tool resolved (canonical path priority). + let sessionFile: string | null = null; + const tryPaths: string[] = []; + if (c.agentId) { + tryPaths.push(path.join(openclawPath, 'agents', c.agentId, 'sessions', `${c.sessionId}.jsonl`)); + } + if (c.agentDir) { + tryPaths.push(path.join(c.agentDir, 'sessions', `${c.sessionId}.jsonl`)); + const stripped = c.agentDir.replace(/\/agent$/, ''); + if (stripped !== c.agentDir) { + tryPaths.push(path.join(stripped, 'sessions', `${c.sessionId}.jsonl`)); + } + } + for (const p of tryPaths) { + if (existsSync(p)) { sessionFile = p; break; } + } + if (!sessionFile) return; + const r = await drainTrimQueueForSession(sessionFile); + if (r && (r.applied > 0 || r.skipped > 0 || r.errors.length > 0)) { + logger.info( + `trim-tool-result drain: session=${c.sessionId} applied=${r.applied} skipped=${r.skipped} trimmed_bytes=${r.trimmed_bytes}` + + (r.errors.length ? ` errors=${r.errors.length}` : ''), + ); + } + } catch (err) { + logger.warn(`trim-tool-result agent_end drain failed: ${err}`); + } + }); + } + logger.info('PaddedCell plugin initialized'); } diff --git a/plugin/openclaw.plugin.json b/plugin/openclaw.plugin.json index f9108d4..ef9ec5d 100644 --- a/plugin/openclaw.plugin.json +++ b/plugin/openclaw.plugin.json @@ -9,7 +9,9 @@ "tools": [ "pcexec", "proxy-pcexec", - "safe_restart" + "safe_restart", + "trim-tool-result", + "list-tool-results" ] }, "configSchema": { diff --git a/plugin/tools/session-rewrite.ts b/plugin/tools/session-rewrite.ts new file mode 100644 index 0000000..882eb8b --- /dev/null +++ b/plugin/tools/session-rewrite.ts @@ -0,0 +1,375 @@ +// Tools that operate on the calling agent's own session JSONL file. +// +// `trimToolResult` rewrites a past toolResult's content[].text by toolCallId, +// shrinking context that the agent has already extracted what it needs from. +// `listToolResults` enumerates past toolResults (size + tool + turns-ago) so +// the agent can pick trim candidates without re-reading their content. +// +// Both operate directly on the canonical session file at +// /sessions/.jsonl +// No backup is kept — if a trim turns out wrong, re-run the original tool. + +import * as fs from 'node:fs'; +import * as os from 'node:os'; +import * as path from 'node:path'; + +export interface SessionRewriteCtx { + agentDir?: string; + agentId?: string; + sessionId?: string; +} + +const TRIM_SENTINEL = '[trimmed by self]'; + +function openclawRoot(): string { + const env = process.env.OPENCLAW_PATH; + if (env) return env; + const home = process.env.HOME || os.homedir(); + return path.join(home, '.openclaw'); +} + +function resolveSessionFile(ctx: SessionRewriteCtx): string { + if (!ctx.sessionId) { + throw new Error('session context missing sessionId (no active session?)'); + } + const candidates: string[] = []; + // canonical: /agents//sessions/.jsonl + if (ctx.agentId) { + candidates.push( + path.join(openclawRoot(), 'agents', ctx.agentId, 'sessions', `${ctx.sessionId}.jsonl`), + ); + } + // fallback A: /sessions/.jsonl + if (ctx.agentDir) { + candidates.push(path.join(ctx.agentDir, 'sessions', `${ctx.sessionId}.jsonl`)); + // fallback B: strip trailing "/agent" if openclaw config put it there + const stripped = ctx.agentDir.replace(/\/agent$/, ''); + if (stripped !== ctx.agentDir) { + candidates.push(path.join(stripped, 'sessions', `${ctx.sessionId}.jsonl`)); + } + } + for (const f of candidates) { + if (fs.existsSync(f)) return f; + } + throw new Error( + `session file not found for sessionId=${ctx.sessionId}; tried: ${candidates.join(', ')}`, + ); +} + +interface MessageEvent { + type?: string; + message?: { + role?: string; + toolCallId?: string; + toolName?: string; + content?: Array<{ type?: string; text?: string }>; + }; +} + +export interface TrimToolResultParams { + tool_call_id: string; + replacement?: string; +} + +export interface TrimToolResultResult { + ok: true; + status: 'queued'; + tool_call_id: string; + applies_at: 'agent_end'; + note: string; +} + +interface QueuedTrim { + tool_call_id: string; + replacement?: string; +} +// Module-scope queue keyed by absolute session file path. Drained from the +// `agent_end` hook (see index.ts) — writing during tool execute trips +// openclaw's session-file fence (EmbeddedAttemptSessionTakeoverError) because +// the fingerprint check around releaseForPrompt rejects third-party writes. +const trimQueue = new Map(); + +export function queueTrimToolResult( + ctx: SessionRewriteCtx, + params: TrimToolResultParams, +): TrimToolResultResult { + if (!params.tool_call_id) { + throw new Error('missing required parameter: tool_call_id'); + } + const sessionFile = resolveSessionFile(ctx); + // Validate the toolCallId actually points at a real toolResult in this + // session. Without this check the drain would silently skip a bad id and + // the agent would not learn of the mistake until reading the gateway log + // — a frequent failure mode for weak models that confuse opaque + // `call_function_*_N` ids with topic/fact numeric ids. + if (!sessionHasToolResult(sessionFile, params.tool_call_id)) { + throw new Error( + `tool_call_id "${params.tool_call_id}" does not match any toolResult in this session. ` + + `Call list-tool-results first to copy a real id (long opaque string like "call_function_abc123_1"). ` + + `Topic ids, fact ids, and small integers will not work here.`, + ); + } + const list = trimQueue.get(sessionFile) ?? []; + list.push({ tool_call_id: params.tool_call_id, replacement: params.replacement }); + trimQueue.set(sessionFile, list); + return { + ok: true, + status: 'queued', + tool_call_id: params.tool_call_id, + applies_at: 'agent_end', + note: 'Trim will be applied to the session file after this turn ends. The smaller ctx takes effect on the next turn.', + }; +} + +function sessionHasToolResult(sessionFile: string, toolCallId: string): boolean { + // Cheap streaming check — scan lines for the toolCallId substring before + // committing to JSON.parse for confirmation. + const raw = fs.readFileSync(sessionFile, 'utf8'); + const needle = `"toolCallId":"${toolCallId}"`; + if (!raw.includes(needle)) return false; + // Defend against the id appearing inside arbitrary text (e.g. a prior + // assistant message quoting the id) by confirming a structural match. + for (const line of raw.split('\n')) { + if (!line.includes(needle)) continue; + try { + const obj = JSON.parse(line) as MessageEvent; + if ( + obj?.type === 'message' && + obj?.message?.role === 'toolResult' && + obj?.message?.toolCallId === toolCallId + ) { + return true; + } + } catch { + // skip malformed line + } + } + return false; +} + +export interface DrainResult { + session_file: string; + applied: number; + skipped: number; + trimmed_bytes: number; + errors: string[]; +} + +export async function drainTrimQueueForSession( + sessionFile: string, +): Promise { + const queued = trimQueue.get(sessionFile); + if (!queued || queued.length === 0) return null; + trimQueue.delete(sessionFile); + + if (!fs.existsSync(sessionFile)) { + return { + session_file: sessionFile, + applied: 0, + skipped: queued.length, + trimmed_bytes: 0, + errors: [`session file vanished: ${sessionFile}`], + }; + } + + const raw = await fs.promises.readFile(sessionFile, 'utf8'); + const lines = raw.split('\n'); + const targets = new Map(); + for (const q of queued) targets.set(q.tool_call_id, q); + + let applied = 0; + let skipped = 0; + let trimmedBytes = 0; + const errors: string[] = []; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + if (!line) continue; + let obj: MessageEvent; + try { + obj = JSON.parse(line) as MessageEvent; + } catch { + continue; + } + if (obj?.type !== 'message') continue; + if (obj.message?.role !== 'toolResult') continue; + const callId = obj.message.toolCallId; + if (!callId) continue; + const q = targets.get(callId); + if (!q) continue; + const content = obj.message.content; + if (!Array.isArray(content) || content.length === 0) { + errors.push(`toolResult ${callId} has no content[] to trim`); + skipped++; + targets.delete(callId); + continue; + } + const newText = q.replacement ? `${TRIM_SENTINEL} ${q.replacement}` : TRIM_SENTINEL; + let touched = false; + for (const c of content) { + if (c?.type === 'text' && typeof c.text === 'string') { + trimmedBytes += Buffer.byteLength(c.text, 'utf8') - Buffer.byteLength(newText, 'utf8'); + c.text = newText; + touched = true; + } + } + if (!touched) { + errors.push(`toolResult ${callId} has no text blocks`); + skipped++; + targets.delete(callId); + continue; + } + lines[i] = JSON.stringify(obj); + applied++; + targets.delete(callId); + } + for (const callId of targets.keys()) { + errors.push(`toolResult ${callId} not found in session`); + skipped++; + } + + const tmp = `${sessionFile}.trim.${process.pid}.${Date.now()}`; + await fs.promises.writeFile(tmp, lines.join('\n'), 'utf8'); + await fs.promises.rename(tmp, sessionFile); + + return { session_file: sessionFile, applied, skipped, trimmed_bytes: trimmedBytes, errors }; +} + +export function pendingTrimSessions(): string[] { + return Array.from(trimQueue.keys()); +} + +// Back-compat: synchronous adapter retained for direct callers / tests; not +// used by the registered tool (which routes through the queue). +export async function trimToolResult( + ctx: SessionRewriteCtx, + params: TrimToolResultParams, +): Promise { + return queueTrimToolResult(ctx, params); +} + +export interface ListToolResultsParams { + min_bytes?: number; + older_than_turns?: number; + include_trimmed?: boolean; + limit?: number; +} + +export interface ListToolResultsEntry { + tool_call_id: string; + tool_name: string; + bytes: number; + turns_ago: number; + arguments_summary: string; + already_trimmed: boolean; +} + +export interface ListToolResultsResult { + count: number; + results: ListToolResultsEntry[]; +} + +export async function listToolResults( + ctx: SessionRewriteCtx, + params: ListToolResultsParams, +): Promise { + const minBytes = params.min_bytes ?? 0; + const olderThanTurns = params.older_than_turns ?? 0; + const includeTrimmed = params.include_trimmed === true; + const limit = params.limit ?? 50; + + const sessionFile = resolveSessionFile(ctx); + const raw = await fs.promises.readFile(sessionFile, 'utf8'); + const lines = raw.split('\n'); + + interface AssistantToolCall { + id: string; + args: unknown; + } + const argsByCallId = new Map(); + interface Candidate { + toolCallId: string; + toolName: string; + bytes: number; + assistantCountAtTime: number; + alreadyTrimmed: boolean; + } + const candidates: Candidate[] = []; + let assistantCount = 0; + + for (const line of lines) { + if (!line) continue; + let obj: MessageEvent; + try { + obj = JSON.parse(line) as MessageEvent; + } catch { + continue; + } + if (obj?.type !== 'message') continue; + const role = obj.message?.role; + if (role === 'assistant') { + assistantCount++; + const content = obj.message?.content; + if (Array.isArray(content)) { + for (const c of content as Array<{ type?: string; id?: string; arguments?: unknown }>) { + if (c?.type === 'toolCall' && typeof c.id === 'string') { + argsByCallId.set(c.id, c.arguments); + } + } + } + } else if (role === 'toolResult') { + const callId = obj.message?.toolCallId; + if (!callId) continue; + const toolName = obj.message?.toolName ?? ''; + const content = obj.message?.content; + let bytes = 0; + let alreadyTrimmed = false; + if (Array.isArray(content)) { + for (const c of content) { + if (c?.type === 'text' && typeof c.text === 'string') { + bytes += Buffer.byteLength(c.text, 'utf8'); + if (c.text.startsWith(TRIM_SENTINEL)) alreadyTrimmed = true; + } + } + } + candidates.push({ + toolCallId: callId, + toolName, + bytes, + assistantCountAtTime: assistantCount, + alreadyTrimmed, + }); + } + } + + const totalAssistant = assistantCount; + const results: ListToolResultsEntry[] = []; + for (const c of candidates) { + if (!includeTrimmed && c.alreadyTrimmed) continue; + if (c.bytes < minBytes) continue; + const turnsAgo = totalAssistant - c.assistantCountAtTime; + if (turnsAgo < olderThanTurns) continue; + const argsRaw = argsByCallId.get(c.toolCallId); + let argsSummary = ''; + if (argsRaw !== undefined) { + try { + argsSummary = JSON.stringify(argsRaw); + } catch { + argsSummary = String(argsRaw); + } + if (argsSummary.length > 200) argsSummary = argsSummary.slice(0, 200) + '…'; + } + results.push({ + tool_call_id: c.toolCallId, + tool_name: c.toolName, + bytes: c.bytes, + turns_ago: turnsAgo, + arguments_summary: argsSummary, + already_trimmed: c.alreadyTrimmed, + }); + } + + results.sort((a, b) => b.bytes - a.bytes); + const trimmed = results.slice(0, limit); + return { count: trimmed.length, results: trimmed }; +}