Same architecture as the codex sister change:
1. mcp-host subcommand routes argv[1] to mcpbridge.Run, reading
PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID from gemini-cli's
`gemini mcp add --env` env baking.
2. EnsureGeminiMCPRegistered handles per-agent stable registration
under `-s user --trust` so gemini auto-approves the tool surface.
3. Post-turn ParseGeminiToolCalls + EmitGeminiToolCalls scans the
chat JSONL at ~/.gemini/tmp/<ws>/chats/session-*.jsonl. Pairs
gemini's nested `toolCalls[].id` with the matching
`functionResponse.id` from later user lines.
4. MutateGeminiSession rewrites the chat JSONL: toolCalls[].args
→ {} (heavy), functionResponse.response.output → marker.
E2E verified: gemini call exec via plexum-host → dynamic.jsonl
records the call → dynamic-tool-clear consumes → gemini session
mirrored → resume gemini sees consumed marker not the original.
282 lines
7.8 KiB
Go
282 lines
7.8 KiB
Go
// Package runner forks the `gemini` CLI per Plexum turn and translates
|
|
// its JSON output into canonical TurnEvents.
|
|
//
|
|
// Each Stream call spawns one subprocess:
|
|
//
|
|
// gemini --skip-trust --output-format json [--model <m>] [--resume <sid>] -p <prompt>
|
|
//
|
|
// in the agent's workspace dir. The CLI emits a single JSON blob at
|
|
// exit; we parse it, capture session_id (for next-turn --resume), and
|
|
// emit synthetic message_start + text_delta + message_end events to
|
|
// match Plexum's streaming agentic-loop expectations.
|
|
//
|
|
// Session continuity: per-workspace UUID stored at
|
|
// <workspace>/.plexum-gemini-session, used as --resume on subsequent
|
|
// turns. Mirrors the contractor pattern in Plexum-anthropic-provider.
|
|
package runner
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
|
|
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
|
|
"git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge"
|
|
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
|
)
|
|
|
|
// SessionFile is the per-workspace filename storing the captured
|
|
// session id between turns.
|
|
const SessionFile = ".plexum-gemini-session"
|
|
|
|
// Run is the ProviderPlugin Stream implementation. It forks gemini,
|
|
// waits for completion, and pushes events on the returned channel.
|
|
// Channel closes when the subprocess exits (clean or error).
|
|
func Run(
|
|
ctx context.Context,
|
|
host plugin.HostAPI,
|
|
agent plugin.AgentContext,
|
|
model string,
|
|
binary string,
|
|
extraArgs []string,
|
|
req canonical.TurnRequest,
|
|
) (<-chan canonical.TurnEvent, error) {
|
|
workspace := agent.Workspace
|
|
if workspace == "" {
|
|
return nil, errors.New("gemini: agent workspace required (host must supply AgentContext)")
|
|
}
|
|
if err := os.MkdirAll(workspace, 0o755); err != nil {
|
|
return nil, fmt.Errorf("mkdir workspace: %w", err)
|
|
}
|
|
|
|
prompt := extractLastUserText(req.Messages)
|
|
if prompt == "" {
|
|
return nil, errors.New("gemini: no user text in request messages")
|
|
}
|
|
|
|
// Per-turn MCP bridge so gemini can call Plexum host tools the
|
|
// host advertised in req.Tools. Best-effort.
|
|
var br *mcpbridge.Bridge
|
|
if len(req.Tools) > 0 {
|
|
exe, err := os.Executable()
|
|
if err == nil {
|
|
if _, regErr := EnsureGeminiMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil {
|
|
host.Log("warn", "gemini: mcp registration failed", map[string]any{"err": regErr.Error()})
|
|
} else {
|
|
b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID))
|
|
if err != nil {
|
|
host.Log("warn", "gemini: bridge setup failed", map[string]any{"err": err.Error()})
|
|
} else {
|
|
br = b
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
resumeID := loadSessionID(workspace)
|
|
|
|
args := []string{"--skip-trust", "--output-format", "json"}
|
|
if model != "" {
|
|
args = append(args, "--model", model)
|
|
}
|
|
if resumeID != "" {
|
|
args = append(args, "--resume", resumeID)
|
|
}
|
|
args = append(args, extraArgs...)
|
|
args = append(args, "-p", prompt)
|
|
|
|
host.Log("info", "gemini: spawning CLI", map[string]any{
|
|
"workspace": workspace, "model": model, "resume": resumeID,
|
|
"prompt_len": len(prompt),
|
|
})
|
|
|
|
cmd := exec.CommandContext(ctx, binary, args...)
|
|
cmd.Dir = workspace
|
|
cmd.Env = os.Environ()
|
|
|
|
var stdoutBuf, stderrBuf bytes.Buffer
|
|
cmd.Stdout = &stdoutBuf
|
|
cmd.Stderr = &stderrBuf
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, fmt.Errorf("gemini: start: %w", err)
|
|
}
|
|
|
|
out := make(chan canonical.TurnEvent, 8)
|
|
go func() {
|
|
defer close(out)
|
|
err := cmd.Wait()
|
|
emit := func(ev canonical.TurnEvent) {
|
|
select {
|
|
case out <- ev:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
emit(canonical.TurnEvent{Type: canonical.EventMessageStart})
|
|
|
|
// stderr is informational; log it but don't treat as fatal
|
|
// (gemini chatters about Ripgrep etc. on stderr).
|
|
if stderrBuf.Len() > 0 {
|
|
host.Log("debug", "gemini stderr",
|
|
map[string]any{"text": stderrBuf.String()})
|
|
}
|
|
|
|
// Parse the JSON output. gemini may prepend a "Ripgrep is not
|
|
// available." line BEFORE the JSON; skip leading non-{ bytes.
|
|
raw := stripPreamble(stdoutBuf.Bytes())
|
|
var resp struct {
|
|
SessionID string `json:"session_id"`
|
|
Response string `json:"response"`
|
|
Stats struct {
|
|
Models map[string]struct {
|
|
Tokens struct {
|
|
Input int `json:"input"`
|
|
Total int `json:"total"`
|
|
Cached int `json:"cached"`
|
|
} `json:"tokens"`
|
|
} `json:"models"`
|
|
} `json:"stats"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
if perr := json.Unmarshal(raw, &resp); perr != nil {
|
|
// Subprocess may have died before producing JSON.
|
|
msg := stderrBuf.String()
|
|
if err != nil {
|
|
msg = fmt.Sprintf("%s (wait err: %v)", msg, err)
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventError,
|
|
Error: &canonical.ErrorInfo{
|
|
Code: "gemini_unparsable",
|
|
Message: fmt.Sprintf("gemini output not JSON: %v\nstdout=%q\nstderr=%q",
|
|
perr, truncate(string(raw), 200), truncate(msg, 400)),
|
|
},
|
|
})
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventMessageEnd, StopReason: "error",
|
|
})
|
|
return
|
|
}
|
|
if resp.SessionID != "" {
|
|
saveSessionID(workspace, resp.SessionID, host)
|
|
}
|
|
if resp.Error != "" {
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventError,
|
|
Error: &canonical.ErrorInfo{Code: "gemini_error", Message: resp.Error},
|
|
})
|
|
}
|
|
if resp.Response != "" {
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventTextDelta, Text: resp.Response,
|
|
})
|
|
}
|
|
|
|
// Post-turn: scan gemini's chat JSONL for tool calls and emit
|
|
// them as canonical events so dynamic.jsonl captures the real
|
|
// ids (same ones SessionMutator will rewrite on consume).
|
|
if sessionPath, perr := FindGeminiSessionFile(workspace); perr == nil {
|
|
calls, perr := ParseGeminiToolCalls(sessionPath)
|
|
if perr != nil {
|
|
host.Log("warn", "gemini: parse session failed",
|
|
map[string]any{"err": perr.Error(), "path": sessionPath})
|
|
} else {
|
|
EmitGeminiToolCalls(calls, emit)
|
|
host.Log("debug", "gemini: emitted tool calls from session",
|
|
map[string]any{"count": len(calls), "path": sessionPath})
|
|
}
|
|
}
|
|
|
|
var usage canonical.Usage
|
|
for _, m := range resp.Stats.Models {
|
|
usage.InputTokens += m.Tokens.Input
|
|
usage.OutputTokens += m.Tokens.Total - m.Tokens.Input
|
|
usage.CacheReadTokens += m.Tokens.Cached
|
|
}
|
|
|
|
stopReason := canonical.StopEndTurn
|
|
if err != nil {
|
|
stopReason = "error"
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventMessageEnd,
|
|
StopReason: stopReason,
|
|
Usage: &usage,
|
|
})
|
|
br.Close() // nil-safe
|
|
}()
|
|
return out, nil
|
|
}
|
|
|
|
// ---- helpers ----
|
|
|
|
func extractLastUserText(msgs []canonical.Message) string {
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
m := msgs[i]
|
|
if m.Role != canonical.RoleUser {
|
|
continue
|
|
}
|
|
var parts []string
|
|
for _, b := range m.Content {
|
|
if t, ok := b.(*canonical.TextBlock); ok {
|
|
parts = append(parts, t.Text)
|
|
}
|
|
}
|
|
if len(parts) > 0 {
|
|
out := parts[0]
|
|
for _, p := range parts[1:] {
|
|
out += "\n" + p
|
|
}
|
|
return out
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func loadSessionID(workspace string) string {
|
|
raw, err := os.ReadFile(filepath.Join(workspace, SessionFile))
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return string(bytes.TrimSpace(raw))
|
|
}
|
|
|
|
func saveSessionID(workspace, id string, host plugin.HostAPI) {
|
|
if id == "" {
|
|
return
|
|
}
|
|
path := filepath.Join(workspace, SessionFile)
|
|
tmp := path + ".tmp"
|
|
if err := os.WriteFile(tmp, []byte(id), 0o600); err != nil {
|
|
host.Log("warn", "gemini: save session id failed",
|
|
map[string]any{"err": err.Error(), "path": tmp})
|
|
return
|
|
}
|
|
if err := os.Rename(tmp, path); err != nil {
|
|
host.Log("warn", "gemini: rename session id failed",
|
|
map[string]any{"err": err.Error(), "path": path})
|
|
}
|
|
}
|
|
|
|
// stripPreamble drops any non-`{` bytes before the first `{` so the
|
|
// "Ripgrep is not available" line gemini sometimes prepends doesn't
|
|
// trip json.Unmarshal.
|
|
func stripPreamble(b []byte) []byte {
|
|
i := bytes.IndexByte(b, '{')
|
|
if i <= 0 {
|
|
return b
|
|
}
|
|
return b[i:]
|
|
}
|
|
|
|
func truncate(s string, n int) string {
|
|
if len(s) <= n {
|
|
return s
|
|
}
|
|
return s[:n] + "…"
|
|
}
|