Files
Plexum-fabric-channel-plugin/internal/inbound/inbound.go
hzhang d6bea46d00 feat: Phase F-3 + F-4 — exp backoff + agent tool surface (batch 1)
F-3 refinements:
- internal/inbound: replace fixed 3s reconnect wait with exponential
  backoff (1s → 60s, ×2, reset when prior session lasted >30s); proxy
  for "healthy" vs "flapping" and avoids hot reconnect loops when the
  server is sick

F-4 agent tool surface (port of openclaw plugin's tools.ts):
- internal/tools/tools.go (~370 LOC): Registry binds Deps {Client,
  Tokens, Identities} and exposes 8 agent-facing tools:
    fabric-send-message     post a normal message to any channel
    fabric-send-sys-msg     post a kind=sys message (bypasses turn engine)
    fabric-channel-list     list channels visible in a guild
    fabric-guild-list       list guilds the agent is in
    fabric-message-history  paginate channel messages by seq
    fabric-channel-set-purpose  PATCH the channel's purpose
    fabric-channel          fetch metadata + members for one channel
    fabric-canvas           get/share/update/remove channel canvas
- internal/tools/contracts.go: static ToolContract list — kept in sync
  with install.sh's manifest emitter
- Every agent-scoped tool requires agent_id in input args (Plexum SDK
  doesn't propagate calling agent id through CallTool today)
- guild_node_id defaults to agent's first guild for fabric-send-message

internal/fabric/client.go: new REST methods needed by tools —
PostSystemMessage, CreateChannel, CloseChannel, JoinChannel,
LeaveChannel, SetChannelPurpose, GetCanvas, ShareCanvas, UpdateCanvas,
RemoveCanvas, SyncCommands.

cmd/plexum-fabric-channel-plugin/main.go:
- Manifest declares the tool surface via tools.New(...).Contracts()
- CallTool dispatches "send" to handleSend (outbound for channel
  manager), everything else to tools.Registry.Handler(name)

scripts/install.sh:
- Manifest tools[] now lists all 9 tools with schemas — matches what
  internal/tools/contracts.go advertises

Live verified against running Fabric stack:
  $ plexum plugin-call fabric-guild-list '{"agent_id":"fabrictester"}'
    → "guilds for agent fabrictester (1): test-guild2 @ http://localhost:7003"
  $ plexum plugin-call fabric-channel-list '{...,"guild_node_id":"test-guild2"}'
    → 2 channels listed
  $ plexum plugin-call fabric-message-history '{...,"limit":5}'
    → 5 messages with timestamps + authors

F-5+ deferred:
- create-{chat,work,report,discussion}-channel (batch 2)
- sub-discussion family (state store + 3 tools)
- presence-sync + command-sync
- attachments

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 15:35:39 +01:00

462 lines
14 KiB
Go

// Package inbound is the Fabric → Plexum message pump. For each
// (agent, guild) it maintains a long-lived socket.io connection to the
// guild's /realtime namespace, joins every channel the agent has
// membership in (resyncing periodically + on push events), and dispatches
// matching message.created events through a per-channel serial queue.
//
// Wakeup gate matches openclaw's plugin: dispatch iff
// - m.xType == "dm" (any non-self message), OR
// - m.wakeup == true.
//
// Other inbounds (non-dm with wakeup != true) are dropped silently
// (Plexum has no "record-only history injection" equivalent in v1;
// future phase can add it).
package inbound
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens"
)
// ChannelSyncInterval is how often the supervisor diffs joined channels
// against /api/channels to catch newly-membership-bestowed channels
// that arrived without a push event. Matches openclaw's 60s.
const ChannelSyncInterval = 60 * time.Second
// ReconnectBackoffInitial / Max / Factor drive exponential backoff
// between reconnect attempts. Starts at 1s, doubles up to 60s; resets
// on successful connect (signalled by connectOnce returning after at
// least one event was received — proxied via wall-clock duration).
const (
ReconnectBackoffInitial = 1 * time.Second
ReconnectBackoffMax = 60 * time.Second
ReconnectBackoffFactor = 2.0
// ReconnectResetAfter: if the previous connection survived at least
// this long, reset the backoff to Initial (we made meaningful
// progress; transient drop, not a flapping-failure loop).
ReconnectResetAfter = 30 * time.Second
)
// Notifier pushes one inbound message to the Plexum host. The plugin
// main wires this to HostAPI.EmitNotification.
type Notifier func(channelName, message, sessionID string)
// Supervisor owns the per-(agent, guild) socket.io connections. Run
// blocks until ctx is cancelled.
type Supervisor struct {
Client *fabric.Client
Tokens *tokens.Cache
Bindings []config.FabricBinding
ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding
AgentIDs []string // unique agents to bring up
Notify Notifier
Logger *slog.Logger
// Deduplicate (agentID, messageId) → already dispatched. Bounded
// to ~5000 entries (matches openclaw).
dedupMu sync.Mutex
seen map[string]struct{}
// Per-channel serial chain: each channel id maps to a chan of
// queued task funcs. Channel goroutine drains them in order.
chainMu sync.Mutex
chains map[string]chan func()
}
// New constructs a Supervisor with deduped agentIDs derived from bindings.
func New(client *fabric.Client, tokenCache *tokens.Cache, bindings []config.FabricBinding,
notify Notifier, logger *slog.Logger) *Supervisor {
agentSet := map[string]struct{}{}
for _, b := range bindings {
agentSet[b.AgentID] = struct{}{}
}
agents := make([]string, 0, len(agentSet))
for a := range agentSet {
agents = append(agents, a)
}
return &Supervisor{
Client: client, Tokens: tokenCache,
Bindings: bindings, ByFabric: config.Index(bindings),
AgentIDs: agents, Notify: notify, Logger: logger,
seen: map[string]struct{}{},
chains: map[string]chan func(){},
}
}
// Run blocks: spawns one goroutine per (agent, guild) socket.io
// supervisor. Each supervisor reconnects on drop until ctx cancels.
// Returns when ctx is cancelled and all supervisors have exited.
func (s *Supervisor) Run(ctx context.Context) error {
var wg sync.WaitGroup
for _, agentID := range s.AgentIDs {
sess, err := s.Tokens.Get(ctx, agentID)
if err != nil {
s.Logger.Warn("inbound: skip agent (no session)", "agent", agentID, "err", err)
continue
}
for _, g := range sess.Guilds {
if g.Endpoint == "" {
continue
}
wg.Add(1)
go func(agentID string, guild fabric.GuildInfo) {
defer wg.Done()
s.runAgentGuild(ctx, agentID, guild)
}(agentID, g)
}
}
wg.Wait()
return nil
}
// runAgentGuild keeps one socket.io connection alive for (agent, guild)
// until ctx cancels. Reconnects with fresh auth on every drop using
// exponential backoff (resets if the previous session survived long
// enough to look healthy).
func (s *Supervisor) runAgentGuild(ctx context.Context, agentID string, guild fabric.GuildInfo) {
logger := s.Logger.With("agent", agentID, "guild", guild.NodeID)
backoff := ReconnectBackoffInitial
for {
if err := ctx.Err(); err != nil {
return
}
startedAt := time.Now()
err := s.connectOnce(ctx, agentID, guild, logger)
if ctx.Err() != nil {
return
}
// Reset backoff if the prior session was meaningfully long
// (proxy for "we connected + did work" — not a flap loop).
if time.Since(startedAt) > ReconnectResetAfter {
backoff = ReconnectBackoffInitial
}
errStr := "(nil)"
if err != nil {
errStr = err.Error()
}
logger.Warn("inbound: socket ended; reconnecting",
"err", errStr, "backoff", backoff.String())
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
backoff = time.Duration(float64(backoff) * ReconnectBackoffFactor)
if backoff > ReconnectBackoffMax {
backoff = ReconnectBackoffMax
}
}
}
// connectOnce opens one socket connection, runs the read loop, and
// returns the terminating error (nil on graceful close).
func (s *Supervisor) connectOnce(ctx context.Context, agentID string, guild fabric.GuildInfo, logger *slog.Logger) error {
authFn := func(authCtx context.Context) (map[string]any, error) {
// Always fetch a fresh token before the CONNECT — fixes the
// stale-JWT-on-reconnect bug openclaw documented.
s.Tokens.Invalidate(agentID)
tok, err := s.Tokens.GuildToken(authCtx, agentID, guild.NodeID)
if err != nil {
return nil, err
}
return map[string]any{"token": tok}, nil
}
cli, err := socketio.New(guild.Endpoint, "/socket.io/", "/realtime", authFn)
if err != nil {
return err
}
// Tracked subscriptions for this connection. Cleared on each
// reconnect (server forgets join_channel state).
joined := map[string]struct{}{}
var joinedMu sync.Mutex
// Initial sync + periodic resync runs in a goroutine; it reads
// channels from REST and emits join_channel for new + leave_channel
// for gone.
syncCtx, cancelSync := context.WithCancel(ctx)
defer cancelSync()
syncOnce := func(kind string) {
tok, err := s.Tokens.GuildToken(syncCtx, agentID, guild.NodeID)
if err != nil {
logger.Warn("inbound: sync token", "err", err)
return
}
channels, err := s.Client.ListChannels(syncCtx, guild.Endpoint, tok, guild.NodeID)
if err != nil {
logger.Warn("inbound: list channels", "err", err)
return
}
current := map[string]struct{}{}
for _, c := range channels {
current[c.ID] = struct{}{}
}
var added, removed int
joinedMu.Lock()
for id := range current {
if _, ok := joined[id]; !ok {
_ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": id})
joined[id] = struct{}{}
added++
}
}
for id := range joined {
if _, ok := current[id]; !ok {
_ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": id})
delete(joined, id)
removed++
}
}
size := len(joined)
joinedMu.Unlock()
if kind == "initial" {
logger.Info("inbound: channels joined", "n", size)
} else if added > 0 || removed > 0 {
logger.Info("inbound: channels resync", "added", added, "removed", removed, "now", size)
}
}
// Event handlers. Registered BEFORE Connect so we don't miss the
// first events the server pushes right after handshake.
cli.On("connect", func([]json.RawMessage) { /* no-op; server CONNECT ack */ })
cli.On("channel.joined", func(args []json.RawMessage) {
var evt struct {
ChannelID string `json:"channelId"`
}
if len(args) == 0 {
return
}
_ = json.Unmarshal(args[0], &evt)
if evt.ChannelID == "" {
return
}
joinedMu.Lock()
if _, already := joined[evt.ChannelID]; already {
joinedMu.Unlock()
return
}
joined[evt.ChannelID] = struct{}{}
joinedMu.Unlock()
_ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": evt.ChannelID})
logger.Info("inbound: channel.joined push", "channel", evt.ChannelID)
})
cli.On("channel.left", func(args []json.RawMessage) {
var evt struct {
ChannelID string `json:"channelId"`
}
if len(args) == 0 {
return
}
_ = json.Unmarshal(args[0], &evt)
if evt.ChannelID == "" {
return
}
joinedMu.Lock()
if _, present := joined[evt.ChannelID]; !present {
joinedMu.Unlock()
return
}
delete(joined, evt.ChannelID)
joinedMu.Unlock()
_ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": evt.ChannelID})
logger.Info("inbound: channel.left push", "channel", evt.ChannelID)
})
// Capture self user id from current cached session for the
// self-author filter.
sess := s.Tokens.Peek(agentID)
var selfUserID string
if sess != nil {
selfUserID = sess.User.ID
}
cli.On("message.created", func(args []json.RawMessage) {
var m FabricMessage
if len(args) == 0 {
return
}
if err := json.Unmarshal(args[0], &m); err != nil {
logger.Warn("inbound: bad message.created", "err", err)
return
}
s.dispatch(agentID, guild.NodeID, selfUserID, &m, logger)
})
// Periodic resync goroutine.
syncTimer := time.NewTicker(ChannelSyncInterval)
defer syncTimer.Stop()
go func() {
// Wait for connect to complete; we kick the initial sync from
// within Connect's startup. Use the ticker for the periodic.
for {
select {
case <-syncCtx.Done():
return
case <-syncTimer.C:
syncOnce("resync")
}
}
}()
// On connect, fire the initial sync. The socketio client doesn't
// fire a Go-side "connect" event before Connect returns; we
// schedule the initial sync to run shortly after Connect starts.
go func() {
time.Sleep(100 * time.Millisecond)
select {
case <-syncCtx.Done():
return
default:
}
syncOnce("initial")
}()
return cli.Connect(ctx)
}
// dispatch applies self-author filter, dedup, channel-binding lookup,
// wakeup gate, then notifies Plexum.
func (s *Supervisor) dispatch(agentID, guildNodeID, selfUserID string, m *FabricMessage, logger *slog.Logger) {
if m.ChannelID == "" {
return
}
// Self-author filter: skip messages this user wrote.
if m.AuthorUserID != "" && m.AuthorUserID == selfUserID {
return
}
// Dedup per (agent, messageId).
key := agentID + ":" + m.MessageID
s.dedupMu.Lock()
if _, dup := s.seen[key]; dup {
s.dedupMu.Unlock()
return
}
s.seen[key] = struct{}{}
if len(s.seen) > 5000 {
s.seen = map[string]struct{}{} // simple bounded reset
}
s.dedupMu.Unlock()
// Channel-binding lookup. We only deliver messages for channels
// the operator has bound to a Plexum agent in channels/*.json.
binding, ok := s.ByFabric[config.Key(guildNodeID, m.ChannelID)]
if !ok {
return
}
// The binding must point at THIS agent (a guild may have several
// agents; an inbound on agent A's socket for a channel bound to
// agent B is ignored — agent B has their own socket for it).
if binding.AgentID != agentID {
return
}
// Wakeup gate (decision #36-equivalent on the openclaw side):
// dm channel → always deliver
// other → wakeup must be true
if m.XType != "dm" && !m.Wakeup {
logger.Debug("inbound: skip (no wakeup)",
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID)
return
}
logger.Info("inbound: dispatch",
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType)
// Per-channel serial queue. Concurrent inbounds on the same channel
// must run one at a time so Plexum's state machine sees clean turns.
sessionID := "s_fab_" + m.ChannelID
body := m.Content
plexumChannel := binding.PlexumChannelName
s.enqueueChannel(m.ChannelID, func() {
s.Notify(plexumChannel, body, sessionID)
})
}
// enqueueChannel chains a task onto the per-channel queue. If no chain
// exists, spawn the drain goroutine. Drain goroutine exits when the
// channel goes idle for >5s to avoid leaking goroutines per channel.
func (s *Supervisor) enqueueChannel(channelID string, task func()) {
s.chainMu.Lock()
ch, ok := s.chains[channelID]
if !ok {
ch = make(chan func(), 32)
s.chains[channelID] = ch
go s.drainChannel(channelID, ch)
}
s.chainMu.Unlock()
select {
case ch <- task:
default:
// Backlog full (32+) — log + drop. Shouldn't happen unless
// the bound agent is hopelessly behind.
s.Logger.Warn("inbound: per-channel queue full; dropping",
"channel", channelID)
}
}
func (s *Supervisor) drainChannel(channelID string, ch chan func()) {
idle := time.NewTimer(5 * time.Second)
defer idle.Stop()
for {
select {
case task, open := <-ch:
if !open {
return
}
if !idle.Stop() {
<-idle.C
}
task()
idle.Reset(5 * time.Second)
case <-idle.C:
s.chainMu.Lock()
delete(s.chains, channelID)
s.chainMu.Unlock()
return
}
}
}
// FabricMessage is the wire shape of Fabric's message.created event.
// Mirror of FabricMessage type in openclaw's inbound.ts.
type FabricMessage struct {
MessageID string `json:"messageId"`
Seq int `json:"seq"`
Content string `json:"content"`
AuthorUserID string `json:"authorUserId,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
ChannelID string `json:"channelId,omitempty"`
Attachments []FabricAttachment `json:"attachments,omitempty"`
Wakeup bool `json:"wakeup,omitempty"`
// XType matches the Fabric channel's x-type field. 'dm' bypasses
// the wakeup gate.
XType string `json:"xType,omitempty"`
}
// FabricAttachment is the wire shape of one attachment ref. v1
// (Phase F-2) doesn't download these; F-8 will.
type FabricAttachment struct {
URL string `json:"url"`
Name string `json:"name,omitempty"`
MimeType string `json:"mimeType,omitempty"`
}
// Equality check used for tests.
func (a FabricAttachment) Equal(b FabricAttachment) bool {
return a.URL == b.URL && a.Name == b.Name && a.MimeType == b.MimeType
}
// Sanity: ensure FabricMessage stays serializable.
var _ = fmt.Sprint