// Package runner forks the OpenAI `codex` CLI per Plexum turn and // translates its JSONL output into canonical TurnEvents. // // Each Stream call spawns one subprocess: // // codex exec --skip-git-repo-check --dangerously-bypass-approvals-and-sandbox --json [] // < /dev/null // // For multi-turn continuity: // // codex exec resume --skip-git-repo-check ... --json // // in the agent's workspace dir. Codex emits a stream of JSONL events: // {"type":"thread.started","thread_id":"..."} // {"type":"turn.started"} // {"type":"item.completed","item":{"id":"...","type":"agent_message","text":"..."}} // {"type":"turn.completed","usage":{"input_tokens":..., ...}} // // Session continuity: per-workspace thread_id stored at // /.plexum-codex-session, used as the resume arg. package runner import ( "bufio" "context" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" "git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge" plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" ) // SessionFile is the per-workspace filename storing the codex // thread_id between turns. const SessionFile = ".plexum-codex-session" // Run is the ProviderPlugin Stream implementation. func Run( ctx context.Context, host plugin.HostAPI, agent plugin.AgentContext, model string, binary string, extraArgs []string, req canonical.TurnRequest, ) (<-chan canonical.TurnEvent, error) { workspace := agent.Workspace if workspace == "" { return nil, errors.New("codex: agent workspace required (host must supply AgentContext)") } if err := os.MkdirAll(workspace, 0o755); err != nil { return nil, fmt.Errorf("mkdir workspace: %w", err) } prompt := extractLastUserText(req.Messages) if prompt == "" { return nil, errors.New("codex: no user text in request messages") } // Per-turn MCP bridge so codex can call Plexum host tools the // host advertised in req.Tools. Best-effort: setup failure logs // + continues; codex still runs with its built-in tools. var br *mcpbridge.Bridge if len(req.Tools) > 0 { exe, err := os.Executable() if err == nil { if _, regErr := EnsureCodexMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil { host.Log("warn", "codex: mcp registration failed", map[string]any{"err": regErr.Error()}) } else { // Open the listener on the stable per-agent socket // path; codex's mcp-host subprocess will dial it. b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID)) if err != nil { host.Log("warn", "codex: bridge setup failed", map[string]any{"err": err.Error()}) } else { br = b } } } } resumeID := loadSessionID(workspace) // Build the argv. `codex exec [resume ]` then flags then prompt. args := []string{"exec"} if resumeID != "" { args = append(args, "resume", resumeID) } args = append(args, "--skip-git-repo-check", "--dangerously-bypass-approvals-and-sandbox", "--json", ) if model != "" { args = append(args, "--model", model) } args = append(args, extraArgs...) // Prompt is the last positional. Codex's exec/resume both accept // it positionally; we pass via arg to avoid the stdin path. args = append(args, prompt) host.Log("info", "codex: spawning CLI", map[string]any{ "workspace": workspace, "model": model, "resume": resumeID, "prompt_len": len(prompt), }) cmd := exec.CommandContext(ctx, binary, args...) cmd.Dir = workspace cmd.Env = os.Environ() cmd.Stdin = nil // detach from inherited stdin (codex would otherwise wait) stdout, err := cmd.StdoutPipe() if err != nil { return nil, fmt.Errorf("stdout pipe: %w", err) } stderr, err := cmd.StderrPipe() if err != nil { return nil, fmt.Errorf("stderr pipe: %w", err) } if err := cmd.Start(); err != nil { return nil, fmt.Errorf("codex: start: %w", err) } out := make(chan canonical.TurnEvent, 16) go func() { pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out) br.Close() // nil-safe }() return out, nil } // pumpEvents drains stdout (JSONL), translates each line to a // canonical TurnEvent, and closes the channel on subprocess exit. func pumpEvents( ctx context.Context, host plugin.HostAPI, cmd *exec.Cmd, stdout io.Reader, stderr io.Reader, workspace string, out chan<- canonical.TurnEvent, ) { defer close(out) // Drain stderr in a side goroutine — codex chatters informational // stuff there ("Reading additional input from stdin..." etc.). go func() { sc := bufio.NewScanner(stderr) for sc.Scan() { host.Log("debug", "codex stderr: "+sc.Text(), nil) } }() emit := func(ev canonical.TurnEvent) { select { case out <- ev: case <-ctx.Done(): } } emit(canonical.TurnEvent{Type: canonical.EventMessageStart}) scanner := bufio.NewScanner(stdout) scanner.Buffer(make([]byte, 64*1024), 4*1024*1024) var ( usage canonical.Usage hadError bool errMsg string ) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } var head struct { Type string `json:"type"` } if err := json.Unmarshal(line, &head); err != nil { host.Log("warn", "codex: unparseable JSONL line", map[string]any{"err": err.Error(), "line": string(line)}) continue } switch head.Type { case "thread.started": var ev struct { ThreadID string `json:"thread_id"` } _ = json.Unmarshal(line, &ev) if ev.ThreadID != "" { saveSessionID(workspace, ev.ThreadID, host) } case "turn.started": // nothing user-facing case "item.started": // codex announces an item; we wait for completed. case "item.completed": var ev struct { Item struct { ID string `json:"id"` Type string `json:"type"` Text string `json:"text"` } `json:"item"` } if err := json.Unmarshal(line, &ev); err != nil { continue } // "agent_message" is the assistant's text reply. // Other item types (reasoning_summary, tool_use, etc.) // surface as logs for now. if ev.Item.Type == "agent_message" && ev.Item.Text != "" { emit(canonical.TurnEvent{ Type: canonical.EventTextDelta, Text: ev.Item.Text, }) } else if ev.Item.Type != "" { host.Log("debug", "codex item", map[string]any{"type": ev.Item.Type, "id": ev.Item.ID}) } case "turn.completed": var ev struct { Usage struct { InputTokens int `json:"input_tokens"` CachedInputTokens int `json:"cached_input_tokens"` OutputTokens int `json:"output_tokens"` ReasoningTokens int `json:"reasoning_output_tokens"` } `json:"usage"` } if err := json.Unmarshal(line, &ev); err == nil { usage.InputTokens = ev.Usage.InputTokens usage.OutputTokens = ev.Usage.OutputTokens usage.CacheReadTokens = ev.Usage.CachedInputTokens usage.ThinkingTokens = ev.Usage.ReasoningTokens } case "error", "turn.failed": var ev struct { Message string `json:"message"` Error struct { Message string `json:"message"` } `json:"error"` } _ = json.Unmarshal(line, &ev) hadError = true errMsg = ev.Message if errMsg == "" { errMsg = ev.Error.Message } emit(canonical.TurnEvent{ Type: canonical.EventError, Error: &canonical.ErrorInfo{Code: head.Type, Message: errMsg}, }) default: host.Log("debug", "codex unknown event", map[string]any{"type": head.Type, "line": truncate(string(line), 200)}) } } if serr := scanner.Err(); serr != nil { host.Log("warn", "codex scanner", map[string]any{"err": serr.Error()}) } werr := cmd.Wait() stopReason := canonical.StopEndTurn if hadError || werr != nil { stopReason = canonical.StopReason("error") } // Read the session rollout once codex has finished and replay // every recorded function_call / function_call_output pair as // tool_call_start/end + EventToolResult — with codex's REAL // call_X ids. The agentic loop attaches these to the iteration // so dynamic-* consume can target the same ids SessionMutator // later mirrors into the session JSONL. if sessionPath, perr := FindCodexSessionFile(workspace); perr == nil { calls, err := ParseRolloutToolCalls(sessionPath) if err != nil { host.Log("warn", "codex: parse rollout failed", map[string]any{"err": err.Error(), "path": sessionPath}) } else { EmitCodexToolCalls(calls, emit) host.Log("debug", "codex: emitted tool calls from rollout", map[string]any{"count": len(calls), "path": sessionPath}) } } emit(canonical.TurnEvent{ Type: canonical.EventMessageEnd, StopReason: stopReason, Usage: &usage, }) } // ---- helpers ---- func extractLastUserText(msgs []canonical.Message) string { for i := len(msgs) - 1; i >= 0; i-- { m := msgs[i] if m.Role != canonical.RoleUser { continue } var parts []string for _, b := range m.Content { if t, ok := b.(*canonical.TextBlock); ok { parts = append(parts, t.Text) } } if len(parts) > 0 { out := parts[0] for _, p := range parts[1:] { out += "\n" + p } return out } } return "" } func loadSessionID(workspace string) string { raw, err := os.ReadFile(filepath.Join(workspace, SessionFile)) if err != nil { return "" } // Trim possible trailing newline. s := string(raw) for len(s) > 0 && (s[len(s)-1] == '\n' || s[len(s)-1] == '\r' || s[len(s)-1] == ' ') { s = s[:len(s)-1] } return s } func saveSessionID(workspace, id string, host plugin.HostAPI) { if id == "" { return } path := filepath.Join(workspace, SessionFile) tmp := path + ".tmp" if err := os.WriteFile(tmp, []byte(id), 0o600); err != nil { host.Log("warn", "codex: save session id failed", map[string]any{"err": err.Error(), "path": tmp}) return } if err := os.Rename(tmp, path); err != nil { host.Log("warn", "codex: rename session id failed", map[string]any{"err": err.Error(), "path": path}) } } func truncate(s string, n int) string { if len(s) <= n { return s } return s[:n] + "…" }