Files
Plexum-gemini-provider/internal/runner/runner.go
hzhang a63f477463 feat(gemini): plexum-host MCP exposure + real consume mirror
Same architecture as the codex sister change:

1. mcp-host subcommand routes argv[1] to mcpbridge.Run, reading
   PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID from gemini-cli's
   `gemini mcp add --env` env baking.

2. EnsureGeminiMCPRegistered handles per-agent stable registration
   under `-s user --trust` so gemini auto-approves the tool surface.

3. Post-turn ParseGeminiToolCalls + EmitGeminiToolCalls scans the
   chat JSONL at ~/.gemini/tmp/<ws>/chats/session-*.jsonl. Pairs
   gemini's nested `toolCalls[].id` with the matching
   `functionResponse.id` from later user lines.

4. MutateGeminiSession rewrites the chat JSONL: toolCalls[].args
   → {} (heavy), functionResponse.response.output → marker.

E2E verified: gemini call exec via plexum-host → dynamic.jsonl
records the call → dynamic-tool-clear consumes → gemini session
mirrored → resume gemini sees consumed marker not the original.
2026-06-01 20:14:13 +01:00

282 lines
7.8 KiB
Go

// Package runner forks the `gemini` CLI per Plexum turn and translates
// its JSON output into canonical TurnEvents.
//
// Each Stream call spawns one subprocess:
//
// gemini --skip-trust --output-format json [--model <m>] [--resume <sid>] -p <prompt>
//
// in the agent's workspace dir. The CLI emits a single JSON blob at
// exit; we parse it, capture session_id (for next-turn --resume), and
// emit synthetic message_start + text_delta + message_end events to
// match Plexum's streaming agentic-loop expectations.
//
// Session continuity: per-workspace UUID stored at
// <workspace>/.plexum-gemini-session, used as --resume on subsequent
// turns. Mirrors the contractor pattern in Plexum-anthropic-provider.
package runner
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge"
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
)
// SessionFile is the per-workspace filename storing the captured
// session id between turns.
const SessionFile = ".plexum-gemini-session"
// Run is the ProviderPlugin Stream implementation. It forks gemini,
// waits for completion, and pushes events on the returned channel.
// Channel closes when the subprocess exits (clean or error).
func Run(
ctx context.Context,
host plugin.HostAPI,
agent plugin.AgentContext,
model string,
binary string,
extraArgs []string,
req canonical.TurnRequest,
) (<-chan canonical.TurnEvent, error) {
workspace := agent.Workspace
if workspace == "" {
return nil, errors.New("gemini: agent workspace required (host must supply AgentContext)")
}
if err := os.MkdirAll(workspace, 0o755); err != nil {
return nil, fmt.Errorf("mkdir workspace: %w", err)
}
prompt := extractLastUserText(req.Messages)
if prompt == "" {
return nil, errors.New("gemini: no user text in request messages")
}
// Per-turn MCP bridge so gemini can call Plexum host tools the
// host advertised in req.Tools. Best-effort.
var br *mcpbridge.Bridge
if len(req.Tools) > 0 {
exe, err := os.Executable()
if err == nil {
if _, regErr := EnsureGeminiMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil {
host.Log("warn", "gemini: mcp registration failed", map[string]any{"err": regErr.Error()})
} else {
b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID))
if err != nil {
host.Log("warn", "gemini: bridge setup failed", map[string]any{"err": err.Error()})
} else {
br = b
}
}
}
}
resumeID := loadSessionID(workspace)
args := []string{"--skip-trust", "--output-format", "json"}
if model != "" {
args = append(args, "--model", model)
}
if resumeID != "" {
args = append(args, "--resume", resumeID)
}
args = append(args, extraArgs...)
args = append(args, "-p", prompt)
host.Log("info", "gemini: spawning CLI", map[string]any{
"workspace": workspace, "model": model, "resume": resumeID,
"prompt_len": len(prompt),
})
cmd := exec.CommandContext(ctx, binary, args...)
cmd.Dir = workspace
cmd.Env = os.Environ()
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("gemini: start: %w", err)
}
out := make(chan canonical.TurnEvent, 8)
go func() {
defer close(out)
err := cmd.Wait()
emit := func(ev canonical.TurnEvent) {
select {
case out <- ev:
case <-ctx.Done():
}
}
emit(canonical.TurnEvent{Type: canonical.EventMessageStart})
// stderr is informational; log it but don't treat as fatal
// (gemini chatters about Ripgrep etc. on stderr).
if stderrBuf.Len() > 0 {
host.Log("debug", "gemini stderr",
map[string]any{"text": stderrBuf.String()})
}
// Parse the JSON output. gemini may prepend a "Ripgrep is not
// available." line BEFORE the JSON; skip leading non-{ bytes.
raw := stripPreamble(stdoutBuf.Bytes())
var resp struct {
SessionID string `json:"session_id"`
Response string `json:"response"`
Stats struct {
Models map[string]struct {
Tokens struct {
Input int `json:"input"`
Total int `json:"total"`
Cached int `json:"cached"`
} `json:"tokens"`
} `json:"models"`
} `json:"stats"`
Error string `json:"error,omitempty"`
}
if perr := json.Unmarshal(raw, &resp); perr != nil {
// Subprocess may have died before producing JSON.
msg := stderrBuf.String()
if err != nil {
msg = fmt.Sprintf("%s (wait err: %v)", msg, err)
}
emit(canonical.TurnEvent{
Type: canonical.EventError,
Error: &canonical.ErrorInfo{
Code: "gemini_unparsable",
Message: fmt.Sprintf("gemini output not JSON: %v\nstdout=%q\nstderr=%q",
perr, truncate(string(raw), 200), truncate(msg, 400)),
},
})
emit(canonical.TurnEvent{
Type: canonical.EventMessageEnd, StopReason: "error",
})
return
}
if resp.SessionID != "" {
saveSessionID(workspace, resp.SessionID, host)
}
if resp.Error != "" {
emit(canonical.TurnEvent{
Type: canonical.EventError,
Error: &canonical.ErrorInfo{Code: "gemini_error", Message: resp.Error},
})
}
if resp.Response != "" {
emit(canonical.TurnEvent{
Type: canonical.EventTextDelta, Text: resp.Response,
})
}
// Post-turn: scan gemini's chat JSONL for tool calls and emit
// them as canonical events so dynamic.jsonl captures the real
// ids (same ones SessionMutator will rewrite on consume).
if sessionPath, perr := FindGeminiSessionFile(workspace); perr == nil {
calls, perr := ParseGeminiToolCalls(sessionPath)
if perr != nil {
host.Log("warn", "gemini: parse session failed",
map[string]any{"err": perr.Error(), "path": sessionPath})
} else {
EmitGeminiToolCalls(calls, emit)
host.Log("debug", "gemini: emitted tool calls from session",
map[string]any{"count": len(calls), "path": sessionPath})
}
}
var usage canonical.Usage
for _, m := range resp.Stats.Models {
usage.InputTokens += m.Tokens.Input
usage.OutputTokens += m.Tokens.Total - m.Tokens.Input
usage.CacheReadTokens += m.Tokens.Cached
}
stopReason := canonical.StopEndTurn
if err != nil {
stopReason = "error"
}
emit(canonical.TurnEvent{
Type: canonical.EventMessageEnd,
StopReason: stopReason,
Usage: &usage,
})
br.Close() // nil-safe
}()
return out, nil
}
// ---- helpers ----
func extractLastUserText(msgs []canonical.Message) string {
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
if m.Role != canonical.RoleUser {
continue
}
var parts []string
for _, b := range m.Content {
if t, ok := b.(*canonical.TextBlock); ok {
parts = append(parts, t.Text)
}
}
if len(parts) > 0 {
out := parts[0]
for _, p := range parts[1:] {
out += "\n" + p
}
return out
}
}
return ""
}
func loadSessionID(workspace string) string {
raw, err := os.ReadFile(filepath.Join(workspace, SessionFile))
if err != nil {
return ""
}
return string(bytes.TrimSpace(raw))
}
func saveSessionID(workspace, id string, host plugin.HostAPI) {
if id == "" {
return
}
path := filepath.Join(workspace, SessionFile)
tmp := path + ".tmp"
if err := os.WriteFile(tmp, []byte(id), 0o600); err != nil {
host.Log("warn", "gemini: save session id failed",
map[string]any{"err": err.Error(), "path": tmp})
return
}
if err := os.Rename(tmp, path); err != nil {
host.Log("warn", "gemini: rename session id failed",
map[string]any{"err": err.Error(), "path": path})
}
}
// stripPreamble drops any non-`{` bytes before the first `{` so the
// "Ripgrep is not available" line gemini sometimes prepends doesn't
// trip json.Unmarshal.
func stripPreamble(b []byte) []byte {
i := bytes.IndexByte(b, '{')
if i <= 0 {
return b
}
return b[i:]
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "…"
}