// Package inbound is the Fabric → Plexum message pump. For each // (agent, guild) it maintains a long-lived socket.io connection to the // guild's /realtime namespace, joins every channel the agent has // membership in (resyncing periodically + on push events), and dispatches // matching message.created events through a per-channel serial queue. // // Wakeup gate matches openclaw's plugin: dispatch iff // - m.xType == "dm" (any non-self message), OR // - m.wakeup == true. // // Other inbounds (non-dm with wakeup != true) are dropped silently // (Plexum has no "record-only history injection" equivalent in v1; // future phase can add it). package inbound import ( "context" "encoding/json" "fmt" "log/slog" "sync" "time" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens" ) // ChannelSyncInterval is how often the supervisor diffs joined channels // against /api/channels to catch newly-membership-bestowed channels // that arrived without a push event. Matches openclaw's 60s. const ChannelSyncInterval = 60 * time.Second // ReconnectBackoffInitial / Max / Factor drive exponential backoff // between reconnect attempts. Starts at 1s, doubles up to 60s; resets // on successful connect (signalled by connectOnce returning after at // least one event was received — proxied via wall-clock duration). const ( ReconnectBackoffInitial = 1 * time.Second ReconnectBackoffMax = 60 * time.Second ReconnectBackoffFactor = 2.0 // ReconnectResetAfter: if the previous connection survived at least // this long, reset the backoff to Initial (we made meaningful // progress; transient drop, not a flapping-failure loop). ReconnectResetAfter = 30 * time.Second ) // Notifier pushes one inbound message to the Plexum host. The plugin // main wires this to HostAPI.EmitNotification. media items are // passed through as `media` field in the channel-inbound notification // (Plexum's channel manager turns them into canonical.ImageBlock // content on the user message). type Notifier func(channelName, message, sessionID string, media []MediaItem) // MediaItem is one attachment file already downloaded to a local path. // Mirrors the host's channel.MediaRef wire shape (path/media_type/name). type MediaItem struct { Path string MediaType string Name string } // Supervisor owns the per-(agent, guild) socket.io connections. Run // blocks until ctx is cancelled. type Supervisor struct { Client *fabric.Client Tokens *tokens.Cache Bindings []config.FabricBinding ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding AgentIDs []string // unique agents to bring up Notify Notifier Logger *slog.Logger // Attachments, if non-nil, downloads message attachments to a temp // dir and appends paths to the message body before notify. nil → // attachments are dropped silently (warning logged). Attachments *attachments.Downloader // Deduplicate (agentID, messageId) → already dispatched. Bounded // to ~5000 entries (matches openclaw). dedupMu sync.Mutex seen map[string]struct{} // Per-channel serial chain: each channel id maps to a chan of // queued task funcs. Channel goroutine drains them in order. chainMu sync.Mutex chains map[string]chan func() } // New constructs a Supervisor with deduped agentIDs derived from bindings. func New(client *fabric.Client, tokenCache *tokens.Cache, bindings []config.FabricBinding, notify Notifier, logger *slog.Logger) *Supervisor { agentSet := map[string]struct{}{} for _, b := range bindings { agentSet[b.AgentID] = struct{}{} } agents := make([]string, 0, len(agentSet)) for a := range agentSet { agents = append(agents, a) } return &Supervisor{ Client: client, Tokens: tokenCache, Bindings: bindings, ByFabric: config.Index(bindings), AgentIDs: agents, Notify: notify, Logger: logger, seen: map[string]struct{}{}, chains: map[string]chan func(){}, } } // Run blocks: spawns one goroutine per (agent, guild) socket.io // supervisor. Each supervisor reconnects on drop until ctx cancels. // Returns when ctx is cancelled and all supervisors have exited. func (s *Supervisor) Run(ctx context.Context) error { var wg sync.WaitGroup for _, agentID := range s.AgentIDs { sess, err := s.Tokens.Get(ctx, agentID) if err != nil { s.Logger.Warn("inbound: skip agent (no session)", "agent", agentID, "err", err) continue } for _, g := range sess.Guilds { if g.Endpoint == "" { continue } wg.Add(1) go func(agentID string, guild fabric.GuildInfo) { defer wg.Done() s.runAgentGuild(ctx, agentID, guild) }(agentID, g) } } wg.Wait() return nil } // runAgentGuild keeps one socket.io connection alive for (agent, guild) // until ctx cancels. Reconnects with fresh auth on every drop using // exponential backoff (resets if the previous session survived long // enough to look healthy). func (s *Supervisor) runAgentGuild(ctx context.Context, agentID string, guild fabric.GuildInfo) { logger := s.Logger.With("agent", agentID, "guild", guild.NodeID) backoff := ReconnectBackoffInitial for { if err := ctx.Err(); err != nil { return } startedAt := time.Now() err := s.connectOnce(ctx, agentID, guild, logger) if ctx.Err() != nil { return } // Reset backoff if the prior session was meaningfully long // (proxy for "we connected + did work" — not a flap loop). if time.Since(startedAt) > ReconnectResetAfter { backoff = ReconnectBackoffInitial } errStr := "(nil)" if err != nil { errStr = err.Error() } logger.Warn("inbound: socket ended; reconnecting", "err", errStr, "backoff", backoff.String()) select { case <-time.After(backoff): case <-ctx.Done(): return } backoff = time.Duration(float64(backoff) * ReconnectBackoffFactor) if backoff > ReconnectBackoffMax { backoff = ReconnectBackoffMax } } } // connectOnce opens one socket connection, runs the read loop, and // returns the terminating error (nil on graceful close). func (s *Supervisor) connectOnce(ctx context.Context, agentID string, guild fabric.GuildInfo, logger *slog.Logger) error { authFn := func(authCtx context.Context) (map[string]any, error) { // Always fetch a fresh token before the CONNECT — fixes the // stale-JWT-on-reconnect bug openclaw documented. s.Tokens.Invalidate(agentID) tok, err := s.Tokens.GuildToken(authCtx, agentID, guild.NodeID) if err != nil { return nil, err } return map[string]any{"token": tok}, nil } cli, err := socketio.New(guild.Endpoint, "/socket.io/", "/realtime", authFn) if err != nil { return err } // Tracked subscriptions for this connection. Cleared on each // reconnect (server forgets join_channel state). joined := map[string]struct{}{} var joinedMu sync.Mutex // Initial sync + periodic resync runs in a goroutine; it reads // channels from REST and emits join_channel for new + leave_channel // for gone. syncCtx, cancelSync := context.WithCancel(ctx) defer cancelSync() syncOnce := func(kind string) { tok, err := s.Tokens.GuildToken(syncCtx, agentID, guild.NodeID) if err != nil { logger.Warn("inbound: sync token", "err", err) return } channels, err := s.Client.ListChannels(syncCtx, guild.Endpoint, tok, guild.NodeID) if err != nil { logger.Warn("inbound: list channels", "err", err) return } current := map[string]struct{}{} for _, c := range channels { current[c.ID] = struct{}{} } var added, removed int joinedMu.Lock() for id := range current { if _, ok := joined[id]; !ok { _ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": id}) joined[id] = struct{}{} added++ } } for id := range joined { if _, ok := current[id]; !ok { _ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": id}) delete(joined, id) removed++ } } size := len(joined) joinedMu.Unlock() if kind == "initial" { logger.Info("inbound: channels joined", "n", size) } else if added > 0 || removed > 0 { logger.Info("inbound: channels resync", "added", added, "removed", removed, "now", size) } } // Event handlers. Registered BEFORE Connect so we don't miss the // first events the server pushes right after handshake. cli.On("connect", func([]json.RawMessage) { /* no-op; server CONNECT ack */ }) cli.On("channel.joined", func(args []json.RawMessage) { var evt struct { ChannelID string `json:"channelId"` } if len(args) == 0 { return } _ = json.Unmarshal(args[0], &evt) if evt.ChannelID == "" { return } joinedMu.Lock() if _, already := joined[evt.ChannelID]; already { joinedMu.Unlock() return } joined[evt.ChannelID] = struct{}{} joinedMu.Unlock() _ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": evt.ChannelID}) logger.Info("inbound: channel.joined push", "channel", evt.ChannelID) }) cli.On("channel.left", func(args []json.RawMessage) { var evt struct { ChannelID string `json:"channelId"` } if len(args) == 0 { return } _ = json.Unmarshal(args[0], &evt) if evt.ChannelID == "" { return } joinedMu.Lock() if _, present := joined[evt.ChannelID]; !present { joinedMu.Unlock() return } delete(joined, evt.ChannelID) joinedMu.Unlock() _ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": evt.ChannelID}) logger.Info("inbound: channel.left push", "channel", evt.ChannelID) }) // Capture self user id from current cached session for the // self-author filter. sess := s.Tokens.Peek(agentID) var selfUserID string if sess != nil { selfUserID = sess.User.ID } cli.On("message.created", func(args []json.RawMessage) { var m FabricMessage if len(args) == 0 { return } if err := json.Unmarshal(args[0], &m); err != nil { logger.Warn("inbound: bad message.created", "err", err) return } s.dispatch(agentID, guild.NodeID, guild.Endpoint, selfUserID, &m, logger) }) // Periodic resync goroutine. syncTimer := time.NewTicker(ChannelSyncInterval) defer syncTimer.Stop() go func() { // Wait for connect to complete; we kick the initial sync from // within Connect's startup. Use the ticker for the periodic. for { select { case <-syncCtx.Done(): return case <-syncTimer.C: syncOnce("resync") } } }() // On connect, fire the initial sync. The socketio client doesn't // fire a Go-side "connect" event before Connect returns; we // schedule the initial sync to run shortly after Connect starts. go func() { time.Sleep(100 * time.Millisecond) select { case <-syncCtx.Done(): return default: } syncOnce("initial") }() return cli.Connect(ctx) } // dispatch applies self-author filter, dedup, channel-binding lookup, // wakeup gate, then notifies Plexum. func (s *Supervisor) dispatch(agentID, guildNodeID, guildEndpoint, selfUserID string, m *FabricMessage, logger *slog.Logger) { if m.ChannelID == "" { return } // Self-author filter: skip messages this user wrote. if m.AuthorUserID != "" && m.AuthorUserID == selfUserID { return } // Dedup per (agent, messageId). key := agentID + ":" + m.MessageID s.dedupMu.Lock() if _, dup := s.seen[key]; dup { s.dedupMu.Unlock() return } s.seen[key] = struct{}{} if len(s.seen) > 5000 { s.seen = map[string]struct{}{} // simple bounded reset } s.dedupMu.Unlock() // Channel-binding lookup. We only deliver messages for channels // the operator has bound to a Plexum agent in channels/*.json. binding, ok := s.ByFabric[config.Key(guildNodeID, m.ChannelID)] if !ok { return } // The binding must point at THIS agent (a guild may have several // agents; an inbound on agent A's socket for a channel bound to // agent B is ignored — agent B has their own socket for it). if binding.AgentID != agentID { return } // Wakeup gate (decision #36-equivalent on the openclaw side): // dm channel → always deliver // other → wakeup must be true if m.XType != "dm" && !m.Wakeup { logger.Debug("inbound: skip (no wakeup)", "channel", m.ChannelID, "agent", agentID, "msg", m.MessageID) return } logger.Info("inbound: dispatch", "channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType, "attachments", len(m.Attachments)) // Per-channel serial queue. Concurrent inbounds on the same channel // must run one at a time so Plexum's state machine sees clean turns. sessionID := "s_fab_" + m.ChannelID body := m.Content plexumChannel := binding.PlexumChannelName var media []MediaItem // Download attachments to local paths so the host can hand them // to provider plugins as canonical.ImageBlock (Plexum host-side // media block support). if len(m.Attachments) > 0 && s.Attachments != nil { tok, terr := s.Tokens.GuildToken(context.Background(), agentID, guildNodeID) if terr == nil { refs := make([]attachments.AttachmentRef, len(m.Attachments)) for i, a := range m.Attachments { // Fabric returns attachment URLs as RELATIVE paths // (`/api/files/`); resolve against the guild endpoint // before fetching. absURL := a.URL if absURL != "" && absURL[0] == '/' { absURL = guildEndpoint + absURL } refs[i] = attachments.AttachmentRef{ URL: absURL, Name: a.Name, MimeType: a.MimeType, } } files := s.Attachments.FetchAll(context.Background(), tok, m.MessageID, refs) for _, f := range files { media = append(media, MediaItem{ Path: f.LocalPath, MediaType: f.MimeType, Name: f.Name, }) } if len(files) < len(refs) { logger.Warn("inbound: some attachments failed to download", "requested", len(refs), "got", len(files)) } } else { logger.Warn("inbound: attachment guild token failed", "err", terr) } } s.enqueueChannel(m.ChannelID, func() { s.Notify(plexumChannel, body, sessionID, media) }) } // enqueueChannel chains a task onto the per-channel queue. If no chain // exists, spawn the drain goroutine. Drain goroutine exits when the // channel goes idle for >5s to avoid leaking goroutines per channel. func (s *Supervisor) enqueueChannel(channelID string, task func()) { s.chainMu.Lock() ch, ok := s.chains[channelID] if !ok { ch = make(chan func(), 32) s.chains[channelID] = ch go s.drainChannel(channelID, ch) } s.chainMu.Unlock() select { case ch <- task: default: // Backlog full (32+) — log + drop. Shouldn't happen unless // the bound agent is hopelessly behind. s.Logger.Warn("inbound: per-channel queue full; dropping", "channel", channelID) } } func (s *Supervisor) drainChannel(channelID string, ch chan func()) { idle := time.NewTimer(5 * time.Second) defer idle.Stop() for { select { case task, open := <-ch: if !open { return } if !idle.Stop() { <-idle.C } task() idle.Reset(5 * time.Second) case <-idle.C: s.chainMu.Lock() delete(s.chains, channelID) s.chainMu.Unlock() return } } } // FabricMessage is the wire shape of Fabric's message.created event. // Mirror of FabricMessage type in openclaw's inbound.ts. type FabricMessage struct { MessageID string `json:"messageId"` Seq int `json:"seq"` Content string `json:"content"` AuthorUserID string `json:"authorUserId,omitempty"` CreatedAt string `json:"createdAt,omitempty"` ChannelID string `json:"channelId,omitempty"` Attachments []FabricAttachment `json:"attachments,omitempty"` Wakeup bool `json:"wakeup,omitempty"` // XType matches the Fabric channel's x-type field. 'dm' bypasses // the wakeup gate. XType string `json:"xType,omitempty"` } // FabricAttachment is the wire shape of one attachment ref. v1 // (Phase F-2) doesn't download these; F-8 will. type FabricAttachment struct { URL string `json:"url"` Name string `json:"name,omitempty"` MimeType string `json:"mimeType,omitempty"` } // Equality check used for tests. func (a FabricAttachment) Equal(b FabricAttachment) bool { return a.URL == b.URL && a.Name == b.Name && a.MimeType == b.MimeType } // Sanity: ensure FabricMessage stays serializable. var _ = fmt.Sprint