From 7911cc63207d7bd84e8dddbc7e6d7ede7e7613fc Mon Sep 17 00:00:00 2001 From: hzhang Date: Mon, 1 Jun 2026 08:40:17 +0100 Subject: [PATCH] =?UTF-8?q?feat(presence):=20F-5b=20presence-sync=20?= =?UTF-8?q?=E2=80=94=20mirror=20sm.Machine=20into=20Fabric?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an internal/presence package that ticks every 30s (configurable via presence_interval_seconds), reads each bound agent's sm.Machine state through host.ReadAgentState, maps to Fabric's 6-status enum, and PUTs diffs to /api/agents/:userId/presence on every guild the agent belongs to. Semantic mapping (the part flagged "needed" in the prior README): idle → idle working → on_call busy → busy offline → offline exhausted/unknown reserved for backend-side fallbacks; we don't push. Tick is mutex-guarded (avoids the upsert race the openclaw incident called out in agent-presence.service.ts) and diff-gated so writes are sparse. Token-cache invalidation on PUT failure handles guild JWT rotation. fabric.Client gains SetAgentPresence helper. README marks F-5b ✅. --- README.md | 15 +- cmd/plexum-fabric-channel-plugin/main.go | 23 ++- internal/fabric/client.go | 17 ++ internal/presence/presence.go | 220 +++++++++++++++++++++++ internal/presence/presence_test.go | 21 +++ 5 files changed, 291 insertions(+), 5 deletions(-) create mode 100644 internal/presence/presence.go create mode 100644 internal/presence/presence_test.go diff --git a/README.md b/README.md index 82bc52d..2657cee 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ real channel members. Port of `Fabric.OpenclawPlugin` — delivered in phases. | **F-6** | attachments → temp-dir + footer | ✅ | | **F-7** | sub-discussion KV + create-*-channel + discussion-complete | ✅ | | **F-8** | coalesce parity | ⏭ no-op — Plexum doesn't segment deliver | -| **F-5b** | presence-sync | ⏭ deferred — Plexum state machine ≠ HF semantics | +| **F-5b** | presence-sync | ✅ | ### Agent-facing tools (16) @@ -53,10 +53,21 @@ Then: `~/.plexum/plugins/plexum-fabric-channel/config.json`: ```json { - "center_api_base": "http://localhost:7001/api" + "center_api_base": "http://localhost:7001/api", + "commands_sync_key": "...", + "presence_interval_seconds": 30 } ``` + `presence_interval_seconds` (F-5b) controls how often the plugin + polls each bound agent's `sm.Machine` state via the host + `plexum/host/agent-state/read` RPC and pushes diffs to + `PUT /api/agents/:userId/presence` on every guild the agent belongs + to. Defaults to 30s; set to 0 to fall back to default. The mapping + is `idle→idle`, `working→on_call`, `busy→busy`, `offline→offline` + (Fabric's `exhausted`/`unknown` are reserved for backend-side + fallbacks; we don't push them). Diff-gated so writes are sparse. + 4. **Bind a Plexum channel name to a Fabric channel** at `~/.plexum/channels/.json`: ```json diff --git a/cmd/plexum-fabric-channel-plugin/main.go b/cmd/plexum-fabric-channel-plugin/main.go index 401a63c..8bbc7ee 100644 --- a/cmd/plexum-fabric-channel-plugin/main.go +++ b/cmd/plexum-fabric-channel-plugin/main.go @@ -29,6 +29,7 @@ import ( "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/inbound" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/presence" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/subdisc" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tools" @@ -43,9 +44,10 @@ import ( // "sync_commands": ["new","stop"] // optional override; defaults to ["new","stop"] // } type HostConfig struct { - CenterAPIBase string `json:"center_api_base"` - CommandsSyncKey string `json:"commands_sync_key,omitempty"` - SyncCommands []string `json:"sync_commands,omitempty"` + CenterAPIBase string `json:"center_api_base"` + CommandsSyncKey string `json:"commands_sync_key,omitempty"` + SyncCommands []string `json:"sync_commands,omitempty"` + PresenceIntervalSeconds int `json:"presence_interval_seconds,omitempty"` // F-5b; 0 → default 30s } type fabricPlugin struct { @@ -237,6 +239,21 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error { }() host.Log("info", "fabric inbound supervisor started", map[string]any{"agents": sup.AgentIDs, "bindings": len(p.bindings)}) + + // F-5b: presence-sync. Shares the inbound supervisor's context + // so it dies together. Tick interval honours config override. + interval := time.Duration(p.cfg.PresenceIntervalSeconds) * time.Second + ps := &presence.Sync{ + Host: host, + Client: p.client, + Tokens: p.tokens, + Identities: p.identities, + Bindings: p.bindings, + Interval: interval, + } + ps.Start(ctxBg) + host.Log("info", "fabric presence-sync started", + map[string]any{"interval": ps.Interval.String()}) } return nil diff --git a/internal/fabric/client.go b/internal/fabric/client.go index c7b2a1b..88d9867 100644 --- a/internal/fabric/client.go +++ b/internal/fabric/client.go @@ -325,6 +325,23 @@ func (c *Client) RemoveCanvas(ctx context.Context, guildEndpoint, guildToken, ch return err } +// SetAgentPresence pushes one agent's presence to the guild. Body +// shape mirrors PUT /api/agents/:userId/presence: +// +// { "status": "idle"|"on_call"|"busy"|"exhausted"|"offline"|"unknown", +// "source": "" } +// +// userID is the agent's Fabric Center user UUID (not the Plexum agent +// id). The guild-side ApiKeyGuard accepts the per-guild access JWT — +// caller must already hold a fresh one. +func (c *Client) SetAgentPresence(ctx context.Context, guildEndpoint, guildToken, userID, status, source string) error { + body := map[string]any{"status": status, "source": source} + _, err := c.do(ctx, http.MethodPut, + guildEndpoint+"/api/agents/"+url.PathEscape(userID)+"/presence", + guildToken, body, nil) + return err +} + // SyncCommands PUTs the agent's slash-command catalog onto the guild // (idempotent full replace). Needs the guild's commands-sync key, which // the operator sources from the guild config. diff --git a/internal/presence/presence.go b/internal/presence/presence.go new file mode 100644 index 0000000..7664537 --- /dev/null +++ b/internal/presence/presence.go @@ -0,0 +1,220 @@ +// Package presence is the F-5b presence-sync loop. Every tick it +// queries the host for each bound agent's sm.Machine state, maps to +// Fabric's 6-status enum, and PUTs diffs to each guild the agent +// belongs to. +// +// Semantic mapping (sm.State → fabric presence) lives in StateToFabric: +// +// idle → "idle" (waiting for work) +// working → "on_call" (running a single turn; available to receive) +// busy → "busy" (workflow stacked; do not deliver announce) +// offline → "offline" (process down / never booted / rate-limited) +// +// "exhausted" + "unknown" are reserved for backend-side fallbacks; we +// never push them — they'd require info Plexum doesn't track. +// +// Tick model mirrors openclaw's Fabric.OpenclawPlugin.presence-sync.ts: +// 30s ticker, inflight mutex (serial ticks may span multiple seconds +// over many agents + guilds; overlapping ticks cause the backend +// upsert race described in agent-presence.service.ts), and diff-gating +// so we only PUT when state actually changed. +package presence + +import ( + "context" + "sync" + "time" + + plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens" +) + +// DefaultInterval matches openclaw's 30s cadence. State transitions are +// rare so any longer interval is fine — Fabric backends treat missing +// rows as 'unknown' (not-busy) so a delayed update can't deliver a +// message wrongly, only fail to suppress one. +const DefaultInterval = 30 * time.Second + +// Sync drives the presence-sync loop. One instance per plugin process; +// owns the lastStatus diff cache + the inflight mutex. +type Sync struct { + Host plugin.HostAPI + Client *fabric.Client + Tokens *tokens.Cache + Identities *identity.Registry + Bindings []config.FabricBinding + Interval time.Duration // ≤0 → DefaultInterval + + mu sync.Mutex + lastStatus map[string]string // by agent_id+guild_node_id → fabric status + inflight bool +} + +// Start launches the ticker; runs until ctx cancels. Fires one tick +// immediately so initial state lands fast. +func (s *Sync) Start(ctx context.Context) { + if s.Interval <= 0 { + s.Interval = DefaultInterval + } + if s.lastStatus == nil { + s.lastStatus = map[string]string{} + } + go func() { + t := time.NewTicker(s.Interval) + defer t.Stop() + s.tick(ctx) + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.tick(ctx) + } + } + }() +} + +// tick runs one presence-sync pass. Skips if a previous tick is still +// running (drop overlapping ticks — see openclaw incident note). +func (s *Sync) tick(ctx context.Context) { + s.mu.Lock() + if s.inflight { + s.mu.Unlock() + return + } + s.inflight = true + s.mu.Unlock() + defer func() { + s.mu.Lock() + s.inflight = false + s.mu.Unlock() + }() + + // Dedupe (agent_id, guild_node_id) pairs across bindings — + // multiple channels in the same guild for the same agent → one PUT. + type target struct { + agentID string + guildNodeID string + } + seen := map[target]struct{}{} + for _, b := range s.Bindings { + key := target{b.AgentID, b.FabricGuildNodeID} + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + s.syncOne(ctx, b) + } +} + +func (s *Sync) syncOne(ctx context.Context, b config.FabricBinding) { + entry := s.Identities.Lookup(b.AgentID) + if entry == nil || !entry.Enabled { + return + } + if entry.FabricUserID == "" { + // Without the userId we can't PUT. Operator should re-register + // to populate it; log once and move on. + s.Host.Log("debug", "fabric presence-sync: no fabric_user_id", + map[string]any{"agent": b.AgentID}) + return + } + + snap, err := s.Host.ReadAgentState(ctx, b.AgentID) + if err != nil { + s.Host.Log("warn", "fabric presence-sync: ReadAgentState failed", + map[string]any{"agent": b.AgentID, "err": err.Error()}) + return + } + status := StateToFabric(snap.State) + if status == "" { + return // unmappable state → skip + } + + cacheKey := b.AgentID + "|" + b.FabricGuildNodeID + s.mu.Lock() + prev, ok := s.lastStatus[cacheKey] + s.mu.Unlock() + if ok && prev == status { + return // no change + } + + guildToken, err := s.Tokens.GuildToken(ctx, b.AgentID, b.FabricGuildNodeID) + if err != nil { + s.Host.Log("warn", "fabric presence-sync: guild token", + map[string]any{"agent": b.AgentID, "guild": b.FabricGuildNodeID, "err": err.Error()}) + return + } + guildEndpoint, err := s.resolveGuildEndpoint(ctx, b.AgentID, b.FabricGuildNodeID) + if err != nil { + s.Host.Log("warn", "fabric presence-sync: guild endpoint", + map[string]any{"agent": b.AgentID, "guild": b.FabricGuildNodeID, "err": err.Error()}) + return + } + + if err := s.Client.SetAgentPresence(ctx, guildEndpoint, guildToken, + entry.FabricUserID, status, "plexum-host"); err != nil { + s.Host.Log("warn", "fabric presence-sync: PUT failed", + map[string]any{"agent": b.AgentID, "status": status, "err": err.Error()}) + // 401 → token rotated; drop cache so the next tick re-logs in. + s.Tokens.Invalidate(b.AgentID) + return + } + s.mu.Lock() + s.lastStatus[cacheKey] = status + s.mu.Unlock() + s.Host.Log("info", "fabric presence-sync", + map[string]any{"agent": b.AgentID, "guild": b.FabricGuildNodeID, "status": status}) +} + +// resolveGuildEndpoint walks the cached session's guilds list to find +// the endpoint URL matching guildNodeID. +func (s *Sync) resolveGuildEndpoint(ctx context.Context, agentID, guildNodeID string) (string, error) { + sess, err := s.Tokens.Get(ctx, agentID) + if err != nil { + return "", err + } + for _, g := range sess.Guilds { + if g.NodeID == guildNodeID { + return g.Endpoint, nil + } + } + return "", errGuildNotInSession +} + +// errGuildNotInSession is sentinel for resolveGuildEndpoint misses; +// callers log + skip, no retry needed (operator config mismatch). +var errGuildNotInSession = sentinel("agent session has no matching guild") + +type sentinel string + +func (s sentinel) Error() string { return string(s) } + +// StateToFabric maps a Plexum sm.State string to the Fabric agent +// presence enum. Empty return = unmappable (caller skips). +// +// idle → "idle" +// working → "on_call" +// busy → "busy" +// offline → "offline" +// +// Plexum's working == "in a turn, still able to receive" (notifications +// merge with the in-flight turn). HF's "on_call" carried the same +// "active but receptive" meaning, so the mapping is direct. +func StateToFabric(plexumState string) string { + switch plexumState { + case "idle": + return "idle" + case "working": + return "on_call" + case "busy": + return "busy" + case "offline": + return "offline" + } + return "" +} diff --git a/internal/presence/presence_test.go b/internal/presence/presence_test.go new file mode 100644 index 0000000..0bb9b40 --- /dev/null +++ b/internal/presence/presence_test.go @@ -0,0 +1,21 @@ +package presence + +import "testing" + +func TestStateToFabricMapping(t *testing.T) { + cases := []struct { + in, want string + }{ + {"idle", "idle"}, + {"working", "on_call"}, + {"busy", "busy"}, + {"offline", "offline"}, + {"", ""}, + {"weird", ""}, + } + for _, c := range cases { + if got := StateToFabric(c.in); got != c.want { + t.Errorf("StateToFabric(%q) = %q, want %q", c.in, got, c.want) + } + } +}