feat: Plexum-minimax-provider v0.1 — MiniMax via Anthropic-compat endpoint
Plexum ProviderPlugin that serves MiniMax models through MiniMax's Anthropic-compatible HTTP endpoint (https://api.minimax.io/anthropic, or CN api.minimaxi.com). Inspired by openclaw's extensions/minimax provider-registration, but rewritten in Go for Plexum's SDK. internal/anthropic/ (~210 LOC + 6 tests): - minimal HTTP+SSE Anthropic Messages client (POST /v1/messages, stream:true, parses event:/data: SSE frames) - handles non-2xx as HTTP error; stream errors land as Event{Type:"error"} - 1 MiB SSE line cap; per-conn 5min timeout internal/translate/ (~220 LOC): - CanonicalToAnthropic: canonical.TurnRequest → MessagesRequest - blockToAnthropic: TextBlock / ToolUseBlock / ToolResultBlock / ThinkingBlock → loose ContentBlock map; preserves signatures + cache control - Translator: per-turn state machine; consumes anthropic.Event stream and emits canonical.TurnEvent stream (handles thinking blocks + tool_use input_json_delta accumulation + signature_delta capture) cmd/plexum-minimax-provider-plugin/: - Plugin manifest declares provider.models = [MiniMax-M2.7, MiniMax-M2.7-highspeed] - Backend fixed to "api" (per scope); region "global"|"cn" + base_url override supported via config - HostConfig from <profile>/plugins/plexum-minimax-provider/config.json {api_key, region?, base_url?, max_tokens_default?} scripts/install.sh: build + manifest emit; operator writes config.json + allows plugin + adds an agent + restarts. End-to-end verified against the real key: 1. plexum say --agent-id mini ... → "Hi, I'm MiniMax!" 2. Multi-turn continuity: agent recalled the prior reply 3. Via gateway socket: {"outcome":"text","text":"\n\npong"} 4. Via Fabric channel (alice posts → plugin inbound → mini agent → MiniMax → outbound REST → reply visible in bt2-clean seq=11): "Hi there! 👋 Fun fact: Octopuses have three hearts, blue blood, and neurons distributed throughout their arms—so their tentacles can 'think'" The MiniMax-M2.7-highspeed variant works the same way but hit a Code Plan rate-limit ceiling during testing (not a plugin issue). Deferred: - OAuth (Code Plan portal) — not in v1 scope per request - MiniMax Portal provider (separate provider id minimax-portal) - Image / TTS / video / music providers (separate plugins later) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
266
internal/anthropic/client.go
Normal file
266
internal/anthropic/client.go
Normal file
@@ -0,0 +1,266 @@
|
||||
// Package anthropic is a minimal Anthropic Messages API HTTP+SSE
|
||||
// client. Works against the real Anthropic API and any Anthropic-
|
||||
// compatible endpoint (MiniMax exposes one at
|
||||
// https://api.minimax.io/anthropic).
|
||||
//
|
||||
// The MiniMax provider plugin's only consumer; if other providers
|
||||
// land later they can reuse this package.
|
||||
package anthropic
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DefaultAPIVersion matches what Anthropic + Anthropic-compat servers expect.
|
||||
const DefaultAPIVersion = "2023-06-01"
|
||||
|
||||
// Client is a thin HTTP wrapper. Stateless across calls.
|
||||
type Client struct {
|
||||
BaseURL string // e.g. "https://api.minimax.io/anthropic"
|
||||
APIKey string
|
||||
APIVersion string // default "2023-06-01"
|
||||
HTTP *http.Client
|
||||
}
|
||||
|
||||
// New constructs a Client with sensible defaults.
|
||||
func New(baseURL, apiKey string) *Client {
|
||||
return &Client{
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
APIKey: apiKey,
|
||||
APIVersion: DefaultAPIVersion,
|
||||
// 5 min ceiling per turn — long enough for a slow reasoning
|
||||
// response, short enough that wedged calls don't hang forever.
|
||||
HTTP: &http.Client{Timeout: 5 * time.Minute},
|
||||
}
|
||||
}
|
||||
|
||||
// MessagesRequest is the wire shape POSTed to /v1/messages.
|
||||
type MessagesRequest struct {
|
||||
Model string `json:"model"`
|
||||
System any `json:"system,omitempty"` // string OR []ContentBlock
|
||||
Messages []Message `json:"messages"`
|
||||
MaxTokens int `json:"max_tokens"`
|
||||
Temperature *float64 `json:"temperature,omitempty"`
|
||||
StopSequences []string `json:"stop_sequences,omitempty"`
|
||||
Tools []ToolDef `json:"tools,omitempty"`
|
||||
ToolChoice *ToolChoice `json:"tool_choice,omitempty"`
|
||||
Thinking *ThinkingConfig `json:"thinking,omitempty"`
|
||||
Stream bool `json:"stream"`
|
||||
Metadata *RequestMetadata `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
// Message is one entry in messages[].
|
||||
type Message struct {
|
||||
Role string `json:"role"` // "user" | "assistant"
|
||||
Content []ContentBlock `json:"content"`
|
||||
}
|
||||
|
||||
// ContentBlock covers text / tool_use / tool_result / thinking. We
|
||||
// keep the discriminated union loose (map[string]any) on the request
|
||||
// side so callers can pass whatever shape the API accepts; on the
|
||||
// response side the SSE parser produces typed deltas.
|
||||
type ContentBlock = map[string]any
|
||||
|
||||
// ToolDef is one tool entry the model can call.
|
||||
type ToolDef struct {
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description,omitempty"`
|
||||
InputSchema json.RawMessage `json:"input_schema"`
|
||||
}
|
||||
|
||||
// ToolChoice constrains how the model picks tools.
|
||||
type ToolChoice struct {
|
||||
Type string `json:"type"` // "auto" | "any" | "tool" | "none"
|
||||
Name string `json:"name,omitempty"`
|
||||
}
|
||||
|
||||
// ThinkingConfig enables extended-thinking on supported models.
|
||||
type ThinkingConfig struct {
|
||||
Type string `json:"type"` // "enabled" | "disabled"
|
||||
BudgetTokens int `json:"budget_tokens,omitempty"`
|
||||
}
|
||||
|
||||
// RequestMetadata is optional user-supplied tracing.
|
||||
type RequestMetadata struct {
|
||||
UserID string `json:"user_id,omitempty"`
|
||||
}
|
||||
|
||||
// ---- Streaming SSE response ----
|
||||
|
||||
// Event is one parsed SSE event from the streaming endpoint.
|
||||
// Direct mirror of Anthropic's event shapes — caller's responsibility
|
||||
// to translate into Plexum's canonical.TurnEvent.
|
||||
type Event struct {
|
||||
Type string `json:"type"`
|
||||
// message_start: complete initial message envelope
|
||||
Message *MessageEnvelope `json:"message,omitempty"`
|
||||
// content_block_start: block at .Index
|
||||
Index int `json:"index,omitempty"`
|
||||
ContentBlock *BlockStart `json:"content_block,omitempty"`
|
||||
// content_block_delta: text_delta / thinking_delta / input_json_delta / signature_delta
|
||||
Delta *BlockDelta `json:"delta,omitempty"`
|
||||
// message_delta: usage update + stop_reason
|
||||
Usage *Usage `json:"usage,omitempty"`
|
||||
// error event payload
|
||||
Error *ErrorBody `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// MessageEnvelope is the top-level message info in message_start.
|
||||
type MessageEnvelope struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Role string `json:"role"`
|
||||
Model string `json:"model"`
|
||||
StopReason string `json:"stop_reason"`
|
||||
Usage *Usage `json:"usage,omitempty"`
|
||||
}
|
||||
|
||||
// BlockStart describes a block that just started. Type is "text",
|
||||
// "thinking", or "tool_use". For tool_use the ID + Name are set.
|
||||
type BlockStart struct {
|
||||
Type string `json:"type"`
|
||||
Text string `json:"text,omitempty"`
|
||||
Thinking string `json:"thinking,omitempty"`
|
||||
ID string `json:"id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Input json.RawMessage `json:"input,omitempty"`
|
||||
}
|
||||
|
||||
// BlockDelta carries incremental changes. Exactly one of the *_delta
|
||||
// fields is meaningful per delta.
|
||||
type BlockDelta struct {
|
||||
Type string `json:"type"` // "text_delta" | "thinking_delta" | "input_json_delta" | "signature_delta"
|
||||
Text string `json:"text,omitempty"`
|
||||
Thinking string `json:"thinking,omitempty"`
|
||||
PartialJSON string `json:"partial_json,omitempty"`
|
||||
Signature string `json:"signature,omitempty"`
|
||||
// On message_delta event, the same Delta carries stop_reason.
|
||||
StopReason string `json:"stop_reason,omitempty"`
|
||||
StopSequence string `json:"stop_sequence,omitempty"`
|
||||
}
|
||||
|
||||
// Usage is the running input/output token counts.
|
||||
type Usage struct {
|
||||
InputTokens int `json:"input_tokens,omitempty"`
|
||||
OutputTokens int `json:"output_tokens,omitempty"`
|
||||
CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"`
|
||||
CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"`
|
||||
}
|
||||
|
||||
// ErrorBody is the Anthropic error envelope.
|
||||
type ErrorBody struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// StreamMessages opens a streaming POST /v1/messages call and sends
|
||||
// parsed events down the returned channel. Channel closes when SSE
|
||||
// stream terminates (message_stop, server EOF, ctx cancel, or error).
|
||||
// On HTTP-level error (non-2xx, network failure), an EventType="error"
|
||||
// event lands on the channel with the wrapped error before close.
|
||||
func (c *Client) StreamMessages(ctx context.Context, req MessagesRequest) (<-chan Event, error) {
|
||||
req.Stream = true
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("anthropic: marshal: %w", err)
|
||||
}
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||
c.BaseURL+"/v1/messages", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpReq.Header.Set("content-type", "application/json")
|
||||
httpReq.Header.Set("authorization", "Bearer "+c.APIKey)
|
||||
httpReq.Header.Set("anthropic-version", c.APIVersion)
|
||||
httpReq.Header.Set("accept", "text/event-stream")
|
||||
|
||||
resp, err := c.HTTP.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("anthropic: %w", err)
|
||||
}
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
raw, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("anthropic: %s -> %d: %s", c.BaseURL, resp.StatusCode, string(raw))
|
||||
}
|
||||
|
||||
ch := make(chan Event, 32)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
defer resp.Body.Close()
|
||||
if err := parseSSE(ctx, resp.Body, ch); err != nil && !errors.Is(err, io.EOF) {
|
||||
select {
|
||||
case ch <- Event{Type: "error", Error: &ErrorBody{Type: "stream_error", Message: err.Error()}}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// parseSSE reads SSE frames (`event:` + `data:` lines, blank-line
|
||||
// separator) and dispatches Event values onto ch. Returns nil on
|
||||
// normal EOF.
|
||||
func parseSSE(ctx context.Context, r io.Reader, ch chan<- Event) error {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 64*1024), 1<<20) // 1 MiB line cap
|
||||
|
||||
var dataBuf bytes.Buffer
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
// End-of-frame: dispatch buffered data, reset.
|
||||
if dataBuf.Len() == 0 {
|
||||
continue
|
||||
}
|
||||
var ev Event
|
||||
if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil {
|
||||
select {
|
||||
case ch <- ev:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
dataBuf.Reset()
|
||||
continue
|
||||
}
|
||||
if bytes.HasPrefix(line, []byte("data:")) {
|
||||
payload := bytes.TrimSpace(line[5:])
|
||||
if dataBuf.Len() > 0 {
|
||||
dataBuf.WriteByte('\n')
|
||||
}
|
||||
dataBuf.Write(payload)
|
||||
}
|
||||
// `event:` lines describe the event name. JSON `data:` payload
|
||||
// also carries `type` field — we use that. Ignore event: lines.
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return fmt.Errorf("sse scan: %w", err)
|
||||
}
|
||||
// Flush any trailing data without final blank line.
|
||||
if dataBuf.Len() > 0 {
|
||||
var ev Event
|
||||
if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil {
|
||||
select {
|
||||
case ch <- ev:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user