Files
PaddedCell/plugin/tools/session-rewrite.ts
hzhang 0b7f18253d feat(dynamic-trim): rename trim-tool-result, add self-compact, drop list-tool-results
Align the openclaw side of the dynamic-* tool family with the new
Plexum design (decision #31, 2026-06-04 revision):

- trim-tool-result → dynamic-trim (same on-wire schema; same semantics)
- Drop list-tool-results entirely. Agents find the opaque tool_call_id
  by reading their own prior assistant message's toolCall block id
  instead of querying a separate "directory" tool. This removes a
  workflow-step prerequisite and matches how Anthropic-shaped APIs
  surface tool_use ids to the model anyway.
- On agent_end drain, ALSO self-compact the dynamic-trim's own
  tool_use.input: rewrite to {tool_call_id, _self_compacted: true}.
  Without this the bulky `replacement` text sits duplicated — once in
  the rewritten target tool_result, once in dynamic-trim's call input.
  Picks up the selfCallId from openclaw's execute(toolCallId, ...) first
  arg (was previously discarded as _id).

Cross-runtime contract: tool name, input schema, return shape, and
sentinel prefix ("[trimmed by self] ") match Plexum's dynamic-trim
in internal/dynmem/trim.go + internal/persistence/trim.go.

Sim e2e tested: dynamic-trim queues, agent_end drain rewrites both
the target tool_result content AND the trim call's tool_use input.
No takeover errors. trimmed_bytes positive on real workloads.
2026-06-04 07:47:30 +01:00

318 lines
9.9 KiB
TypeScript

// `dynamic-trim` — agent-driven tool_result rewrite tool.
//
// The agent picks a past toolResult block by toolCallId and supplies a
// short `replacement` string; on the next turn boundary (agent_end hook)
// the session jsonl is rewritten so that:
//
// 1. The target toolResult.content[].text becomes
// "[trimmed by self] <replacement>"
// 2. The dynamic-trim call's OWN tool_use input is rewritten to
// {"tool_call_id":"<id>","_self_compacted":true}
// so the bulky replacement text doesn't sit duplicated in two places.
//
// Operates directly on <openclaw>/agents/<agentId>/sessions/<sid>.jsonl.
// No faithful audit — irreversible by design (re-run the original tool
// if you trim wrong). See WORKFLOW-TOOLS-COMPLETION-REQUEST.md in Plexum
// docs/ for the cross-runtime contract.
import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
export interface SessionRewriteCtx {
agentDir?: string;
agentId?: string;
sessionId?: string;
}
const TRIM_SENTINEL = '[trimmed by self]';
function openclawRoot(): string {
const env = process.env.OPENCLAW_PATH;
if (env) return env;
const home = process.env.HOME || os.homedir();
return path.join(home, '.openclaw');
}
function resolveSessionFile(ctx: SessionRewriteCtx): string {
if (!ctx.sessionId) {
throw new Error('session context missing sessionId (no active session?)');
}
const candidates: string[] = [];
if (ctx.agentId) {
candidates.push(
path.join(openclawRoot(), 'agents', ctx.agentId, 'sessions', `${ctx.sessionId}.jsonl`),
);
}
if (ctx.agentDir) {
candidates.push(path.join(ctx.agentDir, 'sessions', `${ctx.sessionId}.jsonl`));
const stripped = ctx.agentDir.replace(/\/agent$/, '');
if (stripped !== ctx.agentDir) {
candidates.push(path.join(stripped, 'sessions', `${ctx.sessionId}.jsonl`));
}
}
for (const f of candidates) {
if (fs.existsSync(f)) return f;
}
throw new Error(
`session file not found for sessionId=${ctx.sessionId}; tried: ${candidates.join(', ')}`,
);
}
interface MessageEvent {
type?: string;
message?: {
role?: string;
toolCallId?: string;
toolName?: string;
content?: Array<{
type?: string;
text?: string;
id?: string; // toolCall block id
name?: string; // toolCall block name
arguments?: unknown; // toolCall block arguments — what we self-compact
}>;
};
}
export interface DynamicTrimParams {
tool_call_id: string;
replacement?: string;
}
export interface DynamicTrimResult {
ok: true;
status: 'queued';
tool_call_id: string;
self_compacted: true;
applies_at: 'agent_end';
note: string;
}
interface QueuedTrim {
targetCallId: string; // the past tool_result we want to rewrite
replacement?: string; // agent-supplied summary
selfCallId: string; // the dynamic-trim's OWN tool_use id; gets self-compacted
}
// Module-scope queue keyed by absolute session file path. Drained from
// the `agent_end` hook (see index.ts). Writing during tool execute trips
// openclaw's session-file fence (EmbeddedAttemptSessionTakeoverError)
// because the fingerprint check around releaseForPrompt rejects third-
// party writes; agent_end fires AFTER that fence window closes.
const trimQueue = new Map<string, QueuedTrim[]>();
/**
* Enqueue a dynamic-trim mutation. Returns synchronously to the agent
* with a "queued" status; the actual file rewrite happens on agent_end.
*/
export function queueDynamicTrim(
ctx: SessionRewriteCtx,
params: DynamicTrimParams,
selfCallId: string,
): DynamicTrimResult {
if (!params.tool_call_id) {
throw new Error('missing required parameter: tool_call_id');
}
if (!selfCallId) {
throw new Error(
'dynamic-trim: cannot identify self tool_use id (openclaw runtime should pass it to execute as first arg)',
);
}
const sessionFile = resolveSessionFile(ctx);
// Validate the target id BEFORE queueing — otherwise agent learns of the
// mistake only in the drain log. Weak models occasionally pass topic ids
// / fact ids / small integers thinking they're tool_use ids.
if (!sessionHasToolResult(sessionFile, params.tool_call_id)) {
throw new Error(
`tool_call_id "${params.tool_call_id}" does not match any toolResult in this session. ` +
`Find the opaque "call_function_*_N" id in your own prior assistant messages — ` +
`topic ids, fact ids, and small integers will not work here.`,
);
}
const list = trimQueue.get(sessionFile) ?? [];
list.push({
targetCallId: params.tool_call_id,
replacement: params.replacement,
selfCallId,
});
trimQueue.set(sessionFile, list);
return {
ok: true,
status: 'queued',
tool_call_id: params.tool_call_id,
self_compacted: true,
applies_at: 'agent_end',
note:
'Trim queued. On this turn end the target tool_result text is shrunk AND this dynamic-trim ' +
'call\'s own tool_use input is replaced with {tool_call_id, _self_compacted:true}. ' +
'Effect visible in next turn\'s context.',
};
}
function sessionHasToolResult(sessionFile: string, toolCallId: string): boolean {
const raw = fs.readFileSync(sessionFile, 'utf8');
const needle = `"toolCallId":"${toolCallId}"`;
if (!raw.includes(needle)) return false;
for (const line of raw.split('\n')) {
if (!line.includes(needle)) continue;
try {
const obj = JSON.parse(line) as MessageEvent;
if (
obj?.type === 'message' &&
obj?.message?.role === 'toolResult' &&
obj?.message?.toolCallId === toolCallId
) {
return true;
}
} catch {
/* skip malformed */
}
}
return false;
}
export interface DrainResult {
session_file: string;
applied: number;
skipped: number;
trimmed_bytes: number;
self_compacted: number; // how many of the dynamic-trim tool_use inputs we shrank
errors: string[];
}
export async function drainTrimQueueForSession(
sessionFile: string,
): Promise<DrainResult | null> {
const queued = trimQueue.get(sessionFile);
if (!queued || queued.length === 0) return null;
trimQueue.delete(sessionFile);
if (!fs.existsSync(sessionFile)) {
return {
session_file: sessionFile,
applied: 0,
skipped: queued.length,
trimmed_bytes: 0,
self_compacted: 0,
errors: [`session file vanished: ${sessionFile}`],
};
}
const raw = await fs.promises.readFile(sessionFile, 'utf8');
const lines = raw.split('\n');
// Build lookup maps.
// targetsByCallId: tool_result id → queued mutation
// selfByCallId: dynamic-trim tool_use id → queued mutation (for self-compact)
const targetsByCallId = new Map<string, QueuedTrim>();
const selfByCallId = new Map<string, QueuedTrim>();
for (const q of queued) {
targetsByCallId.set(q.targetCallId, q);
selfByCallId.set(q.selfCallId, q);
}
let applied = 0;
let skipped = 0;
let trimmedBytes = 0;
let selfCompacted = 0;
const errors: string[] = [];
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
if (!line) continue;
let obj: MessageEvent;
try {
obj = JSON.parse(line) as MessageEvent;
} catch {
continue;
}
if (obj?.type !== 'message') continue;
const role = obj.message?.role;
// (A) toolResult rewrite — target
if (role === 'toolResult') {
const callId = obj.message?.toolCallId;
if (!callId) continue;
const q = targetsByCallId.get(callId);
if (!q) continue;
const content = obj.message?.content;
if (!Array.isArray(content) || content.length === 0) {
errors.push(`toolResult ${callId} has no content[] to trim`);
skipped++;
targetsByCallId.delete(callId);
continue;
}
const newText = q.replacement
? `${TRIM_SENTINEL} ${q.replacement}`
: TRIM_SENTINEL;
let touched = false;
for (const c of content) {
if (c?.type === 'text' && typeof c.text === 'string') {
trimmedBytes += Buffer.byteLength(c.text, 'utf8') - Buffer.byteLength(newText, 'utf8');
c.text = newText;
touched = true;
}
}
if (!touched) {
errors.push(`toolResult ${callId} has no text blocks`);
skipped++;
targetsByCallId.delete(callId);
continue;
}
lines[i] = JSON.stringify(obj);
applied++;
targetsByCallId.delete(callId);
continue;
}
// (B) assistant message containing the dynamic-trim's own toolCall — self-compact
if (role === 'assistant') {
const content = obj.message?.content;
if (!Array.isArray(content)) continue;
let lineChanged = false;
for (const c of content) {
if (c?.type !== 'toolCall') continue;
if (typeof c.id !== 'string') continue;
const q = selfByCallId.get(c.id);
if (!q) continue;
// Replace the bulky arguments with the minimal self-compacted form.
// Preserves toolCallId for traceability; _self_compacted flag for
// tooling (future plexum dynamic-recall etc.) to recognise.
c.arguments = { tool_call_id: q.targetCallId, _self_compacted: true };
selfByCallId.delete(c.id);
lineChanged = true;
selfCompacted++;
}
if (lineChanged) {
lines[i] = JSON.stringify(obj);
}
}
}
for (const callId of targetsByCallId.keys()) {
errors.push(`toolResult ${callId} not found in session`);
skipped++;
}
for (const selfId of selfByCallId.keys()) {
errors.push(`self tool_use ${selfId} not found in session (cannot self-compact)`);
}
const tmp = `${sessionFile}.trim.${process.pid}.${Date.now()}`;
await fs.promises.writeFile(tmp, lines.join('\n'), 'utf8');
await fs.promises.rename(tmp, sessionFile);
return {
session_file: sessionFile,
applied,
skipped,
trimmed_bytes: trimmedBytes,
self_compacted: selfCompacted,
errors,
};
}
export function pendingTrimSessions(): string[] {
return Array.from(trimQueue.keys());
}