From a27f6bd0671324e87cb995e06f202fc9ce3ac9af Mon Sep 17 00:00:00 2001 From: hzhang Date: Mon, 1 Jun 2026 20:14:06 +0100 Subject: [PATCH] feat(codex): plexum-host MCP exposure + real consume mirror MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three-part wiring so codex agents get full dynamic-tool consume support: 1. mcp-host subcommand in the plugin binary (routes argv[1] to mcpbridge.Run). Reads PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID from env, baked into codex's `mcp add --env` registration. 2. EnsureCodexMCPRegistered registers a per-agent stable server name (plexum-host-) lazily on first turn. Stable per-agent socket path via StableSocketPath so codex's registration entry and the per-turn bridge listener agree. 3. Post-turn ParseRolloutToolCalls + EmitCodexToolCalls walks the matching rollout-*-.jsonl and emits canonical events for every function_call + function_call_output pair using codex's REAL call_X ids. dynamic.jsonl now has block_ids that match codex's session, so consume mirror has real targets. 4. MutateCodexSession rewrites those response_item entries on consume: function_call.arguments → "{}" (heavy), output → marker. E2E verified: codex resume sees "...(tool called)" instead of the original output. --- cmd/plexum-openai-provider-plugin/main.go | 80 ++++++++--- internal/runner/mcp_register.go | 76 ++++++++++ internal/runner/runner.go | 45 +++++- internal/runner/session_mutate.go | 160 ++++++++++++++++++++-- internal/runner/session_parse.go | 157 +++++++++++++++++++++ 5 files changed, 488 insertions(+), 30 deletions(-) create mode 100644 internal/runner/mcp_register.go create mode 100644 internal/runner/session_parse.go diff --git a/cmd/plexum-openai-provider-plugin/main.go b/cmd/plexum-openai-provider-plugin/main.go index 9efdcd7..55aad96 100644 --- a/cmd/plexum-openai-provider-plugin/main.go +++ b/cmd/plexum-openai-provider-plugin/main.go @@ -21,6 +21,7 @@ import ( "path/filepath" "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + "git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge" plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" "git.hangman-lab.top/hzhang/Plexum-openai-provider/internal/runner" @@ -140,29 +141,33 @@ func (p *openaiPlugin) StreamWithAgent(ctx context.Context, modelID string, agen } // MutateSession is the session-mirror hook (decision #29x). codex -// stores transcripts at ~/.codex/sessions///
/rollout-* -// -.jsonl as response_item lines whose payloads carry -// function_call / function_call_output entries keyed by `call_id` -// (not toolu_*) — so a Plexum tool_use_id like "toolu_X" won't ever -// match in there. +// rollout JSONL at ~/.codex/sessions/.../rollout-*-.jsonl +// is rewritten line-by-line: function_call_output entries matching +// the request's call_ids get their output replaced with the consume +// marker; heavy ops also stub function_call.arguments to "{}". // -// v1 ships as a logged no-op: we resolve the file path so operator -// log can confirm wiring, but we don't rewrite. When Plexum's tool -// dispatch eventually routes through codex's native tool surface -// (preserving call_id round-trip), the rewrite logic plugs in here. +// Tool-call round-trip is provided by the runner package's +// ParseRolloutToolCalls + EmitCodexToolCalls (which run at end of +// each turn) — they push the codex call_X ids into Plexum's +// dynamic.jsonl, so the BlockMutations the host fires here all +// resolve. func (p *openaiPlugin) MutateSession(ctx context.Context, req plugin.SessionMutateRequest) error { - path, err := runner.FindCodexSessionFile(req.Workspace) - if err != nil { - p.host.Log("debug", "codex session mutate: find failed", map[string]any{ - "agent": req.AgentID, "err": err.Error(), - }) + if len(req.Mutations) == 0 { return nil } - p.host.Log("info", "codex session mutate (no-op v1)", map[string]any{ - "agent": req.AgentID, - "session_path": path, - "requested": len(req.Mutations), - "reason": "codex uses call_* ids; tool-call round-trip not yet wired", + mu := make([]runner.CodexMutation, 0, len(req.Mutations)) + for _, m := range req.Mutations { + mu = append(mu, runner.CodexMutation{BlockID: m.BlockID, Op: m.Op}) + } + touched, err := runner.MutateCodexSession(req.Workspace, mu) + if err != nil { + p.host.Log("warn", "codex session mutate failed", map[string]any{ + "agent": req.AgentID, "err": err.Error(), + }) + return err + } + p.host.Log("info", "codex session mutate complete", map[string]any{ + "agent": req.AgentID, "touched": touched, "requested": len(req.Mutations), }) return nil } @@ -182,8 +187,45 @@ func (p *openaiPlugin) mapModel(plexumModel string) string { } func main() { + if len(os.Args) > 1 && os.Args[1] == "mcp-host" { + if err := runMCPHost(os.Args[2:]); err != nil { + fmt.Fprintf(os.Stderr, "mcp-host: %v\n", err) + os.Exit(1) + } + return + } if err := plugin.Serve(&openaiPlugin{}); err != nil { fmt.Fprintf(os.Stderr, "plexum-openai-provider-plugin: %v\n", err) os.Exit(1) } } + +// runMCPHost: codex registers our binary's mcp-host subcommand via +// `codex mcp add --env PLEXUM_MCP_SOCKET=...`. The mcp-host process +// reads those env vars + serves stdio MCP to codex while dialing the +// plugin-side unix socket. +func runMCPHost(argv []string) error { + opts := mcpbridge.Opts{ + SocketPath: os.Getenv("PLEXUM_MCP_SOCKET"), + AgentID: os.Getenv("PLEXUM_MCP_AGENT_ID"), + } + for i := 0; i < len(argv); i++ { + switch argv[i] { + case "--socket": + if i+1 >= len(argv) { + return errors.New("--socket needs value") + } + opts.SocketPath = argv[i+1] + i++ + case "--agent-id": + if i+1 >= len(argv) { + return errors.New("--agent-id needs value") + } + opts.AgentID = argv[i+1] + i++ + default: + return fmt.Errorf("unknown mcp-host flag: %s", argv[i]) + } + } + return mcpbridge.Run(context.Background(), opts) +} diff --git a/internal/runner/mcp_register.go b/internal/runner/mcp_register.go new file mode 100644 index 0000000..eb7bd96 --- /dev/null +++ b/internal/runner/mcp_register.go @@ -0,0 +1,76 @@ +// MCP server registration. codex's `mcp add` writes into the user's +// global config; we use a per-agent stable name so multiple Plexum +// agents using codex don't collide. Registration is idempotent and +// performed lazily before the first turn that needs it. + +package runner + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" +) + +var registered sync.Map // agentID → bool (registration done this process) + +// EnsureCodexMCPRegistered makes sure codex's user config has an MCP +// server entry for this agent named plexum-host-. Returns +// the entry's tool-name namespace prefix so the caller can build +// req.Tools wire shapes if needed. +// +// The socket path baked into the env is StableSocketPath(agentID) — +// the same value the per-turn bridge listens on. +func EnsureCodexMCPRegistered(ctx context.Context, codexBinary, pluginBinary, agentID string) (string, error) { + name := codexServerName(agentID) + if _, ok := registered.Load(name); ok { + return name, nil + } + // Remove first so we don't get "already exists"; ignore error. + _ = exec.CommandContext(ctx, codexBinary, "mcp", "remove", name).Run() + + args := []string{ + "mcp", "add", + "--env", "PLEXUM_MCP_SOCKET=" + StableSocketPath(agentID), + "--env", "PLEXUM_MCP_AGENT_ID=" + agentID, + name, "--", pluginBinary, "mcp-host", + } + cmd := exec.CommandContext(ctx, codexBinary, args...) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("codex mcp add: %w: %s", err, strings.TrimSpace(string(out))) + } + registered.Store(name, true) + return name, nil +} + +// StableSocketPath returns the per-agent unix socket path the bridge +// listens on AND codex's mcp-host subprocess dials into. Keeping it +// stable lets us register codex's MCP server once and reuse it. +func StableSocketPath(agentID string) string { + return filepath.Join(os.TempDir(), "plexum-codex-mcp-"+sanitize(agentID)+".sock") +} + +// codexServerName produces the per-agent MCP server name codex stores +// in its global config. Names need to be filesystem-safe; we just +// reuse the sanitised agent id. +func codexServerName(agentID string) string { + return "plexum-host-" + sanitize(agentID) +} + +func sanitize(s string) string { + out := make([]byte, 0, len(s)) + for i := 0; i < len(s); i++ { + c := s[i] + switch { + case c >= 'a' && c <= 'z', c >= 'A' && c <= 'Z', + c >= '0' && c <= '9', c == '-', c == '_': + out = append(out, c) + default: + out = append(out, '_') + } + } + return string(out) +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 9c376fd..5c838ca 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -32,6 +32,7 @@ import ( "path/filepath" "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + "git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge" plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" ) @@ -62,6 +63,28 @@ func Run( return nil, errors.New("codex: no user text in request messages") } + // Per-turn MCP bridge so codex can call Plexum host tools the + // host advertised in req.Tools. Best-effort: setup failure logs + // + continues; codex still runs with its built-in tools. + var br *mcpbridge.Bridge + if len(req.Tools) > 0 { + exe, err := os.Executable() + if err == nil { + if _, regErr := EnsureCodexMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil { + host.Log("warn", "codex: mcp registration failed", map[string]any{"err": regErr.Error()}) + } else { + // Open the listener on the stable per-agent socket + // path; codex's mcp-host subprocess will dial it. + b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID)) + if err != nil { + host.Log("warn", "codex: bridge setup failed", map[string]any{"err": err.Error()}) + } else { + br = b + } + } + } + } + resumeID := loadSessionID(workspace) // Build the argv. `codex exec [resume ]` then flags then prompt. @@ -105,7 +128,10 @@ func Run( } out := make(chan canonical.TurnEvent, 16) - go pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out) + go func() { + pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out) + br.Close() // nil-safe + }() return out, nil } @@ -249,6 +275,23 @@ func pumpEvents( if hadError || werr != nil { stopReason = canonical.StopReason("error") } + // Read the session rollout once codex has finished and replay + // every recorded function_call / function_call_output pair as + // tool_call_start/end + EventToolResult — with codex's REAL + // call_X ids. The agentic loop attaches these to the iteration + // so dynamic-* consume can target the same ids SessionMutator + // later mirrors into the session JSONL. + if sessionPath, perr := FindCodexSessionFile(workspace); perr == nil { + calls, err := ParseRolloutToolCalls(sessionPath) + if err != nil { + host.Log("warn", "codex: parse rollout failed", + map[string]any{"err": err.Error(), "path": sessionPath}) + } else { + EmitCodexToolCalls(calls, emit) + host.Log("debug", "codex: emitted tool calls from rollout", + map[string]any{"count": len(calls), "path": sessionPath}) + } + } emit(canonical.TurnEvent{ Type: canonical.EventMessageEnd, StopReason: stopReason, Usage: &usage, }) diff --git a/internal/runner/session_mutate.go b/internal/runner/session_mutate.go index c661c63..f3e7c6e 100644 --- a/internal/runner/session_mutate.go +++ b/internal/runner/session_mutate.go @@ -1,20 +1,31 @@ -// session_mutate.go — codex session file path resolution. The actual -// JSONL rewrite is deferred (see main.go MutateSession comment): codex -// keys tool calls by `call_`, not the `toolu_` Plexum uses, -// so a v1 mirror would never match anything. We expose the path -// finder here so the host-side wiring telemetry stays useful and the -// v2 rewriter has its entry point. +// session_mutate — real mirror implementation for codex. After +// Plexum's dynamic-* tools flush a consume batch on dynamic.jsonl, +// host calls SessionMutator with the applied []BlockMutation and we +// rewrite the matching response_item entries in codex's rollout +// JSONL so the next `codex exec resume ` sees the same view. // // Codex layout (observed): // // ~/.codex/sessions///
/rollout--.jsonl // -// Older layout (pre-0.5x) wrote into the top-level sessions dir with -// the same filename convention. We probe both. +// Each tool call lands as two response_item lines: +// +// {"timestamp":"...","type":"response_item","payload":{ +// "type":"function_call","name":"...","arguments":"", +// "call_id":"call_X","namespace":"mcp__"}} +// {"timestamp":"...","type":"response_item","payload":{ +// "type":"function_call_output","call_id":"call_X","output":"..."}} +// +// Mutation ops (mirror host persistence/mutation.go): +// +// consume-light → output → "...(consumed)" +// consume-heavy → output → "...(tool called)" AND arguments → "{}" package runner import ( + "bufio" + "encoding/json" "errors" "fmt" "os" @@ -22,9 +33,138 @@ import ( "strings" ) +const ( + consumedMarker = "...(consumed)" + toolCalledMarker = "...(tool called)" +) + +// MutateCodexSession edits codex's rollout file in lockstep with +// Plexum's dynamic.jsonl mutations. Returns the count of distinct +// call_ids actually touched; missing call_ids are silently skipped +// (the session may not include them — e.g. an old call from a +// previous resume). +func MutateCodexSession(workspace string, mutations []codexMutation) (int, error) { + if len(mutations) == 0 { + return 0, nil + } + sessionPath, err := FindCodexSessionFile(workspace) + if err != nil { + return 0, nil // no session yet — nothing to mirror + } + + byID := make(map[string]string, len(mutations)) + for _, m := range mutations { + byID[m.BlockID] = m.Op + } + + f, err := os.Open(sessionPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return 0, nil + } + return 0, fmt.Errorf("open codex session: %w", err) + } + defer f.Close() + + var out strings.Builder + touched := map[string]struct{}{} + mutated := false + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 64*1024), 16*1024*1024) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 { + out.WriteByte('\n') + continue + } + rewritten, hit := rewriteCodexLine(line, byID) + out.Write(rewritten) + out.WriteByte('\n') + if hit != "" { + touched[hit] = struct{}{} + mutated = true + } + } + if err := sc.Err(); err != nil { + return 0, fmt.Errorf("scan codex session: %w", err) + } + if !mutated { + return 0, nil + } + if err := writeAtomic(sessionPath, []byte(out.String())); err != nil { + return len(touched), fmt.Errorf("rewrite codex session: %w", err) + } + return len(touched), nil +} + +// CodexMutation is the local view of plugin.BlockMutation — kept +// here so the cmd/ caller doesn't bleed SDK types into runner's +// public surface. +type CodexMutation struct { + BlockID string + Op string +} + +// codexMutation kept as an internal alias to avoid renaming inside +// the package; new callers use CodexMutation. +type codexMutation = CodexMutation + +// rewriteCodexLine decodes one response_item line, applies the +// mutation op when the payload's call_id is targeted, and re-encodes. +// Lines that aren't response_item / function_call / function_call_output +// pass through verbatim. +func rewriteCodexLine(line []byte, byID map[string]string) ([]byte, string) { + var obj map[string]any + if err := json.Unmarshal(line, &obj); err != nil { + return line, "" + } + if obj["type"] != "response_item" { + return line, "" + } + payload, ok := obj["payload"].(map[string]any) + if !ok { + return line, "" + } + callID, _ := payload["call_id"].(string) + op, ok := byID[callID] + if !ok || callID == "" { + return line, "" + } + switch payload["type"] { + case "function_call": + if op == "consume-heavy" { + payload["arguments"] = "{}" + } + case "function_call_output": + marker := consumedMarker + if op == "consume-heavy" { + marker = toolCalledMarker + } + payload["output"] = marker + default: + return line, "" + } + obj["payload"] = payload + rewritten, err := json.Marshal(obj) + if err != nil { + return line, "" + } + return rewritten, callID +} + +func writeAtomic(path string, data []byte) error { + tmp := path + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + return err + } + return os.Rename(tmp, path) +} + // FindCodexSessionFile resolves the rollout JSONL for the thread_id -// captured into workspace/.plexum-codex-session. Returns ("", error) -// when no session id is recorded yet OR no matching file is on disk. +// captured into workspace/.plexum-codex-session. Walks the +// date-bucketed ~/.codex/sessions tree for any file containing the +// thread_id substring. Returns ("", error) when no session has been +// captured yet OR no matching file is on disk. func FindCodexSessionFile(workspace string) (string, error) { threadID := loadSessionID(workspace) if threadID == "" { diff --git a/internal/runner/session_parse.go b/internal/runner/session_parse.go new file mode 100644 index 0000000..5c3c3ff --- /dev/null +++ b/internal/runner/session_parse.go @@ -0,0 +1,157 @@ +// session_parse — after codex exits, scan its rollout JSONL session +// file for tool calls and tool results. Codex records both native +// (function_call/function_call_output) and MCP (function_call with +// `namespace = "mcp__"`) calls with the same shape and a +// canonical `call_id`. We use those ids when emitting EventToolCall* +// + EventToolResult so the agentic loop's iteration record matches +// what's actually on disk in codex's rollout — which is what +// SessionMutator rewrites on consume. + +package runner + +import ( + "bufio" + "encoding/json" + "errors" + "os" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" +) + +// CodexToolCall is one (call_id, name, arguments, output) tuple +// extracted from a rollout JSONL. +type CodexToolCall struct { + CallID string + Name string + Namespace string // "mcp__" for MCP calls; "" for native + Arguments string // JSON string per codex's format + Output string // function_call_output.output; "" if no result yet +} + +// ParseRolloutToolCalls walks the rollout file once, returning +// every (call_id, name, args, output) tuple in input order. The +// caller emits TurnEvents from this slice. Pairs with no +// function_call_output yet (mid-turn truncation) still appear with +// Output == "". +func ParseRolloutToolCalls(path string) ([]CodexToolCall, error) { + f, err := os.Open(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, err + } + defer f.Close() + + // Index calls by call_id so output can attach. + calls := map[string]*CodexToolCall{} + var order []string + + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 64*1024), 16*1024*1024) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 { + continue + } + var rec struct { + Type string `json:"type"` + Payload struct { + Type string `json:"type"` + Name string `json:"name"` + Namespace string `json:"namespace"` + Arguments string `json:"arguments"` + CallID string `json:"call_id"` + Output any `json:"output"` + } `json:"payload"` + } + if err := json.Unmarshal(line, &rec); err != nil { + continue + } + if rec.Type != "response_item" || rec.Payload.CallID == "" { + continue + } + switch rec.Payload.Type { + case "function_call": + c, ok := calls[rec.Payload.CallID] + if !ok { + c = &CodexToolCall{CallID: rec.Payload.CallID} + calls[rec.Payload.CallID] = c + order = append(order, rec.Payload.CallID) + } + c.Name = rec.Payload.Name + c.Namespace = rec.Payload.Namespace + c.Arguments = rec.Payload.Arguments + case "function_call_output": + c, ok := calls[rec.Payload.CallID] + if !ok { + // Output without prior call_id seen — synthesize. + c = &CodexToolCall{CallID: rec.Payload.CallID} + calls[rec.Payload.CallID] = c + order = append(order, rec.Payload.CallID) + } + c.Output = outputAsString(rec.Payload.Output) + } + } + if err := sc.Err(); err != nil { + return nil, err + } + out := make([]CodexToolCall, 0, len(order)) + for _, id := range order { + out = append(out, *calls[id]) + } + return out, nil +} + +// outputAsString flattens whatever shape codex writes for function_call_output. +// Often a plain string; sometimes an object with content/text. +func outputAsString(raw any) string { + switch v := raw.(type) { + case string: + return v + case nil: + return "" + default: + b, err := json.Marshal(v) + if err != nil { + return "" + } + return string(b) + } +} + +// EmitCodexToolCalls translates the parsed slice into TurnEvents in +// input order. tool_use blocks land in the assistant message; +// EventToolResult lands on the iteration's ToolResults under the +// matching call_id. +func EmitCodexToolCalls(calls []CodexToolCall, emit func(canonical.TurnEvent)) { + for _, c := range calls { + // Tool name: keep codex's `function_call.name`. For MCP calls + // codex stores name without the mcp__ prefix and tracks the + // server in `namespace`; if you want the original tool name + // the agent saw (e.g. "mcp__plexum-host-alice__plexum_echo"), + // build it from namespace + name. We store the plain `.name` + // so downstream consume sync targets the call_id which is the + // source of truth. + emit(canonical.TurnEvent{ + Type: canonical.EventToolCallStart, + ToolCallID: c.CallID, + ToolName: c.Name, + PartialJSON: c.Arguments, + }) + emit(canonical.TurnEvent{ + Type: canonical.EventToolCallEnd, + ToolCallID: c.CallID, + }) + emit(canonical.TurnEvent{ + Type: canonical.EventToolResult, + ToolResult: &canonical.ToolResultBlock{ + Type: canonical.BlockTypeToolResult, + ToolUseID: c.CallID, + Content: []canonical.Block{ + canonical.NewTextBlock(c.Output), + }, + }, + }) + } +}