From 01577ddfe8fa9f483d3ea37b2a54b2223a199952 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 31 May 2026 16:23:18 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Plexum-kimi-provider=20v0.1=20=E2=80=94?= =?UTF-8?q?=20Moonshot=20Kimi=20via=20coding=20endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plexum ProviderPlugin that serves Moonshot Kimi K2.6 ("Kimi for Coding") via Kimi's Anthropic-compatible coding endpoint at https://api.kimi.com/coding/v1/messages. Port of openclaw's extensions/kimi-coding/provider-catalog.ts to Go + Plexum SDK. Repo structure parallels Plexum-minimax-provider: - internal/anthropic/ HTTP+SSE Anthropic Messages client, with new UserAgent field (Kimi expects "claude-code/ 0.1.0" — openclaw plugin parity) - internal/translate/ canonical ↔ Anthropic translator (re-used shape from MiniMax — no Kimi-specific quirks needed for v1 plain-text path) - cmd/plexum-kimi-provider-plugin/ ProviderPlugin entry Declared models (Kimi server accepts all three; plugin normalizes legacy aliases to the canonical id on the wire): kimi-for-coding (current, default) kimi-code (legacy alias) k2p5 (legacy alias) HostConfig: api_key (required), base_url (override), user_agent (default "claude-code/0.1.0"), max_tokens_default (default 8192). End-to-end verified against the live `sk-kimi-` subscription key: 1. CLI embedded turn 1: "Hi there! I'm Kimi." 2. CLI embedded turn 2: "I said hi, I'm Kimi." (multi-turn context OK) 3. Via gateway socket: {"outcome":"text","text":"...pong"} 4. Via Fabric channel: alice → bt2-clean → kimi agent → Kimi K2.6 → outbound REST → reply in channel seq=15: "Concise concurrency: goroutines and channels make parallel code readable, safe, and efficient without the usual threading complexity." Test the bidirectional channel pipeline works with a fresh provider: Fabric (channel plugin) + Kimi (provider plugin) wired through Plexum agentloop, MiniMax-style plugin packaging. Co-Authored-By: Claude Opus 4.7 --- README.md | 79 ++++++ cmd/plexum-kimi-provider-plugin/main.go | 169 +++++++++++++ go.mod | 7 + internal/anthropic/client.go | 278 +++++++++++++++++++++ internal/anthropic/client_test.go | 143 +++++++++++ internal/translate/translate.go | 317 ++++++++++++++++++++++++ scripts/install.sh | 77 ++++++ 7 files changed, 1070 insertions(+) create mode 100644 README.md create mode 100644 cmd/plexum-kimi-provider-plugin/main.go create mode 100644 go.mod create mode 100644 internal/anthropic/client.go create mode 100644 internal/anthropic/client_test.go create mode 100644 internal/translate/translate.go create mode 100755 scripts/install.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..b3be84d --- /dev/null +++ b/README.md @@ -0,0 +1,79 @@ +# Plexum-kimi-provider + +Plexum ProviderPlugin that serves **Moonshot Kimi K2.6 ("Kimi for Coding")** +via Kimi's Anthropic-compatible coding endpoint +(`https://api.kimi.com/coding/v1/messages`). + +Port of `openclaw/extensions/kimi-coding/provider-catalog.ts` to Go + +Plexum SDK. + +## Status + +**v0.1 — current**: API key auth (Kimi subscription `sk-kimi-...`), +streaming SSE, models `kimi-for-coding` / `kimi-code` / `k2p5`. Sends +`User-Agent: claude-code/0.1.0` (mirrors openclaw plugin) for parity +with Kimi's coding subscription auth path. + +## Install + +```bash +cd ~/Plexum-kimi-provider +./scripts/install.sh +``` + +Then: + +1. **Write API key** to `~/.plexum/plugins/plexum-kimi-provider/config.json`: + ```json + {"api_key": "sk-kimi-..."} + ``` + (`chmod 600` it.) + +2. **Allow the plugin** in `~/.plexum/plexum.json`: + ```json + {"plugins": {"allow": ["plexum-kimi-provider"]}} + ``` + +3. **Point an agent at Kimi**: + ```bash + plexum agent-add --model kimi-for-coding my-agent + ``` + +4. **Restart** and talk: + ```bash + systemctl --user restart plexum + plexum say --agent-id my-agent --session-id $(plexum new-session --agent-id my-agent) "hello" + ``` + +## Config options + +| Field | Default | Notes | +|---|---|---| +| `api_key` | (required) | `sk-kimi-...` subscription key | +| `base_url` | `https://api.kimi.com/coding` | override only if you proxy | +| `user_agent` | `claude-code/0.1.0` | matches openclaw plugin | +| `max_tokens_default` | `8192` | used when TurnRequest.MaxTokens unset | + +## Model id aliases + +Kimi server accepts all three; the plugin normalizes to +`kimi-for-coding` on the wire for consistent logs: + +| advertised id | wire id | +|---|---| +| `kimi-for-coding` | `kimi-for-coding` | +| `kimi-code` (legacy) | `kimi-for-coding` | +| `k2p5` (legacy) | `kimi-for-coding` | + +## Architecture + +Same shape as `Plexum-minimax-provider`: + +- `internal/anthropic/` — HTTP+SSE Messages client (now with `UserAgent` + field; can be shared by future Anthropic-compat providers) +- `internal/translate/` — canonical ↔ Anthropic translator +- `cmd/plexum-kimi-provider-plugin/` — ProviderPlugin entry + +## License + +Same as Plexum. diff --git a/cmd/plexum-kimi-provider-plugin/main.go b/cmd/plexum-kimi-provider-plugin/main.go new file mode 100644 index 0000000..b7c4284 --- /dev/null +++ b/cmd/plexum-kimi-provider-plugin/main.go @@ -0,0 +1,169 @@ +// plexum-kimi-provider-plugin is a Plexum ProviderPlugin that serves +// Moonshot Kimi K2.6 ("Kimi for Coding") via Kimi's Anthropic-compatible +// coding endpoint (https://api.kimi.com/coding/v1/messages). +// +// Reference: openclaw/extensions/kimi-coding/provider-catalog.ts — +// same base URL + User-Agent convention. The endpoint accepts an +// `sk-kimi-` subscription API key minted from kimi.com/code. +// +// Declared models: +// - kimi-for-coding (current; default) +// - kimi-code (legacy alias; same model server-side) +// - k2p5 (legacy alias; same) +// +// Operator agent.json.model = "kimi-for-coding" routes here via +// Plexum's ProviderRouter. +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + + "git.hangman-lab.top/hzhang/Plexum-kimi-provider/internal/anthropic" + "git.hangman-lab.top/hzhang/Plexum-kimi-provider/internal/translate" +) + +const ( + pluginName = "plexum-kimi-provider" + + // Kimi coding endpoint. The path /v1/messages is appended by the + // anthropic client; baseURL must NOT include /v1. + defaultBaseURL = "https://api.kimi.com/coding" + + // UA that openclaw's kimi-coding plugin sets. Likely a server-side + // allowlist marker for the coding subscription auth path; omitting + // it currently still works but we send it for parity. + defaultUserAgent = "claude-code/0.1.0" + + // Kimi K2.6 supports up to 32768 output tokens per request. + defaultMaxTokens = 8192 +) + +// supportedModels = what manifest.contracts.provider.models advertises. +// The legacy aliases (kimi-code, k2p5) route the same way; Kimi server +// accepts them and returns kimi-for-coding-equivalent output. +var supportedModels = []string{"kimi-for-coding", "kimi-code", "k2p5"} + +// HostConfig is per-profile plugin config at +// /plugins/plexum-kimi-provider/config.json: +// +// { +// "api_key": "sk-kimi-...", // required +// "base_url": "https://...", // optional override +// "user_agent": "...", // optional; default "claude-code/0.1.0" +// "max_tokens_default": 8192 // optional +// } +type HostConfig struct { + APIKey string `json:"api_key"` + BaseURL string `json:"base_url"` + UserAgent string `json:"user_agent"` + MaxTokensDefault int `json:"max_tokens_default"` +} + +type kimiPlugin struct { + host plugin.HostAPI + cfg HostConfig + cli *anthropic.Client +} + +func (p *kimiPlugin) Manifest() plugin.Manifest { + return plugin.Manifest{ + Name: pluginName, + Version: "0.1.0", + Activation: plugin.ActivationLazy, + Executable: "plexum-kimi-provider-plugin", + Contracts: plugin.Contracts{ + Provider: &plugin.ProviderContract{Models: supportedModels}, + }, + } +} + +func (p *kimiPlugin) Init(ctx context.Context, host plugin.HostAPI) error { + p.host = host + + profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") + if profileRoot == "" { + home, _ := os.UserHomeDir() + profileRoot = filepath.Join(home, ".plexum") + } + cfgPath := filepath.Join(profileRoot, "plugins", pluginName, "config.json") + raw, err := os.ReadFile(cfgPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("read %s: %w", cfgPath, err) + } + if len(raw) > 0 { + if err := json.Unmarshal(raw, &p.cfg); err != nil { + return fmt.Errorf("parse %s: %w", cfgPath, err) + } + } + if p.cfg.APIKey == "" { + return fmt.Errorf("kimi: api_key missing in %s", cfgPath) + } + base := p.cfg.BaseURL + if base == "" { + base = defaultBaseURL + } + ua := p.cfg.UserAgent + if ua == "" { + ua = defaultUserAgent + } + if p.cfg.MaxTokensDefault <= 0 { + p.cfg.MaxTokensDefault = defaultMaxTokens + } + p.cli = anthropic.New(base, p.cfg.APIKey) + p.cli.UserAgent = ua + host.Log("info", "kimi provider initialized", map[string]any{ + "base": base, "user_agent": ua, "models": supportedModels, + "max_tokens_default": p.cfg.MaxTokensDefault, + }) + return nil +} + +// Stream is the ProviderPlugin entrypoint. canonical.TurnRequest in, +// channel of canonical.TurnEvent out. +func (p *kimiPlugin) Stream(ctx context.Context, modelID string, req canonical.TurnRequest) (<-chan canonical.TurnEvent, error) { + // Normalize legacy aliases to the current canonical id so the wire + // model name is consistent (Kimi server accepts all 3, but this + // keeps logs + telemetry tidy). + wireModel := modelID + if wireModel == "kimi-code" || wireModel == "k2p5" { + wireModel = "kimi-for-coding" + } + apiReq, err := translate.CanonicalToAnthropic(req, wireModel, p.cfg.MaxTokensDefault) + if err != nil { + return nil, err + } + raw, err := p.cli.StreamMessages(ctx, apiReq) + if err != nil { + return nil, err + } + out := make(chan canonical.TurnEvent, 32) + go func() { + defer close(out) + tr := translate.NewTranslator() + for ev := range raw { + for _, te := range tr.Translate(ev) { + select { + case out <- te: + case <-ctx.Done(): + return + } + } + } + }() + return out, nil +} + +func main() { + if err := plugin.Serve(&kimiPlugin{}); err != nil { + fmt.Fprintf(os.Stderr, "plexum-kimi-provider-plugin: %v\n", err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..91d440d --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module git.hangman-lab.top/hzhang/Plexum-kimi-provider + +go 1.24.2 + +require git.hangman-lab.top/hzhang/Plexum-sdk-go v0.0.0 + +replace git.hangman-lab.top/hzhang/Plexum-sdk-go => ../Plexum-sdk-go diff --git a/internal/anthropic/client.go b/internal/anthropic/client.go new file mode 100644 index 0000000..8aac7ed --- /dev/null +++ b/internal/anthropic/client.go @@ -0,0 +1,278 @@ +// Package anthropic is a minimal Anthropic Messages API HTTP+SSE +// client. Works against the real Anthropic API and any Anthropic- +// compatible endpoint (MiniMax exposes one at +// https://api.minimax.io/anthropic). +// +// The MiniMax provider plugin's only consumer; if other providers +// land later they can reuse this package. +package anthropic + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// DefaultAPIVersion matches what Anthropic + Anthropic-compat servers expect. +const DefaultAPIVersion = "2023-06-01" + +// Client is a thin HTTP wrapper. Stateless across calls. +type Client struct { + BaseURL string // e.g. "https://api.kimi.com/coding" or "https://api.minimax.io/anthropic" + APIKey string + APIVersion string // default "2023-06-01" + // UserAgent, if set, replaces Go's default UA header. Kimi's + // coding endpoint expects "claude-code/0.1.0"; other backends + // (MiniMax etc.) are flexible. + UserAgent string + // ExtraHeaders are sent verbatim on every request. + ExtraHeaders map[string]string + HTTP *http.Client +} + +// New constructs a Client with sensible defaults. +func New(baseURL, apiKey string) *Client { + return &Client{ + BaseURL: strings.TrimRight(baseURL, "/"), + APIKey: apiKey, + APIVersion: DefaultAPIVersion, + // 5 min ceiling per turn — long enough for a slow reasoning + // response, short enough that wedged calls don't hang forever. + HTTP: &http.Client{Timeout: 5 * time.Minute}, + } +} + +// MessagesRequest is the wire shape POSTed to /v1/messages. +type MessagesRequest struct { + Model string `json:"model"` + System any `json:"system,omitempty"` // string OR []ContentBlock + Messages []Message `json:"messages"` + MaxTokens int `json:"max_tokens"` + Temperature *float64 `json:"temperature,omitempty"` + StopSequences []string `json:"stop_sequences,omitempty"` + Tools []ToolDef `json:"tools,omitempty"` + ToolChoice *ToolChoice `json:"tool_choice,omitempty"` + Thinking *ThinkingConfig `json:"thinking,omitempty"` + Stream bool `json:"stream"` + Metadata *RequestMetadata `json:"metadata,omitempty"` +} + +// Message is one entry in messages[]. +type Message struct { + Role string `json:"role"` // "user" | "assistant" + Content []ContentBlock `json:"content"` +} + +// ContentBlock covers text / tool_use / tool_result / thinking. We +// keep the discriminated union loose (map[string]any) on the request +// side so callers can pass whatever shape the API accepts; on the +// response side the SSE parser produces typed deltas. +type ContentBlock = map[string]any + +// ToolDef is one tool entry the model can call. +type ToolDef struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema json.RawMessage `json:"input_schema"` +} + +// ToolChoice constrains how the model picks tools. +type ToolChoice struct { + Type string `json:"type"` // "auto" | "any" | "tool" | "none" + Name string `json:"name,omitempty"` +} + +// ThinkingConfig enables extended-thinking on supported models. +type ThinkingConfig struct { + Type string `json:"type"` // "enabled" | "disabled" + BudgetTokens int `json:"budget_tokens,omitempty"` +} + +// RequestMetadata is optional user-supplied tracing. +type RequestMetadata struct { + UserID string `json:"user_id,omitempty"` +} + +// ---- Streaming SSE response ---- + +// Event is one parsed SSE event from the streaming endpoint. +// Direct mirror of Anthropic's event shapes — caller's responsibility +// to translate into Plexum's canonical.TurnEvent. +type Event struct { + Type string `json:"type"` + // message_start: complete initial message envelope + Message *MessageEnvelope `json:"message,omitempty"` + // content_block_start: block at .Index + Index int `json:"index,omitempty"` + ContentBlock *BlockStart `json:"content_block,omitempty"` + // content_block_delta: text_delta / thinking_delta / input_json_delta / signature_delta + Delta *BlockDelta `json:"delta,omitempty"` + // message_delta: usage update + stop_reason + Usage *Usage `json:"usage,omitempty"` + // error event payload + Error *ErrorBody `json:"error,omitempty"` +} + +// MessageEnvelope is the top-level message info in message_start. +type MessageEnvelope struct { + ID string `json:"id"` + Type string `json:"type"` + Role string `json:"role"` + Model string `json:"model"` + StopReason string `json:"stop_reason"` + Usage *Usage `json:"usage,omitempty"` +} + +// BlockStart describes a block that just started. Type is "text", +// "thinking", or "tool_use". For tool_use the ID + Name are set. +type BlockStart struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` +} + +// BlockDelta carries incremental changes. Exactly one of the *_delta +// fields is meaningful per delta. +type BlockDelta struct { + Type string `json:"type"` // "text_delta" | "thinking_delta" | "input_json_delta" | "signature_delta" + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + PartialJSON string `json:"partial_json,omitempty"` + Signature string `json:"signature,omitempty"` + // On message_delta event, the same Delta carries stop_reason. + StopReason string `json:"stop_reason,omitempty"` + StopSequence string `json:"stop_sequence,omitempty"` +} + +// Usage is the running input/output token counts. +type Usage struct { + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"` + CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"` +} + +// ErrorBody is the Anthropic error envelope. +type ErrorBody struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// StreamMessages opens a streaming POST /v1/messages call and sends +// parsed events down the returned channel. Channel closes when SSE +// stream terminates (message_stop, server EOF, ctx cancel, or error). +// On HTTP-level error (non-2xx, network failure), an EventType="error" +// event lands on the channel with the wrapped error before close. +func (c *Client) StreamMessages(ctx context.Context, req MessagesRequest) (<-chan Event, error) { + req.Stream = true + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("anthropic: marshal: %w", err) + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.BaseURL+"/v1/messages", bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("content-type", "application/json") + httpReq.Header.Set("authorization", "Bearer "+c.APIKey) + httpReq.Header.Set("anthropic-version", c.APIVersion) + httpReq.Header.Set("accept", "text/event-stream") + if c.UserAgent != "" { + httpReq.Header.Set("User-Agent", c.UserAgent) + } + for k, v := range c.ExtraHeaders { + httpReq.Header.Set(k, v) + } + + resp, err := c.HTTP.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("anthropic: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + raw, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("anthropic: %s -> %d: %s", c.BaseURL, resp.StatusCode, string(raw)) + } + + ch := make(chan Event, 32) + go func() { + defer close(ch) + defer resp.Body.Close() + if err := parseSSE(ctx, resp.Body, ch); err != nil && !errors.Is(err, io.EOF) { + select { + case ch <- Event{Type: "error", Error: &ErrorBody{Type: "stream_error", Message: err.Error()}}: + default: + } + } + }() + return ch, nil +} + +// parseSSE reads SSE frames (`event:` + `data:` lines, blank-line +// separator) and dispatches Event values onto ch. Returns nil on +// normal EOF. +func parseSSE(ctx context.Context, r io.Reader, ch chan<- Event) error { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 1<<20) // 1 MiB line cap + + var dataBuf bytes.Buffer + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + line := scanner.Bytes() + if len(line) == 0 { + // End-of-frame: dispatch buffered data, reset. + if dataBuf.Len() == 0 { + continue + } + var ev Event + if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { + select { + case ch <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + dataBuf.Reset() + continue + } + if bytes.HasPrefix(line, []byte("data:")) { + payload := bytes.TrimSpace(line[5:]) + if dataBuf.Len() > 0 { + dataBuf.WriteByte('\n') + } + dataBuf.Write(payload) + } + // `event:` lines describe the event name. JSON `data:` payload + // also carries `type` field — we use that. Ignore event: lines. + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("sse scan: %w", err) + } + // Flush any trailing data without final blank line. + if dataBuf.Len() > 0 { + var ev Event + if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { + select { + case ch <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil +} diff --git a/internal/anthropic/client_test.go b/internal/anthropic/client_test.go new file mode 100644 index 0000000..5386802 --- /dev/null +++ b/internal/anthropic/client_test.go @@ -0,0 +1,143 @@ +package anthropic + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestParseSSESingleEvent(t *testing.T) { + body := "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"m1\",\"model\":\"X\"}}\n\n" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_start" || events[0].Message.ID != "m1" { + t.Errorf("events = %+v", events) + } +} + +func TestParseSSEMultiLineData(t *testing.T) { + // "data:" can be split across multiple lines per spec; joined with '\n'. + body := "data: {\"type\":\n" + "data: \"text_delta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "text_delta" || events[0].Delta.Text != "hi" { + t.Errorf("events = %+v", events) + } +} + +func TestParseSSEFlushTrailingWithoutBlank(t *testing.T) { + body := "data: {\"type\":\"message_stop\"}" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_stop" { + t.Errorf("trailing flush missed: %+v", events) + } +} + +func TestStreamMessagesSendsRightHeaders(t *testing.T) { + var gotAuth, gotVer string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("authorization") + gotVer = r.Header.Get("anthropic-version") + w.Header().Set("content-type", "text/event-stream") + w.Write([]byte("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")) + })) + defer srv.Close() + c := New(srv.URL, "fake-key") + ch, err := c.StreamMessages(context.Background(), MessagesRequest{ + Model: "X", MaxTokens: 10, + Messages: []Message{{Role: "user", Content: []ContentBlock{{"type": "text", "text": "hi"}}}}, + }) + if err != nil { + t.Fatal(err) + } + for range ch { + } + if gotAuth != "Bearer fake-key" { + t.Errorf("auth = %q", gotAuth) + } + if gotVer != DefaultAPIVersion { + t.Errorf("anthropic-version = %q", gotVer) + } +} + +func TestStreamMessagesNon2xxErrors(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte(`{"error":{"type":"authentication_error","message":"bad key"}}`)) + })) + defer srv.Close() + c := New(srv.URL, "bad") + _, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10}) + if err == nil || !strings.Contains(err.Error(), "401") { + t.Errorf("err = %v", err) + } +} + +func TestStreamMessagesStreamErrorBecomesEvent(t *testing.T) { + // Simulate an SSE stream that errors mid-stream. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/event-stream") + flusher, _ := w.(http.Flusher) + w.Write([]byte("event: message_start\ndata: {\"type\":\"message_start\"}\n\n")) + flusher.Flush() + // Hijack-style mid-stream close: not really an error in HTTP terms, + // scanner just hits EOF cleanly. So no error event expected. + })) + defer srv.Close() + c := New(srv.URL, "k") + ch, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10}) + if err != nil { + t.Fatal(err) + } + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_start" { + t.Errorf("events = %+v", events) + } +} + +func TestMessagesRequestMarshalShape(t *testing.T) { + req := MessagesRequest{ + Model: "MiniMax-M2.7", MaxTokens: 50, Stream: true, + Messages: []Message{{Role: "user", Content: []ContentBlock{ + {"type": "text", "text": "hi"}, + }}}, + } + raw, _ := json.Marshal(req) + // stream:true present + if !bytes.Contains(raw, []byte(`"stream":true`)) { + t.Errorf("missing stream:true in %s", raw) + } + if !bytes.Contains(raw, []byte(`"model":"MiniMax-M2.7"`)) { + t.Errorf("missing model: %s", raw) + } +} + +func drain(ch <-chan Event) []Event { + var out []Event + for ev := range ch { + out = append(out, ev) + } + return out +} + +// Compile-time check that io.EOF is exported so we don't accidentally +// remove the import. +var _ = io.EOF diff --git a/internal/translate/translate.go b/internal/translate/translate.go new file mode 100644 index 0000000..5fcbce1 --- /dev/null +++ b/internal/translate/translate.go @@ -0,0 +1,317 @@ +// Package translate converts between Plexum's canonical.TurnRequest / +// canonical.TurnEvent shapes and the Anthropic Messages API shapes the +// internal/anthropic client emits. +// +// Round-trip-able for text + thinking blocks; tool_use / tool_result +// passes through structurally. Block-level signatures (the opaque +// thinking signature Anthropic issues) are preserved when present. +package translate + +import ( + "encoding/json" + "fmt" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + + "git.hangman-lab.top/hzhang/Plexum-kimi-provider/internal/anthropic" +) + +// CanonicalToAnthropic converts a Plexum TurnRequest into an Anthropic +// MessagesRequest. modelID overrides the canonical req.Model so callers +// can map (e.g. "minimax/MiniMax-M2.7" → "MiniMax-M2.7") for the wire. +func CanonicalToAnthropic(req canonical.TurnRequest, modelID string, defaultMaxTokens int) (anthropic.MessagesRequest, error) { + out := anthropic.MessagesRequest{ + Model: modelID, + MaxTokens: req.MaxTokens, + } + if out.MaxTokens <= 0 { + out.MaxTokens = defaultMaxTokens + } + if req.Temperature != 0 { + t := req.Temperature + out.Temperature = &t + } + if len(req.StopSequences) > 0 { + out.StopSequences = append([]string{}, req.StopSequences...) + } + // System: Plexum stores System as []Block; Anthropic accepts string + // or []ContentBlock. We always use the []block form when we have any + // non-trivial blocks so cache_control etc. pass through losslessly. + if len(req.System) > 0 { + sysBlocks := make([]anthropic.ContentBlock, 0, len(req.System)) + for _, b := range req.System { + sysBlocks = append(sysBlocks, blockToAnthropic(b)) + } + out.System = sysBlocks + } + // Messages. + for _, m := range req.Messages { + am := anthropic.Message{Role: roleToAnthropic(m.Role)} + for _, b := range m.Content { + am.Content = append(am.Content, blockToAnthropic(b)) + } + out.Messages = append(out.Messages, am) + } + // Tools. + for _, t := range req.Tools { + out.Tools = append(out.Tools, anthropic.ToolDef{ + Name: t.Name, Description: t.Description, InputSchema: t.InputSchema, + }) + } + if req.ToolChoice != nil { + out.ToolChoice = &anthropic.ToolChoice{Type: req.ToolChoice.Type, Name: req.ToolChoice.Name} + } + if req.Thinking != nil { + mode := "disabled" + if req.Thinking.Enabled { + mode = "enabled" + } + out.Thinking = &anthropic.ThinkingConfig{ + Type: mode, + BudgetTokens: req.Thinking.Budget, + } + } + return out, nil +} + +func roleToAnthropic(r canonical.Role) string { + switch r { + case canonical.RoleUser: + return "user" + case canonical.RoleAssistant: + return "assistant" + default: + return string(r) + } +} + +// blockToAnthropic converts ONE canonical.Block into the loose +// map-shaped ContentBlock Anthropic expects. Per-type handling: +// +// - TextBlock → {"type":"text", "text":..., "cache_control"?} +// - ToolUseBlock → {"type":"tool_use", "id":..., "name":..., "input":...} +// - ToolResultBlock → {"type":"tool_result", "tool_use_id":..., "content":[...], "is_error"?} +// - ThinkingBlock → {"type":"thinking", "thinking":..., "signature":...} +// +// Unknown block types serialize to their JSON form via canonical's +// own marshaller (fallback path). +func blockToAnthropic(b canonical.Block) anthropic.ContentBlock { + switch v := b.(type) { + case *canonical.TextBlock: + out := anthropic.ContentBlock{"type": "text", "text": v.Text} + if v.CacheControl != nil { + out["cache_control"] = v.CacheControl + } + return out + case *canonical.ToolUseBlock: + out := anthropic.ContentBlock{ + "type": "tool_use", "id": v.ID, "name": v.Name, + } + if len(v.Input) > 0 { + out["input"] = json.RawMessage(v.Input) + } else { + out["input"] = map[string]any{} + } + return out + case *canonical.ToolResultBlock: + inner := make([]anthropic.ContentBlock, 0, len(v.Content)) + for _, ib := range v.Content { + inner = append(inner, blockToAnthropic(ib)) + } + out := anthropic.ContentBlock{ + "type": "tool_result", "tool_use_id": v.ToolUseID, "content": inner, + } + if v.IsError { + out["is_error"] = true + } + return out + case *canonical.ThinkingBlock: + return anthropic.ContentBlock{ + "type": "thinking", "thinking": v.Thinking, "signature": v.Signature, + } + default: + // Best-effort generic: marshal then unmarshal into a map. + raw, err := json.Marshal(b) + if err != nil { + return anthropic.ContentBlock{"type": "text", "text": fmt.Sprintf("[unsupported block: %T]", b)} + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + return anthropic.ContentBlock{"type": "text", "text": string(raw)} + } + return m + } +} + +// Translator turns a stream of anthropic.Event into a stream of +// canonical.TurnEvent. Tracks per-content-block state (current type + +// id + name) because Anthropic's deltas carry only the block index. +type Translator struct { + // Per-block tracking — index → metadata. + blocks map[int]*blockState + finalReason canonical.StopReason + finalUsage canonical.Usage +} + +type blockState struct { + Kind string // "text" | "thinking" | "tool_use" + ID string // tool_use only + Name string // tool_use only +} + +// NewTranslator constructs a fresh per-turn Translator. +func NewTranslator() *Translator { + return &Translator{blocks: map[int]*blockState{}} +} + +// Translate consumes one anthropic.Event and returns 0..N +// canonical.TurnEvents. The translator owns no internal channel — +// caller drives the loop. +func (t *Translator) Translate(ev anthropic.Event) []canonical.TurnEvent { + switch ev.Type { + case "message_start": + out := []canonical.TurnEvent{{Type: canonical.EventMessageStart}} + if ev.Message != nil && ev.Message.Usage != nil { + t.finalUsage = toCanonicalUsage(ev.Message.Usage) + } + return out + + case "ping": + return nil // keepalive + + case "content_block_start": + if ev.ContentBlock == nil { + return nil + } + st := &blockState{Kind: ev.ContentBlock.Type, ID: ev.ContentBlock.ID, Name: ev.ContentBlock.Name} + t.blocks[ev.Index] = st + switch st.Kind { + case "tool_use": + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallStart, ToolCallID: st.ID, ToolName: st.Name, + }} + case "text": + // Some servers emit content_block_start with non-empty + // text seed; surface that. + if ev.ContentBlock.Text != "" { + return []canonical.TurnEvent{{ + Type: canonical.EventTextDelta, Text: ev.ContentBlock.Text, + }} + } + case "thinking": + if ev.ContentBlock.Thinking != "" { + return []canonical.TurnEvent{{ + Type: canonical.EventThinkingDelta, Thinking: ev.ContentBlock.Thinking, + }} + } + } + return nil + + case "content_block_delta": + if ev.Delta == nil { + return nil + } + st := t.blocks[ev.Index] + switch ev.Delta.Type { + case "text_delta": + return []canonical.TurnEvent{{Type: canonical.EventTextDelta, Text: ev.Delta.Text}} + case "thinking_delta": + return []canonical.TurnEvent{{Type: canonical.EventThinkingDelta, Thinking: ev.Delta.Thinking}} + case "input_json_delta": + if st == nil { + return nil + } + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallDelta, + ToolCallID: st.ID, ToolName: st.Name, + PartialJSON: ev.Delta.PartialJSON, + }} + case "signature_delta": + // Signature lands on EventThinkingEnd at block_stop time; + // stash on the block state. + if st != nil { + st.Name = ev.Delta.Signature // reuse Name as scratch + } + return nil + } + return nil + + case "content_block_stop": + st := t.blocks[ev.Index] + if st == nil { + return nil + } + delete(t.blocks, ev.Index) + switch st.Kind { + case "tool_use": + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallEnd, ToolCallID: st.ID, ToolName: st.Name, + }} + case "thinking": + // st.Name was repurposed to carry the signature in + // signature_delta above. Empty when no signature came. + return []canonical.TurnEvent{{ + Type: canonical.EventThinkingEnd, Signature: st.Name, + }} + } + return nil + + case "message_delta": + // Carries stop_reason + cumulative usage. + if ev.Delta != nil && ev.Delta.StopReason != "" { + t.finalReason = canonicalStopReason(ev.Delta.StopReason) + } + if ev.Usage != nil { + // message_delta usage is INCREMENTAL on Anthropic; the v4 docs + // describe it as cumulative across the stream. We just take + // the last reported values verbatim. + t.finalUsage = toCanonicalUsage(ev.Usage) + } + return nil + + case "message_stop": + usage := t.finalUsage + return []canonical.TurnEvent{{ + Type: canonical.EventMessageEnd, + StopReason: t.finalReason, + Usage: &usage, + }} + + case "error": + msg := "unknown" + etype := "stream_error" + if ev.Error != nil { + msg = ev.Error.Message + etype = ev.Error.Type + } + return []canonical.TurnEvent{{ + Type: canonical.EventError, + Error: &canonical.ErrorInfo{Code: etype, Message: msg}, + }} + } + return nil +} + +func canonicalStopReason(s string) canonical.StopReason { + switch s { + case "end_turn": + return canonical.StopEndTurn + case "max_tokens": + return canonical.StopMaxTokens + case "tool_use": + return canonical.StopToolUse + case "stop_sequence": + return canonical.StopStopSequence + default: + return canonical.StopReason(s) + } +} + +func toCanonicalUsage(u *anthropic.Usage) canonical.Usage { + return canonical.Usage{ + InputTokens: u.InputTokens, + OutputTokens: u.OutputTokens, + CacheReadTokens: u.CacheReadInputTokens, + CacheWriteTokens: u.CacheCreationInputTokens, + } +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..8b554b1 --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,77 @@ +#!/usr/bin/env bash +# Plexum-kimi-provider installer (Phase v0.1). +# +# Builds + installs: +# ~/.plexum/plugins/plexum-kimi-provider/plexum-kimi-provider-plugin +# ~/.plexum/plugins/plexum-kimi-provider/manifest.json +# +# Operator then writes the per-profile config: +# ~/.plexum/plugins/plexum-kimi-provider/config.json +# {"api_key": "sk-kimi-..."} +# +# Re-runnable. Profile data + config.json are never touched. +# +# Flags: +# --profile

Override profile root (default ~/.plexum) +set -euo pipefail + +REPO="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/.." && pwd)" +PROFILE_DIR="${HOME}/.plexum" + +while [[ $# -gt 0 ]]; do + case "$1" in + --profile) PROFILE_DIR="$2"; shift 2 ;; + -h|--help) sed -n '2,/^set -euo/p' "$0" | sed -n '/^#/p' | sed 's/^# \{0,1\}//'; exit 0 ;; + *) echo "unknown flag: $1" >&2; exit 2 ;; + esac +done + +log() { printf '\033[1;34m[kimi-install]\033[0m %s\n' "$*"; } +command -v go >/dev/null || { echo "go not found on PATH" >&2; exit 1; } + +PLUGIN_DIR="${PROFILE_DIR}/plugins/plexum-kimi-provider" +mkdir -p "${PLUGIN_DIR}" + +cd "${REPO}" +VERSION="$(git describe --tags --always 2>/dev/null || echo dev)" +LDFLAGS="-X main.Version=${VERSION}" +log "building plexum-kimi-provider-plugin (v=${VERSION})" +CGO_ENABLED=0 go build -ldflags="${LDFLAGS}" \ + -o "${PLUGIN_DIR}/plexum-kimi-provider-plugin" \ + ./cmd/plexum-kimi-provider-plugin + +cat > "${PLUGIN_DIR}/manifest.json" <<'EOF' +{ + "name": "plexum-kimi-provider", + "version": "0.1.0", + "activation": "lazy", + "executable": "plexum-kimi-provider-plugin", + "contracts": { + "provider": { + "models": ["kimi-for-coding", "kimi-code", "k2p5"] + } + } +} +EOF + +cat < ${PLUGIN_DIR}/config.json < + 4. systemctl --user restart plexum +EOF