// Package anthropic is a minimal Anthropic Messages API HTTP+SSE // client. Works against the real Anthropic API and any Anthropic- // compatible endpoint (MiniMax exposes one at // https://api.minimax.io/anthropic). // // The MiniMax provider plugin's only consumer; if other providers // land later they can reuse this package. package anthropic import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" "strings" "time" ) // DefaultAPIVersion matches what Anthropic + Anthropic-compat servers expect. const DefaultAPIVersion = "2023-06-01" // Client is a thin HTTP wrapper. Stateless across calls. type Client struct { BaseURL string // e.g. "https://api.kimi.com/coding" or "https://api.minimax.io/anthropic" APIKey string APIVersion string // default "2023-06-01" // UserAgent, if set, replaces Go's default UA header. Kimi's // coding endpoint expects "claude-code/0.1.0"; other backends // (MiniMax etc.) are flexible. UserAgent string // ExtraHeaders are sent verbatim on every request. ExtraHeaders map[string]string HTTP *http.Client } // New constructs a Client with sensible defaults. func New(baseURL, apiKey string) *Client { return &Client{ BaseURL: strings.TrimRight(baseURL, "/"), APIKey: apiKey, APIVersion: DefaultAPIVersion, // 5 min ceiling per turn — long enough for a slow reasoning // response, short enough that wedged calls don't hang forever. HTTP: &http.Client{Timeout: 5 * time.Minute}, } } // MessagesRequest is the wire shape POSTed to /v1/messages. type MessagesRequest struct { Model string `json:"model"` System any `json:"system,omitempty"` // string OR []ContentBlock Messages []Message `json:"messages"` MaxTokens int `json:"max_tokens"` Temperature *float64 `json:"temperature,omitempty"` StopSequences []string `json:"stop_sequences,omitempty"` Tools []ToolDef `json:"tools,omitempty"` ToolChoice *ToolChoice `json:"tool_choice,omitempty"` Thinking *ThinkingConfig `json:"thinking,omitempty"` Stream bool `json:"stream"` Metadata *RequestMetadata `json:"metadata,omitempty"` } // Message is one entry in messages[]. type Message struct { Role string `json:"role"` // "user" | "assistant" Content []ContentBlock `json:"content"` } // ContentBlock covers text / tool_use / tool_result / thinking. We // keep the discriminated union loose (map[string]any) on the request // side so callers can pass whatever shape the API accepts; on the // response side the SSE parser produces typed deltas. type ContentBlock = map[string]any // ToolDef is one tool entry the model can call. type ToolDef struct { Name string `json:"name"` Description string `json:"description,omitempty"` InputSchema json.RawMessage `json:"input_schema"` } // ToolChoice constrains how the model picks tools. type ToolChoice struct { Type string `json:"type"` // "auto" | "any" | "tool" | "none" Name string `json:"name,omitempty"` } // ThinkingConfig enables extended-thinking on supported models. type ThinkingConfig struct { Type string `json:"type"` // "enabled" | "disabled" BudgetTokens int `json:"budget_tokens,omitempty"` } // RequestMetadata is optional user-supplied tracing. type RequestMetadata struct { UserID string `json:"user_id,omitempty"` } // ---- Streaming SSE response ---- // Event is one parsed SSE event from the streaming endpoint. // Direct mirror of Anthropic's event shapes — caller's responsibility // to translate into Plexum's canonical.TurnEvent. type Event struct { Type string `json:"type"` // message_start: complete initial message envelope Message *MessageEnvelope `json:"message,omitempty"` // content_block_start: block at .Index Index int `json:"index,omitempty"` ContentBlock *BlockStart `json:"content_block,omitempty"` // content_block_delta: text_delta / thinking_delta / input_json_delta / signature_delta Delta *BlockDelta `json:"delta,omitempty"` // message_delta: usage update + stop_reason Usage *Usage `json:"usage,omitempty"` // error event payload Error *ErrorBody `json:"error,omitempty"` } // MessageEnvelope is the top-level message info in message_start. type MessageEnvelope struct { ID string `json:"id"` Type string `json:"type"` Role string `json:"role"` Model string `json:"model"` StopReason string `json:"stop_reason"` Usage *Usage `json:"usage,omitempty"` } // BlockStart describes a block that just started. Type is "text", // "thinking", or "tool_use". For tool_use the ID + Name are set. type BlockStart struct { Type string `json:"type"` Text string `json:"text,omitempty"` Thinking string `json:"thinking,omitempty"` ID string `json:"id,omitempty"` Name string `json:"name,omitempty"` Input json.RawMessage `json:"input,omitempty"` } // BlockDelta carries incremental changes. Exactly one of the *_delta // fields is meaningful per delta. type BlockDelta struct { Type string `json:"type"` // "text_delta" | "thinking_delta" | "input_json_delta" | "signature_delta" Text string `json:"text,omitempty"` Thinking string `json:"thinking,omitempty"` PartialJSON string `json:"partial_json,omitempty"` Signature string `json:"signature,omitempty"` // On message_delta event, the same Delta carries stop_reason. StopReason string `json:"stop_reason,omitempty"` StopSequence string `json:"stop_sequence,omitempty"` } // Usage is the running input/output token counts. type Usage struct { InputTokens int `json:"input_tokens,omitempty"` OutputTokens int `json:"output_tokens,omitempty"` CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"` CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"` } // ErrorBody is the Anthropic error envelope. type ErrorBody struct { Type string `json:"type"` Message string `json:"message"` } // StreamMessages opens a streaming POST /v1/messages call and sends // parsed events down the returned channel. Channel closes when SSE // stream terminates (message_stop, server EOF, ctx cancel, or error). // On HTTP-level error (non-2xx, network failure), an EventType="error" // event lands on the channel with the wrapped error before close. func (c *Client) StreamMessages(ctx context.Context, req MessagesRequest) (<-chan Event, error) { req.Stream = true body, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("anthropic: marshal: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.BaseURL+"/v1/messages", bytes.NewReader(body)) if err != nil { return nil, err } httpReq.Header.Set("content-type", "application/json") httpReq.Header.Set("authorization", "Bearer "+c.APIKey) httpReq.Header.Set("anthropic-version", c.APIVersion) httpReq.Header.Set("accept", "text/event-stream") if c.UserAgent != "" { httpReq.Header.Set("User-Agent", c.UserAgent) } for k, v := range c.ExtraHeaders { httpReq.Header.Set(k, v) } resp, err := c.HTTP.Do(httpReq) if err != nil { return nil, fmt.Errorf("anthropic: %w", err) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { raw, _ := io.ReadAll(resp.Body) resp.Body.Close() return nil, fmt.Errorf("anthropic: %s -> %d: %s", c.BaseURL, resp.StatusCode, string(raw)) } ch := make(chan Event, 32) go func() { defer close(ch) defer resp.Body.Close() if err := parseSSE(ctx, resp.Body, ch); err != nil && !errors.Is(err, io.EOF) { select { case ch <- Event{Type: "error", Error: &ErrorBody{Type: "stream_error", Message: err.Error()}}: default: } } }() return ch, nil } // parseSSE reads SSE frames (`event:` + `data:` lines, blank-line // separator) and dispatches Event values onto ch. Returns nil on // normal EOF. func parseSSE(ctx context.Context, r io.Reader, ch chan<- Event) error { scanner := bufio.NewScanner(r) scanner.Buffer(make([]byte, 64*1024), 1<<20) // 1 MiB line cap var dataBuf bytes.Buffer for scanner.Scan() { select { case <-ctx.Done(): return ctx.Err() default: } line := scanner.Bytes() if len(line) == 0 { // End-of-frame: dispatch buffered data, reset. if dataBuf.Len() == 0 { continue } var ev Event if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { select { case ch <- ev: case <-ctx.Done(): return ctx.Err() } } dataBuf.Reset() continue } if bytes.HasPrefix(line, []byte("data:")) { payload := bytes.TrimSpace(line[5:]) if dataBuf.Len() > 0 { dataBuf.WriteByte('\n') } dataBuf.Write(payload) } // `event:` lines describe the event name. JSON `data:` payload // also carries `type` field — we use that. Ignore event: lines. } if err := scanner.Err(); err != nil { return fmt.Errorf("sse scan: %w", err) } // Flush any trailing data without final blank line. if dataBuf.Len() > 0 { var ev Event if err := json.Unmarshal(dataBuf.Bytes(), &ev); err == nil { select { case ch <- ev: case <-ctx.Done(): return ctx.Err() } } } return nil }