feat(codex): plexum-host MCP exposure + real consume mirror

Three-part wiring so codex agents get full dynamic-tool consume
support:

1. mcp-host subcommand in the plugin binary (routes argv[1] to
   mcpbridge.Run). Reads PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID
   from env, baked into codex's `mcp add --env` registration.

2. EnsureCodexMCPRegistered registers a per-agent stable server
   name (plexum-host-<sanitised>) lazily on first turn. Stable
   per-agent socket path via StableSocketPath so codex's
   registration entry and the per-turn bridge listener agree.

3. Post-turn ParseRolloutToolCalls + EmitCodexToolCalls walks the
   matching rollout-*-<thread_id>.jsonl and emits canonical events
   for every function_call + function_call_output pair using
   codex's REAL call_X ids. dynamic.jsonl now has block_ids that
   match codex's session, so consume mirror has real targets.

4. MutateCodexSession rewrites those response_item entries on
   consume: function_call.arguments → "{}" (heavy), output → marker.
   E2E verified: codex resume sees "...(tool called)" instead of
   the original output.
This commit is contained in:
h z
2026-06-01 20:14:06 +01:00
parent d075fc408a
commit a27f6bd067
5 changed files with 488 additions and 30 deletions

View File

@@ -0,0 +1,76 @@
// MCP server registration. codex's `mcp add` writes into the user's
// global config; we use a per-agent stable name so multiple Plexum
// agents using codex don't collide. Registration is idempotent and
// performed lazily before the first turn that needs it.
package runner
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
)
var registered sync.Map // agentID → bool (registration done this process)
// EnsureCodexMCPRegistered makes sure codex's user config has an MCP
// server entry for this agent named plexum-host-<sanitized>. Returns
// the entry's tool-name namespace prefix so the caller can build
// req.Tools wire shapes if needed.
//
// The socket path baked into the env is StableSocketPath(agentID) —
// the same value the per-turn bridge listens on.
func EnsureCodexMCPRegistered(ctx context.Context, codexBinary, pluginBinary, agentID string) (string, error) {
name := codexServerName(agentID)
if _, ok := registered.Load(name); ok {
return name, nil
}
// Remove first so we don't get "already exists"; ignore error.
_ = exec.CommandContext(ctx, codexBinary, "mcp", "remove", name).Run()
args := []string{
"mcp", "add",
"--env", "PLEXUM_MCP_SOCKET=" + StableSocketPath(agentID),
"--env", "PLEXUM_MCP_AGENT_ID=" + agentID,
name, "--", pluginBinary, "mcp-host",
}
cmd := exec.CommandContext(ctx, codexBinary, args...)
if out, err := cmd.CombinedOutput(); err != nil {
return "", fmt.Errorf("codex mcp add: %w: %s", err, strings.TrimSpace(string(out)))
}
registered.Store(name, true)
return name, nil
}
// StableSocketPath returns the per-agent unix socket path the bridge
// listens on AND codex's mcp-host subprocess dials into. Keeping it
// stable lets us register codex's MCP server once and reuse it.
func StableSocketPath(agentID string) string {
return filepath.Join(os.TempDir(), "plexum-codex-mcp-"+sanitize(agentID)+".sock")
}
// codexServerName produces the per-agent MCP server name codex stores
// in its global config. Names need to be filesystem-safe; we just
// reuse the sanitised agent id.
func codexServerName(agentID string) string {
return "plexum-host-" + sanitize(agentID)
}
func sanitize(s string) string {
out := make([]byte, 0, len(s))
for i := 0; i < len(s); i++ {
c := s[i]
switch {
case c >= 'a' && c <= 'z', c >= 'A' && c <= 'Z',
c >= '0' && c <= '9', c == '-', c == '_':
out = append(out, c)
default:
out = append(out, '_')
}
}
return string(out)
}

View File

@@ -32,6 +32,7 @@ import (
"path/filepath"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge"
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
)
@@ -62,6 +63,28 @@ func Run(
return nil, errors.New("codex: no user text in request messages")
}
// Per-turn MCP bridge so codex can call Plexum host tools the
// host advertised in req.Tools. Best-effort: setup failure logs
// + continues; codex still runs with its built-in tools.
var br *mcpbridge.Bridge
if len(req.Tools) > 0 {
exe, err := os.Executable()
if err == nil {
if _, regErr := EnsureCodexMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil {
host.Log("warn", "codex: mcp registration failed", map[string]any{"err": regErr.Error()})
} else {
// Open the listener on the stable per-agent socket
// path; codex's mcp-host subprocess will dial it.
b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID))
if err != nil {
host.Log("warn", "codex: bridge setup failed", map[string]any{"err": err.Error()})
} else {
br = b
}
}
}
}
resumeID := loadSessionID(workspace)
// Build the argv. `codex exec [resume <id>]` then flags then prompt.
@@ -105,7 +128,10 @@ func Run(
}
out := make(chan canonical.TurnEvent, 16)
go pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out)
go func() {
pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out)
br.Close() // nil-safe
}()
return out, nil
}
@@ -249,6 +275,23 @@ func pumpEvents(
if hadError || werr != nil {
stopReason = canonical.StopReason("error")
}
// Read the session rollout once codex has finished and replay
// every recorded function_call / function_call_output pair as
// tool_call_start/end + EventToolResult — with codex's REAL
// call_X ids. The agentic loop attaches these to the iteration
// so dynamic-* consume can target the same ids SessionMutator
// later mirrors into the session JSONL.
if sessionPath, perr := FindCodexSessionFile(workspace); perr == nil {
calls, err := ParseRolloutToolCalls(sessionPath)
if err != nil {
host.Log("warn", "codex: parse rollout failed",
map[string]any{"err": err.Error(), "path": sessionPath})
} else {
EmitCodexToolCalls(calls, emit)
host.Log("debug", "codex: emitted tool calls from rollout",
map[string]any{"count": len(calls), "path": sessionPath})
}
}
emit(canonical.TurnEvent{
Type: canonical.EventMessageEnd, StopReason: stopReason, Usage: &usage,
})

View File

@@ -1,20 +1,31 @@
// session_mutate.go — codex session file path resolution. The actual
// JSONL rewrite is deferred (see main.go MutateSession comment): codex
// keys tool calls by `call_<id>`, not the `toolu_<id>` Plexum uses,
// so a v1 mirror would never match anything. We expose the path
// finder here so the host-side wiring telemetry stays useful and the
// v2 rewriter has its entry point.
// session_mutate — real mirror implementation for codex. After
// Plexum's dynamic-* tools flush a consume batch on dynamic.jsonl,
// host calls SessionMutator with the applied []BlockMutation and we
// rewrite the matching response_item entries in codex's rollout
// JSONL so the next `codex exec resume <thread>` sees the same view.
//
// Codex layout (observed):
//
// ~/.codex/sessions/<YYYY>/<MM>/<DD>/rollout-<ISO>-<thread_id>.jsonl
//
// Older layout (pre-0.5x) wrote into the top-level sessions dir with
// the same filename convention. We probe both.
// Each tool call lands as two response_item lines:
//
// {"timestamp":"...","type":"response_item","payload":{
// "type":"function_call","name":"...","arguments":"<json>",
// "call_id":"call_X","namespace":"mcp__<server>"}}
// {"timestamp":"...","type":"response_item","payload":{
// "type":"function_call_output","call_id":"call_X","output":"..."}}
//
// Mutation ops (mirror host persistence/mutation.go):
//
// consume-light → output → "...(consumed)"
// consume-heavy → output → "...(tool called)" AND arguments → "{}"
package runner
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"os"
@@ -22,9 +33,138 @@ import (
"strings"
)
const (
consumedMarker = "...(consumed)"
toolCalledMarker = "...(tool called)"
)
// MutateCodexSession edits codex's rollout file in lockstep with
// Plexum's dynamic.jsonl mutations. Returns the count of distinct
// call_ids actually touched; missing call_ids are silently skipped
// (the session may not include them — e.g. an old call from a
// previous resume).
func MutateCodexSession(workspace string, mutations []codexMutation) (int, error) {
if len(mutations) == 0 {
return 0, nil
}
sessionPath, err := FindCodexSessionFile(workspace)
if err != nil {
return 0, nil // no session yet — nothing to mirror
}
byID := make(map[string]string, len(mutations))
for _, m := range mutations {
byID[m.BlockID] = m.Op
}
f, err := os.Open(sessionPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return 0, nil
}
return 0, fmt.Errorf("open codex session: %w", err)
}
defer f.Close()
var out strings.Builder
touched := map[string]struct{}{}
mutated := false
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 64*1024), 16*1024*1024)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
out.WriteByte('\n')
continue
}
rewritten, hit := rewriteCodexLine(line, byID)
out.Write(rewritten)
out.WriteByte('\n')
if hit != "" {
touched[hit] = struct{}{}
mutated = true
}
}
if err := sc.Err(); err != nil {
return 0, fmt.Errorf("scan codex session: %w", err)
}
if !mutated {
return 0, nil
}
if err := writeAtomic(sessionPath, []byte(out.String())); err != nil {
return len(touched), fmt.Errorf("rewrite codex session: %w", err)
}
return len(touched), nil
}
// CodexMutation is the local view of plugin.BlockMutation — kept
// here so the cmd/ caller doesn't bleed SDK types into runner's
// public surface.
type CodexMutation struct {
BlockID string
Op string
}
// codexMutation kept as an internal alias to avoid renaming inside
// the package; new callers use CodexMutation.
type codexMutation = CodexMutation
// rewriteCodexLine decodes one response_item line, applies the
// mutation op when the payload's call_id is targeted, and re-encodes.
// Lines that aren't response_item / function_call / function_call_output
// pass through verbatim.
func rewriteCodexLine(line []byte, byID map[string]string) ([]byte, string) {
var obj map[string]any
if err := json.Unmarshal(line, &obj); err != nil {
return line, ""
}
if obj["type"] != "response_item" {
return line, ""
}
payload, ok := obj["payload"].(map[string]any)
if !ok {
return line, ""
}
callID, _ := payload["call_id"].(string)
op, ok := byID[callID]
if !ok || callID == "" {
return line, ""
}
switch payload["type"] {
case "function_call":
if op == "consume-heavy" {
payload["arguments"] = "{}"
}
case "function_call_output":
marker := consumedMarker
if op == "consume-heavy" {
marker = toolCalledMarker
}
payload["output"] = marker
default:
return line, ""
}
obj["payload"] = payload
rewritten, err := json.Marshal(obj)
if err != nil {
return line, ""
}
return rewritten, callID
}
func writeAtomic(path string, data []byte) error {
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0o600); err != nil {
return err
}
return os.Rename(tmp, path)
}
// FindCodexSessionFile resolves the rollout JSONL for the thread_id
// captured into workspace/.plexum-codex-session. Returns ("", error)
// when no session id is recorded yet OR no matching file is on disk.
// captured into workspace/.plexum-codex-session. Walks the
// date-bucketed ~/.codex/sessions tree for any file containing the
// thread_id substring. Returns ("", error) when no session has been
// captured yet OR no matching file is on disk.
func FindCodexSessionFile(workspace string) (string, error) {
threadID := loadSessionID(workspace)
if threadID == "" {

View File

@@ -0,0 +1,157 @@
// session_parse — after codex exits, scan its rollout JSONL session
// file for tool calls and tool results. Codex records both native
// (function_call/function_call_output) and MCP (function_call with
// `namespace = "mcp__<server>"`) calls with the same shape and a
// canonical `call_id`. We use those ids when emitting EventToolCall*
// + EventToolResult so the agentic loop's iteration record matches
// what's actually on disk in codex's rollout — which is what
// SessionMutator rewrites on consume.
package runner
import (
"bufio"
"encoding/json"
"errors"
"os"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
)
// CodexToolCall is one (call_id, name, arguments, output) tuple
// extracted from a rollout JSONL.
type CodexToolCall struct {
CallID string
Name string
Namespace string // "mcp__<server>" for MCP calls; "" for native
Arguments string // JSON string per codex's format
Output string // function_call_output.output; "" if no result yet
}
// ParseRolloutToolCalls walks the rollout file once, returning
// every (call_id, name, args, output) tuple in input order. The
// caller emits TurnEvents from this slice. Pairs with no
// function_call_output yet (mid-turn truncation) still appear with
// Output == "".
func ParseRolloutToolCalls(path string) ([]CodexToolCall, error) {
f, err := os.Open(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, err
}
defer f.Close()
// Index calls by call_id so output can attach.
calls := map[string]*CodexToolCall{}
var order []string
sc := bufio.NewScanner(f)
sc.Buffer(make([]byte, 64*1024), 16*1024*1024)
for sc.Scan() {
line := sc.Bytes()
if len(line) == 0 {
continue
}
var rec struct {
Type string `json:"type"`
Payload struct {
Type string `json:"type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Arguments string `json:"arguments"`
CallID string `json:"call_id"`
Output any `json:"output"`
} `json:"payload"`
}
if err := json.Unmarshal(line, &rec); err != nil {
continue
}
if rec.Type != "response_item" || rec.Payload.CallID == "" {
continue
}
switch rec.Payload.Type {
case "function_call":
c, ok := calls[rec.Payload.CallID]
if !ok {
c = &CodexToolCall{CallID: rec.Payload.CallID}
calls[rec.Payload.CallID] = c
order = append(order, rec.Payload.CallID)
}
c.Name = rec.Payload.Name
c.Namespace = rec.Payload.Namespace
c.Arguments = rec.Payload.Arguments
case "function_call_output":
c, ok := calls[rec.Payload.CallID]
if !ok {
// Output without prior call_id seen — synthesize.
c = &CodexToolCall{CallID: rec.Payload.CallID}
calls[rec.Payload.CallID] = c
order = append(order, rec.Payload.CallID)
}
c.Output = outputAsString(rec.Payload.Output)
}
}
if err := sc.Err(); err != nil {
return nil, err
}
out := make([]CodexToolCall, 0, len(order))
for _, id := range order {
out = append(out, *calls[id])
}
return out, nil
}
// outputAsString flattens whatever shape codex writes for function_call_output.
// Often a plain string; sometimes an object with content/text.
func outputAsString(raw any) string {
switch v := raw.(type) {
case string:
return v
case nil:
return ""
default:
b, err := json.Marshal(v)
if err != nil {
return ""
}
return string(b)
}
}
// EmitCodexToolCalls translates the parsed slice into TurnEvents in
// input order. tool_use blocks land in the assistant message;
// EventToolResult lands on the iteration's ToolResults under the
// matching call_id.
func EmitCodexToolCalls(calls []CodexToolCall, emit func(canonical.TurnEvent)) {
for _, c := range calls {
// Tool name: keep codex's `function_call.name`. For MCP calls
// codex stores name without the mcp__ prefix and tracks the
// server in `namespace`; if you want the original tool name
// the agent saw (e.g. "mcp__plexum-host-alice__plexum_echo"),
// build it from namespace + name. We store the plain `.name`
// so downstream consume sync targets the call_id which is the
// source of truth.
emit(canonical.TurnEvent{
Type: canonical.EventToolCallStart,
ToolCallID: c.CallID,
ToolName: c.Name,
PartialJSON: c.Arguments,
})
emit(canonical.TurnEvent{
Type: canonical.EventToolCallEnd,
ToolCallID: c.CallID,
})
emit(canonical.TurnEvent{
Type: canonical.EventToolResult,
ToolResult: &canonical.ToolResultBlock{
Type: canonical.BlockTypeToolResult,
ToolUseID: c.CallID,
Content: []canonical.Block{
canonical.NewTextBlock(c.Output),
},
},
})
}
}