Files
Plexum-fabric-channel-plugin/cmd/plexum-fabric-channel-plugin/main.go
hzhang d6bea46d00 feat: Phase F-3 + F-4 — exp backoff + agent tool surface (batch 1)
F-3 refinements:
- internal/inbound: replace fixed 3s reconnect wait with exponential
  backoff (1s → 60s, ×2, reset when prior session lasted >30s); proxy
  for "healthy" vs "flapping" and avoids hot reconnect loops when the
  server is sick

F-4 agent tool surface (port of openclaw plugin's tools.ts):
- internal/tools/tools.go (~370 LOC): Registry binds Deps {Client,
  Tokens, Identities} and exposes 8 agent-facing tools:
    fabric-send-message     post a normal message to any channel
    fabric-send-sys-msg     post a kind=sys message (bypasses turn engine)
    fabric-channel-list     list channels visible in a guild
    fabric-guild-list       list guilds the agent is in
    fabric-message-history  paginate channel messages by seq
    fabric-channel-set-purpose  PATCH the channel's purpose
    fabric-channel          fetch metadata + members for one channel
    fabric-canvas           get/share/update/remove channel canvas
- internal/tools/contracts.go: static ToolContract list — kept in sync
  with install.sh's manifest emitter
- Every agent-scoped tool requires agent_id in input args (Plexum SDK
  doesn't propagate calling agent id through CallTool today)
- guild_node_id defaults to agent's first guild for fabric-send-message

internal/fabric/client.go: new REST methods needed by tools —
PostSystemMessage, CreateChannel, CloseChannel, JoinChannel,
LeaveChannel, SetChannelPurpose, GetCanvas, ShareCanvas, UpdateCanvas,
RemoveCanvas, SyncCommands.

cmd/plexum-fabric-channel-plugin/main.go:
- Manifest declares the tool surface via tools.New(...).Contracts()
- CallTool dispatches "send" to handleSend (outbound for channel
  manager), everything else to tools.Registry.Handler(name)

scripts/install.sh:
- Manifest tools[] now lists all 9 tools with schemas — matches what
  internal/tools/contracts.go advertises

Live verified against running Fabric stack:
  $ plexum plugin-call fabric-guild-list '{"agent_id":"fabrictester"}'
    → "guilds for agent fabrictester (1): test-guild2 @ http://localhost:7003"
  $ plexum plugin-call fabric-channel-list '{...,"guild_node_id":"test-guild2"}'
    → 2 channels listed
  $ plexum plugin-call fabric-message-history '{...,"limit":5}'
    → 5 messages with timestamps + authors

F-5+ deferred:
- create-{chat,work,report,discussion}-channel (batch 2)
- sub-discussion family (state store + 3 tools)
- presence-sync + command-sync
- attachments

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 15:35:39 +01:00

369 lines
11 KiB
Go

// plexum-fabric-channel-plugin is the Plexum channel plugin that
// connects Plexum agents to a Fabric guild as members.
//
// F-1 (current): identity load, channel config discovery, REST send,
// agentLogin handshake. Plugin advertises channels via manifest +
// reads channels/<name>.json for the Plexum-channel → Fabric-channel
// mapping. The `send` outbound tool posts via Fabric REST.
//
// F-2+ (deferred): socket.io inbound, wakeup gating, token refresh,
// presence sync, sub-discussion, MCP tool surface (~15 tools from
// the openclaw plugin's tools.ts), attachments, channel canvas.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"sync"
"time"
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/inbound"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens"
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tools"
)
// HostConfig is the plugin's own config at
// <profile>/plugins/plexum-fabric-channel/config.json:
//
// {
// "center_api_base": "http://localhost:7001/api"
// }
type HostConfig struct {
CenterAPIBase string `json:"center_api_base"`
}
type fabricPlugin struct {
host plugin.HostAPI
cfgPath string
cfg HostConfig
identities *identity.Registry
bindings []config.FabricBinding
byFabric config.ByFabricChannel
client *fabric.Client
tokens *tokens.Cache
tools *tools.Registry
// Goroutine handle for the inbound supervisor. Cancelled on
// plugin shutdown (we don't have an explicit shutdown signal in
// the SDK today; rely on subprocess kill).
inboundCancel context.CancelFunc
inboundDone chan struct{}
// Legacy field — kept only for back-compat with non-tokens code
// paths during the F-1 → F-2 refactor; safe to remove once nothing
// else references it. Not used anymore.
sessMu sync.Mutex
sessions map[string]*fabric.Session
}
func (p *fabricPlugin) Manifest() plugin.Manifest {
channels := p.dynamicChannelContracts()
// Tool surface = the agent-facing tool family (port of openclaw
// plugin's tools.ts, F-4). The "send" outbound is also there for
// the host's channel manager.
return plugin.Manifest{
Name: config.PluginName,
Version: "0.1.0",
Activation: plugin.ActivationLazy,
Executable: "plexum-fabric-channel-plugin",
Contracts: plugin.Contracts{
Channels: channels,
Tools: tools.New(tools.Deps{}).Contracts(),
},
}
}
func (p *fabricPlugin) dynamicChannelContracts() []plugin.ChannelContract {
// Read channels/*.json from <profile>/channels and surface every
// `plugin: plexum-fabric-channel` entry as a ChannelContract.
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
if profileRoot == "" {
home, _ := os.UserHomeDir()
profileRoot = filepath.Join(home, ".plexum")
}
bindings, err := config.Load(filepath.Join(profileRoot, "channels"))
if err != nil {
// Logged later in Init; manifest call can't itself reach a logger.
return nil
}
out := make([]plugin.ChannelContract, 0, len(bindings))
for _, b := range bindings {
out = append(out, plugin.ChannelContract{
Name: b.PlexumChannelName, OutboundTool: "send",
})
}
return out
}
func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error {
p.host = host
p.sessions = map[string]*fabric.Session{}
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
if profileRoot == "" {
home, _ := os.UserHomeDir()
profileRoot = filepath.Join(home, ".plexum")
}
// Plugin-private config.
p.cfgPath = filepath.Join(profileRoot, "plugins", config.PluginName, "config.json")
raw, err := os.ReadFile(p.cfgPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("read %s: %w", p.cfgPath, err)
}
if len(raw) > 0 {
if err := json.Unmarshal(raw, &p.cfg); err != nil {
return fmt.Errorf("parse %s: %w", p.cfgPath, err)
}
}
if p.cfg.CenterAPIBase == "" {
p.cfg.CenterAPIBase = "http://localhost:7001/api"
}
p.client = fabric.New(p.cfg.CenterAPIBase)
// Identity.
idPath := filepath.Join(profileRoot, identity.FileName)
p.identities, err = identity.Open(idPath)
if err != nil {
return fmt.Errorf("identity: %w", err)
}
// Channel bindings.
p.bindings, err = config.Load(filepath.Join(profileRoot, "channels"))
if err != nil {
return fmt.Errorf("channel bindings: %w", err)
}
p.byFabric = config.Index(p.bindings)
// Token cache: re-login per agent on TTL miss (8min default).
p.tokens = tokens.New(0, func(loginCtx context.Context, agentID string) (*fabric.Session, error) {
entry := p.identities.Lookup(agentID)
if entry == nil || !entry.Enabled {
return nil, fmt.Errorf("agent %s: no enabled identity", agentID)
}
return p.client.AgentLogin(loginCtx, entry.FabricAPIKey)
})
// Agent-facing tool surface (port of openclaw's tools.ts).
p.tools = tools.New(tools.Deps{
Client: p.client, Tokens: p.tokens, Identities: p.identities,
})
host.Log("info", "fabric channel plugin initialized", map[string]any{
"center": p.cfg.CenterAPIBase,
"identity_path": idPath,
"channels_loaded": len(p.bindings),
"identities_loaded": len(p.identities.AgentIDs()),
})
// Warm sessions (early bad-key detection).
if err := p.warmSessions(ctx); err != nil {
host.Log("warn", "fabric warm-sessions had errors",
map[string]any{"err": err.Error()})
}
// Phase F-2: start the inbound supervisor in a goroutine. Lives
// until p.inboundCancel fires (currently never — SDK has no
// shutdown hook; subprocess kill is the only stop signal).
if len(p.bindings) > 0 {
ctxBg, cancel := context.WithCancel(context.Background())
p.inboundCancel = cancel
p.inboundDone = make(chan struct{})
notifier := func(channelName, message, sessionID string) {
p.host.EmitNotification("notifications/plexum/channel/inbound", map[string]any{
"channel_name": channelName,
"message": message,
"session_id": sessionID,
})
}
// slog wrapping plugin.HostAPI.Log isn't worth the indirection
// here; use a discard-style adapter that pipes WARN/INFO to
// the host log.
logger := slog.New(&hostLogHandler{host: host, level: slog.LevelInfo})
sup := inbound.New(p.client, p.tokens, p.bindings, notifier, logger)
go func() {
defer close(p.inboundDone)
if err := sup.Run(ctxBg); err != nil {
host.Log("warn", "inbound supervisor exited", map[string]any{"err": err.Error()})
}
}()
host.Log("info", "fabric inbound supervisor started",
map[string]any{"agents": sup.AgentIDs, "bindings": len(p.bindings)})
}
return nil
}
// hostLogHandler is a tiny slog.Handler that forwards records to the
// plugin's HostAPI.Log. inbound + supervisor use slog for structured
// logging; this bridges to the host's log notification stream.
type hostLogHandler struct {
host plugin.HostAPI
level slog.Level
}
func (h *hostLogHandler) Enabled(_ context.Context, l slog.Level) bool { return l >= h.level }
func (h *hostLogHandler) Handle(_ context.Context, r slog.Record) error {
attrs := make(map[string]any, r.NumAttrs())
r.Attrs(func(a slog.Attr) bool {
attrs[a.Key] = a.Value.Any()
return true
})
h.host.Log(levelString(r.Level), r.Message, attrs)
return nil
}
func (h *hostLogHandler) WithAttrs(_ []slog.Attr) slog.Handler { return h }
func (h *hostLogHandler) WithGroup(_ string) slog.Handler { return h }
func levelString(l slog.Level) string {
switch {
case l >= slog.LevelError:
return "error"
case l >= slog.LevelWarn:
return "warn"
case l >= slog.LevelInfo:
return "info"
default:
return "debug"
}
}
func (p *fabricPlugin) warmSessions(ctx context.Context) error {
// Which agents appear as a binding's AgentID?
agentsNeeded := map[string]bool{}
for _, b := range p.bindings {
agentsNeeded[b.AgentID] = true
}
enabled := p.identities.EnabledEntries()
var firstErr error
for agentID := range agentsNeeded {
if _, ok := enabled[agentID]; !ok {
err := fmt.Errorf("agent %s has channels but no identity (run plexum-fabric-register --agent-id %s --api-key ...)",
agentID, agentID)
p.host.Log("warn", err.Error(), nil)
if firstErr == nil {
firstErr = err
}
continue
}
sess, err := p.tokens.Get(ctx, agentID)
if err != nil {
p.host.Log("warn", "fabric agent warm failed",
map[string]any{"agent": agentID, "err": err.Error()})
if firstErr == nil {
firstErr = err
}
continue
}
p.host.Log("info", "fabric session warm", map[string]any{
"agent": agentID, "fabric_user": sess.User.Email,
"guilds": len(sess.Guilds),
})
}
return firstErr
}
// CallTool dispatches by tool name. "send" is the host-side channel
// outbound (driven by Plexum's channel manager); everything else
// delegates to the tools.Registry built at init time.
func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (plugin.ToolResult, error) {
if name == "send" {
return p.handleSend(ctx, input)
}
if handler := p.tools.Handler(name); handler != nil {
return handler(ctx, input)
}
return plugin.ToolResult{}, fmt.Errorf("unknown tool: %s", name)
}
// handleSend implements the "send" outbound tool: posts the assistant
// reply via Fabric REST as the agent bound to the Plexum channel name.
func (p *fabricPlugin) handleSend(ctx context.Context, input json.RawMessage) (plugin.ToolResult, error) {
var args struct {
ChannelName string `json:"channel_name"`
SessionID string `json:"session_id"`
Message string `json:"message"`
}
if err := json.Unmarshal(input, &args); err != nil {
return plugin.ToolResult{}, fmt.Errorf("parse args: %w", err)
}
if args.ChannelName == "" {
return errResult("channel_name required"), nil
}
p.host.Log("info", "fabric send", map[string]any{
"channel_name": args.ChannelName, "len": len(args.Message),
})
var binding *config.FabricBinding
for i := range p.bindings {
if p.bindings[i].PlexumChannelName == args.ChannelName {
binding = &p.bindings[i]
break
}
}
if binding == nil {
return errResult("unknown plexum channel: " + args.ChannelName), nil
}
sess, err := p.sessionFor(ctx, binding.AgentID)
if err != nil {
return errResult("session for agent " + binding.AgentID + ": " + err.Error()), nil
}
var endpoint, token string
for _, g := range sess.Guilds {
if g.NodeID == binding.FabricGuildNodeID {
endpoint = g.Endpoint
break
}
}
for _, t := range sess.GuildAccessTokens {
if t.GuildNodeID == binding.FabricGuildNodeID {
token = t.Token
break
}
}
if endpoint == "" || token == "" {
return errResult(fmt.Sprintf("agent %s has no access to guild %s",
binding.AgentID, binding.FabricGuildNodeID)), nil
}
if err := p.client.PostMessage(ctx, endpoint, token,
binding.FabricChannelID, args.Message, sess.User.ID); err != nil {
return errResult("post: " + err.Error()), nil
}
return plugin.NewTextResult("sent"), nil
}
func (p *fabricPlugin) sessionFor(ctx context.Context, agentID string) (*fabric.Session, error) {
loginCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
return p.tokens.Get(loginCtx, agentID)
}
func errResult(msg string) plugin.ToolResult {
return plugin.ToolResult{
Content: []plugin.ContentBlock{{Type: "text", Text: msg}},
IsError: true,
}
}
func main() {
if err := plugin.Serve(&fabricPlugin{}); err != nil {
fmt.Fprintf(os.Stderr, "plexum-fabric-channel-plugin: %v\n", err)
os.Exit(1)
}
}