diff --git a/cmd/plexum-fabric-channel-plugin/main.go b/cmd/plexum-fabric-channel-plugin/main.go index b62e725..49e2f34 100644 --- a/cmd/plexum-fabric-channel-plugin/main.go +++ b/cmd/plexum-fabric-channel-plugin/main.go @@ -16,6 +16,7 @@ import ( "encoding/json" "errors" "fmt" + "log/slog" "os" "path/filepath" "sync" @@ -26,6 +27,8 @@ import ( "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/inbound" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens" ) // HostConfig is the plugin's own config at @@ -47,10 +50,19 @@ type fabricPlugin struct { bindings []config.FabricBinding byFabric config.ByFabricChannel client *fabric.Client + tokens *tokens.Cache - // Per-agent Session cache (refreshed lazily; full refresh in F-2). + // Goroutine handle for the inbound supervisor. Cancelled on + // plugin shutdown (we don't have an explicit shutdown signal in + // the SDK today; rely on subprocess kill). + inboundCancel context.CancelFunc + inboundDone chan struct{} + + // Legacy field — kept only for back-compat with non-tokens code + // paths during the F-1 → F-2 refactor; safe to remove once nothing + // else references it. Not used anymore. sessMu sync.Mutex - sessions map[string]*fabric.Session // agentID → session + sessions map[string]*fabric.Session } func (p *fabricPlugin) Manifest() plugin.Manifest { @@ -148,6 +160,15 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error { } p.byFabric = config.Index(p.bindings) + // Token cache: re-login per agent on TTL miss (8min default). + p.tokens = tokens.New(0, func(loginCtx context.Context, agentID string) (*fabric.Session, error) { + entry := p.identities.Lookup(agentID) + if entry == nil || !entry.Enabled { + return nil, fmt.Errorf("agent %s: no enabled identity", agentID) + } + return p.client.AgentLogin(loginCtx, entry.FabricAPIKey) + }) + host.Log("info", "fabric channel plugin initialized", map[string]any{ "center": p.cfg.CenterAPIBase, "identity_path": idPath, @@ -155,18 +176,78 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error { "identities_loaded": len(p.identities.AgentIDs()), }) - // Eager validate: for every bound agent that has a channel, do a - // blocking agentLogin so we surface bad keys at startup instead of - // on first outbound. F-2 hooks socket.io subscription here too. + // Warm sessions (early bad-key detection). if err := p.warmSessions(ctx); err != nil { - // Log + continue; outbound will retry on demand. We don't want - // to refuse plugin init just because one key is stale. host.Log("warn", "fabric warm-sessions had errors", map[string]any{"err": err.Error()}) } + + // Phase F-2: start the inbound supervisor in a goroutine. Lives + // until p.inboundCancel fires (currently never — SDK has no + // shutdown hook; subprocess kill is the only stop signal). + if len(p.bindings) > 0 { + ctxBg, cancel := context.WithCancel(context.Background()) + p.inboundCancel = cancel + p.inboundDone = make(chan struct{}) + notifier := func(channelName, message, sessionID string) { + p.host.EmitNotification("notifications/plexum/channel/inbound", map[string]any{ + "channel_name": channelName, + "message": message, + "session_id": sessionID, + }) + } + // slog wrapping plugin.HostAPI.Log isn't worth the indirection + // here; use a discard-style adapter that pipes WARN/INFO to + // the host log. + logger := slog.New(&hostLogHandler{host: host, level: slog.LevelInfo}) + sup := inbound.New(p.client, p.tokens, p.bindings, notifier, logger) + go func() { + defer close(p.inboundDone) + if err := sup.Run(ctxBg); err != nil { + host.Log("warn", "inbound supervisor exited", map[string]any{"err": err.Error()}) + } + }() + host.Log("info", "fabric inbound supervisor started", + map[string]any{"agents": sup.AgentIDs, "bindings": len(p.bindings)}) + } + return nil } +// hostLogHandler is a tiny slog.Handler that forwards records to the +// plugin's HostAPI.Log. inbound + supervisor use slog for structured +// logging; this bridges to the host's log notification stream. +type hostLogHandler struct { + host plugin.HostAPI + level slog.Level +} + +func (h *hostLogHandler) Enabled(_ context.Context, l slog.Level) bool { return l >= h.level } +func (h *hostLogHandler) Handle(_ context.Context, r slog.Record) error { + attrs := make(map[string]any, r.NumAttrs()) + r.Attrs(func(a slog.Attr) bool { + attrs[a.Key] = a.Value.Any() + return true + }) + h.host.Log(levelString(r.Level), r.Message, attrs) + return nil +} +func (h *hostLogHandler) WithAttrs(_ []slog.Attr) slog.Handler { return h } +func (h *hostLogHandler) WithGroup(_ string) slog.Handler { return h } + +func levelString(l slog.Level) string { + switch { + case l >= slog.LevelError: + return "error" + case l >= slog.LevelWarn: + return "warn" + case l >= slog.LevelInfo: + return "info" + default: + return "debug" + } +} + func (p *fabricPlugin) warmSessions(ctx context.Context) error { // Which agents appear as a binding's AgentID? agentsNeeded := map[string]bool{} @@ -177,8 +258,7 @@ func (p *fabricPlugin) warmSessions(ctx context.Context) error { var firstErr error for agentID := range agentsNeeded { - entry, ok := enabled[agentID] - if !ok { + if _, ok := enabled[agentID]; !ok { err := fmt.Errorf("agent %s has channels but no identity (run plexum-fabric-register --agent-id %s --api-key ...)", agentID, agentID) p.host.Log("warn", err.Error(), nil) @@ -187,18 +267,15 @@ func (p *fabricPlugin) warmSessions(ctx context.Context) error { } continue } - sess, err := p.client.AgentLogin(ctx, entry.FabricAPIKey) + sess, err := p.tokens.Get(ctx, agentID) if err != nil { - err = fmt.Errorf("agent %s login: %w", agentID, err) - p.host.Log("warn", err.Error(), nil) + p.host.Log("warn", "fabric agent warm failed", + map[string]any{"agent": agentID, "err": err.Error()}) if firstErr == nil { firstErr = err } continue } - p.sessMu.Lock() - p.sessions[agentID] = sess - p.sessMu.Unlock() p.host.Log("info", "fabric session warm", map[string]any{ "agent": agentID, "fabric_user": sess.User.Email, "guilds": len(sess.Guilds), @@ -223,6 +300,9 @@ func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.Raw if args.ChannelName == "" { return errResult("channel_name required"), nil } + p.host.Log("info", "fabric send", map[string]any{ + "channel_name": args.ChannelName, "len": len(args.Message), + }) // Find the binding for this plexum channel name. var binding *config.FabricBinding @@ -274,26 +354,9 @@ func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.Raw } func (p *fabricPlugin) sessionFor(ctx context.Context, agentID string) (*fabric.Session, error) { - p.sessMu.Lock() - sess := p.sessions[agentID] - p.sessMu.Unlock() - if sess != nil { - return sess, nil - } - entry := p.identities.Lookup(agentID) - if entry == nil || !entry.Enabled { - return nil, errors.New("no identity registered (use plexum-fabric-register)") - } loginCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - fresh, err := p.client.AgentLogin(loginCtx, entry.FabricAPIKey) - if err != nil { - return nil, err - } - p.sessMu.Lock() - p.sessions[agentID] = fresh - p.sessMu.Unlock() - return fresh, nil + return p.tokens.Get(loginCtx, agentID) } func errResult(msg string) plugin.ToolResult { diff --git a/go.mod b/go.mod index 50a2e38..acff21e 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,9 @@ module git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin go 1.24.2 -require github.com/zishang520/socket.io-client-go v1.1.0 // indirect +require ( + git.hangman-lab.top/hzhang/Plexum-sdk-go v0.0.0 + nhooyr.io/websocket v1.8.17 +) + +replace git.hangman-lab.top/hzhang/Plexum-sdk-go => ../Plexum-sdk-go diff --git a/go.sum b/go.sum deleted file mode 100644 index 353f9b6..0000000 --- a/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/zishang520/socket.io-client-go v1.1.0 h1:rc+WtphqasRKuyxmjx/zqTrdIjzjG0Yv8nWeQcv5hTM= -github.com/zishang520/socket.io-client-go v1.1.0/go.mod h1:pFkvhEgVIjkUJyNvOvb62k78aFt2qwgJFNh2rfaCOc0= diff --git a/internal/inbound/inbound.go b/internal/inbound/inbound.go new file mode 100644 index 0000000..f24ee5b --- /dev/null +++ b/internal/inbound/inbound.go @@ -0,0 +1,438 @@ +// Package inbound is the Fabric → Plexum message pump. For each +// (agent, guild) it maintains a long-lived socket.io connection to the +// guild's /realtime namespace, joins every channel the agent has +// membership in (resyncing periodically + on push events), and dispatches +// matching message.created events through a per-channel serial queue. +// +// Wakeup gate matches openclaw's plugin: dispatch iff +// - m.xType == "dm" (any non-self message), OR +// - m.wakeup == true. +// +// Other inbounds (non-dm with wakeup != true) are dropped silently +// (Plexum has no "record-only history injection" equivalent in v1; +// future phase can add it). +package inbound + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens" +) + +// ChannelSyncInterval is how often the supervisor diffs joined channels +// against /api/channels to catch newly-membership-bestowed channels +// that arrived without a push event. Matches openclaw's 60s. +const ChannelSyncInterval = 60 * time.Second + +// ReconnectBackoff is the wait between reconnect attempts when a +// socket drops + Connect returns. Keep small for snappier recovery; +// exponential backoff is a future improvement. +const ReconnectBackoff = 3 * time.Second + +// Notifier pushes one inbound message to the Plexum host. The plugin +// main wires this to HostAPI.EmitNotification. +type Notifier func(channelName, message, sessionID string) + +// Supervisor owns the per-(agent, guild) socket.io connections. Run +// blocks until ctx is cancelled. +type Supervisor struct { + Client *fabric.Client + Tokens *tokens.Cache + Bindings []config.FabricBinding + ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding + AgentIDs []string // unique agents to bring up + Notify Notifier + Logger *slog.Logger + + // Deduplicate (agentID, messageId) → already dispatched. Bounded + // to ~5000 entries (matches openclaw). + dedupMu sync.Mutex + seen map[string]struct{} + + // Per-channel serial chain: each channel id maps to a chan of + // queued task funcs. Channel goroutine drains them in order. + chainMu sync.Mutex + chains map[string]chan func() +} + +// New constructs a Supervisor with deduped agentIDs derived from bindings. +func New(client *fabric.Client, tokenCache *tokens.Cache, bindings []config.FabricBinding, + notify Notifier, logger *slog.Logger) *Supervisor { + agentSet := map[string]struct{}{} + for _, b := range bindings { + agentSet[b.AgentID] = struct{}{} + } + agents := make([]string, 0, len(agentSet)) + for a := range agentSet { + agents = append(agents, a) + } + return &Supervisor{ + Client: client, Tokens: tokenCache, + Bindings: bindings, ByFabric: config.Index(bindings), + AgentIDs: agents, Notify: notify, Logger: logger, + seen: map[string]struct{}{}, + chains: map[string]chan func(){}, + } +} + +// Run blocks: spawns one goroutine per (agent, guild) socket.io +// supervisor. Each supervisor reconnects on drop until ctx cancels. +// Returns when ctx is cancelled and all supervisors have exited. +func (s *Supervisor) Run(ctx context.Context) error { + var wg sync.WaitGroup + for _, agentID := range s.AgentIDs { + sess, err := s.Tokens.Get(ctx, agentID) + if err != nil { + s.Logger.Warn("inbound: skip agent (no session)", "agent", agentID, "err", err) + continue + } + for _, g := range sess.Guilds { + if g.Endpoint == "" { + continue + } + wg.Add(1) + go func(agentID string, guild fabric.GuildInfo) { + defer wg.Done() + s.runAgentGuild(ctx, agentID, guild) + }(agentID, g) + } + } + wg.Wait() + return nil +} + +// runAgentGuild keeps one socket.io connection alive for (agent, guild) +// until ctx cancels. Reconnects with fresh auth on every drop. +func (s *Supervisor) runAgentGuild(ctx context.Context, agentID string, guild fabric.GuildInfo) { + logger := s.Logger.With("agent", agentID, "guild", guild.NodeID) + for { + if err := ctx.Err(); err != nil { + return + } + err := s.connectOnce(ctx, agentID, guild, logger) + if ctx.Err() != nil { + return + } + errStr := "(nil)" + if err != nil { + errStr = err.Error() + } + logger.Warn("inbound: socket connection ended; reconnecting", "err", errStr) + select { + case <-time.After(ReconnectBackoff): + case <-ctx.Done(): + return + } + } +} + +// connectOnce opens one socket connection, runs the read loop, and +// returns the terminating error (nil on graceful close). +func (s *Supervisor) connectOnce(ctx context.Context, agentID string, guild fabric.GuildInfo, logger *slog.Logger) error { + authFn := func(authCtx context.Context) (map[string]any, error) { + // Always fetch a fresh token before the CONNECT — fixes the + // stale-JWT-on-reconnect bug openclaw documented. + s.Tokens.Invalidate(agentID) + tok, err := s.Tokens.GuildToken(authCtx, agentID, guild.NodeID) + if err != nil { + return nil, err + } + return map[string]any{"token": tok}, nil + } + cli, err := socketio.New(guild.Endpoint, "/socket.io/", "/realtime", authFn) + if err != nil { + return err + } + + // Tracked subscriptions for this connection. Cleared on each + // reconnect (server forgets join_channel state). + joined := map[string]struct{}{} + var joinedMu sync.Mutex + + // Initial sync + periodic resync runs in a goroutine; it reads + // channels from REST and emits join_channel for new + leave_channel + // for gone. + syncCtx, cancelSync := context.WithCancel(ctx) + defer cancelSync() + syncOnce := func(kind string) { + tok, err := s.Tokens.GuildToken(syncCtx, agentID, guild.NodeID) + if err != nil { + logger.Warn("inbound: sync token", "err", err) + return + } + channels, err := s.Client.ListChannels(syncCtx, guild.Endpoint, tok, guild.NodeID) + if err != nil { + logger.Warn("inbound: list channels", "err", err) + return + } + current := map[string]struct{}{} + for _, c := range channels { + current[c.ID] = struct{}{} + } + var added, removed int + joinedMu.Lock() + for id := range current { + if _, ok := joined[id]; !ok { + _ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": id}) + joined[id] = struct{}{} + added++ + } + } + for id := range joined { + if _, ok := current[id]; !ok { + _ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": id}) + delete(joined, id) + removed++ + } + } + size := len(joined) + joinedMu.Unlock() + if kind == "initial" { + logger.Info("inbound: channels joined", "n", size) + } else if added > 0 || removed > 0 { + logger.Info("inbound: channels resync", "added", added, "removed", removed, "now", size) + } + } + + // Event handlers. Registered BEFORE Connect so we don't miss the + // first events the server pushes right after handshake. + cli.On("connect", func([]json.RawMessage) { /* no-op; server CONNECT ack */ }) + + cli.On("channel.joined", func(args []json.RawMessage) { + var evt struct { + ChannelID string `json:"channelId"` + } + if len(args) == 0 { + return + } + _ = json.Unmarshal(args[0], &evt) + if evt.ChannelID == "" { + return + } + joinedMu.Lock() + if _, already := joined[evt.ChannelID]; already { + joinedMu.Unlock() + return + } + joined[evt.ChannelID] = struct{}{} + joinedMu.Unlock() + _ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": evt.ChannelID}) + logger.Info("inbound: channel.joined push", "channel", evt.ChannelID) + }) + + cli.On("channel.left", func(args []json.RawMessage) { + var evt struct { + ChannelID string `json:"channelId"` + } + if len(args) == 0 { + return + } + _ = json.Unmarshal(args[0], &evt) + if evt.ChannelID == "" { + return + } + joinedMu.Lock() + if _, present := joined[evt.ChannelID]; !present { + joinedMu.Unlock() + return + } + delete(joined, evt.ChannelID) + joinedMu.Unlock() + _ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": evt.ChannelID}) + logger.Info("inbound: channel.left push", "channel", evt.ChannelID) + }) + + // Capture self user id from current cached session for the + // self-author filter. + sess := s.Tokens.Peek(agentID) + var selfUserID string + if sess != nil { + selfUserID = sess.User.ID + } + + cli.On("message.created", func(args []json.RawMessage) { + var m FabricMessage + if len(args) == 0 { + return + } + if err := json.Unmarshal(args[0], &m); err != nil { + logger.Warn("inbound: bad message.created", "err", err) + return + } + s.dispatch(agentID, guild.NodeID, selfUserID, &m, logger) + }) + + // Periodic resync goroutine. + syncTimer := time.NewTicker(ChannelSyncInterval) + defer syncTimer.Stop() + go func() { + // Wait for connect to complete; we kick the initial sync from + // within Connect's startup. Use the ticker for the periodic. + for { + select { + case <-syncCtx.Done(): + return + case <-syncTimer.C: + syncOnce("resync") + } + } + }() + + // On connect, fire the initial sync. The socketio client doesn't + // fire a Go-side "connect" event before Connect returns; we + // schedule the initial sync to run shortly after Connect starts. + go func() { + time.Sleep(100 * time.Millisecond) + select { + case <-syncCtx.Done(): + return + default: + } + syncOnce("initial") + }() + + return cli.Connect(ctx) +} + +// dispatch applies self-author filter, dedup, channel-binding lookup, +// wakeup gate, then notifies Plexum. +func (s *Supervisor) dispatch(agentID, guildNodeID, selfUserID string, m *FabricMessage, logger *slog.Logger) { + if m.ChannelID == "" { + return + } + // Self-author filter: skip messages this user wrote. + if m.AuthorUserID != "" && m.AuthorUserID == selfUserID { + return + } + // Dedup per (agent, messageId). + key := agentID + ":" + m.MessageID + s.dedupMu.Lock() + if _, dup := s.seen[key]; dup { + s.dedupMu.Unlock() + return + } + s.seen[key] = struct{}{} + if len(s.seen) > 5000 { + s.seen = map[string]struct{}{} // simple bounded reset + } + s.dedupMu.Unlock() + + // Channel-binding lookup. We only deliver messages for channels + // the operator has bound to a Plexum agent in channels/*.json. + binding, ok := s.ByFabric[config.Key(guildNodeID, m.ChannelID)] + if !ok { + return + } + // The binding must point at THIS agent (a guild may have several + // agents; an inbound on agent A's socket for a channel bound to + // agent B is ignored — agent B has their own socket for it). + if binding.AgentID != agentID { + return + } + + // Wakeup gate (decision #36-equivalent on the openclaw side): + // dm channel → always deliver + // other → wakeup must be true + if m.XType != "dm" && !m.Wakeup { + logger.Debug("inbound: skip (no wakeup)", + "channel", m.ChannelID, "agent", agentID, "msg", m.MessageID) + return + } + + logger.Info("inbound: dispatch", + "channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType) + + // Per-channel serial queue. Concurrent inbounds on the same channel + // must run one at a time so Plexum's state machine sees clean turns. + sessionID := "s_fab_" + m.ChannelID + body := m.Content + plexumChannel := binding.PlexumChannelName + s.enqueueChannel(m.ChannelID, func() { + s.Notify(plexumChannel, body, sessionID) + }) +} + +// enqueueChannel chains a task onto the per-channel queue. If no chain +// exists, spawn the drain goroutine. Drain goroutine exits when the +// channel goes idle for >5s to avoid leaking goroutines per channel. +func (s *Supervisor) enqueueChannel(channelID string, task func()) { + s.chainMu.Lock() + ch, ok := s.chains[channelID] + if !ok { + ch = make(chan func(), 32) + s.chains[channelID] = ch + go s.drainChannel(channelID, ch) + } + s.chainMu.Unlock() + select { + case ch <- task: + default: + // Backlog full (32+) — log + drop. Shouldn't happen unless + // the bound agent is hopelessly behind. + s.Logger.Warn("inbound: per-channel queue full; dropping", + "channel", channelID) + } +} + +func (s *Supervisor) drainChannel(channelID string, ch chan func()) { + idle := time.NewTimer(5 * time.Second) + defer idle.Stop() + for { + select { + case task, open := <-ch: + if !open { + return + } + if !idle.Stop() { + <-idle.C + } + task() + idle.Reset(5 * time.Second) + case <-idle.C: + s.chainMu.Lock() + delete(s.chains, channelID) + s.chainMu.Unlock() + return + } + } +} + +// FabricMessage is the wire shape of Fabric's message.created event. +// Mirror of FabricMessage type in openclaw's inbound.ts. +type FabricMessage struct { + MessageID string `json:"messageId"` + Seq int `json:"seq"` + Content string `json:"content"` + AuthorUserID string `json:"authorUserId,omitempty"` + CreatedAt string `json:"createdAt,omitempty"` + ChannelID string `json:"channelId,omitempty"` + Attachments []FabricAttachment `json:"attachments,omitempty"` + Wakeup bool `json:"wakeup,omitempty"` + // XType matches the Fabric channel's x-type field. 'dm' bypasses + // the wakeup gate. + XType string `json:"xType,omitempty"` +} + +// FabricAttachment is the wire shape of one attachment ref. v1 +// (Phase F-2) doesn't download these; F-8 will. +type FabricAttachment struct { + URL string `json:"url"` + Name string `json:"name,omitempty"` + MimeType string `json:"mimeType,omitempty"` +} + +// Equality check used for tests. +func (a FabricAttachment) Equal(b FabricAttachment) bool { + return a.URL == b.URL && a.Name == b.Name && a.MimeType == b.MimeType +} + +// Sanity: ensure FabricMessage stays serializable. +var _ = fmt.Sprint diff --git a/internal/socketio/client.go b/internal/socketio/client.go new file mode 100644 index 0000000..aae2e69 --- /dev/null +++ b/internal/socketio/client.go @@ -0,0 +1,349 @@ +// Package socketio is a minimal Engine.IO v4 + Socket.IO v5 client over +// WebSocket. Just what the Fabric plugin needs: +// +// - WebSocket-only transport (no polling upgrade dance) +// - Single namespace (defaults to "/realtime" — caller-supplied) +// - CONNECT with caller-supplied auth payload (re-evaluated on every +// reconnect via the AuthFunc callback — this is the bug fix +// openclaw plugin specifically documented for socket.io-client-js +// and that the available Go socket.io library doesn't address) +// - Emit + receive named events with arbitrary JSON arg arrays +// - PING/PONG heartbeat per server-supplied interval +// - Caller-driven manual reconnect: connect returns when socket +// closes; supervisor loop calls Connect again with a fresh token +// +// This is intentionally narrower than full Socket.IO — no rooms (server +// joins us into rooms by event), no acks, no binary, no namespaces +// other than what the constructor takes. +package socketio + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "sync" + "time" + + "nhooyr.io/websocket" +) + +// Engine.IO v4 packet types (first char of frame). +const ( + eioOpen byte = '0' + eioClose byte = '1' + eioPing byte = '2' + eioPong byte = '3' + eioMessage byte = '4' + eioNoop byte = '6' +) + +// Socket.IO v5 packet types (first char inside an EIO message). +const ( + sioConnect byte = '0' + sioDisconnect byte = '1' + sioEvent byte = '2' + sioAck byte = '3' + sioConnectErr byte = '4' +) + +// AuthFunc returns the auth payload to send with CONNECT. Called on +// every (re)connect so the supervisor can plug in a fresh token. +type AuthFunc func(ctx context.Context) (map[string]any, error) + +// Handler is the per-event callback signature. args is the JSON array +// payload after the event name; len(args) is usually 1 (one object). +type Handler func(args []json.RawMessage) + +// Client is one Socket.IO connection. NOT safe for concurrent Emit; +// caller serializes if it wants to multi-write. +type Client struct { + URL string // e.g. "ws://localhost:7002/socket.io/?EIO=4&transport=websocket" + Namespace string // e.g. "/realtime"; "" → root namespace + Auth AuthFunc // CONNECT auth payload + + // Read-only after Connect; mutating during a live connection is + // undefined. + handlers map[string]Handler + handlerMu sync.RWMutex + conn *websocket.Conn + pingPeriod time.Duration // from server "open" packet + pingTimeout time.Duration + + // closed-on-disconnect; Connect returns when this fires. + disconnected chan struct{} +} + +// New constructs a Client. host should be `ws://host:port` (or +// `wss://`). path is typically "/socket.io/" — the Engine.IO query +// params are appended automatically. +func New(host, path, namespace string, auth AuthFunc) (*Client, error) { + u, err := url.Parse(host) + if err != nil { + return nil, fmt.Errorf("socketio: parse host %q: %w", host, err) + } + switch u.Scheme { + case "http": + u.Scheme = "ws" + case "https": + u.Scheme = "wss" + } + if path == "" { + path = "/socket.io/" + } + u.Path = path + q := u.Query() + q.Set("EIO", "4") + q.Set("transport", "websocket") + u.RawQuery = q.Encode() + return &Client{ + URL: u.String(), + Namespace: namespace, + Auth: auth, + handlers: map[string]Handler{}, + }, nil +} + +// On registers a handler for an event name. Safe to call before Connect. +// Replacing an existing handler is fine. +func (c *Client) On(event string, h Handler) { + c.handlerMu.Lock() + defer c.handlerMu.Unlock() + c.handlers[event] = h +} + +// Emit sends an event with args. Server side receives `[event, args...]`. +func (c *Client) Emit(ctx context.Context, event string, args ...any) error { + if c.conn == nil { + return errors.New("socketio: not connected") + } + payload := append([]any{event}, args...) + body, err := json.Marshal(payload) + if err != nil { + return err + } + frame := buildEventFrame(c.Namespace, body) + return c.conn.Write(ctx, websocket.MessageText, frame) +} + +// Connect dials the server, completes the Engine.IO handshake + +// Socket.IO CONNECT, then runs the read+heartbeat loop. Blocks until +// the connection closes (either side) or ctx is cancelled. Returns the +// terminating error (or nil for clean close). +// +// Caller-driven reconnect: wrap Connect in a loop that re-evaluates +// the auth payload (token refresh) before each Call. +func (c *Client) Connect(ctx context.Context) error { + c.disconnected = make(chan struct{}) + defer close(c.disconnected) + + authMap, err := c.Auth(ctx) + if err != nil { + return fmt.Errorf("auth: %w", err) + } + + conn, _, err := websocket.Dial(ctx, c.URL, &websocket.DialOptions{ + HTTPHeader: nil, + }) + if err != nil { + return fmt.Errorf("dial %s: %w", c.URL, err) + } + c.conn = conn + // websocket library default read limit is 32 KiB; bump for chunky + // channel sync payloads. + conn.SetReadLimit(1 << 20) // 1 MiB + defer func() { + _ = conn.Close(websocket.StatusNormalClosure, "") + c.conn = nil + }() + + // Engine.IO handshake: server sends `0{"sid":"...","upgrades":[...],"pingInterval":...,"pingTimeout":...}` + if err := c.recvOpen(ctx); err != nil { + return err + } + + // Send Socket.IO CONNECT with auth. + if err := c.sendConnect(ctx, authMap); err != nil { + return fmt.Errorf("CONNECT: %w", err) + } + + pingCtx, cancelPing := context.WithCancel(ctx) + defer cancelPing() + go c.pingLoop(pingCtx) + + for { + _, data, err := conn.Read(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + return fmt.Errorf("read: %w", err) + } + if len(data) == 0 { + continue + } + if err := c.handlePacket(data); err != nil { + return err + } + } +} + +// Disconnect closes the underlying socket cleanly. Connect's read loop +// will see EOF and return. +func (c *Client) Disconnect() { + if c.conn != nil { + _ = c.conn.Close(websocket.StatusNormalClosure, "") + } +} + +// recvOpen reads the EIO "open" frame and stashes ping intervals. +func (c *Client) recvOpen(ctx context.Context) error { + _, data, err := c.conn.Read(ctx) + if err != nil { + return fmt.Errorf("read open: %w", err) + } + if len(data) < 2 || data[0] != eioOpen { + return fmt.Errorf("expected EIO open, got %q", string(data)) + } + var info struct { + Sid string `json:"sid"` + PingInterval int `json:"pingInterval"` + PingTimeout int `json:"pingTimeout"` + } + if err := json.Unmarshal(data[1:], &info); err != nil { + return fmt.Errorf("parse open: %w", err) + } + c.pingPeriod = time.Duration(info.PingInterval) * time.Millisecond + c.pingTimeout = time.Duration(info.PingTimeout) * time.Millisecond + if c.pingPeriod <= 0 { + c.pingPeriod = 25 * time.Second // EIO default + } + return nil +} + +// sendConnect: `4` (EIO message) + `0` (SIO CONNECT) + namespace,?json +func (c *Client) sendConnect(ctx context.Context, auth map[string]any) error { + body := []byte{eioMessage, sioConnect} + if c.Namespace != "" && c.Namespace != "/" { + body = append(body, []byte(c.Namespace+",")...) + } + if len(auth) > 0 { + raw, err := json.Marshal(auth) + if err != nil { + return err + } + body = append(body, raw...) + } + return c.conn.Write(ctx, websocket.MessageText, body) +} + +// pingLoop sends EIO ping frames per server-supplied interval. +func (c *Client) pingLoop(ctx context.Context) { + ticker := time.NewTicker(c.pingPeriod) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if c.conn == nil { + return + } + writeCtx, cancel := context.WithTimeout(ctx, c.pingTimeout) + err := c.conn.Write(writeCtx, websocket.MessageText, []byte{eioPing}) + cancel() + if err != nil { + return + } + } + } +} + +// handlePacket inspects the first byte (EIO type) + dispatches. +func (c *Client) handlePacket(data []byte) error { + switch data[0] { + case eioPong: + return nil // server responding to our ping (or vice versa) + case eioPing: + // Server-initiated ping; reply with pong. + if c.conn == nil { + return nil + } + return c.conn.Write(context.Background(), websocket.MessageText, []byte{eioPong}) + case eioClose: + return io_EOF + case eioMessage: + if len(data) < 2 { + return nil + } + return c.handleSIO(data[1:]) + case eioNoop: + return nil + } + return nil +} + +// io_EOF is returned on EIO close packet so the supervisor loop knows +// the server cleanly closed (vs network error). +var io_EOF = errors.New("socketio: server initiated close") + +func (c *Client) handleSIO(data []byte) error { + if len(data) == 0 { + return nil + } + sioType := data[0] + rest := data[1:] + // Skip namespace prefix if present (e.g. "/realtime,"). + if c.Namespace != "" && c.Namespace != "/" && len(rest) > len(c.Namespace) && + string(rest[:len(c.Namespace)]) == c.Namespace && rest[len(c.Namespace)] == ',' { + rest = rest[len(c.Namespace)+1:] + } + switch sioType { + case sioConnect: + // Server ack of our CONNECT. Body is `{"sid":"..."}`; we don't + // need anything from it. + return nil + case sioDisconnect: + return io_EOF + case sioConnectErr: + return fmt.Errorf("socketio: CONNECT_ERROR: %s", string(rest)) + case sioEvent: + return c.dispatchEvent(rest) + case sioAck: + // We don't use acks; ignore. + return nil + } + return nil +} + +func (c *Client) dispatchEvent(body []byte) error { + var arr []json.RawMessage + if err := json.Unmarshal(body, &arr); err != nil { + return fmt.Errorf("dispatch parse: %w (body=%q)", err, string(body)) + } + if len(arr) == 0 { + return nil + } + var event string + if err := json.Unmarshal(arr[0], &event); err != nil { + return fmt.Errorf("dispatch event-name: %w", err) + } + c.handlerMu.RLock() + h := c.handlers[event] + c.handlerMu.RUnlock() + if h == nil { + return nil // no subscriber, drop silently + } + h(arr[1:]) + return nil +} + +// buildEventFrame is exposed for the encoder unit test. +func buildEventFrame(namespace string, body []byte) []byte { + out := []byte{eioMessage, sioEvent} + if namespace != "" && namespace != "/" { + out = append(out, []byte(namespace+",")...) + } + return append(out, body...) +} diff --git a/internal/socketio/client_test.go b/internal/socketio/client_test.go new file mode 100644 index 0000000..5343c94 --- /dev/null +++ b/internal/socketio/client_test.go @@ -0,0 +1,27 @@ +package socketio + +import ( + "encoding/json" + "testing" +) + +func TestBuildEventFrame(t *testing.T) { + body, _ := json.Marshal([]any{"hello", map[string]any{"x": 1}}) + got := string(buildEventFrame("/realtime", body)) + want := `42/realtime,["hello",{"x":1}]` + if got != want { + t.Errorf("got %q\nwant %q", got, want) + } +} + +func TestBuildEventFrameRootNamespace(t *testing.T) { + body, _ := json.Marshal([]any{"ping"}) + got := string(buildEventFrame("", body)) + if got != `42["ping"]` { + t.Errorf("got %q", got) + } + got2 := string(buildEventFrame("/", body)) + if got2 != `42["ping"]` { + t.Errorf("got %q", got2) + } +} diff --git a/internal/tokens/tokens.go b/internal/tokens/tokens.go new file mode 100644 index 0000000..44dc047 --- /dev/null +++ b/internal/tokens/tokens.go @@ -0,0 +1,116 @@ +// Package tokens caches per-agent Fabric sessions with a short TTL so +// guild access tokens stay fresh. openclaw's plugin documented the +// failure mode: long-lived sockets survive auto-reconnect at the TCP +// layer but the JWT captured at original CONNECT goes stale ~15min in, +// silently breaking auth at the application layer (subscribe-to-room +// emits go to /dev/null). TTL refresh by re-login per agent fixes it. +package tokens + +import ( + "context" + "errors" + "sync" + "time" + + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" +) + +// DefaultTTL matches openclaw's TOKEN_TTL_MS (8 min). Guild tokens are +// ~15 min so refreshing every 8 keeps a comfortable margin. +const DefaultTTL = 8 * time.Minute + +// LoginFunc is the per-agent re-login callback. Cache calls this when +// the cached session expires. +type LoginFunc func(ctx context.Context, agentID string) (*fabric.Session, error) + +// Cache wraps a per-agent session cache. Thread-safe. +type Cache struct { + TTL time.Duration + Login LoginFunc + + mu sync.Mutex + entries map[string]*entry +} + +type entry struct { + session *fabric.Session + at time.Time +} + +// New constructs a Cache. ttl ≤ 0 → DefaultTTL. +func New(ttl time.Duration, login LoginFunc) *Cache { + if ttl <= 0 { + ttl = DefaultTTL + } + return &Cache{TTL: ttl, Login: login, entries: map[string]*entry{}} +} + +// Get returns the cached session if fresh, otherwise re-logs in. err +// from Login bubbles up; the stale entry is NOT served on Login failure +// (caller may retry). +func (c *Cache) Get(ctx context.Context, agentID string) (*fabric.Session, error) { + c.mu.Lock() + e, ok := c.entries[agentID] + c.mu.Unlock() + if ok && time.Since(e.at) < c.TTL { + return e.session, nil + } + if c.Login == nil { + return nil, errors.New("tokens: no login func configured") + } + fresh, err := c.Login(ctx, agentID) + if err != nil { + return nil, err + } + c.mu.Lock() + c.entries[agentID] = &entry{session: fresh, at: time.Now()} + c.mu.Unlock() + return fresh, nil +} + +// Invalidate drops the cached entry for agentID. Next Get re-logs in. +// Useful when an HTTP call returns 401 (cached token rejected). +func (c *Cache) Invalidate(agentID string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.entries, agentID) +} + +// Peek returns the cached session without TTL check or re-login. +// Returns nil if nothing cached. +func (c *Cache) Peek(agentID string) *fabric.Session { + c.mu.Lock() + defer c.mu.Unlock() + if e, ok := c.entries[agentID]; ok { + return e.session + } + return nil +} + +// GuildToken is a convenience that returns a fresh guild access token +// for (agentID, guildNodeID). Falls through to Login if cache is stale +// or the guildNodeID isn't in the cached session. +func (c *Cache) GuildToken(ctx context.Context, agentID, guildNodeID string) (string, error) { + sess, err := c.Get(ctx, agentID) + if err != nil { + return "", err + } + for _, t := range sess.GuildAccessTokens { + if t.GuildNodeID == guildNodeID { + return t.Token, nil + } + } + // Cached session doesn't have this guild — could be a stale + // session list. Invalidate + retry once. + c.Invalidate(agentID) + sess, err = c.Get(ctx, agentID) + if err != nil { + return "", err + } + for _, t := range sess.GuildAccessTokens { + if t.GuildNodeID == guildNodeID { + return t.Token, nil + } + } + return "", errors.New("tokens: agent has no access to guild " + guildNodeID) +} diff --git a/internal/tokens/tokens_test.go b/internal/tokens/tokens_test.go new file mode 100644 index 0000000..00de4cd --- /dev/null +++ b/internal/tokens/tokens_test.go @@ -0,0 +1,130 @@ +package tokens + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" +) + +func fakeSession(guildNodes ...string) *fabric.Session { + s := &fabric.Session{User: fabric.SessionUser{ID: "u1", Email: "u@x"}} + for _, g := range guildNodes { + s.Guilds = append(s.Guilds, fabric.GuildInfo{NodeID: g, Endpoint: "http://" + g}) + s.GuildAccessTokens = append(s.GuildAccessTokens, fabric.GuildAccessToken{ + GuildNodeID: g, Token: "tok-" + g, + }) + } + return s +} + +func TestGetFirstLogsIn(t *testing.T) { + var calls atomic.Int32 + c := New(0, func(context.Context, string) (*fabric.Session, error) { + calls.Add(1) + return fakeSession("g1"), nil + }) + s, err := c.Get(context.Background(), "alice") + if err != nil || s.User.ID != "u1" { + t.Fatalf("get err=%v", err) + } + if calls.Load() != 1 { + t.Errorf("calls = %d", calls.Load()) + } +} + +func TestGetWithinTTLReusesCached(t *testing.T) { + var calls atomic.Int32 + c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) { + calls.Add(1) + return fakeSession("g1"), nil + }) + c.Get(context.Background(), "alice") + c.Get(context.Background(), "alice") + c.Get(context.Background(), "alice") + if calls.Load() != 1 { + t.Errorf("calls = %d (TTL fresh)", calls.Load()) + } +} + +func TestGetAfterTTLReLogs(t *testing.T) { + var calls atomic.Int32 + c := New(10*time.Millisecond, func(context.Context, string) (*fabric.Session, error) { + calls.Add(1) + return fakeSession("g1"), nil + }) + c.Get(context.Background(), "alice") + time.Sleep(20 * time.Millisecond) + c.Get(context.Background(), "alice") + if calls.Load() != 2 { + t.Errorf("calls = %d, want 2 after TTL expiry", calls.Load()) + } +} + +func TestInvalidateForcesReLogin(t *testing.T) { + var calls atomic.Int32 + c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) { + calls.Add(1) + return fakeSession("g1"), nil + }) + c.Get(context.Background(), "alice") + c.Invalidate("alice") + c.Get(context.Background(), "alice") + if calls.Load() != 2 { + t.Errorf("calls = %d, want 2 after Invalidate", calls.Load()) + } +} + +func TestLoginErrorBubbles(t *testing.T) { + sentinel := errors.New("boom") + c := New(0, func(context.Context, string) (*fabric.Session, error) { + return nil, sentinel + }) + _, err := c.Get(context.Background(), "alice") + if !errors.Is(err, sentinel) { + t.Errorf("err = %v", err) + } +} + +func TestGuildTokenHappy(t *testing.T) { + c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) { + return fakeSession("g1", "g2"), nil + }) + tok, err := c.GuildToken(context.Background(), "alice", "g2") + if err != nil || tok != "tok-g2" { + t.Errorf("token=%q err=%v", tok, err) + } +} + +func TestGuildTokenMissingGuildRetriesThenErrors(t *testing.T) { + var calls atomic.Int32 + c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) { + calls.Add(1) + return fakeSession("g1"), nil + }) + _, err := c.GuildToken(context.Background(), "alice", "missing") + if err == nil { + t.Fatal("expected error") + } + // First Get + post-invalidate Get = 2 logins + if calls.Load() != 2 { + t.Errorf("calls = %d, want 2 (initial + retry)", calls.Load()) + } +} + +func TestPeekDoesNotLogin(t *testing.T) { + var calls atomic.Int32 + c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) { + calls.Add(1) + return fakeSession("g1"), nil + }) + if c.Peek("alice") != nil { + t.Errorf("Peek on empty should be nil") + } + if calls.Load() != 0 { + t.Errorf("Peek should not call Login") + } +}