Files
hzhang e5ca677113 feat(translate): handle canonical.ImageBlock → Anthropic image content
Picks the most efficient source shape available:

  URL set        → {"type":"url", "url":...}
  DataBase64 set → {"type":"base64", "media_type":..., "data":...}
  Path set       → read file → base64 → {"type":"base64", ...}

MIME defaults to application/octet-stream when ImageBlock.MediaType is
empty (some servers accept that, some don't — operator's responsibility
to fill it in upstream).

File read failure → emits a "[image %q unavailable]" text block in its
place so the message still goes through with provenance preserved.

Verified live against Kimi K2.6 via Fabric channel: alice uploads
blue 32x32 PNG → Fabric plugin downloads + emits Media in inbound →
host turncore builds canonical.Message with ImageBlock → translator
reads file + base64s → Kimi responds "Blue".
2026-05-31 21:03:04 +01:00

364 lines
11 KiB
Go

// Package translate converts between Plexum's canonical.TurnRequest /
// canonical.TurnEvent shapes and the Anthropic Messages API shapes the
// internal/anthropic client emits.
//
// Round-trip-able for text + thinking blocks; tool_use / tool_result
// passes through structurally. Block-level signatures (the opaque
// thinking signature Anthropic issues) are preserved when present.
package translate
import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"git.hangman-lab.top/hzhang/Plexum-sdk-go/canonical"
"git.hangman-lab.top/hzhang/Plexum-anthropic-compat-client/anthropic"
)
// CanonicalToAnthropic converts a Plexum TurnRequest into an Anthropic
// MessagesRequest. modelID overrides the canonical req.Model so callers
// can map (e.g. "minimax/MiniMax-M2.7" → "MiniMax-M2.7") for the wire.
func CanonicalToAnthropic(req canonical.TurnRequest, modelID string, defaultMaxTokens int) (anthropic.MessagesRequest, error) {
out := anthropic.MessagesRequest{
Model: modelID,
MaxTokens: req.MaxTokens,
}
if out.MaxTokens <= 0 {
out.MaxTokens = defaultMaxTokens
}
if req.Temperature != 0 {
t := req.Temperature
out.Temperature = &t
}
if len(req.StopSequences) > 0 {
out.StopSequences = append([]string{}, req.StopSequences...)
}
// System: Plexum stores System as []Block; Anthropic accepts string
// or []ContentBlock. We always use the []block form when we have any
// non-trivial blocks so cache_control etc. pass through losslessly.
if len(req.System) > 0 {
sysBlocks := make([]anthropic.ContentBlock, 0, len(req.System))
for _, b := range req.System {
sysBlocks = append(sysBlocks, blockToAnthropic(b))
}
out.System = sysBlocks
}
// Messages.
for _, m := range req.Messages {
am := anthropic.Message{Role: roleToAnthropic(m.Role)}
for _, b := range m.Content {
am.Content = append(am.Content, blockToAnthropic(b))
}
out.Messages = append(out.Messages, am)
}
// Tools.
for _, t := range req.Tools {
out.Tools = append(out.Tools, anthropic.ToolDef{
Name: t.Name, Description: t.Description, InputSchema: t.InputSchema,
})
}
if req.ToolChoice != nil {
out.ToolChoice = &anthropic.ToolChoice{Type: req.ToolChoice.Type, Name: req.ToolChoice.Name}
}
if req.Thinking != nil {
mode := "disabled"
if req.Thinking.Enabled {
mode = "enabled"
}
out.Thinking = &anthropic.ThinkingConfig{
Type: mode,
BudgetTokens: req.Thinking.Budget,
}
}
return out, nil
}
// imageSource converts a canonical.ImageBlock into the
// Anthropic-shaped {"type":..., ...} source map. Picks the most
// efficient source mode available; returns nil if all sources are
// empty or the path is unreadable.
//
// URL set → {"type":"url", "url":...}
// DataBase64 set → {"type":"base64", "media_type":..., "data":...}
// Path set → read file → base64 → {"type":"base64", ...}
func imageSource(v *canonical.ImageBlock) map[string]any {
media := v.MediaType
if media == "" {
media = "application/octet-stream"
}
if v.URL != "" {
return map[string]any{"type": "url", "url": v.URL}
}
if v.DataBase64 != "" {
return map[string]any{
"type": "base64", "media_type": media, "data": v.DataBase64,
}
}
if v.Path == "" {
return nil
}
raw, err := os.ReadFile(v.Path)
if err != nil {
return nil
}
return map[string]any{
"type": "base64", "media_type": media,
"data": base64.StdEncoding.EncodeToString(raw),
}
}
func roleToAnthropic(r canonical.Role) string {
switch r {
case canonical.RoleUser:
return "user"
case canonical.RoleAssistant:
return "assistant"
default:
return string(r)
}
}
// blockToAnthropic converts ONE canonical.Block into the loose
// map-shaped ContentBlock Anthropic expects. Per-type handling:
//
// - TextBlock → {"type":"text", "text":..., "cache_control"?}
// - ToolUseBlock → {"type":"tool_use", "id":..., "name":..., "input":...}
// - ToolResultBlock → {"type":"tool_result", "tool_use_id":..., "content":[...], "is_error"?}
// - ThinkingBlock → {"type":"thinking", "thinking":..., "signature":...}
//
// Unknown block types serialize to their JSON form via canonical's
// own marshaller (fallback path).
func blockToAnthropic(b canonical.Block) anthropic.ContentBlock {
switch v := b.(type) {
case *canonical.TextBlock:
out := anthropic.ContentBlock{"type": "text", "text": v.Text}
if v.CacheControl != nil {
out["cache_control"] = v.CacheControl
}
return out
case *canonical.ToolUseBlock:
out := anthropic.ContentBlock{
"type": "tool_use", "id": v.ID, "name": v.Name,
}
if len(v.Input) > 0 {
out["input"] = json.RawMessage(v.Input)
} else {
out["input"] = map[string]any{}
}
return out
case *canonical.ToolResultBlock:
inner := make([]anthropic.ContentBlock, 0, len(v.Content))
for _, ib := range v.Content {
inner = append(inner, blockToAnthropic(ib))
}
out := anthropic.ContentBlock{
"type": "tool_result", "tool_use_id": v.ToolUseID, "content": inner,
}
if v.IsError {
out["is_error"] = true
}
return out
case *canonical.ThinkingBlock:
return anthropic.ContentBlock{
"type": "thinking", "thinking": v.Thinking, "signature": v.Signature,
}
case *canonical.ImageBlock:
// Three source shapes possible; emit whatever the canonical
// block carries. Anthropic accepts base64 + url + (vendor-
// specific) file shape. Path → read + base64 (default).
src := imageSource(v)
if src == nil {
return anthropic.ContentBlock{"type": "text",
"text": fmt.Sprintf("[image %q unavailable]", v.Path)}
}
return anthropic.ContentBlock{"type": "image", "source": src}
default:
// Best-effort generic: marshal then unmarshal into a map.
raw, err := json.Marshal(b)
if err != nil {
return anthropic.ContentBlock{"type": "text", "text": fmt.Sprintf("[unsupported block: %T]", b)}
}
var m map[string]any
if err := json.Unmarshal(raw, &m); err != nil {
return anthropic.ContentBlock{"type": "text", "text": string(raw)}
}
return m
}
}
// Translator turns a stream of anthropic.Event into a stream of
// canonical.TurnEvent. Tracks per-content-block state (current type +
// id + name) because Anthropic's deltas carry only the block index.
type Translator struct {
// Per-block tracking — index → metadata.
blocks map[int]*blockState
finalReason canonical.StopReason
finalUsage canonical.Usage
}
type blockState struct {
Kind string // "text" | "thinking" | "tool_use"
ID string // tool_use only
Name string // tool_use only
}
// NewTranslator constructs a fresh per-turn Translator.
func NewTranslator() *Translator {
return &Translator{blocks: map[int]*blockState{}}
}
// Translate consumes one anthropic.Event and returns 0..N
// canonical.TurnEvents. The translator owns no internal channel —
// caller drives the loop.
func (t *Translator) Translate(ev anthropic.Event) []canonical.TurnEvent {
switch ev.Type {
case "message_start":
out := []canonical.TurnEvent{{Type: canonical.EventMessageStart}}
if ev.Message != nil && ev.Message.Usage != nil {
t.finalUsage = toCanonicalUsage(ev.Message.Usage)
}
return out
case "ping":
return nil // keepalive
case "content_block_start":
if ev.ContentBlock == nil {
return nil
}
st := &blockState{Kind: ev.ContentBlock.Type, ID: ev.ContentBlock.ID, Name: ev.ContentBlock.Name}
t.blocks[ev.Index] = st
switch st.Kind {
case "tool_use":
return []canonical.TurnEvent{{
Type: canonical.EventToolCallStart, ToolCallID: st.ID, ToolName: st.Name,
}}
case "text":
// Some servers emit content_block_start with non-empty
// text seed; surface that.
if ev.ContentBlock.Text != "" {
return []canonical.TurnEvent{{
Type: canonical.EventTextDelta, Text: ev.ContentBlock.Text,
}}
}
case "thinking":
if ev.ContentBlock.Thinking != "" {
return []canonical.TurnEvent{{
Type: canonical.EventThinkingDelta, Thinking: ev.ContentBlock.Thinking,
}}
}
}
return nil
case "content_block_delta":
if ev.Delta == nil {
return nil
}
st := t.blocks[ev.Index]
switch ev.Delta.Type {
case "text_delta":
return []canonical.TurnEvent{{Type: canonical.EventTextDelta, Text: ev.Delta.Text}}
case "thinking_delta":
return []canonical.TurnEvent{{Type: canonical.EventThinkingDelta, Thinking: ev.Delta.Thinking}}
case "input_json_delta":
if st == nil {
return nil
}
return []canonical.TurnEvent{{
Type: canonical.EventToolCallDelta,
ToolCallID: st.ID, ToolName: st.Name,
PartialJSON: ev.Delta.PartialJSON,
}}
case "signature_delta":
// Signature lands on EventThinkingEnd at block_stop time;
// stash on the block state.
if st != nil {
st.Name = ev.Delta.Signature // reuse Name as scratch
}
return nil
}
return nil
case "content_block_stop":
st := t.blocks[ev.Index]
if st == nil {
return nil
}
delete(t.blocks, ev.Index)
switch st.Kind {
case "tool_use":
return []canonical.TurnEvent{{
Type: canonical.EventToolCallEnd, ToolCallID: st.ID, ToolName: st.Name,
}}
case "thinking":
// st.Name was repurposed to carry the signature in
// signature_delta above. Empty when no signature came.
return []canonical.TurnEvent{{
Type: canonical.EventThinkingEnd, Signature: st.Name,
}}
}
return nil
case "message_delta":
// Carries stop_reason + cumulative usage.
if ev.Delta != nil && ev.Delta.StopReason != "" {
t.finalReason = canonicalStopReason(ev.Delta.StopReason)
}
if ev.Usage != nil {
// message_delta usage is INCREMENTAL on Anthropic; the v4 docs
// describe it as cumulative across the stream. We just take
// the last reported values verbatim.
t.finalUsage = toCanonicalUsage(ev.Usage)
}
return nil
case "message_stop":
usage := t.finalUsage
return []canonical.TurnEvent{{
Type: canonical.EventMessageEnd,
StopReason: t.finalReason,
Usage: &usage,
}}
case "error":
msg := "unknown"
etype := "stream_error"
if ev.Error != nil {
msg = ev.Error.Message
etype = ev.Error.Type
}
return []canonical.TurnEvent{{
Type: canonical.EventError,
Error: &canonical.ErrorInfo{Code: etype, Message: msg},
}}
}
return nil
}
func canonicalStopReason(s string) canonical.StopReason {
switch s {
case "end_turn":
return canonical.StopEndTurn
case "max_tokens":
return canonical.StopMaxTokens
case "tool_use":
return canonical.StopToolUse
case "stop_sequence":
return canonical.StopStopSequence
default:
return canonical.StopReason(s)
}
}
func toCanonicalUsage(u *anthropic.Usage) canonical.Usage {
return canonical.Usage{
InputTokens: u.InputTokens,
OutputTokens: u.OutputTokens,
CacheReadTokens: u.CacheReadInputTokens,
CacheWriteTokens: u.CacheCreationInputTokens,
}
}