Files
hzhang 0efcdfd342 feat: Phase F-2 — socket.io inbound + wakeup gate + token refresh
End-to-end Fabric inbound→Plexum→Fabric outbound now works against a
live Fabric stack:

  alice posts in bt2-clean (Fabric REST)
    → guild emits message.created over socket.io
    → plugin's wakeup gate decides dispatch
    → notifications/plexum/channel/inbound to host
    → Plexum agent runs (echo provider)
    → outbound `send` tool posts via Fabric REST
    → fabrictester reply visible in channel

internal/socketio/ (~280 LOC + 2 tests):
- Minimal Engine.IO v4 + Socket.IO v5 client over websocket
- WebSocket-only transport (skip polling upgrade dance)
- AuthFunc callback re-evaluated on every (re)connect — fixes the
  stale-JWT-on-reconnect bug openclaw plugin documented for the JS
  client's single-shot auth, which the available Go socket.io
  library (zishang520) doesn't address either
- PING/PONG per server-supplied interval
- Caller-driven reconnect: Connect returns on close, supervisor
  re-dials with fresh token

internal/tokens/ (~95 LOC + 9 tests):
- Per-agent session cache with 8min TTL (matches openclaw's
  TOKEN_TTL_MS); guild tokens are ~15min so 8min keeps a margin
- Invalidate forces re-login (used by inbound when CONNECT auth fires)
- GuildToken helper picks the per-guild JWT from the cached session;
  if the guild is missing from the cache, invalidate + retry once

internal/inbound/ (~290 LOC):
- Supervisor: one socket.io conn per (agent, guild); reconnect with
  fresh token on drop; ChannelSyncInterval (60s) polling + push
  channel.joined/channel.left handlers
- Wakeup gate: dm channels deliver any non-self message; other
  x_types require wakeup=true (record-only for non-wake non-dm
  deferred — Plexum has no history-injection equivalent in v1)
- Self-author filter on selfUserId from cached session
- Per-(agent,msgId) dedup bounded to 5000 entries
- Per-channel serial queue with 5s idle drain so concurrent inbounds
  on the same channel run one-at-a-time (matches openclaw plugin)
- Emits notifications/plexum/channel/inbound with session_id =
  "s_fab_<fabric_channel_id>" for stable per-channel session continuity

cmd/plexum-fabric-channel-plugin:
- Wires inbound supervisor at Init; runs in a background goroutine
  for the plugin's lifetime
- Replaces F-1's sessions map with tokens.Cache (same warm-sessions
  behavior, now backed by TTL)
- hostLogHandler: bridges slog records from inbound supervisor to
  HostAPI.Log notifications

F-2 deferred to F-3+:
- record-only history injection (Plexum v1 has no equivalent)
- tools.ts port (15 MCP tools — channel/canvas/sub-discussion family)
- presence-sync, command-sync, attachments, coalesce parity

Tests: 22 (5 identity + 6 config + 9 tokens + 2 socketio).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 15:29:01 +01:00

117 lines
3.3 KiB
Go

// Package tokens caches per-agent Fabric sessions with a short TTL so
// guild access tokens stay fresh. openclaw's plugin documented the
// failure mode: long-lived sockets survive auto-reconnect at the TCP
// layer but the JWT captured at original CONNECT goes stale ~15min in,
// silently breaking auth at the application layer (subscribe-to-room
// emits go to /dev/null). TTL refresh by re-login per agent fixes it.
package tokens
import (
"context"
"errors"
"sync"
"time"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
)
// DefaultTTL matches openclaw's TOKEN_TTL_MS (8 min). Guild tokens are
// ~15 min so refreshing every 8 keeps a comfortable margin.
const DefaultTTL = 8 * time.Minute
// LoginFunc is the per-agent re-login callback. Cache calls this when
// the cached session expires.
type LoginFunc func(ctx context.Context, agentID string) (*fabric.Session, error)
// Cache wraps a per-agent session cache. Thread-safe.
type Cache struct {
TTL time.Duration
Login LoginFunc
mu sync.Mutex
entries map[string]*entry
}
type entry struct {
session *fabric.Session
at time.Time
}
// New constructs a Cache. ttl ≤ 0 → DefaultTTL.
func New(ttl time.Duration, login LoginFunc) *Cache {
if ttl <= 0 {
ttl = DefaultTTL
}
return &Cache{TTL: ttl, Login: login, entries: map[string]*entry{}}
}
// Get returns the cached session if fresh, otherwise re-logs in. err
// from Login bubbles up; the stale entry is NOT served on Login failure
// (caller may retry).
func (c *Cache) Get(ctx context.Context, agentID string) (*fabric.Session, error) {
c.mu.Lock()
e, ok := c.entries[agentID]
c.mu.Unlock()
if ok && time.Since(e.at) < c.TTL {
return e.session, nil
}
if c.Login == nil {
return nil, errors.New("tokens: no login func configured")
}
fresh, err := c.Login(ctx, agentID)
if err != nil {
return nil, err
}
c.mu.Lock()
c.entries[agentID] = &entry{session: fresh, at: time.Now()}
c.mu.Unlock()
return fresh, nil
}
// Invalidate drops the cached entry for agentID. Next Get re-logs in.
// Useful when an HTTP call returns 401 (cached token rejected).
func (c *Cache) Invalidate(agentID string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.entries, agentID)
}
// Peek returns the cached session without TTL check or re-login.
// Returns nil if nothing cached.
func (c *Cache) Peek(agentID string) *fabric.Session {
c.mu.Lock()
defer c.mu.Unlock()
if e, ok := c.entries[agentID]; ok {
return e.session
}
return nil
}
// GuildToken is a convenience that returns a fresh guild access token
// for (agentID, guildNodeID). Falls through to Login if cache is stale
// or the guildNodeID isn't in the cached session.
func (c *Cache) GuildToken(ctx context.Context, agentID, guildNodeID string) (string, error) {
sess, err := c.Get(ctx, agentID)
if err != nil {
return "", err
}
for _, t := range sess.GuildAccessTokens {
if t.GuildNodeID == guildNodeID {
return t.Token, nil
}
}
// Cached session doesn't have this guild — could be a stale
// session list. Invalidate + retry once.
c.Invalidate(agentID)
sess, err = c.Get(ctx, agentID)
if err != nil {
return "", err
}
for _, t := range sess.GuildAccessTokens {
if t.GuildNodeID == guildNodeID {
return t.Token, nil
}
}
return "", errors.New("tokens: agent has no access to guild " + guildNodeID)
}