End-to-end Fabric inbound→Plexum→Fabric outbound now works against a
live Fabric stack:
alice posts in bt2-clean (Fabric REST)
→ guild emits message.created over socket.io
→ plugin's wakeup gate decides dispatch
→ notifications/plexum/channel/inbound to host
→ Plexum agent runs (echo provider)
→ outbound `send` tool posts via Fabric REST
→ fabrictester reply visible in channel
internal/socketio/ (~280 LOC + 2 tests):
- Minimal Engine.IO v4 + Socket.IO v5 client over websocket
- WebSocket-only transport (skip polling upgrade dance)
- AuthFunc callback re-evaluated on every (re)connect — fixes the
stale-JWT-on-reconnect bug openclaw plugin documented for the JS
client's single-shot auth, which the available Go socket.io
library (zishang520) doesn't address either
- PING/PONG per server-supplied interval
- Caller-driven reconnect: Connect returns on close, supervisor
re-dials with fresh token
internal/tokens/ (~95 LOC + 9 tests):
- Per-agent session cache with 8min TTL (matches openclaw's
TOKEN_TTL_MS); guild tokens are ~15min so 8min keeps a margin
- Invalidate forces re-login (used by inbound when CONNECT auth fires)
- GuildToken helper picks the per-guild JWT from the cached session;
if the guild is missing from the cache, invalidate + retry once
internal/inbound/ (~290 LOC):
- Supervisor: one socket.io conn per (agent, guild); reconnect with
fresh token on drop; ChannelSyncInterval (60s) polling + push
channel.joined/channel.left handlers
- Wakeup gate: dm channels deliver any non-self message; other
x_types require wakeup=true (record-only for non-wake non-dm
deferred — Plexum has no history-injection equivalent in v1)
- Self-author filter on selfUserId from cached session
- Per-(agent,msgId) dedup bounded to 5000 entries
- Per-channel serial queue with 5s idle drain so concurrent inbounds
on the same channel run one-at-a-time (matches openclaw plugin)
- Emits notifications/plexum/channel/inbound with session_id =
"s_fab_<fabric_channel_id>" for stable per-channel session continuity
cmd/plexum-fabric-channel-plugin:
- Wires inbound supervisor at Init; runs in a background goroutine
for the plugin's lifetime
- Replaces F-1's sessions map with tokens.Cache (same warm-sessions
behavior, now backed by TTL)
- hostLogHandler: bridges slog records from inbound supervisor to
HostAPI.Log notifications
F-2 deferred to F-3+:
- record-only history injection (Plexum v1 has no equivalent)
- tools.ts port (15 MCP tools — channel/canvas/sub-discussion family)
- presence-sync, command-sync, attachments, coalesce parity
Tests: 22 (5 identity + 6 config + 9 tokens + 2 socketio).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
117 lines
3.3 KiB
Go
117 lines
3.3 KiB
Go
// Package tokens caches per-agent Fabric sessions with a short TTL so
|
|
// guild access tokens stay fresh. openclaw's plugin documented the
|
|
// failure mode: long-lived sockets survive auto-reconnect at the TCP
|
|
// layer but the JWT captured at original CONNECT goes stale ~15min in,
|
|
// silently breaking auth at the application layer (subscribe-to-room
|
|
// emits go to /dev/null). TTL refresh by re-login per agent fixes it.
|
|
package tokens
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
|
)
|
|
|
|
// DefaultTTL matches openclaw's TOKEN_TTL_MS (8 min). Guild tokens are
|
|
// ~15 min so refreshing every 8 keeps a comfortable margin.
|
|
const DefaultTTL = 8 * time.Minute
|
|
|
|
// LoginFunc is the per-agent re-login callback. Cache calls this when
|
|
// the cached session expires.
|
|
type LoginFunc func(ctx context.Context, agentID string) (*fabric.Session, error)
|
|
|
|
// Cache wraps a per-agent session cache. Thread-safe.
|
|
type Cache struct {
|
|
TTL time.Duration
|
|
Login LoginFunc
|
|
|
|
mu sync.Mutex
|
|
entries map[string]*entry
|
|
}
|
|
|
|
type entry struct {
|
|
session *fabric.Session
|
|
at time.Time
|
|
}
|
|
|
|
// New constructs a Cache. ttl ≤ 0 → DefaultTTL.
|
|
func New(ttl time.Duration, login LoginFunc) *Cache {
|
|
if ttl <= 0 {
|
|
ttl = DefaultTTL
|
|
}
|
|
return &Cache{TTL: ttl, Login: login, entries: map[string]*entry{}}
|
|
}
|
|
|
|
// Get returns the cached session if fresh, otherwise re-logs in. err
|
|
// from Login bubbles up; the stale entry is NOT served on Login failure
|
|
// (caller may retry).
|
|
func (c *Cache) Get(ctx context.Context, agentID string) (*fabric.Session, error) {
|
|
c.mu.Lock()
|
|
e, ok := c.entries[agentID]
|
|
c.mu.Unlock()
|
|
if ok && time.Since(e.at) < c.TTL {
|
|
return e.session, nil
|
|
}
|
|
if c.Login == nil {
|
|
return nil, errors.New("tokens: no login func configured")
|
|
}
|
|
fresh, err := c.Login(ctx, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.mu.Lock()
|
|
c.entries[agentID] = &entry{session: fresh, at: time.Now()}
|
|
c.mu.Unlock()
|
|
return fresh, nil
|
|
}
|
|
|
|
// Invalidate drops the cached entry for agentID. Next Get re-logs in.
|
|
// Useful when an HTTP call returns 401 (cached token rejected).
|
|
func (c *Cache) Invalidate(agentID string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
delete(c.entries, agentID)
|
|
}
|
|
|
|
// Peek returns the cached session without TTL check or re-login.
|
|
// Returns nil if nothing cached.
|
|
func (c *Cache) Peek(agentID string) *fabric.Session {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if e, ok := c.entries[agentID]; ok {
|
|
return e.session
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GuildToken is a convenience that returns a fresh guild access token
|
|
// for (agentID, guildNodeID). Falls through to Login if cache is stale
|
|
// or the guildNodeID isn't in the cached session.
|
|
func (c *Cache) GuildToken(ctx context.Context, agentID, guildNodeID string) (string, error) {
|
|
sess, err := c.Get(ctx, agentID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, t := range sess.GuildAccessTokens {
|
|
if t.GuildNodeID == guildNodeID {
|
|
return t.Token, nil
|
|
}
|
|
}
|
|
// Cached session doesn't have this guild — could be a stale
|
|
// session list. Invalidate + retry once.
|
|
c.Invalidate(agentID)
|
|
sess, err = c.Get(ctx, agentID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, t := range sess.GuildAccessTokens {
|
|
if t.GuildNodeID == guildNodeID {
|
|
return t.Token, nil
|
|
}
|
|
}
|
|
return "", errors.New("tokens: agent has no access to guild " + guildNodeID)
|
|
}
|