Two new openclaw tools that let an agent reclaim ctx tokens consumed by
past tool results once it has extracted what it needs:
list-tool-results — enumerates past toolResults in the agent's own
session jsonl (size, tool name, turns-ago, args summary, already-
trimmed flag); does NOT return content body
trim-tool-result — replaces a past toolResult's content[].text with a
short sentinel-tagged replacement, identified by tool_call_id
The actual file rewrite is deferred to the `agent_end` hook (drained
from an in-memory queue) because writing during tool execute() trips
openclaw's session-file fence (EmbeddedAttemptSessionTakeoverError) —
the fingerprint check around releaseForPrompt rejects third-party
writes. By agent_end the lock window is closed and the next turn's
fence baseline picks up our mutation cleanly.
Queue-time validation rejects bad tool_call_ids up-front so weak
models that confuse opaque call_function_*_N ids with topic/fact
numeric ids get a clear error instead of a silent skip at drain time.
install.mjs now auto-sets plugins.entries.padded-cell.hooks.
allowConversationAccess=true (required for the agent_end hook on
non-bundled plugins; without it the drain never fires and queued
trims rot in memory).
Sim-verified end-to-end: model dispatches trim, drain fires on
agent_end, next turn's list shows already_trimmed=true.
376 lines
12 KiB
TypeScript
376 lines
12 KiB
TypeScript
// Tools that operate on the calling agent's own session JSONL file.
|
|
//
|
|
// `trimToolResult` rewrites a past toolResult's content[].text by toolCallId,
|
|
// shrinking context that the agent has already extracted what it needs from.
|
|
// `listToolResults` enumerates past toolResults (size + tool + turns-ago) so
|
|
// the agent can pick trim candidates without re-reading their content.
|
|
//
|
|
// Both operate directly on the canonical session file at
|
|
// <agentDir>/sessions/<sessionId>.jsonl
|
|
// No backup is kept — if a trim turns out wrong, re-run the original tool.
|
|
|
|
import * as fs from 'node:fs';
|
|
import * as os from 'node:os';
|
|
import * as path from 'node:path';
|
|
|
|
export interface SessionRewriteCtx {
|
|
agentDir?: string;
|
|
agentId?: string;
|
|
sessionId?: string;
|
|
}
|
|
|
|
const TRIM_SENTINEL = '[trimmed by self]';
|
|
|
|
function openclawRoot(): string {
|
|
const env = process.env.OPENCLAW_PATH;
|
|
if (env) return env;
|
|
const home = process.env.HOME || os.homedir();
|
|
return path.join(home, '.openclaw');
|
|
}
|
|
|
|
function resolveSessionFile(ctx: SessionRewriteCtx): string {
|
|
if (!ctx.sessionId) {
|
|
throw new Error('session context missing sessionId (no active session?)');
|
|
}
|
|
const candidates: string[] = [];
|
|
// canonical: <OPENCLAW>/agents/<agentId>/sessions/<sid>.jsonl
|
|
if (ctx.agentId) {
|
|
candidates.push(
|
|
path.join(openclawRoot(), 'agents', ctx.agentId, 'sessions', `${ctx.sessionId}.jsonl`),
|
|
);
|
|
}
|
|
// fallback A: <agentDir>/sessions/<sid>.jsonl
|
|
if (ctx.agentDir) {
|
|
candidates.push(path.join(ctx.agentDir, 'sessions', `${ctx.sessionId}.jsonl`));
|
|
// fallback B: strip trailing "/agent" if openclaw config put it there
|
|
const stripped = ctx.agentDir.replace(/\/agent$/, '');
|
|
if (stripped !== ctx.agentDir) {
|
|
candidates.push(path.join(stripped, 'sessions', `${ctx.sessionId}.jsonl`));
|
|
}
|
|
}
|
|
for (const f of candidates) {
|
|
if (fs.existsSync(f)) return f;
|
|
}
|
|
throw new Error(
|
|
`session file not found for sessionId=${ctx.sessionId}; tried: ${candidates.join(', ')}`,
|
|
);
|
|
}
|
|
|
|
interface MessageEvent {
|
|
type?: string;
|
|
message?: {
|
|
role?: string;
|
|
toolCallId?: string;
|
|
toolName?: string;
|
|
content?: Array<{ type?: string; text?: string }>;
|
|
};
|
|
}
|
|
|
|
export interface TrimToolResultParams {
|
|
tool_call_id: string;
|
|
replacement?: string;
|
|
}
|
|
|
|
export interface TrimToolResultResult {
|
|
ok: true;
|
|
status: 'queued';
|
|
tool_call_id: string;
|
|
applies_at: 'agent_end';
|
|
note: string;
|
|
}
|
|
|
|
interface QueuedTrim {
|
|
tool_call_id: string;
|
|
replacement?: string;
|
|
}
|
|
// Module-scope queue keyed by absolute session file path. Drained from the
|
|
// `agent_end` hook (see index.ts) — writing during tool execute trips
|
|
// openclaw's session-file fence (EmbeddedAttemptSessionTakeoverError) because
|
|
// the fingerprint check around releaseForPrompt rejects third-party writes.
|
|
const trimQueue = new Map<string, QueuedTrim[]>();
|
|
|
|
export function queueTrimToolResult(
|
|
ctx: SessionRewriteCtx,
|
|
params: TrimToolResultParams,
|
|
): TrimToolResultResult {
|
|
if (!params.tool_call_id) {
|
|
throw new Error('missing required parameter: tool_call_id');
|
|
}
|
|
const sessionFile = resolveSessionFile(ctx);
|
|
// Validate the toolCallId actually points at a real toolResult in this
|
|
// session. Without this check the drain would silently skip a bad id and
|
|
// the agent would not learn of the mistake until reading the gateway log
|
|
// — a frequent failure mode for weak models that confuse opaque
|
|
// `call_function_*_N` ids with topic/fact numeric ids.
|
|
if (!sessionHasToolResult(sessionFile, params.tool_call_id)) {
|
|
throw new Error(
|
|
`tool_call_id "${params.tool_call_id}" does not match any toolResult in this session. ` +
|
|
`Call list-tool-results first to copy a real id (long opaque string like "call_function_abc123_1"). ` +
|
|
`Topic ids, fact ids, and small integers will not work here.`,
|
|
);
|
|
}
|
|
const list = trimQueue.get(sessionFile) ?? [];
|
|
list.push({ tool_call_id: params.tool_call_id, replacement: params.replacement });
|
|
trimQueue.set(sessionFile, list);
|
|
return {
|
|
ok: true,
|
|
status: 'queued',
|
|
tool_call_id: params.tool_call_id,
|
|
applies_at: 'agent_end',
|
|
note: 'Trim will be applied to the session file after this turn ends. The smaller ctx takes effect on the next turn.',
|
|
};
|
|
}
|
|
|
|
function sessionHasToolResult(sessionFile: string, toolCallId: string): boolean {
|
|
// Cheap streaming check — scan lines for the toolCallId substring before
|
|
// committing to JSON.parse for confirmation.
|
|
const raw = fs.readFileSync(sessionFile, 'utf8');
|
|
const needle = `"toolCallId":"${toolCallId}"`;
|
|
if (!raw.includes(needle)) return false;
|
|
// Defend against the id appearing inside arbitrary text (e.g. a prior
|
|
// assistant message quoting the id) by confirming a structural match.
|
|
for (const line of raw.split('\n')) {
|
|
if (!line.includes(needle)) continue;
|
|
try {
|
|
const obj = JSON.parse(line) as MessageEvent;
|
|
if (
|
|
obj?.type === 'message' &&
|
|
obj?.message?.role === 'toolResult' &&
|
|
obj?.message?.toolCallId === toolCallId
|
|
) {
|
|
return true;
|
|
}
|
|
} catch {
|
|
// skip malformed line
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
export interface DrainResult {
|
|
session_file: string;
|
|
applied: number;
|
|
skipped: number;
|
|
trimmed_bytes: number;
|
|
errors: string[];
|
|
}
|
|
|
|
export async function drainTrimQueueForSession(
|
|
sessionFile: string,
|
|
): Promise<DrainResult | null> {
|
|
const queued = trimQueue.get(sessionFile);
|
|
if (!queued || queued.length === 0) return null;
|
|
trimQueue.delete(sessionFile);
|
|
|
|
if (!fs.existsSync(sessionFile)) {
|
|
return {
|
|
session_file: sessionFile,
|
|
applied: 0,
|
|
skipped: queued.length,
|
|
trimmed_bytes: 0,
|
|
errors: [`session file vanished: ${sessionFile}`],
|
|
};
|
|
}
|
|
|
|
const raw = await fs.promises.readFile(sessionFile, 'utf8');
|
|
const lines = raw.split('\n');
|
|
const targets = new Map<string, QueuedTrim>();
|
|
for (const q of queued) targets.set(q.tool_call_id, q);
|
|
|
|
let applied = 0;
|
|
let skipped = 0;
|
|
let trimmedBytes = 0;
|
|
const errors: string[] = [];
|
|
|
|
for (let i = 0; i < lines.length; i++) {
|
|
const line = lines[i];
|
|
if (!line) continue;
|
|
let obj: MessageEvent;
|
|
try {
|
|
obj = JSON.parse(line) as MessageEvent;
|
|
} catch {
|
|
continue;
|
|
}
|
|
if (obj?.type !== 'message') continue;
|
|
if (obj.message?.role !== 'toolResult') continue;
|
|
const callId = obj.message.toolCallId;
|
|
if (!callId) continue;
|
|
const q = targets.get(callId);
|
|
if (!q) continue;
|
|
const content = obj.message.content;
|
|
if (!Array.isArray(content) || content.length === 0) {
|
|
errors.push(`toolResult ${callId} has no content[] to trim`);
|
|
skipped++;
|
|
targets.delete(callId);
|
|
continue;
|
|
}
|
|
const newText = q.replacement ? `${TRIM_SENTINEL} ${q.replacement}` : TRIM_SENTINEL;
|
|
let touched = false;
|
|
for (const c of content) {
|
|
if (c?.type === 'text' && typeof c.text === 'string') {
|
|
trimmedBytes += Buffer.byteLength(c.text, 'utf8') - Buffer.byteLength(newText, 'utf8');
|
|
c.text = newText;
|
|
touched = true;
|
|
}
|
|
}
|
|
if (!touched) {
|
|
errors.push(`toolResult ${callId} has no text blocks`);
|
|
skipped++;
|
|
targets.delete(callId);
|
|
continue;
|
|
}
|
|
lines[i] = JSON.stringify(obj);
|
|
applied++;
|
|
targets.delete(callId);
|
|
}
|
|
for (const callId of targets.keys()) {
|
|
errors.push(`toolResult ${callId} not found in session`);
|
|
skipped++;
|
|
}
|
|
|
|
const tmp = `${sessionFile}.trim.${process.pid}.${Date.now()}`;
|
|
await fs.promises.writeFile(tmp, lines.join('\n'), 'utf8');
|
|
await fs.promises.rename(tmp, sessionFile);
|
|
|
|
return { session_file: sessionFile, applied, skipped, trimmed_bytes: trimmedBytes, errors };
|
|
}
|
|
|
|
export function pendingTrimSessions(): string[] {
|
|
return Array.from(trimQueue.keys());
|
|
}
|
|
|
|
// Back-compat: synchronous adapter retained for direct callers / tests; not
|
|
// used by the registered tool (which routes through the queue).
|
|
export async function trimToolResult(
|
|
ctx: SessionRewriteCtx,
|
|
params: TrimToolResultParams,
|
|
): Promise<TrimToolResultResult> {
|
|
return queueTrimToolResult(ctx, params);
|
|
}
|
|
|
|
export interface ListToolResultsParams {
|
|
min_bytes?: number;
|
|
older_than_turns?: number;
|
|
include_trimmed?: boolean;
|
|
limit?: number;
|
|
}
|
|
|
|
export interface ListToolResultsEntry {
|
|
tool_call_id: string;
|
|
tool_name: string;
|
|
bytes: number;
|
|
turns_ago: number;
|
|
arguments_summary: string;
|
|
already_trimmed: boolean;
|
|
}
|
|
|
|
export interface ListToolResultsResult {
|
|
count: number;
|
|
results: ListToolResultsEntry[];
|
|
}
|
|
|
|
export async function listToolResults(
|
|
ctx: SessionRewriteCtx,
|
|
params: ListToolResultsParams,
|
|
): Promise<ListToolResultsResult> {
|
|
const minBytes = params.min_bytes ?? 0;
|
|
const olderThanTurns = params.older_than_turns ?? 0;
|
|
const includeTrimmed = params.include_trimmed === true;
|
|
const limit = params.limit ?? 50;
|
|
|
|
const sessionFile = resolveSessionFile(ctx);
|
|
const raw = await fs.promises.readFile(sessionFile, 'utf8');
|
|
const lines = raw.split('\n');
|
|
|
|
interface AssistantToolCall {
|
|
id: string;
|
|
args: unknown;
|
|
}
|
|
const argsByCallId = new Map<string, unknown>();
|
|
interface Candidate {
|
|
toolCallId: string;
|
|
toolName: string;
|
|
bytes: number;
|
|
assistantCountAtTime: number;
|
|
alreadyTrimmed: boolean;
|
|
}
|
|
const candidates: Candidate[] = [];
|
|
let assistantCount = 0;
|
|
|
|
for (const line of lines) {
|
|
if (!line) continue;
|
|
let obj: MessageEvent;
|
|
try {
|
|
obj = JSON.parse(line) as MessageEvent;
|
|
} catch {
|
|
continue;
|
|
}
|
|
if (obj?.type !== 'message') continue;
|
|
const role = obj.message?.role;
|
|
if (role === 'assistant') {
|
|
assistantCount++;
|
|
const content = obj.message?.content;
|
|
if (Array.isArray(content)) {
|
|
for (const c of content as Array<{ type?: string; id?: string; arguments?: unknown }>) {
|
|
if (c?.type === 'toolCall' && typeof c.id === 'string') {
|
|
argsByCallId.set(c.id, c.arguments);
|
|
}
|
|
}
|
|
}
|
|
} else if (role === 'toolResult') {
|
|
const callId = obj.message?.toolCallId;
|
|
if (!callId) continue;
|
|
const toolName = obj.message?.toolName ?? '';
|
|
const content = obj.message?.content;
|
|
let bytes = 0;
|
|
let alreadyTrimmed = false;
|
|
if (Array.isArray(content)) {
|
|
for (const c of content) {
|
|
if (c?.type === 'text' && typeof c.text === 'string') {
|
|
bytes += Buffer.byteLength(c.text, 'utf8');
|
|
if (c.text.startsWith(TRIM_SENTINEL)) alreadyTrimmed = true;
|
|
}
|
|
}
|
|
}
|
|
candidates.push({
|
|
toolCallId: callId,
|
|
toolName,
|
|
bytes,
|
|
assistantCountAtTime: assistantCount,
|
|
alreadyTrimmed,
|
|
});
|
|
}
|
|
}
|
|
|
|
const totalAssistant = assistantCount;
|
|
const results: ListToolResultsEntry[] = [];
|
|
for (const c of candidates) {
|
|
if (!includeTrimmed && c.alreadyTrimmed) continue;
|
|
if (c.bytes < minBytes) continue;
|
|
const turnsAgo = totalAssistant - c.assistantCountAtTime;
|
|
if (turnsAgo < olderThanTurns) continue;
|
|
const argsRaw = argsByCallId.get(c.toolCallId);
|
|
let argsSummary = '';
|
|
if (argsRaw !== undefined) {
|
|
try {
|
|
argsSummary = JSON.stringify(argsRaw);
|
|
} catch {
|
|
argsSummary = String(argsRaw);
|
|
}
|
|
if (argsSummary.length > 200) argsSummary = argsSummary.slice(0, 200) + '…';
|
|
}
|
|
results.push({
|
|
tool_call_id: c.toolCallId,
|
|
tool_name: c.toolName,
|
|
bytes: c.bytes,
|
|
turns_ago: turnsAgo,
|
|
arguments_summary: argsSummary,
|
|
already_trimmed: c.alreadyTrimmed,
|
|
});
|
|
}
|
|
|
|
results.sort((a, b) => b.bytes - a.bytes);
|
|
const trimmed = results.slice(0, limit);
|
|
return { count: trimmed.length, results: trimmed };
|
|
}
|