Plexum ProviderPlugin that serves MiniMax models through MiniMax's Anthropic-compatible HTTP endpoint (https://api.minimax.io/anthropic, or CN api.minimaxi.com). Inspired by openclaw's extensions/minimax provider-registration, but rewritten in Go for Plexum's SDK. internal/anthropic/ (~210 LOC + 6 tests): - minimal HTTP+SSE Anthropic Messages client (POST /v1/messages, stream:true, parses event:/data: SSE frames) - handles non-2xx as HTTP error; stream errors land as Event{Type:"error"} - 1 MiB SSE line cap; per-conn 5min timeout internal/translate/ (~220 LOC): - CanonicalToAnthropic: canonical.TurnRequest → MessagesRequest - blockToAnthropic: TextBlock / ToolUseBlock / ToolResultBlock / ThinkingBlock → loose ContentBlock map; preserves signatures + cache control - Translator: per-turn state machine; consumes anthropic.Event stream and emits canonical.TurnEvent stream (handles thinking blocks + tool_use input_json_delta accumulation + signature_delta capture) cmd/plexum-minimax-provider-plugin/: - Plugin manifest declares provider.models = [MiniMax-M2.7, MiniMax-M2.7-highspeed] - Backend fixed to "api" (per scope); region "global"|"cn" + base_url override supported via config - HostConfig from <profile>/plugins/plexum-minimax-provider/config.json {api_key, region?, base_url?, max_tokens_default?} scripts/install.sh: build + manifest emit; operator writes config.json + allows plugin + adds an agent + restarts. End-to-end verified against the real key: 1. plexum say --agent-id mini ... → "Hi, I'm MiniMax!" 2. Multi-turn continuity: agent recalled the prior reply 3. Via gateway socket: {"outcome":"text","text":"\n\npong"} 4. Via Fabric channel (alice posts → plugin inbound → mini agent → MiniMax → outbound REST → reply visible in bt2-clean seq=11): "Hi there! 👋 Fun fact: Octopuses have three hearts, blue blood, and neurons distributed throughout their arms—so their tentacles can 'think'" The MiniMax-M2.7-highspeed variant works the same way but hit a Code Plan rate-limit ceiling during testing (not a plugin issue). Deferred: - OAuth (Code Plan portal) — not in v1 scope per request - MiniMax Portal provider (separate provider id minimax-portal) - Image / TTS / video / music providers (separate plugins later) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
144 lines
4.3 KiB
Go
144 lines
4.3 KiB
Go
package anthropic
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"strings"
|
|
"testing"
|
|
)
|
|
|
|
func TestParseSSESingleEvent(t *testing.T) {
|
|
body := "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"m1\",\"model\":\"X\"}}\n\n"
|
|
ch := make(chan Event, 4)
|
|
if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
close(ch)
|
|
events := drain(ch)
|
|
if len(events) != 1 || events[0].Type != "message_start" || events[0].Message.ID != "m1" {
|
|
t.Errorf("events = %+v", events)
|
|
}
|
|
}
|
|
|
|
func TestParseSSEMultiLineData(t *testing.T) {
|
|
// "data:" can be split across multiple lines per spec; joined with '\n'.
|
|
body := "data: {\"type\":\n" + "data: \"text_delta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n"
|
|
ch := make(chan Event, 4)
|
|
if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
close(ch)
|
|
events := drain(ch)
|
|
if len(events) != 1 || events[0].Type != "text_delta" || events[0].Delta.Text != "hi" {
|
|
t.Errorf("events = %+v", events)
|
|
}
|
|
}
|
|
|
|
func TestParseSSEFlushTrailingWithoutBlank(t *testing.T) {
|
|
body := "data: {\"type\":\"message_stop\"}"
|
|
ch := make(chan Event, 4)
|
|
if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
close(ch)
|
|
events := drain(ch)
|
|
if len(events) != 1 || events[0].Type != "message_stop" {
|
|
t.Errorf("trailing flush missed: %+v", events)
|
|
}
|
|
}
|
|
|
|
func TestStreamMessagesSendsRightHeaders(t *testing.T) {
|
|
var gotAuth, gotVer string
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
gotAuth = r.Header.Get("authorization")
|
|
gotVer = r.Header.Get("anthropic-version")
|
|
w.Header().Set("content-type", "text/event-stream")
|
|
w.Write([]byte("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n"))
|
|
}))
|
|
defer srv.Close()
|
|
c := New(srv.URL, "fake-key")
|
|
ch, err := c.StreamMessages(context.Background(), MessagesRequest{
|
|
Model: "X", MaxTokens: 10,
|
|
Messages: []Message{{Role: "user", Content: []ContentBlock{{"type": "text", "text": "hi"}}}},
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
for range ch {
|
|
}
|
|
if gotAuth != "Bearer fake-key" {
|
|
t.Errorf("auth = %q", gotAuth)
|
|
}
|
|
if gotVer != DefaultAPIVersion {
|
|
t.Errorf("anthropic-version = %q", gotVer)
|
|
}
|
|
}
|
|
|
|
func TestStreamMessagesNon2xxErrors(t *testing.T) {
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
w.Write([]byte(`{"error":{"type":"authentication_error","message":"bad key"}}`))
|
|
}))
|
|
defer srv.Close()
|
|
c := New(srv.URL, "bad")
|
|
_, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10})
|
|
if err == nil || !strings.Contains(err.Error(), "401") {
|
|
t.Errorf("err = %v", err)
|
|
}
|
|
}
|
|
|
|
func TestStreamMessagesStreamErrorBecomesEvent(t *testing.T) {
|
|
// Simulate an SSE stream that errors mid-stream.
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("content-type", "text/event-stream")
|
|
flusher, _ := w.(http.Flusher)
|
|
w.Write([]byte("event: message_start\ndata: {\"type\":\"message_start\"}\n\n"))
|
|
flusher.Flush()
|
|
// Hijack-style mid-stream close: not really an error in HTTP terms,
|
|
// scanner just hits EOF cleanly. So no error event expected.
|
|
}))
|
|
defer srv.Close()
|
|
c := New(srv.URL, "k")
|
|
ch, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
events := drain(ch)
|
|
if len(events) != 1 || events[0].Type != "message_start" {
|
|
t.Errorf("events = %+v", events)
|
|
}
|
|
}
|
|
|
|
func TestMessagesRequestMarshalShape(t *testing.T) {
|
|
req := MessagesRequest{
|
|
Model: "MiniMax-M2.7", MaxTokens: 50, Stream: true,
|
|
Messages: []Message{{Role: "user", Content: []ContentBlock{
|
|
{"type": "text", "text": "hi"},
|
|
}}},
|
|
}
|
|
raw, _ := json.Marshal(req)
|
|
// stream:true present
|
|
if !bytes.Contains(raw, []byte(`"stream":true`)) {
|
|
t.Errorf("missing stream:true in %s", raw)
|
|
}
|
|
if !bytes.Contains(raw, []byte(`"model":"MiniMax-M2.7"`)) {
|
|
t.Errorf("missing model: %s", raw)
|
|
}
|
|
}
|
|
|
|
func drain(ch <-chan Event) []Event {
|
|
var out []Event
|
|
for ev := range ch {
|
|
out = append(out, ev)
|
|
}
|
|
return out
|
|
}
|
|
|
|
// Compile-time check that io.EOF is exported so we don't accidentally
|
|
// remove the import.
|
|
var _ = io.EOF
|