// plexum-fabric-channel-plugin is the Plexum channel plugin that // connects Plexum agents to a Fabric guild as members. // // F-1 (current): identity load, channel config discovery, REST send, // agentLogin handshake. Plugin advertises channels via manifest + // reads channels/.json for the Plexum-channel → Fabric-channel // mapping. The `send` outbound tool posts via Fabric REST. // // F-2+ (deferred): socket.io inbound, wakeup gating, token refresh, // presence sync, sub-discussion, MCP tool surface (~15 tools from // the openclaw plugin's tools.ts), attachments, channel canvas. package main import ( "context" "encoding/json" "errors" "fmt" "log/slog" "os" "path/filepath" "sync" "time" plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/inbound" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/presence" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/subdisc" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tokens" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/tools" ) // HostConfig is the plugin's own config at // /plugins/plexum-fabric-channel/config.json: // // { // "center_api_base": "http://localhost:7001/api", // "commands_sync_key": "...", // F-5: enables /command autocomplete in Fabric // "sync_commands": ["new","stop"] // optional override; defaults to ["new","stop"] // } type HostConfig struct { CenterAPIBase string `json:"center_api_base"` CommandsSyncKey string `json:"commands_sync_key,omitempty"` SyncCommands []string `json:"sync_commands,omitempty"` PresenceIntervalSeconds int `json:"presence_interval_seconds,omitempty"` // F-5b; 0 → default 30s } type fabricPlugin struct { host plugin.HostAPI cfgPath string cfg HostConfig identities *identity.Registry bindings []config.FabricBinding byFabric config.ByFabricChannel client *fabric.Client tokens *tokens.Cache tools *tools.Registry // Goroutine handle for the inbound supervisor. Cancelled on // plugin shutdown (we don't have an explicit shutdown signal in // the SDK today; rely on subprocess kill). inboundCancel context.CancelFunc inboundDone chan struct{} // Legacy field — kept only for back-compat with non-tokens code // paths during the F-1 → F-2 refactor; safe to remove once nothing // else references it. Not used anymore. sessMu sync.Mutex sessions map[string]*fabric.Session } func (p *fabricPlugin) Manifest() plugin.Manifest { channels := p.dynamicChannelContracts() // Tool surface = the agent-facing tool family (port of openclaw // plugin's tools.ts, F-4). The "send" outbound is also there for // the host's channel manager. return plugin.Manifest{ Name: config.PluginName, Version: "0.1.0", Activation: plugin.ActivationLazy, Executable: "plexum-fabric-channel-plugin", Contracts: plugin.Contracts{ Channels: channels, Tools: tools.New(tools.Deps{}).Contracts(), }, } } func (p *fabricPlugin) dynamicChannelContracts() []plugin.ChannelContract { // Read channels/*.json from /channels and surface every // `plugin: plexum-fabric-channel` entry as a ChannelContract. profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") if profileRoot == "" { home, _ := os.UserHomeDir() profileRoot = filepath.Join(home, ".plexum") } bindings, err := config.Load(filepath.Join(profileRoot, "channels")) if err != nil { // Logged later in Init; manifest call can't itself reach a logger. return nil } out := make([]plugin.ChannelContract, 0, len(bindings)) for _, b := range bindings { out = append(out, plugin.ChannelContract{ Name: b.PlexumChannelName, OutboundTool: "send", }) } return out } func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error { p.host = host p.sessions = map[string]*fabric.Session{} profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") if profileRoot == "" { home, _ := os.UserHomeDir() profileRoot = filepath.Join(home, ".plexum") } // Plugin-private config. p.cfgPath = filepath.Join(profileRoot, "plugins", config.PluginName, "config.json") raw, err := os.ReadFile(p.cfgPath) if err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("read %s: %w", p.cfgPath, err) } if len(raw) > 0 { if err := json.Unmarshal(raw, &p.cfg); err != nil { return fmt.Errorf("parse %s: %w", p.cfgPath, err) } } if p.cfg.CenterAPIBase == "" { p.cfg.CenterAPIBase = "http://localhost:7001/api" } p.client = fabric.New(p.cfg.CenterAPIBase) // Identity. idPath := filepath.Join(profileRoot, identity.FileName) p.identities, err = identity.Open(idPath) if err != nil { return fmt.Errorf("identity: %w", err) } // Channel bindings. p.bindings, err = config.Load(filepath.Join(profileRoot, "channels")) if err != nil { return fmt.Errorf("channel bindings: %w", err) } p.byFabric = config.Index(p.bindings) // Token cache: re-login per agent on TTL miss (8min default). p.tokens = tokens.New(0, func(loginCtx context.Context, agentID string) (*fabric.Session, error) { entry := p.identities.Lookup(agentID) if entry == nil || !entry.Enabled { return nil, fmt.Errorf("agent %s: no enabled identity", agentID) } return p.client.AgentLogin(loginCtx, entry.FabricAPIKey) }) // Sub-discussion KV (F-7); persists across restarts. subStore, err := subdisc.Open(subdisc.DefaultPath()) if err != nil { host.Log("warn", "fabric subdisc open", map[string]any{"err": err.Error()}) } // Agent-facing tool surface (port of openclaw's tools.ts). p.tools = tools.New(tools.Deps{ Client: p.client, Tokens: p.tokens, Identities: p.identities, SubDisc: subStore, }) host.Log("info", "fabric channel plugin initialized", map[string]any{ "center": p.cfg.CenterAPIBase, "identity_path": idPath, "channels_loaded": len(p.bindings), "identities_loaded": len(p.identities.AgentIDs()), }) // Warm sessions (early bad-key detection). if err := p.warmSessions(ctx); err != nil { host.Log("warn", "fabric warm-sessions had errors", map[string]any{"err": err.Error()}) } // F-5: push slash command catalog to every guild the bound agents // belong to. Cosmetic — gives Fabric frontend "/command" autocomplete. // Requires commands_sync_key matching the guild's // FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY; silently skip when unset. if p.cfg.CommandsSyncKey != "" { p.syncCommandsToGuilds(ctx) } // Phase F-2: start the inbound supervisor in a goroutine. Lives // until p.inboundCancel fires (currently never — SDK has no // shutdown hook; subprocess kill is the only stop signal). if len(p.bindings) > 0 { ctxBg, cancel := context.WithCancel(context.Background()) p.inboundCancel = cancel p.inboundDone = make(chan struct{}) notifier := func(channelName, message, sessionID string, media []inbound.MediaItem) { payload := map[string]any{ "channel_name": channelName, "message": message, "session_id": sessionID, } if len(media) > 0 { mediaWire := make([]map[string]any, 0, len(media)) for _, m := range media { mediaWire = append(mediaWire, map[string]any{ "path": m.Path, "media_type": m.MediaType, "name": m.Name, }) } payload["media"] = mediaWire } p.host.EmitNotification("notifications/plexum/channel/inbound", payload) } // slog wrapping plugin.HostAPI.Log isn't worth the indirection // here; use a discard-style adapter that pipes WARN/INFO to // the host log. logger := slog.New(&hostLogHandler{host: host, level: slog.LevelInfo}) sup := inbound.New(p.client, p.tokens, p.bindings, notifier, logger) // F-6: enable attachment downloads. Files land under // $TMPDIR/plexum-fabric//; agents access via // the exec tool (cat / file). sup.Attachments = attachments.New("") go func() { defer close(p.inboundDone) if err := sup.Run(ctxBg); err != nil { host.Log("warn", "inbound supervisor exited", map[string]any{"err": err.Error()}) } }() host.Log("info", "fabric inbound supervisor started", map[string]any{"agents": sup.AgentIDs, "bindings": len(p.bindings)}) // F-5b: presence-sync. Shares the inbound supervisor's context // so it dies together. Tick interval honours config override. interval := time.Duration(p.cfg.PresenceIntervalSeconds) * time.Second ps := &presence.Sync{ Host: host, Client: p.client, Tokens: p.tokens, Identities: p.identities, Bindings: p.bindings, Interval: interval, } ps.Start(ctxBg) host.Log("info", "fabric presence-sync started", map[string]any{"interval": ps.Interval.String()}) } return nil } // hostLogHandler is a tiny slog.Handler that forwards records to the // plugin's HostAPI.Log. inbound + supervisor use slog for structured // logging; this bridges to the host's log notification stream. type hostLogHandler struct { host plugin.HostAPI level slog.Level } func (h *hostLogHandler) Enabled(_ context.Context, l slog.Level) bool { return l >= h.level } func (h *hostLogHandler) Handle(_ context.Context, r slog.Record) error { attrs := make(map[string]any, r.NumAttrs()) r.Attrs(func(a slog.Attr) bool { attrs[a.Key] = a.Value.Any() return true }) h.host.Log(levelString(r.Level), r.Message, attrs) return nil } func (h *hostLogHandler) WithAttrs(_ []slog.Attr) slog.Handler { return h } func (h *hostLogHandler) WithGroup(_ string) slog.Handler { return h } func levelString(l slog.Level) string { switch { case l >= slog.LevelError: return "error" case l >= slog.LevelWarn: return "warn" case l >= slog.LevelInfo: return "info" default: return "debug" } } func (p *fabricPlugin) warmSessions(ctx context.Context) error { // Which agents appear as a binding's AgentID? agentsNeeded := map[string]bool{} for _, b := range p.bindings { agentsNeeded[b.AgentID] = true } enabled := p.identities.EnabledEntries() var firstErr error for agentID := range agentsNeeded { if _, ok := enabled[agentID]; !ok { err := fmt.Errorf("agent %s has channels but no identity (run plexum-fabric-register --agent-id %s --api-key ...)", agentID, agentID) p.host.Log("warn", err.Error(), nil) if firstErr == nil { firstErr = err } continue } sess, err := p.tokens.Get(ctx, agentID) if err != nil { p.host.Log("warn", "fabric agent warm failed", map[string]any{"agent": agentID, "err": err.Error()}) if firstErr == nil { firstErr = err } continue } p.host.Log("info", "fabric session warm", map[string]any{ "agent": agentID, "fabric_user": sess.User.Email, "guilds": len(sess.Guilds), }) } return firstErr } // CallTool dispatches by tool name. "send" is the host-side channel // outbound (driven by Plexum's channel manager); everything else // delegates to the tools.Registry built at init time. func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (plugin.ToolResult, error) { if name == "send" { return p.handleSend(ctx, input) } if handler := p.tools.Handler(name); handler != nil { return handler(ctx, input) } return plugin.ToolResult{}, fmt.Errorf("unknown tool: %s", name) } // handleSend implements the "send" outbound tool: posts the assistant // reply via Fabric REST as the agent bound to the Plexum channel name. func (p *fabricPlugin) handleSend(ctx context.Context, input json.RawMessage) (plugin.ToolResult, error) { var args struct { ChannelName string `json:"channel_name"` SessionID string `json:"session_id"` Message string `json:"message"` } if err := json.Unmarshal(input, &args); err != nil { return plugin.ToolResult{}, fmt.Errorf("parse args: %w", err) } if args.ChannelName == "" { return errResult("channel_name required"), nil } p.host.Log("info", "fabric send", map[string]any{ "channel_name": args.ChannelName, "len": len(args.Message), }) var binding *config.FabricBinding for i := range p.bindings { if p.bindings[i].PlexumChannelName == args.ChannelName { binding = &p.bindings[i] break } } if binding == nil { return errResult("unknown plexum channel: " + args.ChannelName), nil } sess, err := p.sessionFor(ctx, binding.AgentID) if err != nil { return errResult("session for agent " + binding.AgentID + ": " + err.Error()), nil } var endpoint, token string for _, g := range sess.Guilds { if g.NodeID == binding.FabricGuildNodeID { endpoint = g.Endpoint break } } for _, t := range sess.GuildAccessTokens { if t.GuildNodeID == binding.FabricGuildNodeID { token = t.Token break } } if endpoint == "" || token == "" { return errResult(fmt.Sprintf("agent %s has no access to guild %s", binding.AgentID, binding.FabricGuildNodeID)), nil } if err := p.client.PostMessage(ctx, endpoint, token, binding.FabricChannelID, args.Message, sess.User.ID); err != nil { return errResult("post: " + err.Error()), nil } return plugin.NewTextResult("sent"), nil } func (p *fabricPlugin) sessionFor(ctx context.Context, agentID string) (*fabric.Session, error) { loginCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() return p.tokens.Get(loginCtx, agentID) } // syncCommandsToGuilds PUTs the slash-command catalog to every unique // (agent, guild) the plugin knows about. Best-effort — failure on any // one guild is logged but doesn't block other guilds. func (p *fabricPlugin) syncCommandsToGuilds(ctx context.Context) { commands := p.cfg.SyncCommands if len(commands) == 0 { commands = []string{"new", "stop"} // host-side intercepted slashes } wire := make([]any, 0, len(commands)) for _, c := range commands { wire = append(wire, map[string]any{ "name": c, "description": "Plexum host-intercepted slash: /" + c, }) } seen := map[string]bool{} for _, b := range p.bindings { sess, err := p.tokens.Get(ctx, b.AgentID) if err != nil { continue } for _, g := range sess.Guilds { key := b.AgentID + "/" + g.NodeID if seen[key] { continue } seen[key] = true tok, err := p.tokens.GuildToken(ctx, b.AgentID, g.NodeID) if err != nil { continue } if err := p.client.SyncCommands(ctx, g.Endpoint, tok, wire, p.cfg.CommandsSyncKey); err != nil { p.host.Log("warn", "fabric commands sync failed", map[string]any{"guild": g.NodeID, "err": err.Error()}) } else { p.host.Log("info", "fabric commands synced", map[string]any{"guild": g.NodeID, "n": len(commands)}) } } } } func errResult(msg string) plugin.ToolResult { return plugin.ToolResult{ Content: []plugin.ContentBlock{{Type: "text", Text: msg}}, IsError: true, } } func main() { if err := plugin.Serve(&fabricPlugin{}); err != nil { fmt.Fprintf(os.Stderr, "plexum-fabric-channel-plugin: %v\n", err) os.Exit(1) } }