feat: initial extract of shared anthropic-compat client + translator

Extracted from Plexum-minimax-provider and Plexum-kimi-provider (both
had near-identical copies under internal/anthropic + internal/translate).
Future provider plugins (Qwen, Doubao, …) import these two packages
instead of re-implementing the protocol.

anthropic/ (~220 LOC + 6 tests):
- minimal HTTP+SSE Anthropic Messages client (POST /v1/messages,
  stream:true, parses event:/data: SSE)
- Client.UserAgent + Client.ExtraHeaders fields for per-backend
  customization (Kimi needs "claude-code/0.1.0"; MiniMax is flexible)
- non-2xx surfaces as Go error; mid-stream errors as final
  Event{Type:"error"}

translate/ (~220 LOC):
- CanonicalToAnthropic: TurnRequest → MessagesRequest
- blockToAnthropic: TextBlock / ToolUseBlock / ToolResultBlock /
  ThinkingBlock → loose ContentBlock map; preserves signatures
- Translator: per-turn state machine; anthropic.Event stream →
  canonical.TurnEvent stream (text + thinking + tool_use deltas;
  signature_delta capture; message_delta stop_reason + usage)

Both MiniMax and Kimi now import from here; their internal/* dirs are
gone. Live verified after refactor — both still answer "ready" via
plexum say.
This commit is contained in:
h z
2026-05-31 20:34:57 +01:00
commit 94f33a003a
5 changed files with 817 additions and 0 deletions

278
anthropic/client.go Normal file
View File

@@ -0,0 +1,278 @@
// Package anthropic is a minimal Anthropic Messages API HTTP+SSE
// client. Works against the real Anthropic API and any Anthropic-
// compatible endpoint (MiniMax exposes one at
// https://api.minimax.io/anthropic).
//
// The MiniMax provider plugin's only consumer; if other providers
// land later they can reuse this package.
package anthropic
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// DefaultAPIVersion matches what Anthropic + Anthropic-compat servers expect.
const DefaultAPIVersion = "2023-06-01"
// Client is a thin HTTP wrapper. Stateless across calls.
type Client struct {
BaseURL string // e.g. "https://api.kimi.com/coding" or "https://api.minimax.io/anthropic"
APIKey string
APIVersion string // default "2023-06-01"
// UserAgent, if set, replaces Go's default UA header. Kimi's
// coding endpoint expects "claude-code/0.1.0"; other backends
// (MiniMax etc.) are flexible.
UserAgent string
// ExtraHeaders are sent verbatim on every request.
ExtraHeaders map[string]string
HTTP *http.Client
}
// New constructs a Client with sensible defaults.
func New(baseURL, apiKey string) *Client {
return &Client{
BaseURL: strings.TrimRight(baseURL, "/"),
APIKey: apiKey,
APIVersion: DefaultAPIVersion,
// 5 min ceiling per turn — long enough for a slow reasoning
// response, short enough that wedged calls don't hang forever.
HTTP: &http.Client{Timeout: 5 * time.Minute},
}
}
// MessagesRequest is the wire shape POSTed to /v1/messages.
type MessagesRequest struct {
Model string `json:"model"`
System any `json:"system,omitempty"` // string OR []ContentBlock
Messages []Message `json:"messages"`
MaxTokens int `json:"max_tokens"`
Temperature *float64 `json:"temperature,omitempty"`
StopSequences []string `json:"stop_sequences,omitempty"`
Tools []ToolDef `json:"tools,omitempty"`
ToolChoice *ToolChoice `json:"tool_choice,omitempty"`
Thinking *ThinkingConfig `json:"thinking,omitempty"`
Stream bool `json:"stream"`
Metadata *RequestMetadata `json:"metadata,omitempty"`
}
// Message is one entry in messages[].
type Message struct {
Role string `json:"role"` // "user" | "assistant"
Content []ContentBlock `json:"content"`
}
// ContentBlock covers text / tool_use / tool_result / thinking. We
// keep the discriminated union loose (map[string]any) on the request
// side so callers can pass whatever shape the API accepts; on the
// response side the SSE parser produces typed deltas.
type ContentBlock = map[string]any
// ToolDef is one tool entry the model can call.
type ToolDef struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
InputSchema json.RawMessage `json:"input_schema"`
}
// ToolChoice constrains how the model picks tools.
type ToolChoice struct {
Type string `json:"type"` // "auto" | "any" | "tool" | "none"
Name string `json:"name,omitempty"`
}
// ThinkingConfig enables extended-thinking on supported models.
type ThinkingConfig struct {
Type string `json:"type"` // "enabled" | "disabled"
BudgetTokens int `json:"budget_tokens,omitempty"`
}
// RequestMetadata is optional user-supplied tracing.
type RequestMetadata struct {
UserID string `json:"user_id,omitempty"`
}
// ---- Streaming SSE response ----
// Event is one parsed SSE event from the streaming endpoint.
// Direct mirror of Anthropic's event shapes — caller's responsibility
// to translate into Plexum's canonical.TurnEvent.
type Event struct {
Type string `json:"type"`
// message_start: complete initial message envelope
Message *MessageEnvelope `json:"message,omitempty"`
// content_block_start: block at .Index
Index int `json:"index,omitempty"`
ContentBlock *BlockStart `json:"content_block,omitempty"`
// content_block_delta: text_delta / thinking_delta / input_json_delta / signature_delta
Delta *BlockDelta `json:"delta,omitempty"`
// message_delta: usage update + stop_reason
Usage *Usage `json:"usage,omitempty"`
// error event payload
Error *ErrorBody `json:"error,omitempty"`
}
// MessageEnvelope is the top-level message info in message_start.
type MessageEnvelope struct {
ID string `json:"id"`
Type string `json:"type"`
Role string `json:"role"`
Model string `json:"model"`
StopReason string `json:"stop_reason"`
Usage *Usage `json:"usage,omitempty"`
}
// BlockStart describes a block that just started. Type is "text",
// "thinking", or "tool_use". For tool_use the ID + Name are set.
type BlockStart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
Thinking string `json:"thinking,omitempty"`
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
}
// BlockDelta carries incremental changes. Exactly one of the *_delta
// fields is meaningful per delta.
type BlockDelta struct {
Type string `json:"type"` // "text_delta" | "thinking_delta" | "input_json_delta" | "signature_delta"
Text string `json:"text,omitempty"`
Thinking string `json:"thinking,omitempty"`
PartialJSON string `json:"partial_json,omitempty"`
Signature string `json:"signature,omitempty"`
// On message_delta event, the same Delta carries stop_reason.
StopReason string `json:"stop_reason,omitempty"`
StopSequence string `json:"stop_sequence,omitempty"`
}
// Usage is the running input/output token counts.
type Usage struct {
InputTokens int `json:"input_tokens,omitempty"`
OutputTokens int `json:"output_tokens,omitempty"`
CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"`
CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"`
}
// ErrorBody is the Anthropic error envelope.
type ErrorBody struct {
Type string `json:"type"`
Message string `json:"message"`
}
// StreamMessages opens a streaming POST /v1/messages call and sends
// parsed events down the returned channel. Channel closes when SSE
// stream terminates (message_stop, server EOF, ctx cancel, or error).
// On HTTP-level error (non-2xx, network failure), an EventType="error"
// event lands on the channel with the wrapped error before close.
func (c *Client) StreamMessages(ctx context.Context, req MessagesRequest) (<-chan Event, error) {
req.Stream = true
body, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("anthropic: marshal: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
c.BaseURL+"/v1/messages", bytes.NewReader(body))
if err != nil {
return nil, err
}
httpReq.Header.Set("content-type", "application/json")
httpReq.Header.Set("authorization", "Bearer "+c.APIKey)
httpReq.Header.Set("anthropic-version", c.APIVersion)
httpReq.Header.Set("accept", "text/event-stream")
if c.UserAgent != "" {
httpReq.Header.Set("User-Agent", c.UserAgent)
}
for k, v := range c.ExtraHeaders {
httpReq.Header.Set(k, v)
}
resp, err := c.HTTP.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("anthropic: %w", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
raw, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("anthropic: %s -> %d: %s", c.BaseURL, resp.StatusCode, string(raw))
}
ch := make(chan Event, 32)
go func() {
defer close(ch)
defer resp.Body.Close()
if err := parseSSE(ctx, resp.Body, ch); err != nil && !errors.Is(err, io.EOF) {
select {
case ch <- Event{Type: "error", Error: &ErrorBody{Type: "stream_error", Message: err.Error()}}:
default:
}
}
}()
return ch, nil
}
// parseSSE reads SSE frames (`event:` + `data:` lines, blank-line
// separator) and dispatches Event values onto ch. Returns nil on
// normal EOF.
func parseSSE(ctx context.Context, r io.Reader, ch chan<- Event) error {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 64*1024), 1<<20) // 1 MiB line cap
var dataBuf bytes.Buffer
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
line := scanner.Bytes()
if len(line) == 0 {
// End-of-frame: dispatch buffered data, reset.
if dataBuf.Len() == 0 {
continue
}
var ev Event
if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil {
select {
case ch <- ev:
case <-ctx.Done():
return ctx.Err()
}
}
dataBuf.Reset()
continue
}
if bytes.HasPrefix(line, []byte("data:")) {
payload := bytes.TrimSpace(line[5:])
if dataBuf.Len() > 0 {
dataBuf.WriteByte('\n')
}
dataBuf.Write(payload)
}
// `event:` lines describe the event name. JSON `data:` payload
// also carries `type` field — we use that. Ignore event: lines.
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("sse scan: %w", err)
}
// Flush any trailing data without final blank line.
if dataBuf.Len() > 0 {
var ev Event
if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil {
select {
case ch <- ev:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
}

143
anthropic/client_test.go Normal file
View File

@@ -0,0 +1,143 @@
package anthropic
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func TestParseSSESingleEvent(t *testing.T) {
body := "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"m1\",\"model\":\"X\"}}\n\n"
ch := make(chan Event, 4)
if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil {
t.Fatal(err)
}
close(ch)
events := drain(ch)
if len(events) != 1 || events[0].Type != "message_start" || events[0].Message.ID != "m1" {
t.Errorf("events = %+v", events)
}
}
func TestParseSSEMultiLineData(t *testing.T) {
// "data:" can be split across multiple lines per spec; joined with '\n'.
body := "data: {\"type\":\n" + "data: \"text_delta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n"
ch := make(chan Event, 4)
if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil {
t.Fatal(err)
}
close(ch)
events := drain(ch)
if len(events) != 1 || events[0].Type != "text_delta" || events[0].Delta.Text != "hi" {
t.Errorf("events = %+v", events)
}
}
func TestParseSSEFlushTrailingWithoutBlank(t *testing.T) {
body := "data: {\"type\":\"message_stop\"}"
ch := make(chan Event, 4)
if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil {
t.Fatal(err)
}
close(ch)
events := drain(ch)
if len(events) != 1 || events[0].Type != "message_stop" {
t.Errorf("trailing flush missed: %+v", events)
}
}
func TestStreamMessagesSendsRightHeaders(t *testing.T) {
var gotAuth, gotVer string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotAuth = r.Header.Get("authorization")
gotVer = r.Header.Get("anthropic-version")
w.Header().Set("content-type", "text/event-stream")
w.Write([]byte("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n"))
}))
defer srv.Close()
c := New(srv.URL, "fake-key")
ch, err := c.StreamMessages(context.Background(), MessagesRequest{
Model: "X", MaxTokens: 10,
Messages: []Message{{Role: "user", Content: []ContentBlock{{"type": "text", "text": "hi"}}}},
})
if err != nil {
t.Fatal(err)
}
for range ch {
}
if gotAuth != "Bearer fake-key" {
t.Errorf("auth = %q", gotAuth)
}
if gotVer != DefaultAPIVersion {
t.Errorf("anthropic-version = %q", gotVer)
}
}
func TestStreamMessagesNon2xxErrors(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(`{"error":{"type":"authentication_error","message":"bad key"}}`))
}))
defer srv.Close()
c := New(srv.URL, "bad")
_, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10})
if err == nil || !strings.Contains(err.Error(), "401") {
t.Errorf("err = %v", err)
}
}
func TestStreamMessagesStreamErrorBecomesEvent(t *testing.T) {
// Simulate an SSE stream that errors mid-stream.
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "text/event-stream")
flusher, _ := w.(http.Flusher)
w.Write([]byte("event: message_start\ndata: {\"type\":\"message_start\"}\n\n"))
flusher.Flush()
// Hijack-style mid-stream close: not really an error in HTTP terms,
// scanner just hits EOF cleanly. So no error event expected.
}))
defer srv.Close()
c := New(srv.URL, "k")
ch, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10})
if err != nil {
t.Fatal(err)
}
events := drain(ch)
if len(events) != 1 || events[0].Type != "message_start" {
t.Errorf("events = %+v", events)
}
}
func TestMessagesRequestMarshalShape(t *testing.T) {
req := MessagesRequest{
Model: "MiniMax-M2.7", MaxTokens: 50, Stream: true,
Messages: []Message{{Role: "user", Content: []ContentBlock{
{"type": "text", "text": "hi"},
}}},
}
raw, _ := json.Marshal(req)
// stream:true present
if !bytes.Contains(raw, []byte(`"stream":true`)) {
t.Errorf("missing stream:true in %s", raw)
}
if !bytes.Contains(raw, []byte(`"model":"MiniMax-M2.7"`)) {
t.Errorf("missing model: %s", raw)
}
}
func drain(ch <-chan Event) []Event {
var out []Event
for ev := range ch {
out = append(out, ev)
}
return out
}
// Compile-time check that io.EOF is exported so we don't accidentally
// remove the import.
var _ = io.EOF