Files
Plexum-openai-provider/internal/runner/runner.go
hzhang e1c4add9fe fix(runner): write thread id atomically (tmp+rename)
Aligns with anthropic-contractor: a crash mid-write leaves a stale
.plexum-codex-session.tmp instead of an empty real file that would
make the next turn start fresh and lose conversation thread.
2026-06-01 13:45:11 +01:00

318 lines
8.1 KiB
Go

// Package runner forks the OpenAI `codex` CLI per Plexum turn and
// translates its JSONL output into canonical TurnEvents.
//
// Each Stream call spawns one subprocess:
//
// codex exec --skip-git-repo-check --dangerously-bypass-approvals-and-sandbox --json [<prompt>]
// < /dev/null
//
// For multi-turn continuity:
//
// codex exec resume <thread_id> --skip-git-repo-check ... --json <prompt>
//
// in the agent's workspace dir. Codex emits a stream of JSONL events:
// {"type":"thread.started","thread_id":"..."}
// {"type":"turn.started"}
// {"type":"item.completed","item":{"id":"...","type":"agent_message","text":"..."}}
// {"type":"turn.completed","usage":{"input_tokens":..., ...}}
//
// Session continuity: per-workspace thread_id stored at
// <workspace>/.plexum-codex-session, used as the resume arg.
package runner
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
)
// SessionFile is the per-workspace filename storing the codex
// thread_id between turns.
const SessionFile = ".plexum-codex-session"
// Run is the ProviderPlugin Stream implementation.
func Run(
ctx context.Context,
host plugin.HostAPI,
agent plugin.AgentContext,
model string,
binary string,
extraArgs []string,
req canonical.TurnRequest,
) (<-chan canonical.TurnEvent, error) {
workspace := agent.Workspace
if workspace == "" {
return nil, errors.New("codex: agent workspace required (host must supply AgentContext)")
}
if err := os.MkdirAll(workspace, 0o755); err != nil {
return nil, fmt.Errorf("mkdir workspace: %w", err)
}
prompt := extractLastUserText(req.Messages)
if prompt == "" {
return nil, errors.New("codex: no user text in request messages")
}
resumeID := loadSessionID(workspace)
// Build the argv. `codex exec [resume <id>]` then flags then prompt.
args := []string{"exec"}
if resumeID != "" {
args = append(args, "resume", resumeID)
}
args = append(args,
"--skip-git-repo-check",
"--dangerously-bypass-approvals-and-sandbox",
"--json",
)
if model != "" {
args = append(args, "--model", model)
}
args = append(args, extraArgs...)
// Prompt is the last positional. Codex's exec/resume both accept
// it positionally; we pass via arg to avoid the stdin path.
args = append(args, prompt)
host.Log("info", "codex: spawning CLI", map[string]any{
"workspace": workspace, "model": model, "resume": resumeID,
"prompt_len": len(prompt),
})
cmd := exec.CommandContext(ctx, binary, args...)
cmd.Dir = workspace
cmd.Env = os.Environ()
cmd.Stdin = nil // detach from inherited stdin (codex would otherwise wait)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("codex: start: %w", err)
}
out := make(chan canonical.TurnEvent, 16)
go pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out)
return out, nil
}
// pumpEvents drains stdout (JSONL), translates each line to a
// canonical TurnEvent, and closes the channel on subprocess exit.
func pumpEvents(
ctx context.Context,
host plugin.HostAPI,
cmd *exec.Cmd,
stdout io.Reader,
stderr io.Reader,
workspace string,
out chan<- canonical.TurnEvent,
) {
defer close(out)
// Drain stderr in a side goroutine — codex chatters informational
// stuff there ("Reading additional input from stdin..." etc.).
go func() {
sc := bufio.NewScanner(stderr)
for sc.Scan() {
host.Log("debug", "codex stderr: "+sc.Text(), nil)
}
}()
emit := func(ev canonical.TurnEvent) {
select {
case out <- ev:
case <-ctx.Done():
}
}
emit(canonical.TurnEvent{Type: canonical.EventMessageStart})
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
var (
usage canonical.Usage
hadError bool
errMsg string
)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var head struct {
Type string `json:"type"`
}
if err := json.Unmarshal(line, &head); err != nil {
host.Log("warn", "codex: unparseable JSONL line",
map[string]any{"err": err.Error(), "line": string(line)})
continue
}
switch head.Type {
case "thread.started":
var ev struct {
ThreadID string `json:"thread_id"`
}
_ = json.Unmarshal(line, &ev)
if ev.ThreadID != "" {
saveSessionID(workspace, ev.ThreadID, host)
}
case "turn.started":
// nothing user-facing
case "item.started":
// codex announces an item; we wait for completed.
case "item.completed":
var ev struct {
Item struct {
ID string `json:"id"`
Type string `json:"type"`
Text string `json:"text"`
} `json:"item"`
}
if err := json.Unmarshal(line, &ev); err != nil {
continue
}
// "agent_message" is the assistant's text reply.
// Other item types (reasoning_summary, tool_use, etc.)
// surface as logs for now.
if ev.Item.Type == "agent_message" && ev.Item.Text != "" {
emit(canonical.TurnEvent{
Type: canonical.EventTextDelta, Text: ev.Item.Text,
})
} else if ev.Item.Type != "" {
host.Log("debug", "codex item",
map[string]any{"type": ev.Item.Type, "id": ev.Item.ID})
}
case "turn.completed":
var ev struct {
Usage struct {
InputTokens int `json:"input_tokens"`
CachedInputTokens int `json:"cached_input_tokens"`
OutputTokens int `json:"output_tokens"`
ReasoningTokens int `json:"reasoning_output_tokens"`
} `json:"usage"`
}
if err := json.Unmarshal(line, &ev); err == nil {
usage.InputTokens = ev.Usage.InputTokens
usage.OutputTokens = ev.Usage.OutputTokens
usage.CacheReadTokens = ev.Usage.CachedInputTokens
usage.ThinkingTokens = ev.Usage.ReasoningTokens
}
case "error", "turn.failed":
var ev struct {
Message string `json:"message"`
Error struct {
Message string `json:"message"`
} `json:"error"`
}
_ = json.Unmarshal(line, &ev)
hadError = true
errMsg = ev.Message
if errMsg == "" {
errMsg = ev.Error.Message
}
emit(canonical.TurnEvent{
Type: canonical.EventError,
Error: &canonical.ErrorInfo{Code: head.Type, Message: errMsg},
})
default:
host.Log("debug", "codex unknown event",
map[string]any{"type": head.Type, "line": truncate(string(line), 200)})
}
}
if serr := scanner.Err(); serr != nil {
host.Log("warn", "codex scanner", map[string]any{"err": serr.Error()})
}
werr := cmd.Wait()
stopReason := canonical.StopEndTurn
if hadError || werr != nil {
stopReason = canonical.StopReason("error")
}
emit(canonical.TurnEvent{
Type: canonical.EventMessageEnd, StopReason: stopReason, Usage: &usage,
})
}
// ---- helpers ----
func extractLastUserText(msgs []canonical.Message) string {
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
if m.Role != canonical.RoleUser {
continue
}
var parts []string
for _, b := range m.Content {
if t, ok := b.(*canonical.TextBlock); ok {
parts = append(parts, t.Text)
}
}
if len(parts) > 0 {
out := parts[0]
for _, p := range parts[1:] {
out += "\n" + p
}
return out
}
}
return ""
}
func loadSessionID(workspace string) string {
raw, err := os.ReadFile(filepath.Join(workspace, SessionFile))
if err != nil {
return ""
}
// Trim possible trailing newline.
s := string(raw)
for len(s) > 0 && (s[len(s)-1] == '\n' || s[len(s)-1] == '\r' || s[len(s)-1] == ' ') {
s = s[:len(s)-1]
}
return s
}
func saveSessionID(workspace, id string, host plugin.HostAPI) {
if id == "" {
return
}
path := filepath.Join(workspace, SessionFile)
tmp := path + ".tmp"
if err := os.WriteFile(tmp, []byte(id), 0o600); err != nil {
host.Log("warn", "codex: save session id failed",
map[string]any{"err": err.Error(), "path": tmp})
return
}
if err := os.Rename(tmp, path); err != nil {
host.Log("warn", "codex: rename session id failed",
map[string]any{"err": err.Error(), "path": path})
}
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "…"
}