Plexum ProviderPlugin that wraps the local `gemini` CLI binary
(Google Gemini CLI ≥0.37). Same CLI-driven approach as
Plexum-anthropic-provider's contractor mode: each Plexum turn forks
one `gemini -p` subprocess in the agent's workspace dir.
Reference: openclaw/extensions/google/cli-backend.ts.
internal/runner/ (~190 LOC):
- Run() spawns: gemini --skip-trust --output-format json [--model X]
[--resume <sid>] -p <prompt> in agent workspace
- Parses the single JSON blob gemini emits at exit
({"session_id":"...", "response":"...", "stats":{...}})
- session_id persisted at <workspace>/.plexum-gemini-session so the
next turn passes --resume — multi-turn context continuity through
the CLI's own session state
- Emits synthetic message_start + text_delta + message_end events to
match Plexum's streaming agentic-loop contract
- Tolerates the "Ripgrep is not available." stderr preamble (strips
leading non-{ bytes before json.Unmarshal)
cmd/plexum-gemini-provider-plugin/ implements ProviderPluginWithAgent
(receives AgentContext.Workspace from the host).
Models advertised in provider.models:
gemini (no --model flag; CLI default = flash-preview)
gemini-pro → CLI alias "pro" → gemini-3.1-pro-preview
gemini-flash → CLI alias "flash" → gemini-3.1-flash-preview
gemini-flash-lite → CLI alias "flash-lite" → gemini-3.1-flash-lite-preview
Unrecognized model id passes through as --model <id> so operator can
pin a specific gemini-3.x id.
HostConfig (all optional):
binary (default "gemini" — operator should set absolute path
if running under systemd; PATH won't include nvm dirs)
extra_args (appended before -p)
No api_key field — gemini CLI handles auth via ~/.gemini/ state
(OAuth or GEMINI_API_KEY env).
End-to-end verified against local install:
1. CLI embedded turn 1: "Hi, I'm Gemini, your autonomous CLI
agent for software engineering tasks."
2. CLI embedded turn 2: "I said hi as Gemini." (multi-turn ✓ via
--resume)
3. Gateway socket: {"outcome":"text","text":"pong"}
4. Fabric channel e2e: alice → bt2-clean → gem agent → gemini
CLI → outbound REST → seq=19:
"A shrimp's heart is located in its head,
which is quite an unusual biological
arrangement."
Known: when daemon runs under systemd, PATH doesn't include nvm
dirs by default. Operator must set the absolute binary path in
config.json (README notes this).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
237 lines
6.2 KiB
Go
237 lines
6.2 KiB
Go
// Package runner forks the `gemini` CLI per Plexum turn and translates
|
|
// its JSON output into canonical TurnEvents.
|
|
//
|
|
// Each Stream call spawns one subprocess:
|
|
//
|
|
// gemini --skip-trust --output-format json [--model <m>] [--resume <sid>] -p <prompt>
|
|
//
|
|
// in the agent's workspace dir. The CLI emits a single JSON blob at
|
|
// exit; we parse it, capture session_id (for next-turn --resume), and
|
|
// emit synthetic message_start + text_delta + message_end events to
|
|
// match Plexum's streaming agentic-loop expectations.
|
|
//
|
|
// Session continuity: per-workspace UUID stored at
|
|
// <workspace>/.plexum-gemini-session, used as --resume on subsequent
|
|
// turns. Mirrors the contractor pattern in Plexum-anthropic-provider.
|
|
package runner
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
|
|
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
|
|
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
|
)
|
|
|
|
// SessionFile is the per-workspace filename storing the captured
|
|
// session id between turns.
|
|
const SessionFile = ".plexum-gemini-session"
|
|
|
|
// Run is the ProviderPlugin Stream implementation. It forks gemini,
|
|
// waits for completion, and pushes events on the returned channel.
|
|
// Channel closes when the subprocess exits (clean or error).
|
|
func Run(
|
|
ctx context.Context,
|
|
host plugin.HostAPI,
|
|
agent plugin.AgentContext,
|
|
model string,
|
|
binary string,
|
|
extraArgs []string,
|
|
req canonical.TurnRequest,
|
|
) (<-chan canonical.TurnEvent, error) {
|
|
workspace := agent.Workspace
|
|
if workspace == "" {
|
|
return nil, errors.New("gemini: agent workspace required (host must supply AgentContext)")
|
|
}
|
|
if err := os.MkdirAll(workspace, 0o755); err != nil {
|
|
return nil, fmt.Errorf("mkdir workspace: %w", err)
|
|
}
|
|
|
|
prompt := extractLastUserText(req.Messages)
|
|
if prompt == "" {
|
|
return nil, errors.New("gemini: no user text in request messages")
|
|
}
|
|
|
|
resumeID := loadSessionID(workspace)
|
|
|
|
args := []string{"--skip-trust", "--output-format", "json"}
|
|
if model != "" {
|
|
args = append(args, "--model", model)
|
|
}
|
|
if resumeID != "" {
|
|
args = append(args, "--resume", resumeID)
|
|
}
|
|
args = append(args, extraArgs...)
|
|
args = append(args, "-p", prompt)
|
|
|
|
host.Log("info", "gemini: spawning CLI", map[string]any{
|
|
"workspace": workspace, "model": model, "resume": resumeID,
|
|
"prompt_len": len(prompt),
|
|
})
|
|
|
|
cmd := exec.CommandContext(ctx, binary, args...)
|
|
cmd.Dir = workspace
|
|
cmd.Env = os.Environ()
|
|
|
|
var stdoutBuf, stderrBuf bytes.Buffer
|
|
cmd.Stdout = &stdoutBuf
|
|
cmd.Stderr = &stderrBuf
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, fmt.Errorf("gemini: start: %w", err)
|
|
}
|
|
|
|
out := make(chan canonical.TurnEvent, 8)
|
|
go func() {
|
|
defer close(out)
|
|
err := cmd.Wait()
|
|
emit := func(ev canonical.TurnEvent) {
|
|
select {
|
|
case out <- ev:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
emit(canonical.TurnEvent{Type: canonical.EventMessageStart})
|
|
|
|
// stderr is informational; log it but don't treat as fatal
|
|
// (gemini chatters about Ripgrep etc. on stderr).
|
|
if stderrBuf.Len() > 0 {
|
|
host.Log("debug", "gemini stderr",
|
|
map[string]any{"text": stderrBuf.String()})
|
|
}
|
|
|
|
// Parse the JSON output. gemini may prepend a "Ripgrep is not
|
|
// available." line BEFORE the JSON; skip leading non-{ bytes.
|
|
raw := stripPreamble(stdoutBuf.Bytes())
|
|
var resp struct {
|
|
SessionID string `json:"session_id"`
|
|
Response string `json:"response"`
|
|
Stats struct {
|
|
Models map[string]struct {
|
|
Tokens struct {
|
|
Input int `json:"input"`
|
|
Total int `json:"total"`
|
|
Cached int `json:"cached"`
|
|
} `json:"tokens"`
|
|
} `json:"models"`
|
|
} `json:"stats"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
if perr := json.Unmarshal(raw, &resp); perr != nil {
|
|
// Subprocess may have died before producing JSON.
|
|
msg := stderrBuf.String()
|
|
if err != nil {
|
|
msg = fmt.Sprintf("%s (wait err: %v)", msg, err)
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventError,
|
|
Error: &canonical.ErrorInfo{
|
|
Code: "gemini_unparsable",
|
|
Message: fmt.Sprintf("gemini output not JSON: %v\nstdout=%q\nstderr=%q",
|
|
perr, truncate(string(raw), 200), truncate(msg, 400)),
|
|
},
|
|
})
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventMessageEnd, StopReason: "error",
|
|
})
|
|
return
|
|
}
|
|
if resp.SessionID != "" {
|
|
saveSessionID(workspace, resp.SessionID, host)
|
|
}
|
|
if resp.Error != "" {
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventError,
|
|
Error: &canonical.ErrorInfo{Code: "gemini_error", Message: resp.Error},
|
|
})
|
|
}
|
|
if resp.Response != "" {
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventTextDelta, Text: resp.Response,
|
|
})
|
|
}
|
|
|
|
var usage canonical.Usage
|
|
for _, m := range resp.Stats.Models {
|
|
usage.InputTokens += m.Tokens.Input
|
|
usage.OutputTokens += m.Tokens.Total - m.Tokens.Input
|
|
usage.CacheReadTokens += m.Tokens.Cached
|
|
}
|
|
|
|
stopReason := canonical.StopEndTurn
|
|
if err != nil {
|
|
stopReason = "error"
|
|
}
|
|
emit(canonical.TurnEvent{
|
|
Type: canonical.EventMessageEnd,
|
|
StopReason: stopReason,
|
|
Usage: &usage,
|
|
})
|
|
}()
|
|
return out, nil
|
|
}
|
|
|
|
// ---- helpers ----
|
|
|
|
func extractLastUserText(msgs []canonical.Message) string {
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
m := msgs[i]
|
|
if m.Role != canonical.RoleUser {
|
|
continue
|
|
}
|
|
var parts []string
|
|
for _, b := range m.Content {
|
|
if t, ok := b.(*canonical.TextBlock); ok {
|
|
parts = append(parts, t.Text)
|
|
}
|
|
}
|
|
if len(parts) > 0 {
|
|
out := parts[0]
|
|
for _, p := range parts[1:] {
|
|
out += "\n" + p
|
|
}
|
|
return out
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func loadSessionID(workspace string) string {
|
|
raw, err := os.ReadFile(filepath.Join(workspace, SessionFile))
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return string(bytes.TrimSpace(raw))
|
|
}
|
|
|
|
func saveSessionID(workspace, id string, host plugin.HostAPI) {
|
|
path := filepath.Join(workspace, SessionFile)
|
|
if err := os.WriteFile(path, []byte(id), 0o600); err != nil {
|
|
host.Log("warn", "gemini: save session id failed",
|
|
map[string]any{"err": err.Error(), "path": path})
|
|
}
|
|
}
|
|
|
|
// stripPreamble drops any non-`{` bytes before the first `{` so the
|
|
// "Ripgrep is not available" line gemini sometimes prepends doesn't
|
|
// trip json.Unmarshal.
|
|
func stripPreamble(b []byte) []byte {
|
|
i := bytes.IndexByte(b, '{')
|
|
if i <= 0 {
|
|
return b
|
|
}
|
|
return b[i:]
|
|
}
|
|
|
|
func truncate(s string, n int) string {
|
|
if len(s) <= n {
|
|
return s
|
|
}
|
|
return s[:n] + "…"
|
|
}
|