From 94f33a003acb52965639f1bb656a95d53e671d64 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 31 May 2026 20:34:57 +0100 Subject: [PATCH] feat: initial extract of shared anthropic-compat client + translator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extracted from Plexum-minimax-provider and Plexum-kimi-provider (both had near-identical copies under internal/anthropic + internal/translate). Future provider plugins (Qwen, Doubao, …) import these two packages instead of re-implementing the protocol. anthropic/ (~220 LOC + 6 tests): - minimal HTTP+SSE Anthropic Messages client (POST /v1/messages, stream:true, parses event:/data: SSE) - Client.UserAgent + Client.ExtraHeaders fields for per-backend customization (Kimi needs "claude-code/0.1.0"; MiniMax is flexible) - non-2xx surfaces as Go error; mid-stream errors as final Event{Type:"error"} translate/ (~220 LOC): - CanonicalToAnthropic: TurnRequest → MessagesRequest - blockToAnthropic: TextBlock / ToolUseBlock / ToolResultBlock / ThinkingBlock → loose ContentBlock map; preserves signatures - Translator: per-turn state machine; anthropic.Event stream → canonical.TurnEvent stream (text + thinking + tool_use deltas; signature_delta capture; message_delta stop_reason + usage) Both MiniMax and Kimi now import from here; their internal/* dirs are gone. Live verified after refactor — both still answer "ready" via plexum say. --- README.md | 72 +++++++++ anthropic/client.go | 278 ++++++++++++++++++++++++++++++++++ anthropic/client_test.go | 143 ++++++++++++++++++ go.mod | 7 + translate/translate.go | 317 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 817 insertions(+) create mode 100644 README.md create mode 100644 anthropic/client.go create mode 100644 anthropic/client_test.go create mode 100644 go.mod create mode 100644 translate/translate.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..1a5f878 --- /dev/null +++ b/README.md @@ -0,0 +1,72 @@ +# Plexum-anthropic-compat-client + +Shared HTTP+SSE Anthropic Messages API client + canonical ↔ Anthropic +translator. Used by Plexum provider plugins that talk to any +"Anthropic-compatible" endpoint (the one Anthropic itself, MiniMax, +Kimi, Qwen, etc. all expose at `/v1/messages`). + +Each provider plugin imports these two packages and adds: +- its own `cmd/plexum--provider-plugin/main.go` entry +- a list of supported model ids in the manifest +- an endpoint URL + auth key + optional User-Agent override + +## Packages + +### `anthropic/` + +Minimal HTTP+SSE client. Single entry point: + +```go +client := anthropic.New(baseURL, apiKey) +client.UserAgent = "claude-code/0.1.0" // optional +events, err := client.StreamMessages(ctx, anthropic.MessagesRequest{ + Model: "MiniMax-M2.7", MaxTokens: 4096, + Messages: []anthropic.Message{...}, +}) +for ev := range events { /* anthropic.Event */ } +``` + +Features: +- POST `/v1/messages` with `stream: true` +- Parses standard SSE frames (`event:` + `data:` + blank line) +- Surfaces non-2xx as HTTP error (before channel opens) +- Surfaces mid-stream errors as `Event{Type:"error"}` final entry +- `UserAgent` + `ExtraHeaders` fields for per-backend customization +- 5min per-call timeout + +### `translate/` + +Bidirectional canonical-types ↔ Anthropic-types adapter: + +- `CanonicalToAnthropic(req canonical.TurnRequest, modelID, defaultMaxTokens) → anthropic.MessagesRequest` +- `Translator` state machine consumes `<-chan anthropic.Event`, + emits `[]canonical.TurnEvent` per event (handles text / thinking / + tool_use / signature deltas + per-block close) + +## Usage + +In a provider plugin's `go.mod`: + +``` +require git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client v0.0.0 +replace git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client => ../Plexum-anthropic-compat-client +``` + +In code: + +```go +import ( + "git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client/anthropic" + "git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client/translate" +) +``` + +## History + +Extracted from `Plexum-minimax-provider` and `Plexum-kimi-provider` — +both had a near-identical copy. New providers (Qwen, Doubao, …) drop in +without re-implementing the protocol. + +## License + +Same as Plexum. diff --git a/anthropic/client.go b/anthropic/client.go new file mode 100644 index 0000000..8aac7ed --- /dev/null +++ b/anthropic/client.go @@ -0,0 +1,278 @@ +// Package anthropic is a minimal Anthropic Messages API HTTP+SSE +// client. Works against the real Anthropic API and any Anthropic- +// compatible endpoint (MiniMax exposes one at +// https://api.minimax.io/anthropic). +// +// The MiniMax provider plugin's only consumer; if other providers +// land later they can reuse this package. +package anthropic + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// DefaultAPIVersion matches what Anthropic + Anthropic-compat servers expect. +const DefaultAPIVersion = "2023-06-01" + +// Client is a thin HTTP wrapper. Stateless across calls. +type Client struct { + BaseURL string // e.g. "https://api.kimi.com/coding" or "https://api.minimax.io/anthropic" + APIKey string + APIVersion string // default "2023-06-01" + // UserAgent, if set, replaces Go's default UA header. Kimi's + // coding endpoint expects "claude-code/0.1.0"; other backends + // (MiniMax etc.) are flexible. + UserAgent string + // ExtraHeaders are sent verbatim on every request. + ExtraHeaders map[string]string + HTTP *http.Client +} + +// New constructs a Client with sensible defaults. +func New(baseURL, apiKey string) *Client { + return &Client{ + BaseURL: strings.TrimRight(baseURL, "/"), + APIKey: apiKey, + APIVersion: DefaultAPIVersion, + // 5 min ceiling per turn — long enough for a slow reasoning + // response, short enough that wedged calls don't hang forever. + HTTP: &http.Client{Timeout: 5 * time.Minute}, + } +} + +// MessagesRequest is the wire shape POSTed to /v1/messages. +type MessagesRequest struct { + Model string `json:"model"` + System any `json:"system,omitempty"` // string OR []ContentBlock + Messages []Message `json:"messages"` + MaxTokens int `json:"max_tokens"` + Temperature *float64 `json:"temperature,omitempty"` + StopSequences []string `json:"stop_sequences,omitempty"` + Tools []ToolDef `json:"tools,omitempty"` + ToolChoice *ToolChoice `json:"tool_choice,omitempty"` + Thinking *ThinkingConfig `json:"thinking,omitempty"` + Stream bool `json:"stream"` + Metadata *RequestMetadata `json:"metadata,omitempty"` +} + +// Message is one entry in messages[]. +type Message struct { + Role string `json:"role"` // "user" | "assistant" + Content []ContentBlock `json:"content"` +} + +// ContentBlock covers text / tool_use / tool_result / thinking. We +// keep the discriminated union loose (map[string]any) on the request +// side so callers can pass whatever shape the API accepts; on the +// response side the SSE parser produces typed deltas. +type ContentBlock = map[string]any + +// ToolDef is one tool entry the model can call. +type ToolDef struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema json.RawMessage `json:"input_schema"` +} + +// ToolChoice constrains how the model picks tools. +type ToolChoice struct { + Type string `json:"type"` // "auto" | "any" | "tool" | "none" + Name string `json:"name,omitempty"` +} + +// ThinkingConfig enables extended-thinking on supported models. +type ThinkingConfig struct { + Type string `json:"type"` // "enabled" | "disabled" + BudgetTokens int `json:"budget_tokens,omitempty"` +} + +// RequestMetadata is optional user-supplied tracing. +type RequestMetadata struct { + UserID string `json:"user_id,omitempty"` +} + +// ---- Streaming SSE response ---- + +// Event is one parsed SSE event from the streaming endpoint. +// Direct mirror of Anthropic's event shapes — caller's responsibility +// to translate into Plexum's canonical.TurnEvent. +type Event struct { + Type string `json:"type"` + // message_start: complete initial message envelope + Message *MessageEnvelope `json:"message,omitempty"` + // content_block_start: block at .Index + Index int `json:"index,omitempty"` + ContentBlock *BlockStart `json:"content_block,omitempty"` + // content_block_delta: text_delta / thinking_delta / input_json_delta / signature_delta + Delta *BlockDelta `json:"delta,omitempty"` + // message_delta: usage update + stop_reason + Usage *Usage `json:"usage,omitempty"` + // error event payload + Error *ErrorBody `json:"error,omitempty"` +} + +// MessageEnvelope is the top-level message info in message_start. +type MessageEnvelope struct { + ID string `json:"id"` + Type string `json:"type"` + Role string `json:"role"` + Model string `json:"model"` + StopReason string `json:"stop_reason"` + Usage *Usage `json:"usage,omitempty"` +} + +// BlockStart describes a block that just started. Type is "text", +// "thinking", or "tool_use". For tool_use the ID + Name are set. +type BlockStart struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input json.RawMessage `json:"input,omitempty"` +} + +// BlockDelta carries incremental changes. Exactly one of the *_delta +// fields is meaningful per delta. +type BlockDelta struct { + Type string `json:"type"` // "text_delta" | "thinking_delta" | "input_json_delta" | "signature_delta" + Text string `json:"text,omitempty"` + Thinking string `json:"thinking,omitempty"` + PartialJSON string `json:"partial_json,omitempty"` + Signature string `json:"signature,omitempty"` + // On message_delta event, the same Delta carries stop_reason. + StopReason string `json:"stop_reason,omitempty"` + StopSequence string `json:"stop_sequence,omitempty"` +} + +// Usage is the running input/output token counts. +type Usage struct { + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` + CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"` + CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"` +} + +// ErrorBody is the Anthropic error envelope. +type ErrorBody struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// StreamMessages opens a streaming POST /v1/messages call and sends +// parsed events down the returned channel. Channel closes when SSE +// stream terminates (message_stop, server EOF, ctx cancel, or error). +// On HTTP-level error (non-2xx, network failure), an EventType="error" +// event lands on the channel with the wrapped error before close. +func (c *Client) StreamMessages(ctx context.Context, req MessagesRequest) (<-chan Event, error) { + req.Stream = true + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("anthropic: marshal: %w", err) + } + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, + c.BaseURL+"/v1/messages", bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("content-type", "application/json") + httpReq.Header.Set("authorization", "Bearer "+c.APIKey) + httpReq.Header.Set("anthropic-version", c.APIVersion) + httpReq.Header.Set("accept", "text/event-stream") + if c.UserAgent != "" { + httpReq.Header.Set("User-Agent", c.UserAgent) + } + for k, v := range c.ExtraHeaders { + httpReq.Header.Set(k, v) + } + + resp, err := c.HTTP.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("anthropic: %w", err) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + raw, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("anthropic: %s -> %d: %s", c.BaseURL, resp.StatusCode, string(raw)) + } + + ch := make(chan Event, 32) + go func() { + defer close(ch) + defer resp.Body.Close() + if err := parseSSE(ctx, resp.Body, ch); err != nil && !errors.Is(err, io.EOF) { + select { + case ch <- Event{Type: "error", Error: &ErrorBody{Type: "stream_error", Message: err.Error()}}: + default: + } + } + }() + return ch, nil +} + +// parseSSE reads SSE frames (`event:` + `data:` lines, blank-line +// separator) and dispatches Event values onto ch. Returns nil on +// normal EOF. +func parseSSE(ctx context.Context, r io.Reader, ch chan<- Event) error { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 64*1024), 1<<20) // 1 MiB line cap + + var dataBuf bytes.Buffer + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + line := scanner.Bytes() + if len(line) == 0 { + // End-of-frame: dispatch buffered data, reset. + if dataBuf.Len() == 0 { + continue + } + var ev Event + if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { + select { + case ch <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + dataBuf.Reset() + continue + } + if bytes.HasPrefix(line, []byte("data:")) { + payload := bytes.TrimSpace(line[5:]) + if dataBuf.Len() > 0 { + dataBuf.WriteByte('\n') + } + dataBuf.Write(payload) + } + // `event:` lines describe the event name. JSON `data:` payload + // also carries `type` field — we use that. Ignore event: lines. + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("sse scan: %w", err) + } + // Flush any trailing data without final blank line. + if dataBuf.Len() > 0 { + var ev Event + if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { + select { + case ch <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil +} diff --git a/anthropic/client_test.go b/anthropic/client_test.go new file mode 100644 index 0000000..5386802 --- /dev/null +++ b/anthropic/client_test.go @@ -0,0 +1,143 @@ +package anthropic + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestParseSSESingleEvent(t *testing.T) { + body := "event: message_start\ndata: {\"type\":\"message_start\",\"message\":{\"id\":\"m1\",\"model\":\"X\"}}\n\n" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_start" || events[0].Message.ID != "m1" { + t.Errorf("events = %+v", events) + } +} + +func TestParseSSEMultiLineData(t *testing.T) { + // "data:" can be split across multiple lines per spec; joined with '\n'. + body := "data: {\"type\":\n" + "data: \"text_delta\",\"delta\":{\"type\":\"text_delta\",\"text\":\"hi\"}}\n\n" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "text_delta" || events[0].Delta.Text != "hi" { + t.Errorf("events = %+v", events) + } +} + +func TestParseSSEFlushTrailingWithoutBlank(t *testing.T) { + body := "data: {\"type\":\"message_stop\"}" + ch := make(chan Event, 4) + if err := parseSSE(context.Background(), strings.NewReader(body), ch); err != nil { + t.Fatal(err) + } + close(ch) + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_stop" { + t.Errorf("trailing flush missed: %+v", events) + } +} + +func TestStreamMessagesSendsRightHeaders(t *testing.T) { + var gotAuth, gotVer string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("authorization") + gotVer = r.Header.Get("anthropic-version") + w.Header().Set("content-type", "text/event-stream") + w.Write([]byte("event: message_stop\ndata: {\"type\":\"message_stop\"}\n\n")) + })) + defer srv.Close() + c := New(srv.URL, "fake-key") + ch, err := c.StreamMessages(context.Background(), MessagesRequest{ + Model: "X", MaxTokens: 10, + Messages: []Message{{Role: "user", Content: []ContentBlock{{"type": "text", "text": "hi"}}}}, + }) + if err != nil { + t.Fatal(err) + } + for range ch { + } + if gotAuth != "Bearer fake-key" { + t.Errorf("auth = %q", gotAuth) + } + if gotVer != DefaultAPIVersion { + t.Errorf("anthropic-version = %q", gotVer) + } +} + +func TestStreamMessagesNon2xxErrors(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte(`{"error":{"type":"authentication_error","message":"bad key"}}`)) + })) + defer srv.Close() + c := New(srv.URL, "bad") + _, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10}) + if err == nil || !strings.Contains(err.Error(), "401") { + t.Errorf("err = %v", err) + } +} + +func TestStreamMessagesStreamErrorBecomesEvent(t *testing.T) { + // Simulate an SSE stream that errors mid-stream. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/event-stream") + flusher, _ := w.(http.Flusher) + w.Write([]byte("event: message_start\ndata: {\"type\":\"message_start\"}\n\n")) + flusher.Flush() + // Hijack-style mid-stream close: not really an error in HTTP terms, + // scanner just hits EOF cleanly. So no error event expected. + })) + defer srv.Close() + c := New(srv.URL, "k") + ch, err := c.StreamMessages(context.Background(), MessagesRequest{Model: "X", MaxTokens: 10}) + if err != nil { + t.Fatal(err) + } + events := drain(ch) + if len(events) != 1 || events[0].Type != "message_start" { + t.Errorf("events = %+v", events) + } +} + +func TestMessagesRequestMarshalShape(t *testing.T) { + req := MessagesRequest{ + Model: "MiniMax-M2.7", MaxTokens: 50, Stream: true, + Messages: []Message{{Role: "user", Content: []ContentBlock{ + {"type": "text", "text": "hi"}, + }}}, + } + raw, _ := json.Marshal(req) + // stream:true present + if !bytes.Contains(raw, []byte(`"stream":true`)) { + t.Errorf("missing stream:true in %s", raw) + } + if !bytes.Contains(raw, []byte(`"model":"MiniMax-M2.7"`)) { + t.Errorf("missing model: %s", raw) + } +} + +func drain(ch <-chan Event) []Event { + var out []Event + for ev := range ch { + out = append(out, ev) + } + return out +} + +// Compile-time check that io.EOF is exported so we don't accidentally +// remove the import. +var _ = io.EOF diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1a2f295 --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client + +go 1.24.2 + +require git.hangman-lab.top/hzhang/Plexum-sdk-go v0.0.0 + +replace git.hangman-lab.top/hzhang/Plexum-sdk-go => ../Plexum-sdk-go diff --git a/translate/translate.go b/translate/translate.go new file mode 100644 index 0000000..4ce71e9 --- /dev/null +++ b/translate/translate.go @@ -0,0 +1,317 @@ +// Package translate converts between Plexum's canonical.TurnRequest / +// canonical.TurnEvent shapes and the Anthropic Messages API shapes the +// internal/anthropic client emits. +// +// Round-trip-able for text + thinking blocks; tool_use / tool_result +// passes through structurally. Block-level signatures (the opaque +// thinking signature Anthropic issues) are preserved when present. +package translate + +import ( + "encoding/json" + "fmt" + + "git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical" + + "git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client/anthropic" +) + +// CanonicalToAnthropic converts a Plexum TurnRequest into an Anthropic +// MessagesRequest. modelID overrides the canonical req.Model so callers +// can map (e.g. "minimax/MiniMax-M2.7" → "MiniMax-M2.7") for the wire. +func CanonicalToAnthropic(req canonical.TurnRequest, modelID string, defaultMaxTokens int) (anthropic.MessagesRequest, error) { + out := anthropic.MessagesRequest{ + Model: modelID, + MaxTokens: req.MaxTokens, + } + if out.MaxTokens <= 0 { + out.MaxTokens = defaultMaxTokens + } + if req.Temperature != 0 { + t := req.Temperature + out.Temperature = &t + } + if len(req.StopSequences) > 0 { + out.StopSequences = append([]string{}, req.StopSequences...) + } + // System: Plexum stores System as []Block; Anthropic accepts string + // or []ContentBlock. We always use the []block form when we have any + // non-trivial blocks so cache_control etc. pass through losslessly. + if len(req.System) > 0 { + sysBlocks := make([]anthropic.ContentBlock, 0, len(req.System)) + for _, b := range req.System { + sysBlocks = append(sysBlocks, blockToAnthropic(b)) + } + out.System = sysBlocks + } + // Messages. + for _, m := range req.Messages { + am := anthropic.Message{Role: roleToAnthropic(m.Role)} + for _, b := range m.Content { + am.Content = append(am.Content, blockToAnthropic(b)) + } + out.Messages = append(out.Messages, am) + } + // Tools. + for _, t := range req.Tools { + out.Tools = append(out.Tools, anthropic.ToolDef{ + Name: t.Name, Description: t.Description, InputSchema: t.InputSchema, + }) + } + if req.ToolChoice != nil { + out.ToolChoice = &anthropic.ToolChoice{Type: req.ToolChoice.Type, Name: req.ToolChoice.Name} + } + if req.Thinking != nil { + mode := "disabled" + if req.Thinking.Enabled { + mode = "enabled" + } + out.Thinking = &anthropic.ThinkingConfig{ + Type: mode, + BudgetTokens: req.Thinking.Budget, + } + } + return out, nil +} + +func roleToAnthropic(r canonical.Role) string { + switch r { + case canonical.RoleUser: + return "user" + case canonical.RoleAssistant: + return "assistant" + default: + return string(r) + } +} + +// blockToAnthropic converts ONE canonical.Block into the loose +// map-shaped ContentBlock Anthropic expects. Per-type handling: +// +// - TextBlock → {"type":"text", "text":..., "cache_control"?} +// - ToolUseBlock → {"type":"tool_use", "id":..., "name":..., "input":...} +// - ToolResultBlock → {"type":"tool_result", "tool_use_id":..., "content":[...], "is_error"?} +// - ThinkingBlock → {"type":"thinking", "thinking":..., "signature":...} +// +// Unknown block types serialize to their JSON form via canonical's +// own marshaller (fallback path). +func blockToAnthropic(b canonical.Block) anthropic.ContentBlock { + switch v := b.(type) { + case *canonical.TextBlock: + out := anthropic.ContentBlock{"type": "text", "text": v.Text} + if v.CacheControl != nil { + out["cache_control"] = v.CacheControl + } + return out + case *canonical.ToolUseBlock: + out := anthropic.ContentBlock{ + "type": "tool_use", "id": v.ID, "name": v.Name, + } + if len(v.Input) > 0 { + out["input"] = json.RawMessage(v.Input) + } else { + out["input"] = map[string]any{} + } + return out + case *canonical.ToolResultBlock: + inner := make([]anthropic.ContentBlock, 0, len(v.Content)) + for _, ib := range v.Content { + inner = append(inner, blockToAnthropic(ib)) + } + out := anthropic.ContentBlock{ + "type": "tool_result", "tool_use_id": v.ToolUseID, "content": inner, + } + if v.IsError { + out["is_error"] = true + } + return out + case *canonical.ThinkingBlock: + return anthropic.ContentBlock{ + "type": "thinking", "thinking": v.Thinking, "signature": v.Signature, + } + default: + // Best-effort generic: marshal then unmarshal into a map. + raw, err := json.Marshal(b) + if err != nil { + return anthropic.ContentBlock{"type": "text", "text": fmt.Sprintf("[unsupported block: %T]", b)} + } + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + return anthropic.ContentBlock{"type": "text", "text": string(raw)} + } + return m + } +} + +// Translator turns a stream of anthropic.Event into a stream of +// canonical.TurnEvent. Tracks per-content-block state (current type + +// id + name) because Anthropic's deltas carry only the block index. +type Translator struct { + // Per-block tracking — index → metadata. + blocks map[int]*blockState + finalReason canonical.StopReason + finalUsage canonical.Usage +} + +type blockState struct { + Kind string // "text" | "thinking" | "tool_use" + ID string // tool_use only + Name string // tool_use only +} + +// NewTranslator constructs a fresh per-turn Translator. +func NewTranslator() *Translator { + return &Translator{blocks: map[int]*blockState{}} +} + +// Translate consumes one anthropic.Event and returns 0..N +// canonical.TurnEvents. The translator owns no internal channel — +// caller drives the loop. +func (t *Translator) Translate(ev anthropic.Event) []canonical.TurnEvent { + switch ev.Type { + case "message_start": + out := []canonical.TurnEvent{{Type: canonical.EventMessageStart}} + if ev.Message != nil && ev.Message.Usage != nil { + t.finalUsage = toCanonicalUsage(ev.Message.Usage) + } + return out + + case "ping": + return nil // keepalive + + case "content_block_start": + if ev.ContentBlock == nil { + return nil + } + st := &blockState{Kind: ev.ContentBlock.Type, ID: ev.ContentBlock.ID, Name: ev.ContentBlock.Name} + t.blocks[ev.Index] = st + switch st.Kind { + case "tool_use": + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallStart, ToolCallID: st.ID, ToolName: st.Name, + }} + case "text": + // Some servers emit content_block_start with non-empty + // text seed; surface that. + if ev.ContentBlock.Text != "" { + return []canonical.TurnEvent{{ + Type: canonical.EventTextDelta, Text: ev.ContentBlock.Text, + }} + } + case "thinking": + if ev.ContentBlock.Thinking != "" { + return []canonical.TurnEvent{{ + Type: canonical.EventThinkingDelta, Thinking: ev.ContentBlock.Thinking, + }} + } + } + return nil + + case "content_block_delta": + if ev.Delta == nil { + return nil + } + st := t.blocks[ev.Index] + switch ev.Delta.Type { + case "text_delta": + return []canonical.TurnEvent{{Type: canonical.EventTextDelta, Text: ev.Delta.Text}} + case "thinking_delta": + return []canonical.TurnEvent{{Type: canonical.EventThinkingDelta, Thinking: ev.Delta.Thinking}} + case "input_json_delta": + if st == nil { + return nil + } + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallDelta, + ToolCallID: st.ID, ToolName: st.Name, + PartialJSON: ev.Delta.PartialJSON, + }} + case "signature_delta": + // Signature lands on EventThinkingEnd at block_stop time; + // stash on the block state. + if st != nil { + st.Name = ev.Delta.Signature // reuse Name as scratch + } + return nil + } + return nil + + case "content_block_stop": + st := t.blocks[ev.Index] + if st == nil { + return nil + } + delete(t.blocks, ev.Index) + switch st.Kind { + case "tool_use": + return []canonical.TurnEvent{{ + Type: canonical.EventToolCallEnd, ToolCallID: st.ID, ToolName: st.Name, + }} + case "thinking": + // st.Name was repurposed to carry the signature in + // signature_delta above. Empty when no signature came. + return []canonical.TurnEvent{{ + Type: canonical.EventThinkingEnd, Signature: st.Name, + }} + } + return nil + + case "message_delta": + // Carries stop_reason + cumulative usage. + if ev.Delta != nil && ev.Delta.StopReason != "" { + t.finalReason = canonicalStopReason(ev.Delta.StopReason) + } + if ev.Usage != nil { + // message_delta usage is INCREMENTAL on Anthropic; the v4 docs + // describe it as cumulative across the stream. We just take + // the last reported values verbatim. + t.finalUsage = toCanonicalUsage(ev.Usage) + } + return nil + + case "message_stop": + usage := t.finalUsage + return []canonical.TurnEvent{{ + Type: canonical.EventMessageEnd, + StopReason: t.finalReason, + Usage: &usage, + }} + + case "error": + msg := "unknown" + etype := "stream_error" + if ev.Error != nil { + msg = ev.Error.Message + etype = ev.Error.Type + } + return []canonical.TurnEvent{{ + Type: canonical.EventError, + Error: &canonical.ErrorInfo{Code: etype, Message: msg}, + }} + } + return nil +} + +func canonicalStopReason(s string) canonical.StopReason { + switch s { + case "end_turn": + return canonical.StopEndTurn + case "max_tokens": + return canonical.StopMaxTokens + case "tool_use": + return canonical.StopToolUse + case "stop_sequence": + return canonical.StopStopSequence + default: + return canonical.StopReason(s) + } +} + +func toCanonicalUsage(u *anthropic.Usage) canonical.Usage { + return canonical.Usage{ + InputTokens: u.InputTokens, + OutputTokens: u.OutputTokens, + CacheReadTokens: u.CacheReadInputTokens, + CacheWriteTokens: u.CacheCreationInputTokens, + } +}