Channel plugins now have a structured way to propagate attachments — Plexum's host turns each MediaItem into a canonical.ImageBlock content on the user message. This replaces the F-6 workaround that crammed local file paths into a markdown footer at the end of the message text. internal/inbound/inbound.go: - Notifier signature gains `media []MediaItem`; MediaItem mirrors the host's channel.MediaRef (Path/MediaType/Name) - dispatch downloads attachments (unchanged) then forwards results as MediaItem to Notify — no more footer appending - dispatch now also receives guildEndpoint so it can resolve Fabric's RELATIVE attachment URLs (`/api/files/<id>`) against the guild base. Previously the downloader received the relative path verbatim and failed every fetch silently. cmd/plexum-fabric-channel-plugin/main.go: - notifier closure pushes media[] in the EmitNotification payload Live verified: alice uploads blue 32x32 PNG → Fabric guild → plugin downloads to /tmp/plexum-fabric/<msg>/blue32.png → emits inbound with media → host routes to kimi agent → Kimi: "Blue". (MiniMax M2.7 is text-only per openclaw model definitions, so the same flow against MiniMax returns "I don't see an image" — that's a model capability limit, not a plugin issue.) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
513 lines
16 KiB
Go
513 lines
16 KiB
Go
// Package inbound is the Fabric → Plexum message pump. For each
|
|
// (agent, guild) it maintains a long-lived socket.io connection to the
|
|
// guild's /realtime namespace, joins every channel the agent has
|
|
// membership in (resyncing periodically + on push events), and dispatches
|
|
// matching message.created events through a per-channel serial queue.
|
|
//
|
|
// Wakeup gate matches openclaw's plugin: dispatch iff
|
|
// - m.xType == "dm" (any non-self message), OR
|
|
// - m.wakeup == true.
|
|
//
|
|
// Other inbounds (non-dm with wakeup != true) are dropped silently
|
|
// (Plexum has no "record-only history injection" equivalent in v1;
|
|
// future phase can add it).
|
|
package inbound
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments"
|
|
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
|
|
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
|
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio"
|
|
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens"
|
|
)
|
|
|
|
// ChannelSyncInterval is how often the supervisor diffs joined channels
|
|
// against /api/channels to catch newly-membership-bestowed channels
|
|
// that arrived without a push event. Matches openclaw's 60s.
|
|
const ChannelSyncInterval = 60 * time.Second
|
|
|
|
// ReconnectBackoffInitial / Max / Factor drive exponential backoff
|
|
// between reconnect attempts. Starts at 1s, doubles up to 60s; resets
|
|
// on successful connect (signalled by connectOnce returning after at
|
|
// least one event was received — proxied via wall-clock duration).
|
|
const (
|
|
ReconnectBackoffInitial = 1 * time.Second
|
|
ReconnectBackoffMax = 60 * time.Second
|
|
ReconnectBackoffFactor = 2.0
|
|
// ReconnectResetAfter: if the previous connection survived at least
|
|
// this long, reset the backoff to Initial (we made meaningful
|
|
// progress; transient drop, not a flapping-failure loop).
|
|
ReconnectResetAfter = 30 * time.Second
|
|
)
|
|
|
|
// Notifier pushes one inbound message to the Plexum host. The plugin
|
|
// main wires this to HostAPI.EmitNotification. media items are
|
|
// passed through as `media` field in the channel-inbound notification
|
|
// (Plexum's channel manager turns them into canonical.ImageBlock
|
|
// content on the user message).
|
|
type Notifier func(channelName, message, sessionID string, media []MediaItem)
|
|
|
|
// MediaItem is one attachment file already downloaded to a local path.
|
|
// Mirrors the host's channel.MediaRef wire shape (path/media_type/name).
|
|
type MediaItem struct {
|
|
Path string
|
|
MediaType string
|
|
Name string
|
|
}
|
|
|
|
// Supervisor owns the per-(agent, guild) socket.io connections. Run
|
|
// blocks until ctx is cancelled.
|
|
type Supervisor struct {
|
|
Client *fabric.Client
|
|
Tokens *tokens.Cache
|
|
Bindings []config.FabricBinding
|
|
ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding
|
|
AgentIDs []string // unique agents to bring up
|
|
Notify Notifier
|
|
Logger *slog.Logger
|
|
// Attachments, if non-nil, downloads message attachments to a temp
|
|
// dir and appends paths to the message body before notify. nil →
|
|
// attachments are dropped silently (warning logged).
|
|
Attachments *attachments.Downloader
|
|
|
|
// Deduplicate (agentID, messageId) → already dispatched. Bounded
|
|
// to ~5000 entries (matches openclaw).
|
|
dedupMu sync.Mutex
|
|
seen map[string]struct{}
|
|
|
|
// Per-channel serial chain: each channel id maps to a chan of
|
|
// queued task funcs. Channel goroutine drains them in order.
|
|
chainMu sync.Mutex
|
|
chains map[string]chan func()
|
|
}
|
|
|
|
// New constructs a Supervisor with deduped agentIDs derived from bindings.
|
|
func New(client *fabric.Client, tokenCache *tokens.Cache, bindings []config.FabricBinding,
|
|
notify Notifier, logger *slog.Logger) *Supervisor {
|
|
agentSet := map[string]struct{}{}
|
|
for _, b := range bindings {
|
|
agentSet[b.AgentID] = struct{}{}
|
|
}
|
|
agents := make([]string, 0, len(agentSet))
|
|
for a := range agentSet {
|
|
agents = append(agents, a)
|
|
}
|
|
return &Supervisor{
|
|
Client: client, Tokens: tokenCache,
|
|
Bindings: bindings, ByFabric: config.Index(bindings),
|
|
AgentIDs: agents, Notify: notify, Logger: logger,
|
|
seen: map[string]struct{}{},
|
|
chains: map[string]chan func(){},
|
|
}
|
|
}
|
|
|
|
// Run blocks: spawns one goroutine per (agent, guild) socket.io
|
|
// supervisor. Each supervisor reconnects on drop until ctx cancels.
|
|
// Returns when ctx is cancelled and all supervisors have exited.
|
|
func (s *Supervisor) Run(ctx context.Context) error {
|
|
var wg sync.WaitGroup
|
|
for _, agentID := range s.AgentIDs {
|
|
sess, err := s.Tokens.Get(ctx, agentID)
|
|
if err != nil {
|
|
s.Logger.Warn("inbound: skip agent (no session)", "agent", agentID, "err", err)
|
|
continue
|
|
}
|
|
for _, g := range sess.Guilds {
|
|
if g.Endpoint == "" {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(agentID string, guild fabric.GuildInfo) {
|
|
defer wg.Done()
|
|
s.runAgentGuild(ctx, agentID, guild)
|
|
}(agentID, g)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// runAgentGuild keeps one socket.io connection alive for (agent, guild)
|
|
// until ctx cancels. Reconnects with fresh auth on every drop using
|
|
// exponential backoff (resets if the previous session survived long
|
|
// enough to look healthy).
|
|
func (s *Supervisor) runAgentGuild(ctx context.Context, agentID string, guild fabric.GuildInfo) {
|
|
logger := s.Logger.With("agent", agentID, "guild", guild.NodeID)
|
|
backoff := ReconnectBackoffInitial
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
return
|
|
}
|
|
startedAt := time.Now()
|
|
err := s.connectOnce(ctx, agentID, guild, logger)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
// Reset backoff if the prior session was meaningfully long
|
|
// (proxy for "we connected + did work" — not a flap loop).
|
|
if time.Since(startedAt) > ReconnectResetAfter {
|
|
backoff = ReconnectBackoffInitial
|
|
}
|
|
errStr := "(nil)"
|
|
if err != nil {
|
|
errStr = err.Error()
|
|
}
|
|
logger.Warn("inbound: socket ended; reconnecting",
|
|
"err", errStr, "backoff", backoff.String())
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
backoff = time.Duration(float64(backoff) * ReconnectBackoffFactor)
|
|
if backoff > ReconnectBackoffMax {
|
|
backoff = ReconnectBackoffMax
|
|
}
|
|
}
|
|
}
|
|
|
|
// connectOnce opens one socket connection, runs the read loop, and
|
|
// returns the terminating error (nil on graceful close).
|
|
func (s *Supervisor) connectOnce(ctx context.Context, agentID string, guild fabric.GuildInfo, logger *slog.Logger) error {
|
|
authFn := func(authCtx context.Context) (map[string]any, error) {
|
|
// Always fetch a fresh token before the CONNECT — fixes the
|
|
// stale-JWT-on-reconnect bug openclaw documented.
|
|
s.Tokens.Invalidate(agentID)
|
|
tok, err := s.Tokens.GuildToken(authCtx, agentID, guild.NodeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return map[string]any{"token": tok}, nil
|
|
}
|
|
cli, err := socketio.New(guild.Endpoint, "/socket.io/", "/realtime", authFn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Tracked subscriptions for this connection. Cleared on each
|
|
// reconnect (server forgets join_channel state).
|
|
joined := map[string]struct{}{}
|
|
var joinedMu sync.Mutex
|
|
|
|
// Initial sync + periodic resync runs in a goroutine; it reads
|
|
// channels from REST and emits join_channel for new + leave_channel
|
|
// for gone.
|
|
syncCtx, cancelSync := context.WithCancel(ctx)
|
|
defer cancelSync()
|
|
syncOnce := func(kind string) {
|
|
tok, err := s.Tokens.GuildToken(syncCtx, agentID, guild.NodeID)
|
|
if err != nil {
|
|
logger.Warn("inbound: sync token", "err", err)
|
|
return
|
|
}
|
|
channels, err := s.Client.ListChannels(syncCtx, guild.Endpoint, tok, guild.NodeID)
|
|
if err != nil {
|
|
logger.Warn("inbound: list channels", "err", err)
|
|
return
|
|
}
|
|
current := map[string]struct{}{}
|
|
for _, c := range channels {
|
|
current[c.ID] = struct{}{}
|
|
}
|
|
var added, removed int
|
|
joinedMu.Lock()
|
|
for id := range current {
|
|
if _, ok := joined[id]; !ok {
|
|
_ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": id})
|
|
joined[id] = struct{}{}
|
|
added++
|
|
}
|
|
}
|
|
for id := range joined {
|
|
if _, ok := current[id]; !ok {
|
|
_ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": id})
|
|
delete(joined, id)
|
|
removed++
|
|
}
|
|
}
|
|
size := len(joined)
|
|
joinedMu.Unlock()
|
|
if kind == "initial" {
|
|
logger.Info("inbound: channels joined", "n", size)
|
|
} else if added > 0 || removed > 0 {
|
|
logger.Info("inbound: channels resync", "added", added, "removed", removed, "now", size)
|
|
}
|
|
}
|
|
|
|
// Event handlers. Registered BEFORE Connect so we don't miss the
|
|
// first events the server pushes right after handshake.
|
|
cli.On("connect", func([]json.RawMessage) { /* no-op; server CONNECT ack */ })
|
|
|
|
cli.On("channel.joined", func(args []json.RawMessage) {
|
|
var evt struct {
|
|
ChannelID string `json:"channelId"`
|
|
}
|
|
if len(args) == 0 {
|
|
return
|
|
}
|
|
_ = json.Unmarshal(args[0], &evt)
|
|
if evt.ChannelID == "" {
|
|
return
|
|
}
|
|
joinedMu.Lock()
|
|
if _, already := joined[evt.ChannelID]; already {
|
|
joinedMu.Unlock()
|
|
return
|
|
}
|
|
joined[evt.ChannelID] = struct{}{}
|
|
joinedMu.Unlock()
|
|
_ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": evt.ChannelID})
|
|
logger.Info("inbound: channel.joined push", "channel", evt.ChannelID)
|
|
})
|
|
|
|
cli.On("channel.left", func(args []json.RawMessage) {
|
|
var evt struct {
|
|
ChannelID string `json:"channelId"`
|
|
}
|
|
if len(args) == 0 {
|
|
return
|
|
}
|
|
_ = json.Unmarshal(args[0], &evt)
|
|
if evt.ChannelID == "" {
|
|
return
|
|
}
|
|
joinedMu.Lock()
|
|
if _, present := joined[evt.ChannelID]; !present {
|
|
joinedMu.Unlock()
|
|
return
|
|
}
|
|
delete(joined, evt.ChannelID)
|
|
joinedMu.Unlock()
|
|
_ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": evt.ChannelID})
|
|
logger.Info("inbound: channel.left push", "channel", evt.ChannelID)
|
|
})
|
|
|
|
// Capture self user id from current cached session for the
|
|
// self-author filter.
|
|
sess := s.Tokens.Peek(agentID)
|
|
var selfUserID string
|
|
if sess != nil {
|
|
selfUserID = sess.User.ID
|
|
}
|
|
|
|
cli.On("message.created", func(args []json.RawMessage) {
|
|
var m FabricMessage
|
|
if len(args) == 0 {
|
|
return
|
|
}
|
|
if err := json.Unmarshal(args[0], &m); err != nil {
|
|
logger.Warn("inbound: bad message.created", "err", err)
|
|
return
|
|
}
|
|
s.dispatch(agentID, guild.NodeID, guild.Endpoint, selfUserID, &m, logger)
|
|
})
|
|
|
|
// Periodic resync goroutine.
|
|
syncTimer := time.NewTicker(ChannelSyncInterval)
|
|
defer syncTimer.Stop()
|
|
go func() {
|
|
// Wait for connect to complete; we kick the initial sync from
|
|
// within Connect's startup. Use the ticker for the periodic.
|
|
for {
|
|
select {
|
|
case <-syncCtx.Done():
|
|
return
|
|
case <-syncTimer.C:
|
|
syncOnce("resync")
|
|
}
|
|
}
|
|
}()
|
|
|
|
// On connect, fire the initial sync. The socketio client doesn't
|
|
// fire a Go-side "connect" event before Connect returns; we
|
|
// schedule the initial sync to run shortly after Connect starts.
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
select {
|
|
case <-syncCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
syncOnce("initial")
|
|
}()
|
|
|
|
return cli.Connect(ctx)
|
|
}
|
|
|
|
// dispatch applies self-author filter, dedup, channel-binding lookup,
|
|
// wakeup gate, then notifies Plexum.
|
|
func (s *Supervisor) dispatch(agentID, guildNodeID, guildEndpoint, selfUserID string, m *FabricMessage, logger *slog.Logger) {
|
|
if m.ChannelID == "" {
|
|
return
|
|
}
|
|
// Self-author filter: skip messages this user wrote.
|
|
if m.AuthorUserID != "" && m.AuthorUserID == selfUserID {
|
|
return
|
|
}
|
|
// Dedup per (agent, messageId).
|
|
key := agentID + ":" + m.MessageID
|
|
s.dedupMu.Lock()
|
|
if _, dup := s.seen[key]; dup {
|
|
s.dedupMu.Unlock()
|
|
return
|
|
}
|
|
s.seen[key] = struct{}{}
|
|
if len(s.seen) > 5000 {
|
|
s.seen = map[string]struct{}{} // simple bounded reset
|
|
}
|
|
s.dedupMu.Unlock()
|
|
|
|
// Channel-binding lookup. We only deliver messages for channels
|
|
// the operator has bound to a Plexum agent in channels/*.json.
|
|
binding, ok := s.ByFabric[config.Key(guildNodeID, m.ChannelID)]
|
|
if !ok {
|
|
return
|
|
}
|
|
// The binding must point at THIS agent (a guild may have several
|
|
// agents; an inbound on agent A's socket for a channel bound to
|
|
// agent B is ignored — agent B has their own socket for it).
|
|
if binding.AgentID != agentID {
|
|
return
|
|
}
|
|
|
|
// Wakeup gate (decision #36-equivalent on the openclaw side):
|
|
// dm channel → always deliver
|
|
// other → wakeup must be true
|
|
if m.XType != "dm" && !m.Wakeup {
|
|
logger.Debug("inbound: skip (no wakeup)",
|
|
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID)
|
|
return
|
|
}
|
|
|
|
logger.Info("inbound: dispatch",
|
|
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType,
|
|
"attachments", len(m.Attachments))
|
|
|
|
// Per-channel serial queue. Concurrent inbounds on the same channel
|
|
// must run one at a time so Plexum's state machine sees clean turns.
|
|
sessionID := "s_fab_" + m.ChannelID
|
|
body := m.Content
|
|
plexumChannel := binding.PlexumChannelName
|
|
var media []MediaItem
|
|
// Download attachments to local paths so the host can hand them
|
|
// to provider plugins as canonical.ImageBlock (Plexum host-side
|
|
// media block support).
|
|
if len(m.Attachments) > 0 && s.Attachments != nil {
|
|
tok, terr := s.Tokens.GuildToken(context.Background(), agentID, guildNodeID)
|
|
if terr == nil {
|
|
refs := make([]attachments.AttachmentRef, len(m.Attachments))
|
|
for i, a := range m.Attachments {
|
|
// Fabric returns attachment URLs as RELATIVE paths
|
|
// (`/api/files/<id>`); resolve against the guild endpoint
|
|
// before fetching.
|
|
absURL := a.URL
|
|
if absURL != "" && absURL[0] == '/' {
|
|
absURL = guildEndpoint + absURL
|
|
}
|
|
refs[i] = attachments.AttachmentRef{
|
|
URL: absURL, Name: a.Name, MimeType: a.MimeType,
|
|
}
|
|
}
|
|
files := s.Attachments.FetchAll(context.Background(), tok, m.MessageID, refs)
|
|
for _, f := range files {
|
|
media = append(media, MediaItem{
|
|
Path: f.LocalPath, MediaType: f.MimeType, Name: f.Name,
|
|
})
|
|
}
|
|
if len(files) < len(refs) {
|
|
logger.Warn("inbound: some attachments failed to download",
|
|
"requested", len(refs), "got", len(files))
|
|
}
|
|
} else {
|
|
logger.Warn("inbound: attachment guild token failed", "err", terr)
|
|
}
|
|
}
|
|
s.enqueueChannel(m.ChannelID, func() {
|
|
s.Notify(plexumChannel, body, sessionID, media)
|
|
})
|
|
}
|
|
|
|
// enqueueChannel chains a task onto the per-channel queue. If no chain
|
|
// exists, spawn the drain goroutine. Drain goroutine exits when the
|
|
// channel goes idle for >5s to avoid leaking goroutines per channel.
|
|
func (s *Supervisor) enqueueChannel(channelID string, task func()) {
|
|
s.chainMu.Lock()
|
|
ch, ok := s.chains[channelID]
|
|
if !ok {
|
|
ch = make(chan func(), 32)
|
|
s.chains[channelID] = ch
|
|
go s.drainChannel(channelID, ch)
|
|
}
|
|
s.chainMu.Unlock()
|
|
select {
|
|
case ch <- task:
|
|
default:
|
|
// Backlog full (32+) — log + drop. Shouldn't happen unless
|
|
// the bound agent is hopelessly behind.
|
|
s.Logger.Warn("inbound: per-channel queue full; dropping",
|
|
"channel", channelID)
|
|
}
|
|
}
|
|
|
|
func (s *Supervisor) drainChannel(channelID string, ch chan func()) {
|
|
idle := time.NewTimer(5 * time.Second)
|
|
defer idle.Stop()
|
|
for {
|
|
select {
|
|
case task, open := <-ch:
|
|
if !open {
|
|
return
|
|
}
|
|
if !idle.Stop() {
|
|
<-idle.C
|
|
}
|
|
task()
|
|
idle.Reset(5 * time.Second)
|
|
case <-idle.C:
|
|
s.chainMu.Lock()
|
|
delete(s.chains, channelID)
|
|
s.chainMu.Unlock()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// FabricMessage is the wire shape of Fabric's message.created event.
|
|
// Mirror of FabricMessage type in openclaw's inbound.ts.
|
|
type FabricMessage struct {
|
|
MessageID string `json:"messageId"`
|
|
Seq int `json:"seq"`
|
|
Content string `json:"content"`
|
|
AuthorUserID string `json:"authorUserId,omitempty"`
|
|
CreatedAt string `json:"createdAt,omitempty"`
|
|
ChannelID string `json:"channelId,omitempty"`
|
|
Attachments []FabricAttachment `json:"attachments,omitempty"`
|
|
Wakeup bool `json:"wakeup,omitempty"`
|
|
// XType matches the Fabric channel's x-type field. 'dm' bypasses
|
|
// the wakeup gate.
|
|
XType string `json:"xType,omitempty"`
|
|
}
|
|
|
|
// FabricAttachment is the wire shape of one attachment ref. v1
|
|
// (Phase F-2) doesn't download these; F-8 will.
|
|
type FabricAttachment struct {
|
|
URL string `json:"url"`
|
|
Name string `json:"name,omitempty"`
|
|
MimeType string `json:"mimeType,omitempty"`
|
|
}
|
|
|
|
// Equality check used for tests.
|
|
func (a FabricAttachment) Equal(b FabricAttachment) bool {
|
|
return a.URL == b.URL && a.Name == b.Name && a.MimeType == b.MimeType
|
|
}
|
|
|
|
// Sanity: ensure FabricMessage stays serializable.
|
|
var _ = fmt.Sprint
|