Plexum ProviderPlugin that wraps the local `codex` CLI binary
(OpenAI Codex CLI ≥ 0.135). Same CLI-driven pattern as
Plexum-gemini-provider and Plexum-anthropic-provider's contractor.
internal/runner/ (~200 LOC):
- Per Plexum turn, fork:
codex exec [resume <thread_id>] \
--skip-git-repo-check \
--dangerously-bypass-approvals-and-sandbox \
--json \
[--model <m>] \
"<last user msg>"
in agent workspace, with stdin = /dev/null (codex would otherwise
block waiting for additional input).
- Stream-parses JSONL events:
thread.started → save thread_id to .plexum-codex-session
item.completed → if type==agent_message, emit text_delta
turn.completed → capture usage (input/output/cached/reasoning)
error / turn.failed → emit canonical EventError
- Other event types (turn.started, item.started, reasoning, etc.)
logged at debug, don't produce user-facing events for v1
cmd/plexum-openai-provider-plugin/ implements ProviderPluginWithAgent
(needs AgentContext.Workspace as cwd).
Model surface:
default: ["codex"] (no --model flag → CLI default; only thing
ChatGPT-account installs support)
override via config.supported_models + config.model_args for
API-key installs that want gpt-5 / o3 / etc.
HostConfig (all optional):
binary default "codex" (operator should set abs path for
systemd PATH)
extra_args appended before the prompt
supported_models override the advertised model list
model_args map plexum id → CLI --model value
No api_key field — codex CLI handles auth via ~/.codex/ state
(ChatGPT login OR OPENAI_API_KEY env).
End-to-end verified against local install (ChatGPT login):
1. CLI turn 1: "OpenAI built me."
2. CLI turn 2 (resume): "I said: 'OpenAI built me.'" (multi-turn ✓)
3. Fabric channel e2e: alice → cx agent → codex CLI → outbound REST →
seq=23: "Use list comprehensions for concise, readable filtering and
transformation of iterables."
Known: when daemon runs under systemd PATH doesn't include nvm/local
bin dirs; operator must set absolute binary path in config.json.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
309 lines
7.9 KiB
Go
309 lines
7.9 KiB
Go
// Package runner forks the OpenAI `codex` CLI per Plexum turn and
|
|
// translates its JSONL output into canonical TurnEvents.
|
|
//
|
|
// Each Stream call spawns one subprocess:
|
|
//
|
|
// codex exec --skip-git-repo-check --dangerously-bypass-approvals-and-sandbox --json [<prompt>]
|
|
// < /dev/null
|
|
//
|
|
// For multi-turn continuity:
|
|
//
|
|
// codex exec resume <thread_id> --skip-git-repo-check ... --json <prompt>
|
|
//
|
|
// in the agent's workspace dir. Codex emits a stream of JSONL events:
|
|
// {"type":"thread.started","thread_id":"..."}
|
|
// {"type":"turn.started"}
|
|
// {"type":"item.completed","item":{"id":"...","type":"agent_message","text":"..."}}
|
|
// {"type":"turn.completed","usage":{"input_tokens":..., ...}}
|
|
//
|
|
// Session continuity: per-workspace thread_id stored at
|
|
// <workspace>/.plexum-codex-session, used as the resume arg.
|
|
package runner
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
|
|
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
|
|
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
|
)
|
|
|
|
// SessionFile is the per-workspace filename storing the codex
|
|
// thread_id between turns.
|
|
const SessionFile = ".plexum-codex-session"
|
|
|
|
// Run is the ProviderPlugin Stream implementation.
|
|
func Run(
|
|
ctx context.Context,
|
|
host plugin.HostAPI,
|
|
agent plugin.AgentContext,
|
|
model string,
|
|
binary string,
|
|
extraArgs []string,
|
|
req canonical.TurnRequest,
|
|
) (<-chan canonical.TurnEvent, error) {
|
|
workspace := agent.Workspace
|
|
if workspace == "" {
|
|
return nil, errors.New("codex: agent workspace required (host must supply AgentContext)")
|
|
}
|
|
if err := os.MkdirAll(workspace, 0o755); err != nil {
|
|
return nil, fmt.Errorf("mkdir workspace: %w", err)
|
|
}
|
|
|
|
prompt := extractLastUserText(req.Messages)
|
|
if prompt == "" {
|
|
return nil, errors.New("codex: no user text in request messages")
|
|
}
|
|
|
|
resumeID := loadSessionID(workspace)
|
|
|
|
// Build the argv. `codex exec [resume <id>]` then flags then prompt.
|
|
args := []string{"exec"}
|
|
if resumeID != "" {
|
|
args = append(args, "resume", resumeID)
|
|
}
|
|
args = append(args,
|
|
"--skip-git-repo-check",
|
|
"--dangerously-bypass-approvals-and-sandbox",
|
|
"--json",
|
|
)
|
|
if model != "" {
|
|
args = append(args, "--model", model)
|
|
}
|
|
args = append(args, extraArgs...)
|
|
// Prompt is the last positional. Codex's exec/resume both accept
|
|
// it positionally; we pass via arg to avoid the stdin path.
|
|
args = append(args, prompt)
|
|
|
|
host.Log("info", "codex: spawning CLI", map[string]any{
|
|
"workspace": workspace, "model": model, "resume": resumeID,
|
|
"prompt_len": len(prompt),
|
|
})
|
|
|
|
cmd := exec.CommandContext(ctx, binary, args...)
|
|
cmd.Dir = workspace
|
|
cmd.Env = os.Environ()
|
|
cmd.Stdin = nil // detach from inherited stdin (codex would otherwise wait)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("stdout pipe: %w", err)
|
|
}
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("stderr pipe: %w", err)
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, fmt.Errorf("codex: start: %w", err)
|
|
}
|
|
|
|
out := make(chan canonical.TurnEvent, 16)
|
|
go pumpEvents(ctx, host, cmd, stdout, stderr, workspace, out)
|
|
return out, nil
|
|
}
|
|
|
|
// pumpEvents drains stdout (JSONL), translates each line to a
|
|
// canonical TurnEvent, and closes the channel on subprocess exit.
|
|
func pumpEvents(
|
|
ctx context.Context,
|
|
host plugin.HostAPI,
|
|
cmd *exec.Cmd,
|
|
stdout io.Reader,
|
|
stderr io.Reader,
|
|
workspace string,
|
|
out chan<- canonical.TurnEvent,
|
|
) {
|
|
defer close(out)
|
|
|
|
// Drain stderr in a side goroutine — codex chatters informational
|
|
// stuff there ("Reading additional input from stdin..." etc.).
|
|
go func() {
|
|
sc := bufio.NewScanner(stderr)
|
|
for sc.Scan() {
|
|
host.Log("debug", "codex stderr: "+sc.Text(), nil)
|
|
}
|
|
}()
|
|
|
|
emit := func(ev canonical.TurnEvent) {
|
|
select {
|
|
case out <- ev:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
emit(canonical.TurnEvent{Type: canonical.EventMessageStart})
|
|
|
|
scanner := bufio.NewScanner(stdout)
|
|
scanner.Buffer(make([]byte, 64*1024), 4*1024*1024)
|
|
|
|
var (
|
|
usage canonical.Usage
|
|
hadError bool
|
|
errMsg string
|
|
)
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var head struct {
|
|
Type string `json:"type"`
|
|
}
|
|
if err := json.Unmarshal(line, &head); err != nil {
|
|
host.Log("warn", "codex: unparseable JSONL line",
|
|
map[string]any{"err": err.Error(), "line": string(line)})
|
|
continue
|
|
}
|
|
|
|
switch head.Type {
|
|
case "thread.started":
|
|
var ev struct {
|
|
ThreadID string `json:"thread_id"`
|
|
}
|
|
_ = json.Unmarshal(line, &ev)
|
|
if ev.ThreadID != "" {
|
|
saveSessionID(workspace, ev.ThreadID, host)
|
|
}
|
|
|
|
case "turn.started":
|
|
// nothing user-facing
|
|
|
|
case "item.started":
|
|
// codex announces an item; we wait for completed.
|
|
|
|
case "item.completed":
|
|
var ev struct {
|
|
Item struct {
|
|
ID string `json:"id"`
|
|
Type string `json:"type"`
|
|
Text string `json:"text"`
|
|
} `json:"item"`
|
|
}
|
|
if err := json.Unmarshal(line, &ev); err != nil {
|
|
continue
|
|
}
|
|
// "agent_message" is the assistant's text reply.
|
|
// Other item types (reasoning_summary, tool_use, etc.)
|
|
// surface as logs for now.
|
|
if ev.Item.Type == "agent_message" && ev.Item.Text != "" {
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventTextDelta, Text: ev.Item.Text,
|
|
})
|
|
} else if ev.Item.Type != "" {
|
|
host.Log("debug", "codex item",
|
|
map[string]any{"type": ev.Item.Type, "id": ev.Item.ID})
|
|
}
|
|
|
|
case "turn.completed":
|
|
var ev struct {
|
|
Usage struct {
|
|
InputTokens int `json:"input_tokens"`
|
|
CachedInputTokens int `json:"cached_input_tokens"`
|
|
OutputTokens int `json:"output_tokens"`
|
|
ReasoningTokens int `json:"reasoning_output_tokens"`
|
|
} `json:"usage"`
|
|
}
|
|
if err := json.Unmarshal(line, &ev); err == nil {
|
|
usage.InputTokens = ev.Usage.InputTokens
|
|
usage.OutputTokens = ev.Usage.OutputTokens
|
|
usage.CacheReadTokens = ev.Usage.CachedInputTokens
|
|
usage.ThinkingTokens = ev.Usage.ReasoningTokens
|
|
}
|
|
|
|
case "error", "turn.failed":
|
|
var ev struct {
|
|
Message string `json:"message"`
|
|
Error struct {
|
|
Message string `json:"message"`
|
|
} `json:"error"`
|
|
}
|
|
_ = json.Unmarshal(line, &ev)
|
|
hadError = true
|
|
errMsg = ev.Message
|
|
if errMsg == "" {
|
|
errMsg = ev.Error.Message
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventError,
|
|
Error: &canonical.ErrorInfo{Code: head.Type, Message: errMsg},
|
|
})
|
|
|
|
default:
|
|
host.Log("debug", "codex unknown event",
|
|
map[string]any{"type": head.Type, "line": truncate(string(line), 200)})
|
|
}
|
|
}
|
|
if serr := scanner.Err(); serr != nil {
|
|
host.Log("warn", "codex scanner", map[string]any{"err": serr.Error()})
|
|
}
|
|
|
|
werr := cmd.Wait()
|
|
stopReason := canonical.StopEndTurn
|
|
if hadError || werr != nil {
|
|
stopReason = canonical.StopReason("error")
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventMessageEnd, StopReason: stopReason, Usage: &usage,
|
|
})
|
|
}
|
|
|
|
// ---- helpers ----
|
|
|
|
func extractLastUserText(msgs []canonical.Message) string {
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
m := msgs[i]
|
|
if m.Role != canonical.RoleUser {
|
|
continue
|
|
}
|
|
var parts []string
|
|
for _, b := range m.Content {
|
|
if t, ok := b.(*canonical.TextBlock); ok {
|
|
parts = append(parts, t.Text)
|
|
}
|
|
}
|
|
if len(parts) > 0 {
|
|
out := parts[0]
|
|
for _, p := range parts[1:] {
|
|
out += "\n" + p
|
|
}
|
|
return out
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func loadSessionID(workspace string) string {
|
|
raw, err := os.ReadFile(filepath.Join(workspace, SessionFile))
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
// Trim possible trailing newline.
|
|
s := string(raw)
|
|
for len(s) > 0 && (s[len(s)-1] == '\n' || s[len(s)-1] == '\r' || s[len(s)-1] == ' ') {
|
|
s = s[:len(s)-1]
|
|
}
|
|
return s
|
|
}
|
|
|
|
func saveSessionID(workspace, id string, host plugin.HostAPI) {
|
|
path := filepath.Join(workspace, SessionFile)
|
|
if err := os.WriteFile(path, []byte(id), 0o600); err != nil {
|
|
host.Log("warn", "codex: save session id failed",
|
|
map[string]any{"err": err.Error(), "path": path})
|
|
}
|
|
}
|
|
|
|
func truncate(s string, n int) string {
|
|
if len(s) <= n {
|
|
return s
|
|
}
|
|
return s[:n] + "…"
|
|
}
|