Files
hzhang 7911cc6320 feat(presence): F-5b presence-sync — mirror sm.Machine into Fabric
Adds an internal/presence package that ticks every 30s (configurable
via presence_interval_seconds), reads each bound agent's sm.Machine
state through host.ReadAgentState, maps to Fabric's 6-status enum,
and PUTs diffs to /api/agents/:userId/presence on every guild the
agent belongs to.

Semantic mapping (the part flagged "needed" in the prior README):
  idle    → idle
  working → on_call
  busy    → busy
  offline → offline
exhausted/unknown reserved for backend-side fallbacks; we don't push.

Tick is mutex-guarded (avoids the upsert race the openclaw incident
called out in agent-presence.service.ts) and diff-gated so writes are
sparse. Token-cache invalidation on PUT failure handles guild JWT
rotation.

fabric.Client gains SetAgentPresence helper. README marks F-5b .
2026-06-01 08:40:17 +01:00

221 lines
6.6 KiB
Go

// Package presence is the F-5b presence-sync loop. Every tick it
// queries the host for each bound agent's sm.Machine state, maps to
// Fabric's 6-status enum, and PUTs diffs to each guild the agent
// belongs to.
//
// Semantic mapping (sm.State → fabric presence) lives in StateToFabric:
//
// idle → "idle" (waiting for work)
// working → "on_call" (running a single turn; available to receive)
// busy → "busy" (workflow stacked; do not deliver announce)
// offline → "offline" (process down / never booted / rate-limited)
//
// "exhausted" + "unknown" are reserved for backend-side fallbacks; we
// never push them — they'd require info Plexum doesn't track.
//
// Tick model mirrors openclaw's Fabric.OpenclawPlugin.presence-sync.ts:
// 30s ticker, inflight mutex (serial ticks may span multiple seconds
// over many agents + guilds; overlapping ticks cause the backend
// upsert race described in agent-presence.service.ts), and diff-gating
// so we only PUT when state actually changed.
package presence
import (
"context"
"sync"
"time"
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens"
)
// DefaultInterval matches openclaw's 30s cadence. State transitions are
// rare so any longer interval is fine — Fabric backends treat missing
// rows as 'unknown' (not-busy) so a delayed update can't deliver a
// message wrongly, only fail to suppress one.
const DefaultInterval = 30 * time.Second
// Sync drives the presence-sync loop. One instance per plugin process;
// owns the lastStatus diff cache + the inflight mutex.
type Sync struct {
Host plugin.HostAPI
Client *fabric.Client
Tokens *tokens.Cache
Identities *identity.Registry
Bindings []config.FabricBinding
Interval time.Duration // ≤0 → DefaultInterval
mu sync.Mutex
lastStatus map[string]string // by agent_id+guild_node_id → fabric status
inflight bool
}
// Start launches the ticker; runs until ctx cancels. Fires one tick
// immediately so initial state lands fast.
func (s *Sync) Start(ctx context.Context) {
if s.Interval <= 0 {
s.Interval = DefaultInterval
}
if s.lastStatus == nil {
s.lastStatus = map[string]string{}
}
go func() {
t := time.NewTicker(s.Interval)
defer t.Stop()
s.tick(ctx)
for {
select {
case <-ctx.Done():
return
case <-t.C:
s.tick(ctx)
}
}
}()
}
// tick runs one presence-sync pass. Skips if a previous tick is still
// running (drop overlapping ticks — see openclaw incident note).
func (s *Sync) tick(ctx context.Context) {
s.mu.Lock()
if s.inflight {
s.mu.Unlock()
return
}
s.inflight = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.inflight = false
s.mu.Unlock()
}()
// Dedupe (agent_id, guild_node_id) pairs across bindings —
// multiple channels in the same guild for the same agent → one PUT.
type target struct {
agentID string
guildNodeID string
}
seen := map[target]struct{}{}
for _, b := range s.Bindings {
key := target{b.AgentID, b.FabricGuildNodeID}
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
s.syncOne(ctx, b)
}
}
func (s *Sync) syncOne(ctx context.Context, b config.FabricBinding) {
entry := s.Identities.Lookup(b.AgentID)
if entry == nil || !entry.Enabled {
return
}
if entry.FabricUserID == "" {
// Without the userId we can't PUT. Operator should re-register
// to populate it; log once and move on.
s.Host.Log("debug", "fabric presence-sync: no fabric_user_id",
map[string]any{"agent": b.AgentID})
return
}
snap, err := s.Host.ReadAgentState(ctx, b.AgentID)
if err != nil {
s.Host.Log("warn", "fabric presence-sync: ReadAgentState failed",
map[string]any{"agent": b.AgentID, "err": err.Error()})
return
}
status := StateToFabric(snap.State)
if status == "" {
return // unmappable state → skip
}
cacheKey := b.AgentID + "|" + b.FabricGuildNodeID
s.mu.Lock()
prev, ok := s.lastStatus[cacheKey]
s.mu.Unlock()
if ok && prev == status {
return // no change
}
guildToken, err := s.Tokens.GuildToken(ctx, b.AgentID, b.FabricGuildNodeID)
if err != nil {
s.Host.Log("warn", "fabric presence-sync: guild token",
map[string]any{"agent": b.AgentID, "guild": b.FabricGuildNodeID, "err": err.Error()})
return
}
guildEndpoint, err := s.resolveGuildEndpoint(ctx, b.AgentID, b.FabricGuildNodeID)
if err != nil {
s.Host.Log("warn", "fabric presence-sync: guild endpoint",
map[string]any{"agent": b.AgentID, "guild": b.FabricGuildNodeID, "err": err.Error()})
return
}
if err := s.Client.SetAgentPresence(ctx, guildEndpoint, guildToken,
entry.FabricUserID, status, "plexum-host"); err != nil {
s.Host.Log("warn", "fabric presence-sync: PUT failed",
map[string]any{"agent": b.AgentID, "status": status, "err": err.Error()})
// 401 → token rotated; drop cache so the next tick re-logs in.
s.Tokens.Invalidate(b.AgentID)
return
}
s.mu.Lock()
s.lastStatus[cacheKey] = status
s.mu.Unlock()
s.Host.Log("info", "fabric presence-sync",
map[string]any{"agent": b.AgentID, "guild": b.FabricGuildNodeID, "status": status})
}
// resolveGuildEndpoint walks the cached session's guilds list to find
// the endpoint URL matching guildNodeID.
func (s *Sync) resolveGuildEndpoint(ctx context.Context, agentID, guildNodeID string) (string, error) {
sess, err := s.Tokens.Get(ctx, agentID)
if err != nil {
return "", err
}
for _, g := range sess.Guilds {
if g.NodeID == guildNodeID {
return g.Endpoint, nil
}
}
return "", errGuildNotInSession
}
// errGuildNotInSession is sentinel for resolveGuildEndpoint misses;
// callers log + skip, no retry needed (operator config mismatch).
var errGuildNotInSession = sentinel("agent session has no matching guild")
type sentinel string
func (s sentinel) Error() string { return string(s) }
// StateToFabric maps a Plexum sm.State string to the Fabric agent
// presence enum. Empty return = unmappable (caller skips).
//
// idle → "idle"
// working → "on_call"
// busy → "busy"
// offline → "offline"
//
// Plexum's working == "in a turn, still able to receive" (notifications
// merge with the in-flight turn). HF's "on_call" carried the same
// "active but receptive" meaning, so the mapping is direct.
func StateToFabric(plexumState string) string {
switch plexumState {
case "idle":
return "idle"
case "working":
return "on_call"
case "busy":
return "busy"
case "offline":
return "offline"
}
return ""
}