From a63f477463107330391d73ee2742c102971d1e4f Mon Sep 17 00:00:00 2001 From: hzhang Date: Mon, 1 Jun 2026 20:14:13 +0100 Subject: [PATCH] feat(gemini): plexum-host MCP exposure + real consume mirror MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same architecture as the codex sister change: 1. mcp-host subcommand routes argv[1] to mcpbridge.Run, reading PLEXUM_MCP_SOCKET / PLEXUM_MCP_AGENT_ID from gemini-cli's `gemini mcp add --env` env baking. 2. EnsureGeminiMCPRegistered handles per-agent stable registration under `-s user --trust` so gemini auto-approves the tool surface. 3. Post-turn ParseGeminiToolCalls + EmitGeminiToolCalls scans the chat JSONL at ~/.gemini/tmp//chats/session-*.jsonl. Pairs gemini's nested `toolCalls[].id` with the matching `functionResponse.id` from later user lines. 4. MutateGeminiSession rewrites the chat JSONL: toolCalls[].args → {} (heavy), functionResponse.response.output → marker. E2E verified: gemini call exec via plexum-host → dynamic.jsonl records the call → dynamic-tool-clear consumes → gemini session mirrored → resume gemini sees consumed marker not the original. --- cmd/plexum-gemini-provider-plugin/main.go | 76 +++++-- internal/runner/mcp_register.go | 70 +++++++ internal/runner/runner.go | 36 ++++ internal/runner/session_mutate.go | 91 --------- internal/runner/session_mutate_apply.go | 237 ++++++++++++++++++++++ internal/runner/session_parse.go | 161 +++++++++++++++ 6 files changed, 558 insertions(+), 113 deletions(-) create mode 100644 internal/runner/mcp_register.go delete mode 100644 internal/runner/session_mutate.go create mode 100644 internal/runner/session_mutate_apply.go create mode 100644 internal/runner/session_parse.go diff --git a/cmd/plexum-gemini-provider-plugin/main.go b/cmd/plexum-gemini-provider-plugin/main.go index 6634c37..0935be9 100644 --- a/cmd/plexum-gemini-provider-plugin/main.go +++ b/cmd/plexum-gemini-provider-plugin/main.go @@ -25,6 +25,7 @@ import ( "path/filepath" "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + "git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge" plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" "git.hangman-lab.top/hzhang/Plexum-gemini-provider/internal/runner" @@ -125,31 +126,29 @@ func (p *geminiPlugin) StreamWithAgent(ctx context.Context, modelID string, agen return runner.Run(ctx, p.host, agent, cliModel, p.cfg.Binary, p.cfg.ExtraArgs, req) } -// MutateSession is the session-mirror hook (decision #29x). gemini-cli -// stores transcripts under ~/.gemini/tmp//chats/ -// session--.jsonl with a `$set`-style mutation -// log (each line either a session-meta header or `{"$set":{...}}` -// applied in order). Tool-call records use gemini-cli's own id -// namespace (no shape doc as of this writing) which doesn't match -// Plexum's toolu_*-style block ids. -// -// v1 ships as a logged no-op: we resolve the file path so operator -// telemetry confirms wiring, but we don't attempt to rewrite. When -// Plexum starts routing tool-call ids through gemini's native surface -// (preserving the round-trip), the rewriter plugs in here. +// MutateSession mirrors host dynamic-* consume mutations onto the +// gemini chat JSONL at ~/.gemini/tmp//chats/session-*.jsonl so +// the next `gemini --resume` sees the consumed view. Tool-call +// round-trip is captured by runner.ParseGeminiToolCalls + +// EmitGeminiToolCalls (post-turn), so the BlockMutations we receive +// here resolve to ids actually in gemini's session. func (p *geminiPlugin) MutateSession(ctx context.Context, req plugin.SessionMutateRequest) error { - path, err := runner.FindGeminiSessionFile(req.Workspace) - if err != nil { - p.host.Log("debug", "gemini session mutate: find failed", map[string]any{ - "agent": req.AgentID, "err": err.Error(), - }) + if len(req.Mutations) == 0 { return nil } - p.host.Log("info", "gemini session mutate (no-op v1)", map[string]any{ - "agent": req.AgentID, - "session_path": path, - "requested": len(req.Mutations), - "reason": "gemini tool-call id round-trip not yet wired", + mu := make([]runner.GeminiMutation, 0, len(req.Mutations)) + for _, m := range req.Mutations { + mu = append(mu, runner.GeminiMutation{BlockID: m.BlockID, Op: m.Op}) + } + touched, err := runner.MutateGeminiSession(req.Workspace, mu) + if err != nil { + p.host.Log("warn", "gemini session mutate failed", map[string]any{ + "agent": req.AgentID, "err": err.Error(), + }) + return err + } + p.host.Log("info", "gemini session mutate complete", map[string]any{ + "agent": req.AgentID, "touched": touched, "requested": len(req.Mutations), }) return nil } @@ -169,8 +168,41 @@ func mapModel(plexumModel string) string { } func main() { + if len(os.Args) > 1 && os.Args[1] == "mcp-host" { + if err := runMCPHost(os.Args[2:]); err != nil { + fmt.Fprintf(os.Stderr, "mcp-host: %v\n", err) + os.Exit(1) + } + return + } if err := plugin.Serve(&geminiPlugin{}); err != nil { fmt.Fprintf(os.Stderr, "plexum-gemini-provider-plugin: %v\n", err) os.Exit(1) } } + +func runMCPHost(argv []string) error { + opts := mcpbridge.Opts{ + SocketPath: os.Getenv("PLEXUM_MCP_SOCKET"), + AgentID: os.Getenv("PLEXUM_MCP_AGENT_ID"), + } + for i := 0; i < len(argv); i++ { + switch argv[i] { + case "--socket": + if i+1 >= len(argv) { + return errors.New("--socket needs value") + } + opts.SocketPath = argv[i+1] + i++ + case "--agent-id": + if i+1 >= len(argv) { + return errors.New("--agent-id needs value") + } + opts.AgentID = argv[i+1] + i++ + default: + return fmt.Errorf("unknown mcp-host flag: %s", argv[i]) + } + } + return mcpbridge.Run(context.Background(), opts) +} diff --git a/internal/runner/mcp_register.go b/internal/runner/mcp_register.go new file mode 100644 index 0000000..2d9163b --- /dev/null +++ b/internal/runner/mcp_register.go @@ -0,0 +1,70 @@ +// MCP server registration. gemini's `gemini mcp add -s user` writes +// into the user's settings.json; we use a per-agent stable name so +// multiple Plexum agents using gemini don't collide. Registration is +// idempotent and performed lazily before the first turn that needs it. + +package runner + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" +) + +var registered sync.Map // agentID → bool (registration done this process) + +// EnsureGeminiMCPRegistered makes sure gemini's user settings has an +// MCP server entry for this agent named plexum-host-. +// Socket path is the stable StableSocketPath(agentID); same path the +// per-turn bridge listens on. +func EnsureGeminiMCPRegistered(ctx context.Context, geminiBinary, pluginBinary, agentID string) (string, error) { + name := geminiServerName(agentID) + if _, ok := registered.Load(name); ok { + return name, nil + } + // Remove first so we don't double-register; ignore error. + _ = exec.CommandContext(ctx, geminiBinary, "mcp", "remove", "-s", "user", name).Run() + + args := []string{ + "mcp", "add", + "-s", "user", "--trust", + "-e", "PLEXUM_MCP_SOCKET=" + StableSocketPath(agentID), + "-e", "PLEXUM_MCP_AGENT_ID=" + agentID, + name, pluginBinary, "mcp-host", + } + cmd := exec.CommandContext(ctx, geminiBinary, args...) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("gemini mcp add: %w: %s", err, strings.TrimSpace(string(out))) + } + registered.Store(name, true) + return name, nil +} + +// StableSocketPath returns the per-agent unix socket path the bridge +// listens on AND gemini's mcp-host subprocess dials into. +func StableSocketPath(agentID string) string { + return filepath.Join(os.TempDir(), "plexum-gemini-mcp-"+sanitize(agentID)+".sock") +} + +func geminiServerName(agentID string) string { + return "plexum-host-" + sanitize(agentID) +} + +func sanitize(s string) string { + out := make([]byte, 0, len(s)) + for i := 0; i < len(s); i++ { + c := s[i] + switch { + case c >= 'a' && c <= 'z', c >= 'A' && c <= 'Z', + c >= '0' && c <= '9', c == '-', c == '_': + out = append(out, c) + default: + out = append(out, '_') + } + } + return string(out) +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index ddff1c5..189072e 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -26,6 +26,7 @@ import ( "path/filepath" "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + "git.hangman-lab.top/hzhang/Plexum-sdk-go/mcpbridge" plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" ) @@ -58,6 +59,25 @@ func Run( return nil, errors.New("gemini: no user text in request messages") } + // Per-turn MCP bridge so gemini can call Plexum host tools the + // host advertised in req.Tools. Best-effort. + var br *mcpbridge.Bridge + if len(req.Tools) > 0 { + exe, err := os.Executable() + if err == nil { + if _, regErr := EnsureGeminiMCPRegistered(ctx, binary, exe, agent.AgentID); regErr != nil { + host.Log("warn", "gemini: mcp registration failed", map[string]any{"err": regErr.Error()}) + } else { + b, err := mcpbridge.SetupOnPath(ctx, host, agent.AgentID, req.Tools, StableSocketPath(agent.AgentID)) + if err != nil { + host.Log("warn", "gemini: bridge setup failed", map[string]any{"err": err.Error()}) + } else { + br = b + } + } + } + } + resumeID := loadSessionID(workspace) args := []string{"--skip-trust", "--output-format", "json"} @@ -156,6 +176,21 @@ func Run( }) } + // Post-turn: scan gemini's chat JSONL for tool calls and emit + // them as canonical events so dynamic.jsonl captures the real + // ids (same ones SessionMutator will rewrite on consume). + if sessionPath, perr := FindGeminiSessionFile(workspace); perr == nil { + calls, perr := ParseGeminiToolCalls(sessionPath) + if perr != nil { + host.Log("warn", "gemini: parse session failed", + map[string]any{"err": perr.Error(), "path": sessionPath}) + } else { + EmitGeminiToolCalls(calls, emit) + host.Log("debug", "gemini: emitted tool calls from session", + map[string]any{"count": len(calls), "path": sessionPath}) + } + } + var usage canonical.Usage for _, m := range resp.Stats.Models { usage.InputTokens += m.Tokens.Input @@ -172,6 +207,7 @@ func Run( StopReason: stopReason, Usage: &usage, }) + br.Close() // nil-safe }() return out, nil } diff --git a/internal/runner/session_mutate.go b/internal/runner/session_mutate.go deleted file mode 100644 index 7971fbf..0000000 --- a/internal/runner/session_mutate.go +++ /dev/null @@ -1,91 +0,0 @@ -// session_mutate.go — gemini session file path resolution. The actual -// JSONL rewrite is deferred (see plugin main.go MutateSession comment): -// gemini-cli uses its own tool-call id namespace that doesn't match -// Plexum's, so v1 ships as a logged no-op. This file owns the path -// resolver so the v2 rewriter has its entry point. -// -// Layout (observed): -// -// ~/.gemini/tmp//chats/session--.jsonl -// -// where is the first 8 chars of the session_id captured into -// workspace/.plexum-gemini-session. We pick the file whose name -// contains that prefix; multiple matches → the most-recent by mtime. - -package runner - -import ( - "errors" - "fmt" - "os" - "path/filepath" - "sort" - "strings" -) - -// FindGeminiSessionFile resolves the chat JSONL gemini-cli writes for -// the session id captured in workspace/.plexum-gemini-session. -// Returns ("", error) if no session id is recorded yet OR no chat -// file is on disk. -func FindGeminiSessionFile(workspace string) (string, error) { - sid := loadSessionID(workspace) - if sid == "" { - return "", errors.New("no gemini session id captured yet") - } - prefix := sid - if len(prefix) > 8 { - prefix = prefix[:8] - } - home, err := os.UserHomeDir() - if err != nil { - return "", err - } - root := filepath.Join(home, ".gemini", "tmp") - wsName := filepath.Base(workspace) - candidates := []string{ - filepath.Join(root, wsName, "chats"), - // Fallback dirs (older / alternate gemini-cli versions). We - // glob defensively so an operator's custom layout still - // surfaces something for telemetry. - } - entries := []os.DirEntry{} - chosenDir := "" - for _, dir := range candidates { - es, err := os.ReadDir(dir) - if err == nil { - entries = es - chosenDir = dir - break - } - } - if chosenDir == "" { - return "", fmt.Errorf("no gemini chats dir under %s", root) - } - // Pick the newest session-*.jsonl whose name contains the sid prefix. - type cand struct { - path string - mtime int64 - } - var matches []cand - for _, e := range entries { - if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") { - continue - } - if !strings.Contains(e.Name(), prefix) { - continue - } - info, err := e.Info() - if err != nil { - continue - } - matches = append(matches, cand{ - path: filepath.Join(chosenDir, e.Name()), - mtime: info.ModTime().UnixNano(), - }) - } - if len(matches) == 0 { - return "", fmt.Errorf("no chat file matching session_id prefix %q in %s", prefix, chosenDir) - } - sort.Slice(matches, func(i, j int) bool { return matches[i].mtime > matches[j].mtime }) - return matches[0].path, nil -} diff --git a/internal/runner/session_mutate_apply.go b/internal/runner/session_mutate_apply.go new file mode 100644 index 0000000..b3c4399 --- /dev/null +++ b/internal/runner/session_mutate_apply.go @@ -0,0 +1,237 @@ +// session_mutate_apply — gemini consume mirror. Rewrites the chat +// JSONL so the consumed view propagates to next-resume gemini. +// +// Per-line mutation: +// +// type="gemini" line: every toolCalls[i].id matching → if heavy, +// reset args to {} +// type="user" line: every content[j].functionResponse.id matching +// → set response.output to the marker + +package runner + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" + "strings" +) + +const ( + consumedMarker = "...(consumed)" + toolCalledMarker = "...(tool called)" +) + +// GeminiMutation mirrors plugin.BlockMutation. +type GeminiMutation struct { + BlockID string + Op string +} + +// MutateGeminiSession edits gemini's chat JSONL in lockstep with +// Plexum's dynamic.jsonl. Returns the count of distinct ids touched. +func MutateGeminiSession(workspace string, mutations []GeminiMutation) (int, error) { + if len(mutations) == 0 { + return 0, nil + } + path, err := FindGeminiSessionFile(workspace) + if err != nil { + return 0, nil // no session yet + } + byID := make(map[string]string, len(mutations)) + for _, m := range mutations { + byID[m.BlockID] = m.Op + } + f, err := os.Open(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return 0, nil + } + return 0, fmt.Errorf("open gemini session: %w", err) + } + defer f.Close() + + var out strings.Builder + touched := map[string]struct{}{} + mutated := false + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 64*1024), 16*1024*1024) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 { + out.WriteByte('\n') + continue + } + rewritten, hits := rewriteGeminiLine(line, byID) + out.Write(rewritten) + out.WriteByte('\n') + for _, h := range hits { + touched[h] = struct{}{} + mutated = true + } + } + if err := sc.Err(); err != nil { + return 0, fmt.Errorf("scan gemini session: %w", err) + } + if !mutated { + return 0, nil + } + if err := writeAtomic(path, []byte(out.String())); err != nil { + return len(touched), fmt.Errorf("rewrite gemini session: %w", err) + } + return len(touched), nil +} + +// rewriteGeminiLine returns rewritten bytes + list of touched ids. +func rewriteGeminiLine(line []byte, byID map[string]string) ([]byte, []string) { + var obj map[string]any + if err := json.Unmarshal(line, &obj); err != nil { + return line, nil + } + var touched []string + mutated := false + + switch obj["type"] { + case "gemini": + tcs, ok := obj["toolCalls"].([]any) + if !ok { + return line, nil + } + for i, tcAny := range tcs { + tc, ok := tcAny.(map[string]any) + if !ok { + continue + } + id, _ := tc["id"].(string) + op, ok := byID[id] + if !ok || id == "" { + continue + } + if op == "consume-heavy" { + tc["args"] = map[string]any{} + tcs[i] = tc + touched = append(touched, id) + mutated = true + } + } + obj["toolCalls"] = tcs + case "user": + contents, ok := obj["content"].([]any) + if !ok { + return line, nil + } + for i, ctAny := range contents { + ct, ok := ctAny.(map[string]any) + if !ok { + continue + } + fr, ok := ct["functionResponse"].(map[string]any) + if !ok { + continue + } + id, _ := fr["id"].(string) + op, ok := byID[id] + if !ok || id == "" { + continue + } + marker := consumedMarker + if op == "consume-heavy" { + marker = toolCalledMarker + } + resp, _ := fr["response"].(map[string]any) + if resp == nil { + resp = map[string]any{} + } + resp["output"] = marker + fr["response"] = resp + ct["functionResponse"] = fr + contents[i] = ct + touched = append(touched, id) + mutated = true + } + obj["content"] = contents + default: + return line, nil + } + if !mutated { + return line, nil + } + rewritten, err := json.Marshal(obj) + if err != nil { + return line, nil + } + return rewritten, touched +} + +func writeAtomic(path string, data []byte) error { + tmp := path + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + return err + } + return os.Rename(tmp, path) +} + +// FindGeminiSessionFile resolves ~/.gemini/tmp//chats/session-*-.jsonl +// for the captured session id. Picks the most-recent match by mtime +// since gemini may roll the chat across multiple files within the +// same logical session id. +func FindGeminiSessionFile(workspace string) (string, error) { + sid := loadSessionID(workspace) + if sid == "" { + return "", errors.New("no gemini session id captured yet") + } + prefix := sid + if len(prefix) > 8 { + prefix = prefix[:8] + } + home, err := os.UserHomeDir() + if err != nil { + return "", err + } + wsName := filepathBase(workspace) + dir := home + "/.gemini/tmp/" + wsName + "/chats" + entries, err := os.ReadDir(dir) + if err != nil { + return "", fmt.Errorf("read gemini chats dir %s: %w", dir, err) + } + type cand struct { + path string + mtime int64 + } + var matches []cand + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".jsonl") { + continue + } + if !strings.Contains(e.Name(), prefix) { + continue + } + info, err := e.Info() + if err != nil { + continue + } + matches = append(matches, cand{path: dir + "/" + e.Name(), mtime: info.ModTime().UnixNano()}) + } + if len(matches) == 0 { + return "", fmt.Errorf("no chat file matching session_id prefix %q in %s", prefix, dir) + } + // pick newest + best := matches[0] + for _, m := range matches[1:] { + if m.mtime > best.mtime { + best = m + } + } + return best.path, nil +} + +func filepathBase(p string) string { + // minimal local impl to keep imports tight + i := strings.LastIndex(p, "/") + if i < 0 { + return p + } + return p[i+1:] +} diff --git a/internal/runner/session_parse.go b/internal/runner/session_parse.go new file mode 100644 index 0000000..204dd40 --- /dev/null +++ b/internal/runner/session_parse.go @@ -0,0 +1,161 @@ +// session_parse — after gemini exits, scan its chat JSONL for tool +// calls and matching function responses. Same shape across native +// gemini tools and MCP tools — id is gemini's full canonical id +// (e.g. "mcp_______"), name is +// the namespaced tool name, args is a JSON map. + +package runner + +import ( + "bufio" + "encoding/json" + "errors" + "os" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" +) + +// GeminiToolCall pairs a toolCalls entry with the matching +// functionResponse from a later user line. +type GeminiToolCall struct { + ID string + Name string + Args string // JSON + Output string // functionResponse.response.output +} + +// ParseGeminiToolCalls walks the chat JSONL file once and returns +// every (id, name, args, output) tuple in input order. Pairs without +// a matching functionResponse still appear with Output == "". +func ParseGeminiToolCalls(path string) ([]GeminiToolCall, error) { + f, err := os.Open(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, err + } + defer f.Close() + + calls := map[string]*GeminiToolCall{} + var order []string + + sc := bufio.NewScanner(f) + sc.Buffer(make([]byte, 64*1024), 16*1024*1024) + for sc.Scan() { + line := sc.Bytes() + if len(line) == 0 { + continue + } + var rec map[string]any + if err := json.Unmarshal(line, &rec); err != nil { + continue + } + // Type "gemini" lines carry toolCalls; type "user" lines may + // carry functionResponse objects inside content[]. + switch rec["type"] { + case "gemini": + for _, tcAny := range asSlice(rec["toolCalls"]) { + tc, ok := tcAny.(map[string]any) + if !ok { + continue + } + id, _ := tc["id"].(string) + if id == "" { + continue + } + c, ok := calls[id] + if !ok { + c = &GeminiToolCall{ID: id} + calls[id] = c + order = append(order, id) + } + c.Name, _ = tc["name"].(string) + if args, ok := tc["args"]; ok { + if b, err := json.Marshal(args); err == nil { + c.Args = string(b) + } + } + } + case "user": + for _, ctAny := range asSlice(rec["content"]) { + ct, ok := ctAny.(map[string]any) + if !ok { + continue + } + fr, ok := ct["functionResponse"].(map[string]any) + if !ok { + continue + } + id, _ := fr["id"].(string) + if id == "" { + continue + } + c, ok := calls[id] + if !ok { + c = &GeminiToolCall{ID: id} + calls[id] = c + order = append(order, id) + } + if fr["name"] != nil { + c.Name, _ = fr["name"].(string) + } + resp, _ := fr["response"].(map[string]any) + if resp != nil { + if out, ok := resp["output"].(string); ok { + c.Output = out + } else if out, ok := resp["output"]; ok { + if b, err := json.Marshal(out); err == nil { + c.Output = string(b) + } + } + } + } + } + } + if err := sc.Err(); err != nil { + return nil, err + } + out := make([]GeminiToolCall, 0, len(order)) + for _, id := range order { + out = append(out, *calls[id]) + } + return out, nil +} + +func asSlice(v any) []any { + if s, ok := v.([]any); ok { + return s + } + return nil +} + +// EmitGeminiToolCalls translates the parsed slice into TurnEvents. +func EmitGeminiToolCalls(calls []GeminiToolCall, emit func(canonical.TurnEvent)) { + for _, c := range calls { + args := c.Args + if args == "" { + args = "{}" + } + emit(canonical.TurnEvent{ + Type: canonical.EventToolCallStart, + ToolCallID: c.ID, + ToolName: c.Name, + PartialJSON: args, + }) + emit(canonical.TurnEvent{ + Type: canonical.EventToolCallEnd, + ToolCallID: c.ID, + }) + emit(canonical.TurnEvent{ + Type: canonical.EventToolResult, + ToolResult: &canonical.ToolResultBlock{ + Type: canonical.BlockTypeToolResult, + ToolUseID: c.ID, + Content: []canonical.Block{ + canonical.NewTextBlock(c.Output), + }, + }, + }) + } +}