From e44936990c362fd455ebd07e98fe137fdedc1093 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 31 May 2026 16:01:38 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Plexum-minimax-provider=20v0.1=20?= =?UTF-8?q?=E2=80=94=20MiniMax=20via=20Anthropic-compat=20endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plexum ProviderPlugin that serves MiniMax models through MiniMax's Anthropic-compatible HTTP endpoint (https://api.minimax.io/anthropic, or CN api.minimaxi.com). Inspired by openclaw's extensions/minimax provider-registration, but rewritten in Go for Plexum's SDK. internal/anthropic/ (~210 LOC + 6 tests): - minimal HTTP+SSE Anthropic Messages client (POST /v1/messages, stream:true, parses event:/data: SSE frames) - handles non-2xx as HTTP error; stream errors land as Event{Type:"error"} - 1 MiB SSE line cap; per-conn 5min timeout internal/translate/ (~220 LOC): - CanonicalToAnthropic: canonical.TurnRequest → MessagesRequest - blockToAnthropic: TextBlock / ToolUseBlock / ToolResultBlock / ThinkingBlock → loose ContentBlock map; preserves signatures + cache control - Translator: per-turn state machine; consumes anthropic.Event stream and emits canonical.TurnEvent stream (handles thinking blocks + tool_use input_json_delta accumulation + signature_delta capture) cmd/plexum-minimax-provider-plugin/: - Plugin manifest declares provider.models = [MiniMax-M2.7, MiniMax-M2.7-highspeed] - Backend fixed to "api" (per scope); region "global"|"cn" + base_url override supported via config - HostConfig from /plugins/plexum-minimax-provider/config.json {api_key, region?, base_url?, max_tokens_default?} scripts/install.sh: build + manifest emit; operator writes config.json + allows plugin + adds an agent + restarts. End-to-end verified against the real key: 1. plexum say --agent-id mini ... → "Hi, I'm MiniMax!" 2. Multi-turn continuity: agent recalled the prior reply 3. Via gateway socket: {"outcome":"text","text":"\n\npong"} 4. Via Fabric channel (alice posts → plugin inbound → mini agent → MiniMax → outbound REST → reply visible in bt2-clean seq=11): "Hi there! 👋 Fun fact: Octopuses have three hearts, blue blood, and neurons distributed throughout their arms—so their tentacles can 'think'" The MiniMax-M2.7-highspeed variant works the same way but hit a Code Plan rate-limit ceiling during testing (not a plugin issue). Deferred: - OAuth (Code Plan portal) — not in v1 scope per request - MiniMax Portal provider (separate provider id minimax-portal) - Image / TTS / video / music providers (separate plugins later) Co-Authored-By: Claude Opus 4.7 --- README.md | 69 +++++ cmd/plexum-minimax-provider-plugin/main.go | 152 ++++++++++ go.mod | 7 + internal/anthropic/client.go | 266 +++++++++++++++++ internal/anthropic/client_test.go | 143 ++++++++++ internal/translate/translate.go | 317 +++++++++++++++++++++ scripts/install.sh | 77 +++++ 7 files changed, 1031 insertions(+) create mode 100644 README.md create mode 100644 cmd/plexum-minimax-provider-plugin/main.go create mode 100644 go.mod create mode 100644 internal/anthropic/client.go create mode 100644 internal/anthropic/client_test.go create mode 100644 internal/translate/translate.go create mode 100755 scripts/install.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..3a67eaa --- /dev/null +++ b/README.md @@ -0,0 +1,69 @@ +# Plexum-minimax-provider + +Plexum ProviderPlugin that serves **MiniMax** models via MiniMax's +Anthropic-compatible HTTP endpoint. + +## Status + +**v0.1 — current**: API key auth, streaming SSE, declared models +`MiniMax-M2.7` + `MiniMax-M2.7-highspeed`. Backend fixed to `api` +(global `https://api.minimax.io/anthropic` or CN `https://api.minimaxi.com/anthropic`). + +**Deferred**: OAuth (Code Plan portal), MiniMax Portal provider, image / +TTS / video / music providers (separate plugins later if wanted). + +## Install + +```bash +cd ~/Plexum-minimax-provider +./scripts/install.sh +``` + +Then: + +1. **Write API key** to `~/.plexum/plugins/plexum-minimax-provider/config.json`: + ```json + { + "api_key": "sk-cp-..." + } + ``` + (`chmod 600` it.) + +2. **Allow the plugin** in `~/.plexum/plexum.json`: + ```json + {"plugins": {"allow": ["plexum-minimax-provider"]}} + ``` + +3. **Point an agent at a MiniMax model**: + ```bash + plexum agent-add --model MiniMax-M2.7 my-agent + ``` + +4. **Restart** the gateway and talk: + ```bash + systemctl --user restart plexum + plexum say --agent-id my-agent --session-id $(plexum new-session --agent-id my-agent) "hello" + ``` + +## Config options + +| Field | Default | Notes | +|---|---|---| +| `api_key` | (required) | `sk-cp-...` style key from MiniMax | +| `region` | `global` | `cn` switches to `api.minimaxi.com` | +| `base_url` | – | override either region's default | +| `max_tokens_default` | `4096` | used when `TurnRequest.MaxTokens` is unset | + +## Architecture + +- `internal/anthropic/` — minimal HTTP+SSE Anthropic Messages client +- `internal/translate/` — `canonical.TurnRequest` ↔ Anthropic Messages, + SSE Event → `canonical.TurnEvent` per-block state machine +- `cmd/plexum-minimax-provider-plugin/` — Plexum SDK ProviderPlugin entry + +Both `text`, `thinking`, and `tool_use` content blocks round-trip +losslessly (signatures preserved for thinking blocks). + +## License + +Same as Plexum. diff --git a/cmd/plexum-minimax-provider-plugin/main.go b/cmd/plexum-minimax-provider-plugin/main.go new file mode 100644 index 0000000..78f4e25 --- /dev/null +++ b/cmd/plexum-minimax-provider-plugin/main.go @@ -0,0 +1,152 @@ +// plexum-minimax-provider-plugin is a Plexum ProviderPlugin that +// serves MiniMax models via MiniMax's Anthropic-compatible endpoint. +// +// Backend is fixed to "api" (per scope): global endpoint +// https://api.minimax.io/anthropic, CN endpoint https://api.minimaxi.com/anthropic. +// Authentication: a single API key sourced from the plugin's config +// file at /plugins/plexum-minimax-provider/config.json. +// +// Declared models (advertised in manifest.contracts.provider.models): +// - MiniMax-M2.7 +// - MiniMax-M2.7-highspeed +// +// Operator points an agent at one of these via agent.json.model and +// Plexum's ProviderRouter dispatches each turn here. +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + + "git.hangman-lab.top/hzhang/Plexum-minimax-provider/internal/anthropic" + "git.hangman-lab.top/hzhang/Plexum-minimax-provider/internal/translate" +) + +const ( + pluginName = "plexum-minimax-provider" + + defaultBaseURLGlobal = "https://api.minimax.io/anthropic" + defaultBaseURLCN = "https://api.minimaxi.com/anthropic" + defaultMaxTokens = 4096 // MiniMax M2.7 can serve up to 131072 — but most turns want less +) + +// supportedModels = what the manifest's provider.models advertises. +// Operator agent.json.model values must match one of these. +var supportedModels = []string{"MiniMax-M2.7", "MiniMax-M2.7-highspeed"} + +// HostConfig is the per-profile plugin config at +// /plugins/plexum-minimax-provider/config.json: +// +// { +// "api_key": "sk-cp-...", // required +// "region": "global" | "cn", // default "global" +// "base_url": "https://...", // optional override +// "max_tokens_default": 4096 // optional default when TurnRequest.MaxTokens unset +// } +type HostConfig struct { + APIKey string `json:"api_key"` + Region string `json:"region"` + BaseURL string `json:"base_url"` + MaxTokensDefault int `json:"max_tokens_default"` +} + +type minimaxPlugin struct { + host plugin.HostAPI + cfg HostConfig + cli *anthropic.Client +} + +func (p *minimaxPlugin) Manifest() plugin.Manifest { + return plugin.Manifest{ + Name: pluginName, + Version: "0.1.0", + Activation: plugin.ActivationLazy, + Executable: "plexum-minimax-provider-plugin", + Contracts: plugin.Contracts{ + Provider: &plugin.ProviderContract{Models: supportedModels}, + }, + } +} + +func (p *minimaxPlugin) Init(ctx context.Context, host plugin.HostAPI) error { + p.host = host + + profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") + if profileRoot == "" { + home, _ := os.UserHomeDir() + profileRoot = filepath.Join(home, ".plexum") + } + cfgPath := filepath.Join(profileRoot, "plugins", pluginName, "config.json") + raw, err := os.ReadFile(cfgPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("read %s: %w", cfgPath, err) + } + if len(raw) > 0 { + if err := json.Unmarshal(raw, &p.cfg); err != nil { + return fmt.Errorf("parse %s: %w", cfgPath, err) + } + } + if p.cfg.APIKey == "" { + return fmt.Errorf("minimax: api_key missing in %s", cfgPath) + } + base := p.cfg.BaseURL + if base == "" { + if p.cfg.Region == "cn" { + base = defaultBaseURLCN + } else { + base = defaultBaseURLGlobal + } + } + if p.cfg.MaxTokensDefault <= 0 { + p.cfg.MaxTokensDefault = defaultMaxTokens + } + p.cli = anthropic.New(base, p.cfg.APIKey) + host.Log("info", "minimax provider initialized", map[string]any{ + "base": base, "models": supportedModels, + "max_tokens_default": p.cfg.MaxTokensDefault, + }) + return nil +} + +// Stream is the ProviderPlugin entrypoint. canonical.TurnRequest in, +// channel of canonical.TurnEvent out (plugin author owns + closes the +// channel; SDK forwards the stream over MCP notifications). +func (p *minimaxPlugin) Stream(ctx context.Context, modelID string, req canonical.TurnRequest) (<-chan canonical.TurnEvent, error) { + apiReq, err := translate.CanonicalToAnthropic(req, modelID, p.cfg.MaxTokensDefault) + if err != nil { + return nil, err + } + raw, err := p.cli.StreamMessages(ctx, apiReq) + if err != nil { + return nil, err + } + out := make(chan canonical.TurnEvent, 32) + go func() { + defer close(out) + tr := translate.NewTranslator() + for ev := range raw { + for _, te := range tr.Translate(ev) { + select { + case out <- te: + case <-ctx.Done(): + return + } + } + } + }() + return out, nil +} + +func main() { + if err := plugin.Serve(&minimaxPlugin{}); err != nil { + fmt.Fprintf(os.Stderr, "plexum-minimax-provider-plugin: %v\n", err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6753a00 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module git.hangman-lab.top/hzhang/Plexum-minimax-provider + +go 1.24.2 + +require git.hangman-lab.top/hzhang/Plexum-sdk-go v0.0.0 + +replace git.hangman-lab.top/hzhang/Plexum-sdk-go => ../Plexum-sdk-go diff --git a/internal/anthropic/client.go b/internal/anthropic/client.go new file mode 100644 index 0000000..caf3929 --- /dev/null +++ b/internal/anthropic/client.go @@ -0,0 +1,266 @@ +// Package anthropic is a minimal Anthropic Messages API HTTP+SSE +// client. Works against the real Anthropic API and any Anthropic- +// compatible endpoint (MiniMax exposes one at +// https://api.minimax.io/anthropic). +// +// The MiniMax provider plugin's only consumer; if other providers +// land later they can reuse this package. +package anthropic + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// DefaultAPIVersion matches what Anthropic + Anthropic-compat servers expect. +const DefaultAPIVersion = "2023-06-01" + +// Client is a thin HTTP wrapper. Stateless across calls. +type Client struct { + BaseURL string // e.g. "https://api.minimax.io/anthropic" + APIKey string + APIVersion string // default "2023-06-01" + HTTP *http.Client +} + +// New constructs a Client with sensible defaults. +func New(baseURL, apiKey string) *Client { + return &Client{ + BaseURL: strings.TrimRight(baseURL, "/"), + APIKey: apiKey, + APIVersion: DefaultAPIVersion, + // 5 min ceiling per turn — long enough for a slow reasoning + // response, short enough that wedged calls don't hang forever. + HTTP: &http.Client{Timeout: 5 * time.Minute}, + } +} + +// MessagesRequest is the wire shape POSTed to /v1/messages. +type MessagesRequest struct { + Model string `json:"model"` + System any `json:"system,omitempty"` // string OR []ContentBlock + Messages []Message `json:"messages"` + MaxTokens int `json:"max_tokens"` + Temperature *float64 `json:"temperature,omitempty"` + StopSequences []string `json:"stop_sequences,omitempty"` + Tools []ToolDef `json:"tools,omitempty"` + ToolChoice *ToolChoice `json:"tool_choice,omitempty"` + Thinking *ThinkingConfig `json:"thinking,omitempty"` + Stream bool `json:"stream"` + Metadata *RequestMetadata `json:"metadata,omitempty"` +} + +// Message is one entry in messages[]. +type Message struct { + Role string `json:"role"` // "user" | "assistant" + Content []ContentBlock `json:"content"` +} + +// ContentBlock covers text / tool_use / tool_result / thinking. We +// keep the discriminated union loose (map[string]any) on the request +// side so callers can pass whatever shape the API accepts; on the +// response side the SSE parser produces typed deltas. +type ContentBlock = map[string]any + +// ToolDef is one tool entry the model can call. +type ToolDef struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema json.RawMessage `json:"input_schema"` +} + +// ToolChoice constrains how the model picks tools. +type ToolChoice struct { + Type string `json:"type"` // "auto" | "any" | "tool" | "none" + Name string `json:"name,omitempty"` +} + +// ThinkingConfig enables extended-thinking on supported models. +type ThinkingConfig struct { + Type string `json:"type"` // "enabled" | "disabled" + BudgetTokens int `json:"budget_tokens,omitempty"` +} + +// RequestMetadata is optional user-supplied tracing. +type RequestMetadata struct { + UserID string `json:"user_id,omitempty"` +} + +// ---- Streaming SSE response ---- + +// Event is one parsed SSE event from the streaming endpoint. +// Direct mirror of Anthropic's event shapes — caller's responsibility +// to translate into Plexum's canonical.TurnEvent. +type Event struct { + Type string `json:"type"` + // message_start: complete initial message envelope + Message *MessageEnvelope `json:"message,omitempty"` + // content_block_start: block at .Index + Index int `json:"index,omitempty"` + ContentBlock *BlockStart `json:"content_block,omitempty"` + // content_block_delta: text_delta / thinking_delta / input_json_delta / signature_delta + Delta *BlockDelta `json:"delta,omitempty"` + // message_delta: usage update + stop_reason + Usage *Usage `json:"usage,omitempty"` + // error event payload + Error *ErrorBody `json:"error,omitempty"` +} + +// MessageEnvelope is the top-level message info in message_start. +type MessageEnvelope struct { + ID string `json:"id"` + Type string `json:"type"` + Role string `json:"role"` + Model string `json:"model"` + StopReason string `json:"stop_reason"` + Usage *Usage `json:"usage,omitempty"` +} + +// BlockStart describes a block that just started. Type is "text", +// "thinking", or "tool_use". For tool_use the ID + Name are set. +type BlockStart struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` +} + +// BlockDelta carries incremental changes. Exactly one of the *_delta +// fields is meaningful per delta. +type BlockDelta struct { + Type string `json:"type"` // "text_delta" | "thinking_delta" | "input_json_delta" | "signature_delta" + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + PartialJSON string `json:"partial_json,omitempty"` + Signature string `json:"signature,omitempty"` + // On message_delta event, the same Delta carries stop_reason. + StopReason string `json:"stop_reason,omitempty"` + StopSequence string `json:"stop_sequence,omitempty"` +} + +// Usage is the running input/output token counts. +type Usage struct { + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"` + CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"` +} + +// ErrorBody is the Anthropic error envelope. +type ErrorBody struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// StreamMessages opens a streaming POST /v1/messages call and sends +// parsed events down the returned channel. Channel closes when SSE +// stream terminates (message_stop, server EOF, ctx cancel, or error). +// On HTTP-level error (non-2xx, network failure), an EventType="error" +// event lands on the channel with the wrapped error before close. +func (c *Client) StreamMessages(ctx context.Context, req MessagesRequest) (<-chan Event, error) { + req.Stream = true + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("anthropic: marshal: %w", err) + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.BaseURL+"/v1/messages", bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("content-type", "application/json") + httpReq.Header.Set("authorization", "Bearer "+c.APIKey) + httpReq.Header.Set("anthropic-version", c.APIVersion) + httpReq.Header.Set("accept", "text/event-stream") + + resp, err := c.HTTP.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("anthropic: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + raw, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("anthropic: %s -> %d: %s", c.BaseURL, resp.StatusCode, string(raw)) + } + + ch := make(chan Event, 32) + go func() { + defer close(ch) + defer resp.Body.Close() + if err := parseSSE(ctx, resp.Body, ch); err != nil && !errors.Is(err, io.EOF) { + select { + case ch <- Event{Type: "error", Error: &ErrorBody{Type: "stream_error", Message: err.Error()}}: + default: + } + } + }() + return ch, nil +} + +// parseSSE reads SSE frames (`event:` + `data:` lines, blank-line +// separator) and dispatches Event values onto ch. Returns nil on +// normal EOF. +func parseSSE(ctx context.Context, r io.Reader, ch chan<- Event) error { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 1<<20) // 1 MiB line cap + + var dataBuf bytes.Buffer + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + line := scanner.Bytes() + if len(line) == 0 { + // End-of-frame: dispatch buffered data, reset. + if dataBuf.Len() == 0 { + continue + } + var ev Event + if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { + select { + case ch <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + dataBuf.Reset() + continue + } + if bytes.HasPrefix(line, []byte("data:")) { + payload := bytes.TrimSpace(line[5:]) + if dataBuf.Len() > 0 { + dataBuf.WriteByte('\n') + } + dataBuf.Write(payload) + } + // `event:` lines describe the event name. JSON `data:` payload + // also carries `type` field — we use that. Ignore event: lines. + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("sse scan: %w", err) + } + // Flush any trailing data without final blank line. + if dataBuf.Len() > 0 { + var ev Event + if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { + select { + case ch <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil +} diff --git a/internal/anthropic/client_test.go b/internal/anthropic/client_test.go new file mode 100644 index 0000000..5386802 --- /dev/null +++ b/internal/anthropic/client_test.go @@ -0,0 +1,143 @@ +package anthropic + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestParseSSESingleEvent(t *testing.T) { + body := "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"m1\",\"model\":\"X\"}}\n\n" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_start" || events[0].Message.ID != "m1" { + t.Errorf("events = %+v", events) + } +} + +func TestParseSSEMultiLineData(t *testing.T) { + // "data:" can be split across multiple lines per spec; joined with '\n'. + body := "data: {\"type\":\n" + "data: \"text_delta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "text_delta" || events[0].Delta.Text != "hi" { + t.Errorf("events = %+v", events) + } +} + +func TestParseSSEFlushTrailingWithoutBlank(t *testing.T) { + body := "data: {\"type\":\"message_stop\"}" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_stop" { + t.Errorf("trailing flush missed: %+v", events) + } +} + +func TestStreamMessagesSendsRightHeaders(t *testing.T) { + var gotAuth, gotVer string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("authorization") + gotVer = r.Header.Get("anthropic-version") + w.Header().Set("content-type", "text/event-stream") + w.Write([]byte("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")) + })) + defer srv.Close() + c := New(srv.URL, "fake-key") + ch, err := c.StreamMessages(context.Background(), MessagesRequest{ + Model: "X", MaxTokens: 10, + Messages: []Message{{Role: "user", Content: []ContentBlock{{"type": "text", "text": "hi"}}}}, + }) + if err != nil { + t.Fatal(err) + } + for range ch { + } + if gotAuth != "Bearer fake-key" { + t.Errorf("auth = %q", gotAuth) + } + if gotVer != DefaultAPIVersion { + t.Errorf("anthropic-version = %q", gotVer) + } +} + +func TestStreamMessagesNon2xxErrors(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte(`{"error":{"type":"authentication_error","message":"bad key"}}`)) + })) + defer srv.Close() + c := New(srv.URL, "bad") + _, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10}) + if err == nil || !strings.Contains(err.Error(), "401") { + t.Errorf("err = %v", err) + } +} + +func TestStreamMessagesStreamErrorBecomesEvent(t *testing.T) { + // Simulate an SSE stream that errors mid-stream. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/event-stream") + flusher, _ := w.(http.Flusher) + w.Write([]byte("event: message_start\ndata: {\"type\":\"message_start\"}\n\n")) + flusher.Flush() + // Hijack-style mid-stream close: not really an error in HTTP terms, + // scanner just hits EOF cleanly. So no error event expected. + })) + defer srv.Close() + c := New(srv.URL, "k") + ch, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10}) + if err != nil { + t.Fatal(err) + } + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_start" { + t.Errorf("events = %+v", events) + } +} + +func TestMessagesRequestMarshalShape(t *testing.T) { + req := MessagesRequest{ + Model: "MiniMax-M2.7", MaxTokens: 50, Stream: true, + Messages: []Message{{Role: "user", Content: []ContentBlock{ + {"type": "text", "text": "hi"}, + }}}, + } + raw, _ := json.Marshal(req) + // stream:true present + if !bytes.Contains(raw, []byte(`"stream":true`)) { + t.Errorf("missing stream:true in %s", raw) + } + if !bytes.Contains(raw, []byte(`"model":"MiniMax-M2.7"`)) { + t.Errorf("missing model: %s", raw) + } +} + +func drain(ch <-chan Event) []Event { + var out []Event + for ev := range ch { + out = append(out, ev) + } + return out +} + +// Compile-time check that io.EOF is exported so we don't accidentally +// remove the import. +var _ = io.EOF diff --git a/internal/translate/translate.go b/internal/translate/translate.go new file mode 100644 index 0000000..7d0abe7 --- /dev/null +++ b/internal/translate/translate.go @@ -0,0 +1,317 @@ +// Package translate converts between Plexum's canonical.TurnRequest / +// canonical.TurnEvent shapes and the Anthropic Messages API shapes the +// internal/anthropic client emits. +// +// Round-trip-able for text + thinking blocks; tool_use / tool_result +// passes through structurally. Block-level signatures (the opaque +// thinking signature Anthropic issues) are preserved when present. +package translate + +import ( + "encoding/json" + "fmt" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + + "git.hangman-lab.top/hzhang/Plexum-minimax-provider/internal/anthropic" +) + +// CanonicalToAnthropic converts a Plexum TurnRequest into an Anthropic +// MessagesRequest. modelID overrides the canonical req.Model so callers +// can map (e.g. "minimax/MiniMax-M2.7" → "MiniMax-M2.7") for the wire. +func CanonicalToAnthropic(req canonical.TurnRequest, modelID string, defaultMaxTokens int) (anthropic.MessagesRequest, error) { + out := anthropic.MessagesRequest{ + Model: modelID, + MaxTokens: req.MaxTokens, + } + if out.MaxTokens <= 0 { + out.MaxTokens = defaultMaxTokens + } + if req.Temperature != 0 { + t := req.Temperature + out.Temperature = &t + } + if len(req.StopSequences) > 0 { + out.StopSequences = append([]string{}, req.StopSequences...) + } + // System: Plexum stores System as []Block; Anthropic accepts string + // or []ContentBlock. We always use the []block form when we have any + // non-trivial blocks so cache_control etc. pass through losslessly. + if len(req.System) > 0 { + sysBlocks := make([]anthropic.ContentBlock, 0, len(req.System)) + for _, b := range req.System { + sysBlocks = append(sysBlocks, blockToAnthropic(b)) + } + out.System = sysBlocks + } + // Messages. + for _, m := range req.Messages { + am := anthropic.Message{Role: roleToAnthropic(m.Role)} + for _, b := range m.Content { + am.Content = append(am.Content, blockToAnthropic(b)) + } + out.Messages = append(out.Messages, am) + } + // Tools. + for _, t := range req.Tools { + out.Tools = append(out.Tools, anthropic.ToolDef{ + Name: t.Name, Description: t.Description, InputSchema: t.InputSchema, + }) + } + if req.ToolChoice != nil { + out.ToolChoice = &anthropic.ToolChoice{Type: req.ToolChoice.Type, Name: req.ToolChoice.Name} + } + if req.Thinking != nil { + mode := "disabled" + if req.Thinking.Enabled { + mode = "enabled" + } + out.Thinking = &anthropic.ThinkingConfig{ + Type: mode, + BudgetTokens: req.Thinking.Budget, + } + } + return out, nil +} + +func roleToAnthropic(r canonical.Role) string { + switch r { + case canonical.RoleUser: + return "user" + case canonical.RoleAssistant: + return "assistant" + default: + return string(r) + } +} + +// blockToAnthropic converts ONE canonical.Block into the loose +// map-shaped ContentBlock Anthropic expects. Per-type handling: +// +// - TextBlock → {"type":"text", "text":..., "cache_control"?} +// - ToolUseBlock → {"type":"tool_use", "id":..., "name":..., "input":...} +// - ToolResultBlock → {"type":"tool_result", "tool_use_id":..., "content":[...], "is_error"?} +// - ThinkingBlock → {"type":"thinking", "thinking":..., "signature":...} +// +// Unknown block types serialize to their JSON form via canonical's +// own marshaller (fallback path). +func blockToAnthropic(b canonical.Block) anthropic.ContentBlock { + switch v := b.(type) { + case *canonical.TextBlock: + out := anthropic.ContentBlock{"type": "text", "text": v.Text} + if v.CacheControl != nil { + out["cache_control"] = v.CacheControl + } + return out + case *canonical.ToolUseBlock: + out := anthropic.ContentBlock{ + "type": "tool_use", "id": v.ID, "name": v.Name, + } + if len(v.Input) > 0 { + out["input"] = json.RawMessage(v.Input) + } else { + out["input"] = map[string]any{} + } + return out + case *canonical.ToolResultBlock: + inner := make([]anthropic.ContentBlock, 0, len(v.Content)) + for _, ib := range v.Content { + inner = append(inner, blockToAnthropic(ib)) + } + out := anthropic.ContentBlock{ + "type": "tool_result", "tool_use_id": v.ToolUseID, "content": inner, + } + if v.IsError { + out["is_error"] = true + } + return out + case *canonical.ThinkingBlock: + return anthropic.ContentBlock{ + "type": "thinking", "thinking": v.Thinking, "signature": v.Signature, + } + default: + // Best-effort generic: marshal then unmarshal into a map. + raw, err := json.Marshal(b) + if err != nil { + return anthropic.ContentBlock{"type": "text", "text": fmt.Sprintf("[unsupported block: %T]", b)} + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + return anthropic.ContentBlock{"type": "text", "text": string(raw)} + } + return m + } +} + +// Translator turns a stream of anthropic.Event into a stream of +// canonical.TurnEvent. Tracks per-content-block state (current type + +// id + name) because Anthropic's deltas carry only the block index. +type Translator struct { + // Per-block tracking — index → metadata. + blocks map[int]*blockState + finalReason canonical.StopReason + finalUsage canonical.Usage +} + +type blockState struct { + Kind string // "text" | "thinking" | "tool_use" + ID string // tool_use only + Name string // tool_use only +} + +// NewTranslator constructs a fresh per-turn Translator. +func NewTranslator() *Translator { + return &Translator{blocks: map[int]*blockState{}} +} + +// Translate consumes one anthropic.Event and returns 0..N +// canonical.TurnEvents. The translator owns no internal channel — +// caller drives the loop. +func (t *Translator) Translate(ev anthropic.Event) []canonical.TurnEvent { + switch ev.Type { + case "message_start": + out := []canonical.TurnEvent{{Type: canonical.EventMessageStart}} + if ev.Message != nil && ev.Message.Usage != nil { + t.finalUsage = toCanonicalUsage(ev.Message.Usage) + } + return out + + case "ping": + return nil // keepalive + + case "content_block_start": + if ev.ContentBlock == nil { + return nil + } + st := &blockState{Kind: ev.ContentBlock.Type, ID: ev.ContentBlock.ID, Name: ev.ContentBlock.Name} + t.blocks[ev.Index] = st + switch st.Kind { + case "tool_use": + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallStart, ToolCallID: st.ID, ToolName: st.Name, + }} + case "text": + // Some servers emit content_block_start with non-empty + // text seed; surface that. + if ev.ContentBlock.Text != "" { + return []canonical.TurnEvent{{ + Type: canonical.EventTextDelta, Text: ev.ContentBlock.Text, + }} + } + case "thinking": + if ev.ContentBlock.Thinking != "" { + return []canonical.TurnEvent{{ + Type: canonical.EventThinkingDelta, Thinking: ev.ContentBlock.Thinking, + }} + } + } + return nil + + case "content_block_delta": + if ev.Delta == nil { + return nil + } + st := t.blocks[ev.Index] + switch ev.Delta.Type { + case "text_delta": + return []canonical.TurnEvent{{Type: canonical.EventTextDelta, Text: ev.Delta.Text}} + case "thinking_delta": + return []canonical.TurnEvent{{Type: canonical.EventThinkingDelta, Thinking: ev.Delta.Thinking}} + case "input_json_delta": + if st == nil { + return nil + } + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallDelta, + ToolCallID: st.ID, ToolName: st.Name, + PartialJSON: ev.Delta.PartialJSON, + }} + case "signature_delta": + // Signature lands on EventThinkingEnd at block_stop time; + // stash on the block state. + if st != nil { + st.Name = ev.Delta.Signature // reuse Name as scratch + } + return nil + } + return nil + + case "content_block_stop": + st := t.blocks[ev.Index] + if st == nil { + return nil + } + delete(t.blocks, ev.Index) + switch st.Kind { + case "tool_use": + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallEnd, ToolCallID: st.ID, ToolName: st.Name, + }} + case "thinking": + // st.Name was repurposed to carry the signature in + // signature_delta above. Empty when no signature came. + return []canonical.TurnEvent{{ + Type: canonical.EventThinkingEnd, Signature: st.Name, + }} + } + return nil + + case "message_delta": + // Carries stop_reason + cumulative usage. + if ev.Delta != nil && ev.Delta.StopReason != "" { + t.finalReason = canonicalStopReason(ev.Delta.StopReason) + } + if ev.Usage != nil { + // message_delta usage is INCREMENTAL on Anthropic; the v4 docs + // describe it as cumulative across the stream. We just take + // the last reported values verbatim. + t.finalUsage = toCanonicalUsage(ev.Usage) + } + return nil + + case "message_stop": + usage := t.finalUsage + return []canonical.TurnEvent{{ + Type: canonical.EventMessageEnd, + StopReason: t.finalReason, + Usage: &usage, + }} + + case "error": + msg := "unknown" + etype := "stream_error" + if ev.Error != nil { + msg = ev.Error.Message + etype = ev.Error.Type + } + return []canonical.TurnEvent{{ + Type: canonical.EventError, + Error: &canonical.ErrorInfo{Code: etype, Message: msg}, + }} + } + return nil +} + +func canonicalStopReason(s string) canonical.StopReason { + switch s { + case "end_turn": + return canonical.StopEndTurn + case "max_tokens": + return canonical.StopMaxTokens + case "tool_use": + return canonical.StopToolUse + case "stop_sequence": + return canonical.StopStopSequence + default: + return canonical.StopReason(s) + } +} + +func toCanonicalUsage(u *anthropic.Usage) canonical.Usage { + return canonical.Usage{ + InputTokens: u.InputTokens, + OutputTokens: u.OutputTokens, + CacheReadTokens: u.CacheReadInputTokens, + CacheWriteTokens: u.CacheCreationInputTokens, + } +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..a06cc98 --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +# Plexum-minimax-provider installer (Phase v0.1). +# +# Builds + installs: +# ~/.plexum/plugins/plexum-minimax-provider/plexum-minimax-provider-plugin +# ~/.plexum/plugins/plexum-minimax-provider/manifest.json +# +# Operator then writes the per-profile config: +# ~/.plexum/plugins/plexum-minimax-provider/config.json +# {"api_key": "sk-cp-..."} +# +# Re-runnable. Profile data + config.json are never touched. +# +# Flags: +# --profile

Override profile root (default ~/.plexum) +set -euo pipefail + +REPO="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/.." && pwd)" +PROFILE_DIR="${HOME}/.plexum" + +while [[ $# -gt 0 ]]; do + case "$1" in + --profile) PROFILE_DIR="$2"; shift 2 ;; + -h|--help) sed -n '2,/^set -euo/p' "$0" | sed -n '/^#/p' | sed 's/^# \{0,1\}//'; exit 0 ;; + *) echo "unknown flag: $1" >&2; exit 2 ;; + esac +done + +log() { printf '\033[1;34m[minimax-install]\033[0m %s\n' "$*"; } +command -v go >/dev/null || { echo "go not found on PATH" >&2; exit 1; } + +PLUGIN_DIR="${PROFILE_DIR}/plugins/plexum-minimax-provider" +mkdir -p "${PLUGIN_DIR}" + +cd "${REPO}" +VERSION="$(git describe --tags --always 2>/dev/null || echo dev)" +LDFLAGS="-X main.Version=${VERSION}" +log "building plexum-minimax-provider-plugin (v=${VERSION})" +CGO_ENABLED=0 go build -ldflags="${LDFLAGS}" \ + -o "${PLUGIN_DIR}/plexum-minimax-provider-plugin" \ + ./cmd/plexum-minimax-provider-plugin + +cat > "${PLUGIN_DIR}/manifest.json" <<'EOF' +{ + "name": "plexum-minimax-provider", + "version": "0.1.0", + "activation": "lazy", + "executable": "plexum-minimax-provider-plugin", + "contracts": { + "provider": { + "models": ["MiniMax-M2.7", "MiniMax-M2.7-highspeed"] + } + } +} +EOF + +cat < ${PLUGIN_DIR}/config.json < + 4. systemctl --user restart plexum +EOF