Three-part wiring so codex agents get full dynamic-tool consume
support:
1. mcp-host subcommand in the plugin binary (routes argv[1] to
mcpbridge.Run). Reads PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID
from env, baked into codex's `mcp add --env` registration.
2. EnsureCodexMCPRegistered registers a per-agent stable server
name (plexum-host-<sanitised>) lazily on first turn. Stable
per-agent socket path via StableSocketPath so codex's
registration entry and the per-turn bridge listener agree.
3. Post-turn ParseRolloutToolCalls + EmitCodexToolCalls walks the
matching rollout-*-<thread_id>.jsonl and emits canonical events
for every function_call + function_call_output pair using
codex's REAL call_X ids. dynamic.jsonl now has block_ids that
match codex's session, so consume mirror has real targets.
4. MutateCodexSession rewrites those response_item entries on
consume: function_call.arguments → "{}" (heavy), output → marker.
E2E verified: codex resume sees "...(tool called)" instead of
the original output.
204 lines
5.4 KiB
Go
204 lines
5.4 KiB
Go
// session_mutate — real mirror implementation for codex. After
|
|
// Plexum's dynamic-* tools flush a consume batch on dynamic.jsonl,
|
|
// host calls SessionMutator with the applied []BlockMutation and we
|
|
// rewrite the matching response_item entries in codex's rollout
|
|
// JSONL so the next `codex exec resume <thread>` sees the same view.
|
|
//
|
|
// Codex layout (observed):
|
|
//
|
|
// ~/.codex/sessions/<YYYY>/<MM>/<DD>/rollout-<ISO>-<thread_id>.jsonl
|
|
//
|
|
// Each tool call lands as two response_item lines:
|
|
//
|
|
// {"timestamp":"...","type":"response_item","payload":{
|
|
// "type":"function_call","name":"...","arguments":"<json>",
|
|
// "call_id":"call_X","namespace":"mcp__<server>"}}
|
|
// {"timestamp":"...","type":"response_item","payload":{
|
|
// "type":"function_call_output","call_id":"call_X","output":"..."}}
|
|
//
|
|
// Mutation ops (mirror host persistence/mutation.go):
|
|
//
|
|
// consume-light → output → "...(consumed)"
|
|
// consume-heavy → output → "...(tool called)" AND arguments → "{}"
|
|
|
|
package runner
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
)
|
|
|
|
const (
|
|
consumedMarker = "...(consumed)"
|
|
toolCalledMarker = "...(tool called)"
|
|
)
|
|
|
|
// MutateCodexSession edits codex's rollout file in lockstep with
|
|
// Plexum's dynamic.jsonl mutations. Returns the count of distinct
|
|
// call_ids actually touched; missing call_ids are silently skipped
|
|
// (the session may not include them — e.g. an old call from a
|
|
// previous resume).
|
|
func MutateCodexSession(workspace string, mutations []codexMutation) (int, error) {
|
|
if len(mutations) == 0 {
|
|
return 0, nil
|
|
}
|
|
sessionPath, err := FindCodexSessionFile(workspace)
|
|
if err != nil {
|
|
return 0, nil // no session yet — nothing to mirror
|
|
}
|
|
|
|
byID := make(map[string]string, len(mutations))
|
|
for _, m := range mutations {
|
|
byID[m.BlockID] = m.Op
|
|
}
|
|
|
|
f, err := os.Open(sessionPath)
|
|
if err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return 0, nil
|
|
}
|
|
return 0, fmt.Errorf("open codex session: %w", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
var out strings.Builder
|
|
touched := map[string]struct{}{}
|
|
mutated := false
|
|
sc := bufio.NewScanner(f)
|
|
sc.Buffer(make([]byte, 64*1024), 16*1024*1024)
|
|
for sc.Scan() {
|
|
line := sc.Bytes()
|
|
if len(line) == 0 {
|
|
out.WriteByte('\n')
|
|
continue
|
|
}
|
|
rewritten, hit := rewriteCodexLine(line, byID)
|
|
out.Write(rewritten)
|
|
out.WriteByte('\n')
|
|
if hit != "" {
|
|
touched[hit] = struct{}{}
|
|
mutated = true
|
|
}
|
|
}
|
|
if err := sc.Err(); err != nil {
|
|
return 0, fmt.Errorf("scan codex session: %w", err)
|
|
}
|
|
if !mutated {
|
|
return 0, nil
|
|
}
|
|
if err := writeAtomic(sessionPath, []byte(out.String())); err != nil {
|
|
return len(touched), fmt.Errorf("rewrite codex session: %w", err)
|
|
}
|
|
return len(touched), nil
|
|
}
|
|
|
|
// CodexMutation is the local view of plugin.BlockMutation — kept
|
|
// here so the cmd/ caller doesn't bleed SDK types into runner's
|
|
// public surface.
|
|
type CodexMutation struct {
|
|
BlockID string
|
|
Op string
|
|
}
|
|
|
|
// codexMutation kept as an internal alias to avoid renaming inside
|
|
// the package; new callers use CodexMutation.
|
|
type codexMutation = CodexMutation
|
|
|
|
// rewriteCodexLine decodes one response_item line, applies the
|
|
// mutation op when the payload's call_id is targeted, and re-encodes.
|
|
// Lines that aren't response_item / function_call / function_call_output
|
|
// pass through verbatim.
|
|
func rewriteCodexLine(line []byte, byID map[string]string) ([]byte, string) {
|
|
var obj map[string]any
|
|
if err := json.Unmarshal(line, &obj); err != nil {
|
|
return line, ""
|
|
}
|
|
if obj["type"] != "response_item" {
|
|
return line, ""
|
|
}
|
|
payload, ok := obj["payload"].(map[string]any)
|
|
if !ok {
|
|
return line, ""
|
|
}
|
|
callID, _ := payload["call_id"].(string)
|
|
op, ok := byID[callID]
|
|
if !ok || callID == "" {
|
|
return line, ""
|
|
}
|
|
switch payload["type"] {
|
|
case "function_call":
|
|
if op == "consume-heavy" {
|
|
payload["arguments"] = "{}"
|
|
}
|
|
case "function_call_output":
|
|
marker := consumedMarker
|
|
if op == "consume-heavy" {
|
|
marker = toolCalledMarker
|
|
}
|
|
payload["output"] = marker
|
|
default:
|
|
return line, ""
|
|
}
|
|
obj["payload"] = payload
|
|
rewritten, err := json.Marshal(obj)
|
|
if err != nil {
|
|
return line, ""
|
|
}
|
|
return rewritten, callID
|
|
}
|
|
|
|
func writeAtomic(path string, data []byte) error {
|
|
tmp := path + ".tmp"
|
|
if err := os.WriteFile(tmp, data, 0o600); err != nil {
|
|
return err
|
|
}
|
|
return os.Rename(tmp, path)
|
|
}
|
|
|
|
// FindCodexSessionFile resolves the rollout JSONL for the thread_id
|
|
// captured into workspace/.plexum-codex-session. Walks the
|
|
// date-bucketed ~/.codex/sessions tree for any file containing the
|
|
// thread_id substring. Returns ("", error) when no session has been
|
|
// captured yet OR no matching file is on disk.
|
|
func FindCodexSessionFile(workspace string) (string, error) {
|
|
threadID := loadSessionID(workspace)
|
|
if threadID == "" {
|
|
return "", errors.New("no thread_id captured yet")
|
|
}
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
root := filepath.Join(home, ".codex", "sessions")
|
|
var found string
|
|
err = filepath.WalkDir(root, func(p string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return nil // tolerate per-entry errors; keep walking
|
|
}
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
name := d.Name()
|
|
if !strings.HasSuffix(name, ".jsonl") && !strings.HasSuffix(name, ".json") {
|
|
return nil
|
|
}
|
|
if strings.Contains(name, threadID) {
|
|
found = p
|
|
return filepath.SkipAll
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if found == "" {
|
|
return "", fmt.Errorf("no rollout file for thread_id=%s under %s", threadID, root)
|
|
}
|
|
return found, nil
|
|
}
|