feat: Phase F-2 — socket.io inbound + wakeup gate + token refresh
End-to-end Fabric inbound→Plexum→Fabric outbound now works against a
live Fabric stack:
alice posts in bt2-clean (Fabric REST)
→ guild emits message.created over socket.io
→ plugin's wakeup gate decides dispatch
→ notifications/plexum/channel/inbound to host
→ Plexum agent runs (echo provider)
→ outbound `send` tool posts via Fabric REST
→ fabrictester reply visible in channel
internal/socketio/ (~280 LOC + 2 tests):
- Minimal Engine.IO v4 + Socket.IO v5 client over websocket
- WebSocket-only transport (skip polling upgrade dance)
- AuthFunc callback re-evaluated on every (re)connect — fixes the
stale-JWT-on-reconnect bug openclaw plugin documented for the JS
client's single-shot auth, which the available Go socket.io
library (zishang520) doesn't address either
- PING/PONG per server-supplied interval
- Caller-driven reconnect: Connect returns on close, supervisor
re-dials with fresh token
internal/tokens/ (~95 LOC + 9 tests):
- Per-agent session cache with 8min TTL (matches openclaw's
TOKEN_TTL_MS); guild tokens are ~15min so 8min keeps a margin
- Invalidate forces re-login (used by inbound when CONNECT auth fires)
- GuildToken helper picks the per-guild JWT from the cached session;
if the guild is missing from the cache, invalidate + retry once
internal/inbound/ (~290 LOC):
- Supervisor: one socket.io conn per (agent, guild); reconnect with
fresh token on drop; ChannelSyncInterval (60s) polling + push
channel.joined/channel.left handlers
- Wakeup gate: dm channels deliver any non-self message; other
x_types require wakeup=true (record-only for non-wake non-dm
deferred — Plexum has no history-injection equivalent in v1)
- Self-author filter on selfUserId from cached session
- Per-(agent,msgId) dedup bounded to 5000 entries
- Per-channel serial queue with 5s idle drain so concurrent inbounds
on the same channel run one-at-a-time (matches openclaw plugin)
- Emits notifications/plexum/channel/inbound with session_id =
"s_fab_<fabric_channel_id>" for stable per-channel session continuity
cmd/plexum-fabric-channel-plugin:
- Wires inbound supervisor at Init; runs in a background goroutine
for the plugin's lifetime
- Replaces F-1's sessions map with tokens.Cache (same warm-sessions
behavior, now backed by TTL)
- hostLogHandler: bridges slog records from inbound supervisor to
HostAPI.Log notifications
F-2 deferred to F-3+:
- record-only history injection (Plexum v1 has no equivalent)
- tools.ts port (15 MCP tools — channel/canvas/sub-discussion family)
- presence-sync, command-sync, attachments, coalesce parity
Tests: 22 (5 identity + 6 config + 9 tokens + 2 socketio).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@@ -26,6 +27,8 @@ import (
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/inbound"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens"
|
||||
)
|
||||
|
||||
// HostConfig is the plugin's own config at
|
||||
@@ -47,10 +50,19 @@ type fabricPlugin struct {
|
||||
bindings []config.FabricBinding
|
||||
byFabric config.ByFabricChannel
|
||||
client *fabric.Client
|
||||
tokens *tokens.Cache
|
||||
|
||||
// Per-agent Session cache (refreshed lazily; full refresh in F-2).
|
||||
// Goroutine handle for the inbound supervisor. Cancelled on
|
||||
// plugin shutdown (we don't have an explicit shutdown signal in
|
||||
// the SDK today; rely on subprocess kill).
|
||||
inboundCancel context.CancelFunc
|
||||
inboundDone chan struct{}
|
||||
|
||||
// Legacy field — kept only for back-compat with non-tokens code
|
||||
// paths during the F-1 → F-2 refactor; safe to remove once nothing
|
||||
// else references it. Not used anymore.
|
||||
sessMu sync.Mutex
|
||||
sessions map[string]*fabric.Session // agentID → session
|
||||
sessions map[string]*fabric.Session
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) Manifest() plugin.Manifest {
|
||||
@@ -148,6 +160,15 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error {
|
||||
}
|
||||
p.byFabric = config.Index(p.bindings)
|
||||
|
||||
// Token cache: re-login per agent on TTL miss (8min default).
|
||||
p.tokens = tokens.New(0, func(loginCtx context.Context, agentID string) (*fabric.Session, error) {
|
||||
entry := p.identities.Lookup(agentID)
|
||||
if entry == nil || !entry.Enabled {
|
||||
return nil, fmt.Errorf("agent %s: no enabled identity", agentID)
|
||||
}
|
||||
return p.client.AgentLogin(loginCtx, entry.FabricAPIKey)
|
||||
})
|
||||
|
||||
host.Log("info", "fabric channel plugin initialized", map[string]any{
|
||||
"center": p.cfg.CenterAPIBase,
|
||||
"identity_path": idPath,
|
||||
@@ -155,18 +176,78 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error {
|
||||
"identities_loaded": len(p.identities.AgentIDs()),
|
||||
})
|
||||
|
||||
// Eager validate: for every bound agent that has a channel, do a
|
||||
// blocking agentLogin so we surface bad keys at startup instead of
|
||||
// on first outbound. F-2 hooks socket.io subscription here too.
|
||||
// Warm sessions (early bad-key detection).
|
||||
if err := p.warmSessions(ctx); err != nil {
|
||||
// Log + continue; outbound will retry on demand. We don't want
|
||||
// to refuse plugin init just because one key is stale.
|
||||
host.Log("warn", "fabric warm-sessions had errors",
|
||||
map[string]any{"err": err.Error()})
|
||||
}
|
||||
|
||||
// Phase F-2: start the inbound supervisor in a goroutine. Lives
|
||||
// until p.inboundCancel fires (currently never — SDK has no
|
||||
// shutdown hook; subprocess kill is the only stop signal).
|
||||
if len(p.bindings) > 0 {
|
||||
ctxBg, cancel := context.WithCancel(context.Background())
|
||||
p.inboundCancel = cancel
|
||||
p.inboundDone = make(chan struct{})
|
||||
notifier := func(channelName, message, sessionID string) {
|
||||
p.host.EmitNotification("notifications/plexum/channel/inbound", map[string]any{
|
||||
"channel_name": channelName,
|
||||
"message": message,
|
||||
"session_id": sessionID,
|
||||
})
|
||||
}
|
||||
// slog wrapping plugin.HostAPI.Log isn't worth the indirection
|
||||
// here; use a discard-style adapter that pipes WARN/INFO to
|
||||
// the host log.
|
||||
logger := slog.New(&hostLogHandler{host: host, level: slog.LevelInfo})
|
||||
sup := inbound.New(p.client, p.tokens, p.bindings, notifier, logger)
|
||||
go func() {
|
||||
defer close(p.inboundDone)
|
||||
if err := sup.Run(ctxBg); err != nil {
|
||||
host.Log("warn", "inbound supervisor exited", map[string]any{"err": err.Error()})
|
||||
}
|
||||
}()
|
||||
host.Log("info", "fabric inbound supervisor started",
|
||||
map[string]any{"agents": sup.AgentIDs, "bindings": len(p.bindings)})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// hostLogHandler is a tiny slog.Handler that forwards records to the
|
||||
// plugin's HostAPI.Log. inbound + supervisor use slog for structured
|
||||
// logging; this bridges to the host's log notification stream.
|
||||
type hostLogHandler struct {
|
||||
host plugin.HostAPI
|
||||
level slog.Level
|
||||
}
|
||||
|
||||
func (h *hostLogHandler) Enabled(_ context.Context, l slog.Level) bool { return l >= h.level }
|
||||
func (h *hostLogHandler) Handle(_ context.Context, r slog.Record) error {
|
||||
attrs := make(map[string]any, r.NumAttrs())
|
||||
r.Attrs(func(a slog.Attr) bool {
|
||||
attrs[a.Key] = a.Value.Any()
|
||||
return true
|
||||
})
|
||||
h.host.Log(levelString(r.Level), r.Message, attrs)
|
||||
return nil
|
||||
}
|
||||
func (h *hostLogHandler) WithAttrs(_ []slog.Attr) slog.Handler { return h }
|
||||
func (h *hostLogHandler) WithGroup(_ string) slog.Handler { return h }
|
||||
|
||||
func levelString(l slog.Level) string {
|
||||
switch {
|
||||
case l >= slog.LevelError:
|
||||
return "error"
|
||||
case l >= slog.LevelWarn:
|
||||
return "warn"
|
||||
case l >= slog.LevelInfo:
|
||||
return "info"
|
||||
default:
|
||||
return "debug"
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) warmSessions(ctx context.Context) error {
|
||||
// Which agents appear as a binding's AgentID?
|
||||
agentsNeeded := map[string]bool{}
|
||||
@@ -177,8 +258,7 @@ func (p *fabricPlugin) warmSessions(ctx context.Context) error {
|
||||
|
||||
var firstErr error
|
||||
for agentID := range agentsNeeded {
|
||||
entry, ok := enabled[agentID]
|
||||
if !ok {
|
||||
if _, ok := enabled[agentID]; !ok {
|
||||
err := fmt.Errorf("agent %s has channels but no identity (run plexum-fabric-register --agent-id %s --api-key ...)",
|
||||
agentID, agentID)
|
||||
p.host.Log("warn", err.Error(), nil)
|
||||
@@ -187,18 +267,15 @@ func (p *fabricPlugin) warmSessions(ctx context.Context) error {
|
||||
}
|
||||
continue
|
||||
}
|
||||
sess, err := p.client.AgentLogin(ctx, entry.FabricAPIKey)
|
||||
sess, err := p.tokens.Get(ctx, agentID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("agent %s login: %w", agentID, err)
|
||||
p.host.Log("warn", err.Error(), nil)
|
||||
p.host.Log("warn", "fabric agent warm failed",
|
||||
map[string]any{"agent": agentID, "err": err.Error()})
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
p.sessMu.Lock()
|
||||
p.sessions[agentID] = sess
|
||||
p.sessMu.Unlock()
|
||||
p.host.Log("info", "fabric session warm", map[string]any{
|
||||
"agent": agentID, "fabric_user": sess.User.Email,
|
||||
"guilds": len(sess.Guilds),
|
||||
@@ -223,6 +300,9 @@ func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.Raw
|
||||
if args.ChannelName == "" {
|
||||
return errResult("channel_name required"), nil
|
||||
}
|
||||
p.host.Log("info", "fabric send", map[string]any{
|
||||
"channel_name": args.ChannelName, "len": len(args.Message),
|
||||
})
|
||||
|
||||
// Find the binding for this plexum channel name.
|
||||
var binding *config.FabricBinding
|
||||
@@ -274,26 +354,9 @@ func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.Raw
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) sessionFor(ctx context.Context, agentID string) (*fabric.Session, error) {
|
||||
p.sessMu.Lock()
|
||||
sess := p.sessions[agentID]
|
||||
p.sessMu.Unlock()
|
||||
if sess != nil {
|
||||
return sess, nil
|
||||
}
|
||||
entry := p.identities.Lookup(agentID)
|
||||
if entry == nil || !entry.Enabled {
|
||||
return nil, errors.New("no identity registered (use plexum-fabric-register)")
|
||||
}
|
||||
loginCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||
defer cancel()
|
||||
fresh, err := p.client.AgentLogin(loginCtx, entry.FabricAPIKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.sessMu.Lock()
|
||||
p.sessions[agentID] = fresh
|
||||
p.sessMu.Unlock()
|
||||
return fresh, nil
|
||||
return p.tokens.Get(loginCtx, agentID)
|
||||
}
|
||||
|
||||
func errResult(msg string) plugin.ToolResult {
|
||||
|
||||
7
go.mod
7
go.mod
@@ -2,4 +2,9 @@ module git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin
|
||||
|
||||
go 1.24.2
|
||||
|
||||
require github.com/zishang520/socket.io-client-go v1.1.0 // indirect
|
||||
require (
|
||||
git.hangman-lab.top/hzhang/Plexum-sdk-go v0.0.0
|
||||
nhooyr.io/websocket v1.8.17
|
||||
)
|
||||
|
||||
replace git.hangman-lab.top/hzhang/Plexum-sdk-go => ../Plexum-sdk-go
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1,2 +0,0 @@
|
||||
github.com/zishang520/socket.io-client-go v1.1.0 h1:rc+WtphqasRKuyxmjx/zqTrdIjzjG0Yv8nWeQcv5hTM=
|
||||
github.com/zishang520/socket.io-client-go v1.1.0/go.mod h1:pFkvhEgVIjkUJyNvOvb62k78aFt2qwgJFNh2rfaCOc0=
|
||||
438
internal/inbound/inbound.go
Normal file
438
internal/inbound/inbound.go
Normal file
@@ -0,0 +1,438 @@
|
||||
// Package inbound is the Fabric → Plexum message pump. For each
|
||||
// (agent, guild) it maintains a long-lived socket.io connection to the
|
||||
// guild's /realtime namespace, joins every channel the agent has
|
||||
// membership in (resyncing periodically + on push events), and dispatches
|
||||
// matching message.created events through a per-channel serial queue.
|
||||
//
|
||||
// Wakeup gate matches openclaw's plugin: dispatch iff
|
||||
// - m.xType == "dm" (any non-self message), OR
|
||||
// - m.wakeup == true.
|
||||
//
|
||||
// Other inbounds (non-dm with wakeup != true) are dropped silently
|
||||
// (Plexum has no "record-only history injection" equivalent in v1;
|
||||
// future phase can add it).
|
||||
package inbound
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens"
|
||||
)
|
||||
|
||||
// ChannelSyncInterval is how often the supervisor diffs joined channels
|
||||
// against /api/channels to catch newly-membership-bestowed channels
|
||||
// that arrived without a push event. Matches openclaw's 60s.
|
||||
const ChannelSyncInterval = 60 * time.Second
|
||||
|
||||
// ReconnectBackoff is the wait between reconnect attempts when a
|
||||
// socket drops + Connect returns. Keep small for snappier recovery;
|
||||
// exponential backoff is a future improvement.
|
||||
const ReconnectBackoff = 3 * time.Second
|
||||
|
||||
// Notifier pushes one inbound message to the Plexum host. The plugin
|
||||
// main wires this to HostAPI.EmitNotification.
|
||||
type Notifier func(channelName, message, sessionID string)
|
||||
|
||||
// Supervisor owns the per-(agent, guild) socket.io connections. Run
|
||||
// blocks until ctx is cancelled.
|
||||
type Supervisor struct {
|
||||
Client *fabric.Client
|
||||
Tokens *tokens.Cache
|
||||
Bindings []config.FabricBinding
|
||||
ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding
|
||||
AgentIDs []string // unique agents to bring up
|
||||
Notify Notifier
|
||||
Logger *slog.Logger
|
||||
|
||||
// Deduplicate (agentID, messageId) → already dispatched. Bounded
|
||||
// to ~5000 entries (matches openclaw).
|
||||
dedupMu sync.Mutex
|
||||
seen map[string]struct{}
|
||||
|
||||
// Per-channel serial chain: each channel id maps to a chan of
|
||||
// queued task funcs. Channel goroutine drains them in order.
|
||||
chainMu sync.Mutex
|
||||
chains map[string]chan func()
|
||||
}
|
||||
|
||||
// New constructs a Supervisor with deduped agentIDs derived from bindings.
|
||||
func New(client *fabric.Client, tokenCache *tokens.Cache, bindings []config.FabricBinding,
|
||||
notify Notifier, logger *slog.Logger) *Supervisor {
|
||||
agentSet := map[string]struct{}{}
|
||||
for _, b := range bindings {
|
||||
agentSet[b.AgentID] = struct{}{}
|
||||
}
|
||||
agents := make([]string, 0, len(agentSet))
|
||||
for a := range agentSet {
|
||||
agents = append(agents, a)
|
||||
}
|
||||
return &Supervisor{
|
||||
Client: client, Tokens: tokenCache,
|
||||
Bindings: bindings, ByFabric: config.Index(bindings),
|
||||
AgentIDs: agents, Notify: notify, Logger: logger,
|
||||
seen: map[string]struct{}{},
|
||||
chains: map[string]chan func(){},
|
||||
}
|
||||
}
|
||||
|
||||
// Run blocks: spawns one goroutine per (agent, guild) socket.io
|
||||
// supervisor. Each supervisor reconnects on drop until ctx cancels.
|
||||
// Returns when ctx is cancelled and all supervisors have exited.
|
||||
func (s *Supervisor) Run(ctx context.Context) error {
|
||||
var wg sync.WaitGroup
|
||||
for _, agentID := range s.AgentIDs {
|
||||
sess, err := s.Tokens.Get(ctx, agentID)
|
||||
if err != nil {
|
||||
s.Logger.Warn("inbound: skip agent (no session)", "agent", agentID, "err", err)
|
||||
continue
|
||||
}
|
||||
for _, g := range sess.Guilds {
|
||||
if g.Endpoint == "" {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(agentID string, guild fabric.GuildInfo) {
|
||||
defer wg.Done()
|
||||
s.runAgentGuild(ctx, agentID, guild)
|
||||
}(agentID, g)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// runAgentGuild keeps one socket.io connection alive for (agent, guild)
|
||||
// until ctx cancels. Reconnects with fresh auth on every drop.
|
||||
func (s *Supervisor) runAgentGuild(ctx context.Context, agentID string, guild fabric.GuildInfo) {
|
||||
logger := s.Logger.With("agent", agentID, "guild", guild.NodeID)
|
||||
for {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return
|
||||
}
|
||||
err := s.connectOnce(ctx, agentID, guild, logger)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
errStr := "(nil)"
|
||||
if err != nil {
|
||||
errStr = err.Error()
|
||||
}
|
||||
logger.Warn("inbound: socket connection ended; reconnecting", "err", errStr)
|
||||
select {
|
||||
case <-time.After(ReconnectBackoff):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// connectOnce opens one socket connection, runs the read loop, and
|
||||
// returns the terminating error (nil on graceful close).
|
||||
func (s *Supervisor) connectOnce(ctx context.Context, agentID string, guild fabric.GuildInfo, logger *slog.Logger) error {
|
||||
authFn := func(authCtx context.Context) (map[string]any, error) {
|
||||
// Always fetch a fresh token before the CONNECT — fixes the
|
||||
// stale-JWT-on-reconnect bug openclaw documented.
|
||||
s.Tokens.Invalidate(agentID)
|
||||
tok, err := s.Tokens.GuildToken(authCtx, agentID, guild.NodeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return map[string]any{"token": tok}, nil
|
||||
}
|
||||
cli, err := socketio.New(guild.Endpoint, "/socket.io/", "/realtime", authFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Tracked subscriptions for this connection. Cleared on each
|
||||
// reconnect (server forgets join_channel state).
|
||||
joined := map[string]struct{}{}
|
||||
var joinedMu sync.Mutex
|
||||
|
||||
// Initial sync + periodic resync runs in a goroutine; it reads
|
||||
// channels from REST and emits join_channel for new + leave_channel
|
||||
// for gone.
|
||||
syncCtx, cancelSync := context.WithCancel(ctx)
|
||||
defer cancelSync()
|
||||
syncOnce := func(kind string) {
|
||||
tok, err := s.Tokens.GuildToken(syncCtx, agentID, guild.NodeID)
|
||||
if err != nil {
|
||||
logger.Warn("inbound: sync token", "err", err)
|
||||
return
|
||||
}
|
||||
channels, err := s.Client.ListChannels(syncCtx, guild.Endpoint, tok, guild.NodeID)
|
||||
if err != nil {
|
||||
logger.Warn("inbound: list channels", "err", err)
|
||||
return
|
||||
}
|
||||
current := map[string]struct{}{}
|
||||
for _, c := range channels {
|
||||
current[c.ID] = struct{}{}
|
||||
}
|
||||
var added, removed int
|
||||
joinedMu.Lock()
|
||||
for id := range current {
|
||||
if _, ok := joined[id]; !ok {
|
||||
_ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": id})
|
||||
joined[id] = struct{}{}
|
||||
added++
|
||||
}
|
||||
}
|
||||
for id := range joined {
|
||||
if _, ok := current[id]; !ok {
|
||||
_ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": id})
|
||||
delete(joined, id)
|
||||
removed++
|
||||
}
|
||||
}
|
||||
size := len(joined)
|
||||
joinedMu.Unlock()
|
||||
if kind == "initial" {
|
||||
logger.Info("inbound: channels joined", "n", size)
|
||||
} else if added > 0 || removed > 0 {
|
||||
logger.Info("inbound: channels resync", "added", added, "removed", removed, "now", size)
|
||||
}
|
||||
}
|
||||
|
||||
// Event handlers. Registered BEFORE Connect so we don't miss the
|
||||
// first events the server pushes right after handshake.
|
||||
cli.On("connect", func([]json.RawMessage) { /* no-op; server CONNECT ack */ })
|
||||
|
||||
cli.On("channel.joined", func(args []json.RawMessage) {
|
||||
var evt struct {
|
||||
ChannelID string `json:"channelId"`
|
||||
}
|
||||
if len(args) == 0 {
|
||||
return
|
||||
}
|
||||
_ = json.Unmarshal(args[0], &evt)
|
||||
if evt.ChannelID == "" {
|
||||
return
|
||||
}
|
||||
joinedMu.Lock()
|
||||
if _, already := joined[evt.ChannelID]; already {
|
||||
joinedMu.Unlock()
|
||||
return
|
||||
}
|
||||
joined[evt.ChannelID] = struct{}{}
|
||||
joinedMu.Unlock()
|
||||
_ = cli.Emit(syncCtx, "join_channel", map[string]any{"channelId": evt.ChannelID})
|
||||
logger.Info("inbound: channel.joined push", "channel", evt.ChannelID)
|
||||
})
|
||||
|
||||
cli.On("channel.left", func(args []json.RawMessage) {
|
||||
var evt struct {
|
||||
ChannelID string `json:"channelId"`
|
||||
}
|
||||
if len(args) == 0 {
|
||||
return
|
||||
}
|
||||
_ = json.Unmarshal(args[0], &evt)
|
||||
if evt.ChannelID == "" {
|
||||
return
|
||||
}
|
||||
joinedMu.Lock()
|
||||
if _, present := joined[evt.ChannelID]; !present {
|
||||
joinedMu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(joined, evt.ChannelID)
|
||||
joinedMu.Unlock()
|
||||
_ = cli.Emit(syncCtx, "leave_channel", map[string]any{"channelId": evt.ChannelID})
|
||||
logger.Info("inbound: channel.left push", "channel", evt.ChannelID)
|
||||
})
|
||||
|
||||
// Capture self user id from current cached session for the
|
||||
// self-author filter.
|
||||
sess := s.Tokens.Peek(agentID)
|
||||
var selfUserID string
|
||||
if sess != nil {
|
||||
selfUserID = sess.User.ID
|
||||
}
|
||||
|
||||
cli.On("message.created", func(args []json.RawMessage) {
|
||||
var m FabricMessage
|
||||
if len(args) == 0 {
|
||||
return
|
||||
}
|
||||
if err := json.Unmarshal(args[0], &m); err != nil {
|
||||
logger.Warn("inbound: bad message.created", "err", err)
|
||||
return
|
||||
}
|
||||
s.dispatch(agentID, guild.NodeID, selfUserID, &m, logger)
|
||||
})
|
||||
|
||||
// Periodic resync goroutine.
|
||||
syncTimer := time.NewTicker(ChannelSyncInterval)
|
||||
defer syncTimer.Stop()
|
||||
go func() {
|
||||
// Wait for connect to complete; we kick the initial sync from
|
||||
// within Connect's startup. Use the ticker for the periodic.
|
||||
for {
|
||||
select {
|
||||
case <-syncCtx.Done():
|
||||
return
|
||||
case <-syncTimer.C:
|
||||
syncOnce("resync")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// On connect, fire the initial sync. The socketio client doesn't
|
||||
// fire a Go-side "connect" event before Connect returns; we
|
||||
// schedule the initial sync to run shortly after Connect starts.
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
select {
|
||||
case <-syncCtx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
syncOnce("initial")
|
||||
}()
|
||||
|
||||
return cli.Connect(ctx)
|
||||
}
|
||||
|
||||
// dispatch applies self-author filter, dedup, channel-binding lookup,
|
||||
// wakeup gate, then notifies Plexum.
|
||||
func (s *Supervisor) dispatch(agentID, guildNodeID, selfUserID string, m *FabricMessage, logger *slog.Logger) {
|
||||
if m.ChannelID == "" {
|
||||
return
|
||||
}
|
||||
// Self-author filter: skip messages this user wrote.
|
||||
if m.AuthorUserID != "" && m.AuthorUserID == selfUserID {
|
||||
return
|
||||
}
|
||||
// Dedup per (agent, messageId).
|
||||
key := agentID + ":" + m.MessageID
|
||||
s.dedupMu.Lock()
|
||||
if _, dup := s.seen[key]; dup {
|
||||
s.dedupMu.Unlock()
|
||||
return
|
||||
}
|
||||
s.seen[key] = struct{}{}
|
||||
if len(s.seen) > 5000 {
|
||||
s.seen = map[string]struct{}{} // simple bounded reset
|
||||
}
|
||||
s.dedupMu.Unlock()
|
||||
|
||||
// Channel-binding lookup. We only deliver messages for channels
|
||||
// the operator has bound to a Plexum agent in channels/*.json.
|
||||
binding, ok := s.ByFabric[config.Key(guildNodeID, m.ChannelID)]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// The binding must point at THIS agent (a guild may have several
|
||||
// agents; an inbound on agent A's socket for a channel bound to
|
||||
// agent B is ignored — agent B has their own socket for it).
|
||||
if binding.AgentID != agentID {
|
||||
return
|
||||
}
|
||||
|
||||
// Wakeup gate (decision #36-equivalent on the openclaw side):
|
||||
// dm channel → always deliver
|
||||
// other → wakeup must be true
|
||||
if m.XType != "dm" && !m.Wakeup {
|
||||
logger.Debug("inbound: skip (no wakeup)",
|
||||
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("inbound: dispatch",
|
||||
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType)
|
||||
|
||||
// Per-channel serial queue. Concurrent inbounds on the same channel
|
||||
// must run one at a time so Plexum's state machine sees clean turns.
|
||||
sessionID := "s_fab_" + m.ChannelID
|
||||
body := m.Content
|
||||
plexumChannel := binding.PlexumChannelName
|
||||
s.enqueueChannel(m.ChannelID, func() {
|
||||
s.Notify(plexumChannel, body, sessionID)
|
||||
})
|
||||
}
|
||||
|
||||
// enqueueChannel chains a task onto the per-channel queue. If no chain
|
||||
// exists, spawn the drain goroutine. Drain goroutine exits when the
|
||||
// channel goes idle for >5s to avoid leaking goroutines per channel.
|
||||
func (s *Supervisor) enqueueChannel(channelID string, task func()) {
|
||||
s.chainMu.Lock()
|
||||
ch, ok := s.chains[channelID]
|
||||
if !ok {
|
||||
ch = make(chan func(), 32)
|
||||
s.chains[channelID] = ch
|
||||
go s.drainChannel(channelID, ch)
|
||||
}
|
||||
s.chainMu.Unlock()
|
||||
select {
|
||||
case ch <- task:
|
||||
default:
|
||||
// Backlog full (32+) — log + drop. Shouldn't happen unless
|
||||
// the bound agent is hopelessly behind.
|
||||
s.Logger.Warn("inbound: per-channel queue full; dropping",
|
||||
"channel", channelID)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) drainChannel(channelID string, ch chan func()) {
|
||||
idle := time.NewTimer(5 * time.Second)
|
||||
defer idle.Stop()
|
||||
for {
|
||||
select {
|
||||
case task, open := <-ch:
|
||||
if !open {
|
||||
return
|
||||
}
|
||||
if !idle.Stop() {
|
||||
<-idle.C
|
||||
}
|
||||
task()
|
||||
idle.Reset(5 * time.Second)
|
||||
case <-idle.C:
|
||||
s.chainMu.Lock()
|
||||
delete(s.chains, channelID)
|
||||
s.chainMu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FabricMessage is the wire shape of Fabric's message.created event.
|
||||
// Mirror of FabricMessage type in openclaw's inbound.ts.
|
||||
type FabricMessage struct {
|
||||
MessageID string `json:"messageId"`
|
||||
Seq int `json:"seq"`
|
||||
Content string `json:"content"`
|
||||
AuthorUserID string `json:"authorUserId,omitempty"`
|
||||
CreatedAt string `json:"createdAt,omitempty"`
|
||||
ChannelID string `json:"channelId,omitempty"`
|
||||
Attachments []FabricAttachment `json:"attachments,omitempty"`
|
||||
Wakeup bool `json:"wakeup,omitempty"`
|
||||
// XType matches the Fabric channel's x-type field. 'dm' bypasses
|
||||
// the wakeup gate.
|
||||
XType string `json:"xType,omitempty"`
|
||||
}
|
||||
|
||||
// FabricAttachment is the wire shape of one attachment ref. v1
|
||||
// (Phase F-2) doesn't download these; F-8 will.
|
||||
type FabricAttachment struct {
|
||||
URL string `json:"url"`
|
||||
Name string `json:"name,omitempty"`
|
||||
MimeType string `json:"mimeType,omitempty"`
|
||||
}
|
||||
|
||||
// Equality check used for tests.
|
||||
func (a FabricAttachment) Equal(b FabricAttachment) bool {
|
||||
return a.URL == b.URL && a.Name == b.Name && a.MimeType == b.MimeType
|
||||
}
|
||||
|
||||
// Sanity: ensure FabricMessage stays serializable.
|
||||
var _ = fmt.Sprint
|
||||
349
internal/socketio/client.go
Normal file
349
internal/socketio/client.go
Normal file
@@ -0,0 +1,349 @@
|
||||
// Package socketio is a minimal Engine.IO v4 + Socket.IO v5 client over
|
||||
// WebSocket. Just what the Fabric plugin needs:
|
||||
//
|
||||
// - WebSocket-only transport (no polling upgrade dance)
|
||||
// - Single namespace (defaults to "/realtime" — caller-supplied)
|
||||
// - CONNECT with caller-supplied auth payload (re-evaluated on every
|
||||
// reconnect via the AuthFunc callback — this is the bug fix
|
||||
// openclaw plugin specifically documented for socket.io-client-js
|
||||
// and that the available Go socket.io library doesn't address)
|
||||
// - Emit + receive named events with arbitrary JSON arg arrays
|
||||
// - PING/PONG heartbeat per server-supplied interval
|
||||
// - Caller-driven manual reconnect: connect returns when socket
|
||||
// closes; supervisor loop calls Connect again with a fresh token
|
||||
//
|
||||
// This is intentionally narrower than full Socket.IO — no rooms (server
|
||||
// joins us into rooms by event), no acks, no binary, no namespaces
|
||||
// other than what the constructor takes.
|
||||
package socketio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
// Engine.IO v4 packet types (first char of frame).
|
||||
const (
|
||||
eioOpen byte = '0'
|
||||
eioClose byte = '1'
|
||||
eioPing byte = '2'
|
||||
eioPong byte = '3'
|
||||
eioMessage byte = '4'
|
||||
eioNoop byte = '6'
|
||||
)
|
||||
|
||||
// Socket.IO v5 packet types (first char inside an EIO message).
|
||||
const (
|
||||
sioConnect byte = '0'
|
||||
sioDisconnect byte = '1'
|
||||
sioEvent byte = '2'
|
||||
sioAck byte = '3'
|
||||
sioConnectErr byte = '4'
|
||||
)
|
||||
|
||||
// AuthFunc returns the auth payload to send with CONNECT. Called on
|
||||
// every (re)connect so the supervisor can plug in a fresh token.
|
||||
type AuthFunc func(ctx context.Context) (map[string]any, error)
|
||||
|
||||
// Handler is the per-event callback signature. args is the JSON array
|
||||
// payload after the event name; len(args) is usually 1 (one object).
|
||||
type Handler func(args []json.RawMessage)
|
||||
|
||||
// Client is one Socket.IO connection. NOT safe for concurrent Emit;
|
||||
// caller serializes if it wants to multi-write.
|
||||
type Client struct {
|
||||
URL string // e.g. "ws://localhost:7002/socket.io/?EIO=4&transport=websocket"
|
||||
Namespace string // e.g. "/realtime"; "" → root namespace
|
||||
Auth AuthFunc // CONNECT auth payload
|
||||
|
||||
// Read-only after Connect; mutating during a live connection is
|
||||
// undefined.
|
||||
handlers map[string]Handler
|
||||
handlerMu sync.RWMutex
|
||||
conn *websocket.Conn
|
||||
pingPeriod time.Duration // from server "open" packet
|
||||
pingTimeout time.Duration
|
||||
|
||||
// closed-on-disconnect; Connect returns when this fires.
|
||||
disconnected chan struct{}
|
||||
}
|
||||
|
||||
// New constructs a Client. host should be `ws://host:port` (or
|
||||
// `wss://`). path is typically "/socket.io/" — the Engine.IO query
|
||||
// params are appended automatically.
|
||||
func New(host, path, namespace string, auth AuthFunc) (*Client, error) {
|
||||
u, err := url.Parse(host)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("socketio: parse host %q: %w", host, err)
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "http":
|
||||
u.Scheme = "ws"
|
||||
case "https":
|
||||
u.Scheme = "wss"
|
||||
}
|
||||
if path == "" {
|
||||
path = "/socket.io/"
|
||||
}
|
||||
u.Path = path
|
||||
q := u.Query()
|
||||
q.Set("EIO", "4")
|
||||
q.Set("transport", "websocket")
|
||||
u.RawQuery = q.Encode()
|
||||
return &Client{
|
||||
URL: u.String(),
|
||||
Namespace: namespace,
|
||||
Auth: auth,
|
||||
handlers: map[string]Handler{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// On registers a handler for an event name. Safe to call before Connect.
|
||||
// Replacing an existing handler is fine.
|
||||
func (c *Client) On(event string, h Handler) {
|
||||
c.handlerMu.Lock()
|
||||
defer c.handlerMu.Unlock()
|
||||
c.handlers[event] = h
|
||||
}
|
||||
|
||||
// Emit sends an event with args. Server side receives `[event, args...]`.
|
||||
func (c *Client) Emit(ctx context.Context, event string, args ...any) error {
|
||||
if c.conn == nil {
|
||||
return errors.New("socketio: not connected")
|
||||
}
|
||||
payload := append([]any{event}, args...)
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
frame := buildEventFrame(c.Namespace, body)
|
||||
return c.conn.Write(ctx, websocket.MessageText, frame)
|
||||
}
|
||||
|
||||
// Connect dials the server, completes the Engine.IO handshake +
|
||||
// Socket.IO CONNECT, then runs the read+heartbeat loop. Blocks until
|
||||
// the connection closes (either side) or ctx is cancelled. Returns the
|
||||
// terminating error (or nil for clean close).
|
||||
//
|
||||
// Caller-driven reconnect: wrap Connect in a loop that re-evaluates
|
||||
// the auth payload (token refresh) before each Call.
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
c.disconnected = make(chan struct{})
|
||||
defer close(c.disconnected)
|
||||
|
||||
authMap, err := c.Auth(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("auth: %w", err)
|
||||
}
|
||||
|
||||
conn, _, err := websocket.Dial(ctx, c.URL, &websocket.DialOptions{
|
||||
HTTPHeader: nil,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial %s: %w", c.URL, err)
|
||||
}
|
||||
c.conn = conn
|
||||
// websocket library default read limit is 32 KiB; bump for chunky
|
||||
// channel sync payloads.
|
||||
conn.SetReadLimit(1 << 20) // 1 MiB
|
||||
defer func() {
|
||||
_ = conn.Close(websocket.StatusNormalClosure, "")
|
||||
c.conn = nil
|
||||
}()
|
||||
|
||||
// Engine.IO handshake: server sends `0{"sid":"...","upgrades":[...],"pingInterval":...,"pingTimeout":...}`
|
||||
if err := c.recvOpen(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send Socket.IO CONNECT with auth.
|
||||
if err := c.sendConnect(ctx, authMap); err != nil {
|
||||
return fmt.Errorf("CONNECT: %w", err)
|
||||
}
|
||||
|
||||
pingCtx, cancelPing := context.WithCancel(ctx)
|
||||
defer cancelPing()
|
||||
go c.pingLoop(pingCtx)
|
||||
|
||||
for {
|
||||
_, data, err := conn.Read(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("read: %w", err)
|
||||
}
|
||||
if len(data) == 0 {
|
||||
continue
|
||||
}
|
||||
if err := c.handlePacket(data); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnect closes the underlying socket cleanly. Connect's read loop
|
||||
// will see EOF and return.
|
||||
func (c *Client) Disconnect() {
|
||||
if c.conn != nil {
|
||||
_ = c.conn.Close(websocket.StatusNormalClosure, "")
|
||||
}
|
||||
}
|
||||
|
||||
// recvOpen reads the EIO "open" frame and stashes ping intervals.
|
||||
func (c *Client) recvOpen(ctx context.Context) error {
|
||||
_, data, err := c.conn.Read(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read open: %w", err)
|
||||
}
|
||||
if len(data) < 2 || data[0] != eioOpen {
|
||||
return fmt.Errorf("expected EIO open, got %q", string(data))
|
||||
}
|
||||
var info struct {
|
||||
Sid string `json:"sid"`
|
||||
PingInterval int `json:"pingInterval"`
|
||||
PingTimeout int `json:"pingTimeout"`
|
||||
}
|
||||
if err := json.Unmarshal(data[1:], &info); err != nil {
|
||||
return fmt.Errorf("parse open: %w", err)
|
||||
}
|
||||
c.pingPeriod = time.Duration(info.PingInterval) * time.Millisecond
|
||||
c.pingTimeout = time.Duration(info.PingTimeout) * time.Millisecond
|
||||
if c.pingPeriod <= 0 {
|
||||
c.pingPeriod = 25 * time.Second // EIO default
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendConnect: `4` (EIO message) + `0` (SIO CONNECT) + namespace,?json
|
||||
func (c *Client) sendConnect(ctx context.Context, auth map[string]any) error {
|
||||
body := []byte{eioMessage, sioConnect}
|
||||
if c.Namespace != "" && c.Namespace != "/" {
|
||||
body = append(body, []byte(c.Namespace+",")...)
|
||||
}
|
||||
if len(auth) > 0 {
|
||||
raw, err := json.Marshal(auth)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = append(body, raw...)
|
||||
}
|
||||
return c.conn.Write(ctx, websocket.MessageText, body)
|
||||
}
|
||||
|
||||
// pingLoop sends EIO ping frames per server-supplied interval.
|
||||
func (c *Client) pingLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.pingPeriod)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if c.conn == nil {
|
||||
return
|
||||
}
|
||||
writeCtx, cancel := context.WithTimeout(ctx, c.pingTimeout)
|
||||
err := c.conn.Write(writeCtx, websocket.MessageText, []byte{eioPing})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handlePacket inspects the first byte (EIO type) + dispatches.
|
||||
func (c *Client) handlePacket(data []byte) error {
|
||||
switch data[0] {
|
||||
case eioPong:
|
||||
return nil // server responding to our ping (or vice versa)
|
||||
case eioPing:
|
||||
// Server-initiated ping; reply with pong.
|
||||
if c.conn == nil {
|
||||
return nil
|
||||
}
|
||||
return c.conn.Write(context.Background(), websocket.MessageText, []byte{eioPong})
|
||||
case eioClose:
|
||||
return io_EOF
|
||||
case eioMessage:
|
||||
if len(data) < 2 {
|
||||
return nil
|
||||
}
|
||||
return c.handleSIO(data[1:])
|
||||
case eioNoop:
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// io_EOF is returned on EIO close packet so the supervisor loop knows
|
||||
// the server cleanly closed (vs network error).
|
||||
var io_EOF = errors.New("socketio: server initiated close")
|
||||
|
||||
func (c *Client) handleSIO(data []byte) error {
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
sioType := data[0]
|
||||
rest := data[1:]
|
||||
// Skip namespace prefix if present (e.g. "/realtime,").
|
||||
if c.Namespace != "" && c.Namespace != "/" && len(rest) > len(c.Namespace) &&
|
||||
string(rest[:len(c.Namespace)]) == c.Namespace && rest[len(c.Namespace)] == ',' {
|
||||
rest = rest[len(c.Namespace)+1:]
|
||||
}
|
||||
switch sioType {
|
||||
case sioConnect:
|
||||
// Server ack of our CONNECT. Body is `{"sid":"..."}`; we don't
|
||||
// need anything from it.
|
||||
return nil
|
||||
case sioDisconnect:
|
||||
return io_EOF
|
||||
case sioConnectErr:
|
||||
return fmt.Errorf("socketio: CONNECT_ERROR: %s", string(rest))
|
||||
case sioEvent:
|
||||
return c.dispatchEvent(rest)
|
||||
case sioAck:
|
||||
// We don't use acks; ignore.
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) dispatchEvent(body []byte) error {
|
||||
var arr []json.RawMessage
|
||||
if err := json.Unmarshal(body, &arr); err != nil {
|
||||
return fmt.Errorf("dispatch parse: %w (body=%q)", err, string(body))
|
||||
}
|
||||
if len(arr) == 0 {
|
||||
return nil
|
||||
}
|
||||
var event string
|
||||
if err := json.Unmarshal(arr[0], &event); err != nil {
|
||||
return fmt.Errorf("dispatch event-name: %w", err)
|
||||
}
|
||||
c.handlerMu.RLock()
|
||||
h := c.handlers[event]
|
||||
c.handlerMu.RUnlock()
|
||||
if h == nil {
|
||||
return nil // no subscriber, drop silently
|
||||
}
|
||||
h(arr[1:])
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildEventFrame is exposed for the encoder unit test.
|
||||
func buildEventFrame(namespace string, body []byte) []byte {
|
||||
out := []byte{eioMessage, sioEvent}
|
||||
if namespace != "" && namespace != "/" {
|
||||
out = append(out, []byte(namespace+",")...)
|
||||
}
|
||||
return append(out, body...)
|
||||
}
|
||||
27
internal/socketio/client_test.go
Normal file
27
internal/socketio/client_test.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package socketio
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBuildEventFrame(t *testing.T) {
|
||||
body, _ := json.Marshal([]any{"hello", map[string]any{"x": 1}})
|
||||
got := string(buildEventFrame("/realtime", body))
|
||||
want := `42/realtime,["hello",{"x":1}]`
|
||||
if got != want {
|
||||
t.Errorf("got %q\nwant %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildEventFrameRootNamespace(t *testing.T) {
|
||||
body, _ := json.Marshal([]any{"ping"})
|
||||
got := string(buildEventFrame("", body))
|
||||
if got != `42["ping"]` {
|
||||
t.Errorf("got %q", got)
|
||||
}
|
||||
got2 := string(buildEventFrame("/", body))
|
||||
if got2 != `42["ping"]` {
|
||||
t.Errorf("got %q", got2)
|
||||
}
|
||||
}
|
||||
116
internal/tokens/tokens.go
Normal file
116
internal/tokens/tokens.go
Normal file
@@ -0,0 +1,116 @@
|
||||
// Package tokens caches per-agent Fabric sessions with a short TTL so
|
||||
// guild access tokens stay fresh. openclaw's plugin documented the
|
||||
// failure mode: long-lived sockets survive auto-reconnect at the TCP
|
||||
// layer but the JWT captured at original CONNECT goes stale ~15min in,
|
||||
// silently breaking auth at the application layer (subscribe-to-room
|
||||
// emits go to /dev/null). TTL refresh by re-login per agent fixes it.
|
||||
package tokens
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
)
|
||||
|
||||
// DefaultTTL matches openclaw's TOKEN_TTL_MS (8 min). Guild tokens are
|
||||
// ~15 min so refreshing every 8 keeps a comfortable margin.
|
||||
const DefaultTTL = 8 * time.Minute
|
||||
|
||||
// LoginFunc is the per-agent re-login callback. Cache calls this when
|
||||
// the cached session expires.
|
||||
type LoginFunc func(ctx context.Context, agentID string) (*fabric.Session, error)
|
||||
|
||||
// Cache wraps a per-agent session cache. Thread-safe.
|
||||
type Cache struct {
|
||||
TTL time.Duration
|
||||
Login LoginFunc
|
||||
|
||||
mu sync.Mutex
|
||||
entries map[string]*entry
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
session *fabric.Session
|
||||
at time.Time
|
||||
}
|
||||
|
||||
// New constructs a Cache. ttl ≤ 0 → DefaultTTL.
|
||||
func New(ttl time.Duration, login LoginFunc) *Cache {
|
||||
if ttl <= 0 {
|
||||
ttl = DefaultTTL
|
||||
}
|
||||
return &Cache{TTL: ttl, Login: login, entries: map[string]*entry{}}
|
||||
}
|
||||
|
||||
// Get returns the cached session if fresh, otherwise re-logs in. err
|
||||
// from Login bubbles up; the stale entry is NOT served on Login failure
|
||||
// (caller may retry).
|
||||
func (c *Cache) Get(ctx context.Context, agentID string) (*fabric.Session, error) {
|
||||
c.mu.Lock()
|
||||
e, ok := c.entries[agentID]
|
||||
c.mu.Unlock()
|
||||
if ok && time.Since(e.at) < c.TTL {
|
||||
return e.session, nil
|
||||
}
|
||||
if c.Login == nil {
|
||||
return nil, errors.New("tokens: no login func configured")
|
||||
}
|
||||
fresh, err := c.Login(ctx, agentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.entries[agentID] = &entry{session: fresh, at: time.Now()}
|
||||
c.mu.Unlock()
|
||||
return fresh, nil
|
||||
}
|
||||
|
||||
// Invalidate drops the cached entry for agentID. Next Get re-logs in.
|
||||
// Useful when an HTTP call returns 401 (cached token rejected).
|
||||
func (c *Cache) Invalidate(agentID string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.entries, agentID)
|
||||
}
|
||||
|
||||
// Peek returns the cached session without TTL check or re-login.
|
||||
// Returns nil if nothing cached.
|
||||
func (c *Cache) Peek(agentID string) *fabric.Session {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if e, ok := c.entries[agentID]; ok {
|
||||
return e.session
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GuildToken is a convenience that returns a fresh guild access token
|
||||
// for (agentID, guildNodeID). Falls through to Login if cache is stale
|
||||
// or the guildNodeID isn't in the cached session.
|
||||
func (c *Cache) GuildToken(ctx context.Context, agentID, guildNodeID string) (string, error) {
|
||||
sess, err := c.Get(ctx, agentID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, t := range sess.GuildAccessTokens {
|
||||
if t.GuildNodeID == guildNodeID {
|
||||
return t.Token, nil
|
||||
}
|
||||
}
|
||||
// Cached session doesn't have this guild — could be a stale
|
||||
// session list. Invalidate + retry once.
|
||||
c.Invalidate(agentID)
|
||||
sess, err = c.Get(ctx, agentID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, t := range sess.GuildAccessTokens {
|
||||
if t.GuildNodeID == guildNodeID {
|
||||
return t.Token, nil
|
||||
}
|
||||
}
|
||||
return "", errors.New("tokens: agent has no access to guild " + guildNodeID)
|
||||
}
|
||||
130
internal/tokens/tokens_test.go
Normal file
130
internal/tokens/tokens_test.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package tokens
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
)
|
||||
|
||||
func fakeSession(guildNodes ...string) *fabric.Session {
|
||||
s := &fabric.Session{User: fabric.SessionUser{ID: "u1", Email: "u@x"}}
|
||||
for _, g := range guildNodes {
|
||||
s.Guilds = append(s.Guilds, fabric.GuildInfo{NodeID: g, Endpoint: "http://" + g})
|
||||
s.GuildAccessTokens = append(s.GuildAccessTokens, fabric.GuildAccessToken{
|
||||
GuildNodeID: g, Token: "tok-" + g,
|
||||
})
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func TestGetFirstLogsIn(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
c := New(0, func(context.Context, string) (*fabric.Session, error) {
|
||||
calls.Add(1)
|
||||
return fakeSession("g1"), nil
|
||||
})
|
||||
s, err := c.Get(context.Background(), "alice")
|
||||
if err != nil || s.User.ID != "u1" {
|
||||
t.Fatalf("get err=%v", err)
|
||||
}
|
||||
if calls.Load() != 1 {
|
||||
t.Errorf("calls = %d", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWithinTTLReusesCached(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) {
|
||||
calls.Add(1)
|
||||
return fakeSession("g1"), nil
|
||||
})
|
||||
c.Get(context.Background(), "alice")
|
||||
c.Get(context.Background(), "alice")
|
||||
c.Get(context.Background(), "alice")
|
||||
if calls.Load() != 1 {
|
||||
t.Errorf("calls = %d (TTL fresh)", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAfterTTLReLogs(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
c := New(10*time.Millisecond, func(context.Context, string) (*fabric.Session, error) {
|
||||
calls.Add(1)
|
||||
return fakeSession("g1"), nil
|
||||
})
|
||||
c.Get(context.Background(), "alice")
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
c.Get(context.Background(), "alice")
|
||||
if calls.Load() != 2 {
|
||||
t.Errorf("calls = %d, want 2 after TTL expiry", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvalidateForcesReLogin(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) {
|
||||
calls.Add(1)
|
||||
return fakeSession("g1"), nil
|
||||
})
|
||||
c.Get(context.Background(), "alice")
|
||||
c.Invalidate("alice")
|
||||
c.Get(context.Background(), "alice")
|
||||
if calls.Load() != 2 {
|
||||
t.Errorf("calls = %d, want 2 after Invalidate", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoginErrorBubbles(t *testing.T) {
|
||||
sentinel := errors.New("boom")
|
||||
c := New(0, func(context.Context, string) (*fabric.Session, error) {
|
||||
return nil, sentinel
|
||||
})
|
||||
_, err := c.Get(context.Background(), "alice")
|
||||
if !errors.Is(err, sentinel) {
|
||||
t.Errorf("err = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuildTokenHappy(t *testing.T) {
|
||||
c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) {
|
||||
return fakeSession("g1", "g2"), nil
|
||||
})
|
||||
tok, err := c.GuildToken(context.Background(), "alice", "g2")
|
||||
if err != nil || tok != "tok-g2" {
|
||||
t.Errorf("token=%q err=%v", tok, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGuildTokenMissingGuildRetriesThenErrors(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) {
|
||||
calls.Add(1)
|
||||
return fakeSession("g1"), nil
|
||||
})
|
||||
_, err := c.GuildToken(context.Background(), "alice", "missing")
|
||||
if err == nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
// First Get + post-invalidate Get = 2 logins
|
||||
if calls.Load() != 2 {
|
||||
t.Errorf("calls = %d, want 2 (initial + retry)", calls.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeekDoesNotLogin(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
c := New(time.Minute, func(context.Context, string) (*fabric.Session, error) {
|
||||
calls.Add(1)
|
||||
return fakeSession("g1"), nil
|
||||
})
|
||||
if c.Peek("alice") != nil {
|
||||
t.Errorf("Peek on empty should be nil")
|
||||
}
|
||||
if calls.Load() != 0 {
|
||||
t.Errorf("Peek should not call Login")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user