Three-part wiring so codex agents get full dynamic-tool consume
support:
1. mcp-host subcommand in the plugin binary (routes argv[1] to
mcpbridge.Run). Reads PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID
from env, baked into codex's `mcp add --env` registration.
2. EnsureCodexMCPRegistered registers a per-agent stable server
name (plexum-host-<sanitised>) lazily on first turn. Stable
per-agent socket path via StableSocketPath so codex's
registration entry and the per-turn bridge listener agree.
3. Post-turn ParseRolloutToolCalls + EmitCodexToolCalls walks the
matching rollout-*-<thread_id>.jsonl and emits canonical events
for every function_call + function_call_output pair using
codex's REAL call_X ids. dynamic.jsonl now has block_ids that
match codex's session, so consume mirror has real targets.
4. MutateCodexSession rewrites those response_item entries on
consume: function_call.arguments → "{}" (heavy), output → marker.
E2E verified: codex resume sees "...(tool called)" instead of
the original output.
361 lines
9.8 KiB
Go
361 lines
9.8 KiB
Go
// Package runner forks the OpenAI `codex` CLI per Plexum turn and
|
|
// translates its JSONL output into canonical TurnEvents.
|
|
//
|
|
// Each Stream call spawns one subprocess:
|
|
//
|
|
// codex exec --skip-git-repo-check --dangerously-bypass-approvals-and-sandbox --json [<prompt>]
|
|
// < /dev/null
|
|
//
|
|
// For multi-turn continuity:
|
|
//
|
|
// codex exec resume <thread_id> --skip-git-repo-check ... --json <prompt>
|
|
//
|
|
// in the agent's workspace dir. Codex emits a stream of JSONL events:
|
|
// {"type":"thread.started","thread_id":"..."}
|
|
// {"type":"turn.started"}
|
|
// {"type":"item.completed","item":{"id":"...","type":"agent_message","text":"..."}}
|
|
// {"type":"turn.completed","usage":{"input_tokens":..., ...}}
|
|
//
|
|
// Session continuity: per-workspace thread_id stored at
|
|
// <workspace>/.plexum-codex-session, used as the resume arg.
|
|
package runner
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
|
|
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
|
|
"git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge"
|
|
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
|
)
|
|
|
|
// SessionFile is the per-workspace filename storing the codex
|
|
// thread_id between turns.
|
|
const SessionFile = ".plexum-codex-session"
|
|
|
|
// Run is the ProviderPlugin Stream implementation.
|
|
func Run(
|
|
ctx context.Context,
|
|
host plugin.HostAPI,
|
|
agent plugin.AgentContext,
|
|
model string,
|
|
binary string,
|
|
extraArgs []string,
|
|
req canonical.TurnRequest,
|
|
) (<-chan canonical.TurnEvent, error) {
|
|
workspace := agent.Workspace
|
|
if workspace == "" {
|
|
return nil, errors.New("codex: agent workspace required (host must supply AgentContext)")
|
|
}
|
|
if err := os.MkdirAll(workspace, 0o755); err != nil {
|
|
return nil, fmt.Errorf("mkdir workspace: %w", err)
|
|
}
|
|
|
|
prompt := extractLastUserText(req.Messages)
|
|
if prompt == "" {
|
|
return nil, errors.New("codex: no user text in request messages")
|
|
}
|
|
|
|
// Per-turn MCP bridge so codex can call Plexum host tools the
|
|
// host advertised in req.Tools. Best-effort: setup failure logs
|
|
// + continues; codex still runs with its built-in tools.
|
|
var br *mcpbridge.Bridge
|
|
if len(req.Tools) > 0 {
|
|
exe, err := os.Executable()
|
|
if err == nil {
|
|
if _, regErr := EnsureCodexMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil {
|
|
host.Log("warn", "codex: mcp registration failed", map[string]any{"err": regErr.Error()})
|
|
} else {
|
|
// Open the listener on the stable per-agent socket
|
|
// path; codex's mcp-host subprocess will dial it.
|
|
b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID))
|
|
if err != nil {
|
|
host.Log("warn", "codex: bridge setup failed", map[string]any{"err": err.Error()})
|
|
} else {
|
|
br = b
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
resumeID := loadSessionID(workspace)
|
|
|
|
// Build the argv. `codex exec [resume <id>]` then flags then prompt.
|
|
args := []string{"exec"}
|
|
if resumeID != "" {
|
|
args = append(args, "resume", resumeID)
|
|
}
|
|
args = append(args,
|
|
"--skip-git-repo-check",
|
|
"--dangerously-bypass-approvals-and-sandbox",
|
|
"--json",
|
|
)
|
|
if model != "" {
|
|
args = append(args, "--model", model)
|
|
}
|
|
args = append(args, extraArgs...)
|
|
// Prompt is the last positional. Codex's exec/resume both accept
|
|
// it positionally; we pass via arg to avoid the stdin path.
|
|
args = append(args, prompt)
|
|
|
|
host.Log("info", "codex: spawning CLI", map[string]any{
|
|
"workspace": workspace, "model": model, "resume": resumeID,
|
|
"prompt_len": len(prompt),
|
|
})
|
|
|
|
cmd := exec.CommandContext(ctx, binary, args...)
|
|
cmd.Dir = workspace
|
|
cmd.Env = os.Environ()
|
|
cmd.Stdin = nil // detach from inherited stdin (codex would otherwise wait)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("stdout pipe: %w", err)
|
|
}
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("stderr pipe: %w", err)
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, fmt.Errorf("codex: start: %w", err)
|
|
}
|
|
|
|
out := make(chan canonical.TurnEvent, 16)
|
|
go func() {
|
|
pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out)
|
|
br.Close() // nil-safe
|
|
}()
|
|
return out, nil
|
|
}
|
|
|
|
// pumpEvents drains stdout (JSONL), translates each line to a
|
|
// canonical TurnEvent, and closes the channel on subprocess exit.
|
|
func pumpEvents(
|
|
ctx context.Context,
|
|
host plugin.HostAPI,
|
|
cmd *exec.Cmd,
|
|
stdout io.Reader,
|
|
stderr io.Reader,
|
|
workspace string,
|
|
out chan<- canonical.TurnEvent,
|
|
) {
|
|
defer close(out)
|
|
|
|
// Drain stderr in a side goroutine — codex chatters informational
|
|
// stuff there ("Reading additional input from stdin..." etc.).
|
|
go func() {
|
|
sc := bufio.NewScanner(stderr)
|
|
for sc.Scan() {
|
|
host.Log("debug", "codex stderr: "+sc.Text(), nil)
|
|
}
|
|
}()
|
|
|
|
emit := func(ev canonical.TurnEvent) {
|
|
select {
|
|
case out <- ev:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
emit(canonical.TurnEvent{Type: canonical.EventMessageStart})
|
|
|
|
scanner := bufio.NewScanner(stdout)
|
|
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
|
|
|
var (
|
|
usage canonical.Usage
|
|
hadError bool
|
|
errMsg string
|
|
)
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var head struct {
|
|
Type string `json:"type"`
|
|
}
|
|
if err := json.Unmarshal(line, &head); err != nil {
|
|
host.Log("warn", "codex: unparseable JSONL line",
|
|
map[string]any{"err": err.Error(), "line": string(line)})
|
|
continue
|
|
}
|
|
|
|
switch head.Type {
|
|
case "thread.started":
|
|
var ev struct {
|
|
ThreadID string `json:"thread_id"`
|
|
}
|
|
_ = json.Unmarshal(line, &ev)
|
|
if ev.ThreadID != "" {
|
|
saveSessionID(workspace, ev.ThreadID, host)
|
|
}
|
|
|
|
case "turn.started":
|
|
// nothing user-facing
|
|
|
|
case "item.started":
|
|
// codex announces an item; we wait for completed.
|
|
|
|
case "item.completed":
|
|
var ev struct {
|
|
Item struct {
|
|
ID string `json:"id"`
|
|
Type string `json:"type"`
|
|
Text string `json:"text"`
|
|
} `json:"item"`
|
|
}
|
|
if err := json.Unmarshal(line, &ev); err != nil {
|
|
continue
|
|
}
|
|
// "agent_message" is the assistant's text reply.
|
|
// Other item types (reasoning_summary, tool_use, etc.)
|
|
// surface as logs for now.
|
|
if ev.Item.Type == "agent_message" && ev.Item.Text != "" {
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventTextDelta, Text: ev.Item.Text,
|
|
})
|
|
} else if ev.Item.Type != "" {
|
|
host.Log("debug", "codex item",
|
|
map[string]any{"type": ev.Item.Type, "id": ev.Item.ID})
|
|
}
|
|
|
|
case "turn.completed":
|
|
var ev struct {
|
|
Usage struct {
|
|
InputTokens int `json:"input_tokens"`
|
|
CachedInputTokens int `json:"cached_input_tokens"`
|
|
OutputTokens int `json:"output_tokens"`
|
|
ReasoningTokens int `json:"reasoning_output_tokens"`
|
|
} `json:"usage"`
|
|
}
|
|
if err := json.Unmarshal(line, &ev); err == nil {
|
|
usage.InputTokens = ev.Usage.InputTokens
|
|
usage.OutputTokens = ev.Usage.OutputTokens
|
|
usage.CacheReadTokens = ev.Usage.CachedInputTokens
|
|
usage.ThinkingTokens = ev.Usage.ReasoningTokens
|
|
}
|
|
|
|
case "error", "turn.failed":
|
|
var ev struct {
|
|
Message string `json:"message"`
|
|
Error struct {
|
|
Message string `json:"message"`
|
|
} `json:"error"`
|
|
}
|
|
_ = json.Unmarshal(line, &ev)
|
|
hadError = true
|
|
errMsg = ev.Message
|
|
if errMsg == "" {
|
|
errMsg = ev.Error.Message
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventError,
|
|
Error: &canonical.ErrorInfo{Code: head.Type, Message: errMsg},
|
|
})
|
|
|
|
default:
|
|
host.Log("debug", "codex unknown event",
|
|
map[string]any{"type": head.Type, "line": truncate(string(line), 200)})
|
|
}
|
|
}
|
|
if serr := scanner.Err(); serr != nil {
|
|
host.Log("warn", "codex scanner", map[string]any{"err": serr.Error()})
|
|
}
|
|
|
|
werr := cmd.Wait()
|
|
stopReason := canonical.StopEndTurn
|
|
if hadError || werr != nil {
|
|
stopReason = canonical.StopReason("error")
|
|
}
|
|
// Read the session rollout once codex has finished and replay
|
|
// every recorded function_call / function_call_output pair as
|
|
// tool_call_start/end + EventToolResult — with codex's REAL
|
|
// call_X ids. The agentic loop attaches these to the iteration
|
|
// so dynamic-* consume can target the same ids SessionMutator
|
|
// later mirrors into the session JSONL.
|
|
if sessionPath, perr := FindCodexSessionFile(workspace); perr == nil {
|
|
calls, err := ParseRolloutToolCalls(sessionPath)
|
|
if err != nil {
|
|
host.Log("warn", "codex: parse rollout failed",
|
|
map[string]any{"err": err.Error(), "path": sessionPath})
|
|
} else {
|
|
EmitCodexToolCalls(calls, emit)
|
|
host.Log("debug", "codex: emitted tool calls from rollout",
|
|
map[string]any{"count": len(calls), "path": sessionPath})
|
|
}
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventMessageEnd, StopReason: stopReason, Usage: &usage,
|
|
})
|
|
}
|
|
|
|
// ---- helpers ----
|
|
|
|
func extractLastUserText(msgs []canonical.Message) string {
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
m := msgs[i]
|
|
if m.Role != canonical.RoleUser {
|
|
continue
|
|
}
|
|
var parts []string
|
|
for _, b := range m.Content {
|
|
if t, ok := b.(*canonical.TextBlock); ok {
|
|
parts = append(parts, t.Text)
|
|
}
|
|
}
|
|
if len(parts) > 0 {
|
|
out := parts[0]
|
|
for _, p := range parts[1:] {
|
|
out += "\n" + p
|
|
}
|
|
return out
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func loadSessionID(workspace string) string {
|
|
raw, err := os.ReadFile(filepath.Join(workspace, SessionFile))
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
// Trim possible trailing newline.
|
|
s := string(raw)
|
|
for len(s) > 0 && (s[len(s)-1] == '\n' || s[len(s)-1] == '\r' || s[len(s)-1] == ' ') {
|
|
s = s[:len(s)-1]
|
|
}
|
|
return s
|
|
}
|
|
|
|
func saveSessionID(workspace, id string, host plugin.HostAPI) {
|
|
if id == "" {
|
|
return
|
|
}
|
|
path := filepath.Join(workspace, SessionFile)
|
|
tmp := path + ".tmp"
|
|
if err := os.WriteFile(tmp, []byte(id), 0o600); err != nil {
|
|
host.Log("warn", "codex: save session id failed",
|
|
map[string]any{"err": err.Error(), "path": tmp})
|
|
return
|
|
}
|
|
if err := os.Rename(tmp, path); err != nil {
|
|
host.Log("warn", "codex: rename session id failed",
|
|
map[string]any{"err": err.Error(), "path": path})
|
|
}
|
|
}
|
|
|
|
func truncate(s string, n int) string {
|
|
if len(s) <= n {
|
|
return s
|
|
}
|
|
return s[:n] + "…"
|
|
}
|