From f8d43ae70eff8f1d98b0ada5cb2ac763e89060c6 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 31 May 2026 15:13:34 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=20F-1=20=E2=80=94=20Plexum-fabric?= =?UTF-8?q?-channel-plugin=20foundation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports the foundation of Fabric.OpenclawPlugin to a native Plexum channel plugin (Go). F-2+ phases (socket.io inbound, wakeup gate, tools, presence, etc.) follow. Layout: internal/identity/ — fabric-identity.json registry (agent → API key) internal/fabric/ — REST client (Center auth + Guild messaging) internal/config/ — channels/.json fabric extension parser cmd/plexum-fabric-register/ — agent registration CLI cmd/plexum-fabric-channel-plugin/— Plexum SDK plugin entry scripts/install.sh — build + install + manifest generator Plugin behavior (F-1): - Reads /channels/*.json, filters plugin=plexum-fabric-channel, builds (plexum-channel-name → fabric channel-id) index - Validates each bound agent's API key against Center at init (warmSessions); logs warning but doesn't refuse init on bad keys - `send` MCP tool: POST plain text to the bound Fabric channel as the agent user; selects guild endpoint+token from cached session - Manifest channels[] is generated by install.sh from current channels/*.json — re-run with --reset-manifest after adding bindings - Plugin-private config at /plugins/plexum-fabric-channel/config.json (center_api_base, default http://localhost:7001/api) Live smoke verified: - plexum-fabric-register against running Fabric Center (port 7001): validated fak_..., wrote identity file with user_id + email captured Tests: identity (5) + config (6) = 11 unit tests. F-2 will hook socket.io for inbound + wakeup gating + token refresh. Co-Authored-By: Claude Opus 4.7 --- README.md | 85 +++++++ cmd/plexum-fabric-channel-plugin/main.go | 311 +++++++++++++++++++++++ cmd/plexum-fabric-register/main.go | 91 +++++++ go.mod | 5 + go.sum | 2 + internal/config/config.go | 111 ++++++++ internal/config/config_test.go | 92 +++++++ internal/fabric/client.go | 274 ++++++++++++++++++++ internal/identity/identity.go | 157 ++++++++++++ internal/identity/identity_test.go | 88 +++++++ scripts/install.sh | 115 +++++++++ 11 files changed, 1331 insertions(+) create mode 100644 README.md create mode 100644 cmd/plexum-fabric-channel-plugin/main.go create mode 100644 cmd/plexum-fabric-register/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/config/config.go create mode 100644 internal/config/config_test.go create mode 100644 internal/fabric/client.go create mode 100644 internal/identity/identity.go create mode 100644 internal/identity/identity_test.go create mode 100755 scripts/install.sh diff --git a/README.md b/README.md new file mode 100644 index 0000000..358539c --- /dev/null +++ b/README.md @@ -0,0 +1,85 @@ +# Plexum-fabric-channel-plugin + +Native Plexum channel plugin connecting Plexum agents to **Fabric** guilds as +real channel members. Inspired by `Fabric.OpenclawPlugin` — Plexum port, +delivered in phases. + +## Status + +**Phase F-1 (foundation) — current**: identity registry, Center auth, REST +client, plugin scaffold, channel-binding discovery. The `send` outbound tool +posts plain text to a Fabric channel as the bound agent. + +**Phase F-2 (deferred)**: socket.io inbound, wakeup-gated dispatch, token +refresh, per-channel serial queue. + +**Phase F-3+ (deferred)**: tools.ts port (~15 MCP tools — channel/canvas/ +sub-discussion/etc.), presence sync, command sync, attachments, coalesce +parity. + +## Install + +```bash +cd ~/Plexum-fabric-channel-plugin +./scripts/install.sh # build + install plugin + register CLI +``` + +Then: + +1. **Mint a Center API key** for each Plexum agent that should speak in Fabric: + ```bash + docker exec fabric-backend-center node dist/cli.js user apikey --email + ``` + +2. **Register the key** with Plexum: + ```bash + plexum-fabric-register --agent-id alice --api-key fak_xxxx + # writes ~/.plexum/fabric-identity.json + ``` + +3. **Configure plugin-level settings** at + `~/.plexum/plugins/plexum-fabric-channel/config.json`: + ```json + { + "center_api_base": "http://localhost:7001/api" + } + ``` + +4. **Bind a Plexum channel name to a Fabric channel** at + `~/.plexum/channels/.json`: + ```json + { + "agent_id": "alice", + "plugin": "plexum-fabric-channel", + "fabric": { + "guild_node_id": "test-guild1", + "channel_id": "ch_xxxxxxxxx" + } + } + ``` + +5. **Allow the plugin** in `~/.plexum/plexum.json`: + ```json + {"plugins": {"allow": ["plexum-fabric-channel"]}} + ``` + +6. **Restart the gateway**: `systemctl --user restart plexum` + +## What you get in F-1 + +- Plugin loads at gateway start, validates every bound agent's API key + against Center via `agentLogin`, and warms a session per agent. +- `send` MCP tool posts plain text to the bound Fabric channel as the + agent user. +- No inbound yet — Fabric → Plexum routing arrives in F-2. + +## Build manually + +```bash +go build -o bin/plexum-fabric-channel-plugin ./cmd/plexum-fabric-channel-plugin +go build -o bin/plexum-fabric-register ./cmd/plexum-fabric-register +``` + +## License + +Same as Plexum. diff --git a/cmd/plexum-fabric-channel-plugin/main.go b/cmd/plexum-fabric-channel-plugin/main.go new file mode 100644 index 0000000..b62e725 --- /dev/null +++ b/cmd/plexum-fabric-channel-plugin/main.go @@ -0,0 +1,311 @@ +// plexum-fabric-channel-plugin is the Plexum channel plugin that +// connects Plexum agents to a Fabric guild as members. +// +// F-1 (current): identity load, channel config discovery, REST send, +// agentLogin handshake. Plugin advertises channels via manifest + +// reads channels/.json for the Plexum-channel → Fabric-channel +// mapping. The `send` outbound tool posts via Fabric REST. +// +// F-2+ (deferred): socket.io inbound, wakeup gating, token refresh, +// presence sync, sub-discussion, MCP tool surface (~15 tools from +// the openclaw plugin's tools.ts), attachments, channel canvas. +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" +) + +// HostConfig is the plugin's own config at +// /plugins/plexum-fabric-channel/config.json: +// +// { +// "center_api_base": "http://localhost:7001/api" +// } +type HostConfig struct { + CenterAPIBase string `json:"center_api_base"` +} + +type fabricPlugin struct { + host plugin.HostAPI + + cfgPath string + cfg HostConfig + identities *identity.Registry + bindings []config.FabricBinding + byFabric config.ByFabricChannel + client *fabric.Client + + // Per-agent Session cache (refreshed lazily; full refresh in F-2). + sessMu sync.Mutex + sessions map[string]*fabric.Session // agentID → session +} + +func (p *fabricPlugin) Manifest() plugin.Manifest { + // Manifest channels are populated dynamically from channels/*.json + // at startup: the operator adds a channels/.json + restarts + // the gateway, and the matching ChannelContract entry surfaces here. + // Both halves needed because Plexum's host registry reads the + // manifest's channel names too. + channels := p.dynamicChannelContracts() + return plugin.Manifest{ + Name: config.PluginName, + Version: "0.1.0", + Activation: plugin.ActivationLazy, + Executable: "plexum-fabric-channel-plugin", + Contracts: plugin.Contracts{ + Channels: channels, + Tools: []plugin.ToolContract{ + { + Name: "send", + Description: "Post a plain-text message to the bound Fabric channel as the agent user.", + InputSchema: json.RawMessage(`{ + "type": "object", + "properties": { + "channel_name": {"type": "string"}, + "session_id": {"type": "string"}, + "message": {"type": "string"} + }, + "required": ["channel_name", "message"] + }`), + }, + }, + }, + } +} + +func (p *fabricPlugin) dynamicChannelContracts() []plugin.ChannelContract { + // Read channels/*.json from /channels and surface every + // `plugin: plexum-fabric-channel` entry as a ChannelContract. + profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") + if profileRoot == "" { + home, _ := os.UserHomeDir() + profileRoot = filepath.Join(home, ".plexum") + } + bindings, err := config.Load(filepath.Join(profileRoot, "channels")) + if err != nil { + // Logged later in Init; manifest call can't itself reach a logger. + return nil + } + out := make([]plugin.ChannelContract, 0, len(bindings)) + for _, b := range bindings { + out = append(out, plugin.ChannelContract{ + Name: b.PlexumChannelName, OutboundTool: "send", + }) + } + return out +} + +func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error { + p.host = host + p.sessions = map[string]*fabric.Session{} + + profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") + if profileRoot == "" { + home, _ := os.UserHomeDir() + profileRoot = filepath.Join(home, ".plexum") + } + + // Plugin-private config. + p.cfgPath = filepath.Join(profileRoot, "plugins", config.PluginName, "config.json") + raw, err := os.ReadFile(p.cfgPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("read %s: %w", p.cfgPath, err) + } + if len(raw) > 0 { + if err := json.Unmarshal(raw, &p.cfg); err != nil { + return fmt.Errorf("parse %s: %w", p.cfgPath, err) + } + } + if p.cfg.CenterAPIBase == "" { + p.cfg.CenterAPIBase = "http://localhost:7001/api" + } + p.client = fabric.New(p.cfg.CenterAPIBase) + + // Identity. + idPath := filepath.Join(profileRoot, identity.FileName) + p.identities, err = identity.Open(idPath) + if err != nil { + return fmt.Errorf("identity: %w", err) + } + + // Channel bindings. + p.bindings, err = config.Load(filepath.Join(profileRoot, "channels")) + if err != nil { + return fmt.Errorf("channel bindings: %w", err) + } + p.byFabric = config.Index(p.bindings) + + host.Log("info", "fabric channel plugin initialized", map[string]any{ + "center": p.cfg.CenterAPIBase, + "identity_path": idPath, + "channels_loaded": len(p.bindings), + "identities_loaded": len(p.identities.AgentIDs()), + }) + + // Eager validate: for every bound agent that has a channel, do a + // blocking agentLogin so we surface bad keys at startup instead of + // on first outbound. F-2 hooks socket.io subscription here too. + if err := p.warmSessions(ctx); err != nil { + // Log + continue; outbound will retry on demand. We don't want + // to refuse plugin init just because one key is stale. + host.Log("warn", "fabric warm-sessions had errors", + map[string]any{"err": err.Error()}) + } + return nil +} + +func (p *fabricPlugin) warmSessions(ctx context.Context) error { + // Which agents appear as a binding's AgentID? + agentsNeeded := map[string]bool{} + for _, b := range p.bindings { + agentsNeeded[b.AgentID] = true + } + enabled := p.identities.EnabledEntries() + + var firstErr error + for agentID := range agentsNeeded { + entry, ok := enabled[agentID] + if !ok { + err := fmt.Errorf("agent %s has channels but no identity (run plexum-fabric-register --agent-id %s --api-key ...)", + agentID, agentID) + p.host.Log("warn", err.Error(), nil) + if firstErr == nil { + firstErr = err + } + continue + } + sess, err := p.client.AgentLogin(ctx, entry.FabricAPIKey) + if err != nil { + err = fmt.Errorf("agent %s login: %w", agentID, err) + p.host.Log("warn", err.Error(), nil) + if firstErr == nil { + firstErr = err + } + continue + } + p.sessMu.Lock() + p.sessions[agentID] = sess + p.sessMu.Unlock() + p.host.Log("info", "fabric session warm", map[string]any{ + "agent": agentID, "fabric_user": sess.User.Email, + "guilds": len(sess.Guilds), + }) + } + return firstErr +} + +// CallTool handles the "send" outbound tool. +func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (plugin.ToolResult, error) { + if name != "send" { + return plugin.ToolResult{}, fmt.Errorf("unknown tool: %s", name) + } + var args struct { + ChannelName string `json:"channel_name"` + SessionID string `json:"session_id"` + Message string `json:"message"` + } + if err := json.Unmarshal(input, &args); err != nil { + return plugin.ToolResult{}, fmt.Errorf("parse args: %w", err) + } + if args.ChannelName == "" { + return errResult("channel_name required"), nil + } + + // Find the binding for this plexum channel name. + var binding *config.FabricBinding + for i := range p.bindings { + if p.bindings[i].PlexumChannelName == args.ChannelName { + binding = &p.bindings[i] + break + } + } + if binding == nil { + return errResult("unknown plexum channel: " + args.ChannelName), nil + } + + // Resolve the bound agent's session (may need refresh — F-2 will + // add a proper TTL + background refresh; for F-1 we re-login lazily + // if the cache is empty). + sess, err := p.sessionFor(ctx, binding.AgentID) + if err != nil { + return errResult("session for agent " + binding.AgentID + ": " + err.Error()), nil + } + + // Pick the guild endpoint + token for the target guild_node_id. + var ( + endpoint string + token string + ) + for _, g := range sess.Guilds { + if g.NodeID == binding.FabricGuildNodeID { + endpoint = g.Endpoint + break + } + } + for _, t := range sess.GuildAccessTokens { + if t.GuildNodeID == binding.FabricGuildNodeID { + token = t.Token + break + } + } + if endpoint == "" || token == "" { + return errResult(fmt.Sprintf("agent %s has no access to guild %s", + binding.AgentID, binding.FabricGuildNodeID)), nil + } + + if err := p.client.PostMessage(ctx, endpoint, token, + binding.FabricChannelID, args.Message, sess.User.ID); err != nil { + return errResult("post: " + err.Error()), nil + } + return plugin.NewTextResult("sent"), nil +} + +func (p *fabricPlugin) sessionFor(ctx context.Context, agentID string) (*fabric.Session, error) { + p.sessMu.Lock() + sess := p.sessions[agentID] + p.sessMu.Unlock() + if sess != nil { + return sess, nil + } + entry := p.identities.Lookup(agentID) + if entry == nil || !entry.Enabled { + return nil, errors.New("no identity registered (use plexum-fabric-register)") + } + loginCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + fresh, err := p.client.AgentLogin(loginCtx, entry.FabricAPIKey) + if err != nil { + return nil, err + } + p.sessMu.Lock() + p.sessions[agentID] = fresh + p.sessMu.Unlock() + return fresh, nil +} + +func errResult(msg string) plugin.ToolResult { + return plugin.ToolResult{ + Content: []plugin.ContentBlock{{Type: "text", Text: msg}}, + IsError: true, + } +} + +func main() { + if err := plugin.Serve(&fabricPlugin{}); err != nil { + fmt.Fprintf(os.Stderr, "plexum-fabric-channel-plugin: %v\n", err) + os.Exit(1) + } +} diff --git a/cmd/plexum-fabric-register/main.go b/cmd/plexum-fabric-register/main.go new file mode 100644 index 0000000..0f7c97d --- /dev/null +++ b/cmd/plexum-fabric-register/main.go @@ -0,0 +1,91 @@ +// Command plexum-fabric-register binds a Plexum agent to a Fabric +// Center API key. Equivalent of openclaw's `fabric-register` script. +// +// Usage: +// +// plexum-fabric-register --api-key fak_... +// # agent id from $AGENT_ID env (set by exec MCP tool) +// plexum-fabric-register --agent-id alice --api-key fak_... +// +// Flags: +// --api-key Required. The Center-issued agent API key. +// --agent-id Optional when $AGENT_ID is set. +// --center Optional. Defaults to ${FABRIC_CENTER_API_BASE} or +// http://localhost:7001/api. +// --identity-file Optional path to fabric-identity.json (default +// ${PLEXUM_PROFILE_ROOT}/fabric-identity.json). +// +// Writes the (agent, api key, user id, email) tuple to identity file +// after validating the key against Center via /auth/agent/login. +package main + +import ( + "context" + "flag" + "fmt" + "os" + "time" + + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" +) + +func main() { + apiKey := flag.String("api-key", "", "Fabric Center API key (fak_...; required)") + agentID := flag.String("agent-id", "", "agent id (defaults to $AGENT_ID)") + centerBase := flag.String("center", "", "Center API base URL (default $FABRIC_CENTER_API_BASE or http://localhost:7001/api)") + identityFile := flag.String("identity-file", "", "identity registry path (default $PLEXUM_PROFILE_ROOT/fabric-identity.json)") + flag.Parse() + + if *apiKey == "" { + fatalf("--api-key is required") + } + if *agentID == "" { + *agentID = os.Getenv("AGENT_ID") + } + if *agentID == "" { + fatalf("--agent-id required (or set AGENT_ID env)") + } + if *centerBase == "" { + *centerBase = os.Getenv("FABRIC_CENTER_API_BASE") + } + if *centerBase == "" { + *centerBase = "http://localhost:7001/api" + } + if *identityFile == "" { + *identityFile = identity.DefaultPath() + } + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + client := fabric.New(*centerBase) + sess, err := client.AgentLogin(ctx, *apiKey) + if err != nil { + fatalf("validate key against %s: %v", *centerBase, err) + } + + reg, err := identity.Open(*identityFile) + if err != nil { + fatalf("open identity: %v", err) + } + reg.Set(*agentID, &identity.Entry{ + FabricAPIKey: *apiKey, + FabricUserID: sess.User.ID, + FabricEmail: sess.User.Email, + Enabled: true, + }) + if err := reg.Save(); err != nil { + fatalf("save identity: %v", err) + } + + fmt.Printf("registered agent %s as fabric user %s (%s); %d guilds\n", + *agentID, sess.User.Email, sess.User.ID, len(sess.Guilds)) + fmt.Printf("identity file: %s\n", *identityFile) + fmt.Println("restart the plexum gateway to pick up the new identity") +} + +func fatalf(format string, args ...any) { + fmt.Fprintf(os.Stderr, "plexum-fabric-register: "+format+"\n", args...) + os.Exit(1) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..50a2e38 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin + +go 1.24.2 + +require github.com/zishang520/socket.io-client-go v1.1.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..353f9b6 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/zishang520/socket.io-client-go v1.1.0 h1:rc+WtphqasRKuyxmjx/zqTrdIjzjG0Yv8nWeQcv5hTM= +github.com/zishang520/socket.io-client-go v1.1.0/go.mod h1:pFkvhEgVIjkUJyNvOvb62k78aFt2qwgJFNh2rfaCOc0= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..5f243ee --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,111 @@ +// Package config parses the per-channel binding files at +// /channels/.json. Plexum's channel.Registry already +// parses the {agent_id, plugin} core; this package additionally pulls +// out the plugin-specific `fabric` extension block describing which +// Fabric (guild, channelId) the Plexum channel name maps to. +package config + +import ( + "encoding/json" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "strings" +) + +// PluginName is what manifest.json declares as the plugin name; we +// only consume channels/*.json entries with this value in `plugin`. +const PluginName = "plexum-fabric-channel" + +// FabricBinding is one Plexum-channel ↔ Fabric-channel mapping. Built +// by Load from channels/.json files. +type FabricBinding struct { + // PlexumChannelName: the basename of channels/.json (no .json). + // Also matches the ChannelContract.Name the plugin manifest advertises. + PlexumChannelName string + // AgentID this channel routes to (also recorded in Plexum's channel + // registry; we re-read here for plugin-internal convenience). + AgentID string + // FabricGuildNodeID — which Fabric guild owns the channel. + FabricGuildNodeID string + // FabricChannelID — the channel id within that guild. + FabricChannelID string +} + +// On-disk shape; we ignore fields outside the `fabric` block. +type wireConfig struct { + AgentID string `json:"agent_id"` + Plugin string `json:"plugin"` + Fabric struct { + GuildNodeID string `json:"guild_node_id"` + ChannelID string `json:"channel_id"` + } `json:"fabric"` +} + +// Load returns all FabricBindings discovered under channelsDir. Files +// whose `plugin` field doesn't match PluginName are silently skipped +// (other channel plugins coexist in the same dir). Files with our +// plugin name but missing fabric.{guild_node_id, channel_id} are +// errors — we won't silently route nowhere. +func Load(channelsDir string) ([]FabricBinding, error) { + entries, err := os.ReadDir(channelsDir) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return nil, nil + } + return nil, fmt.Errorf("fabric/config: read %s: %w", channelsDir, err) + } + var out []FabricBinding + for _, e := range entries { + if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { + continue + } + path := filepath.Join(channelsDir, e.Name()) + raw, rerr := os.ReadFile(path) + if rerr != nil { + return nil, fmt.Errorf("fabric/config: read %s: %w", path, rerr) + } + var w wireConfig + if jerr := json.Unmarshal(raw, &w); jerr != nil { + return nil, fmt.Errorf("fabric/config: parse %s: %w", path, jerr) + } + if w.Plugin != PluginName { + continue + } + name := strings.TrimSuffix(e.Name(), ".json") + if w.AgentID == "" { + return nil, fmt.Errorf("fabric/config: %s missing agent_id", path) + } + if w.Fabric.GuildNodeID == "" || w.Fabric.ChannelID == "" { + return nil, fmt.Errorf("fabric/config: %s missing fabric.{guild_node_id, channel_id}", path) + } + out = append(out, FabricBinding{ + PlexumChannelName: name, + AgentID: w.AgentID, + FabricGuildNodeID: w.Fabric.GuildNodeID, + FabricChannelID: w.Fabric.ChannelID, + }) + } + return out, nil +} + +// ByFabricChannel indexes bindings by (guild_node_id, channel_id) for +// fast inbound lookup. Plugin builds this map once at startup. +type ByFabricChannel map[string]*FabricBinding + +// Key composes the index key. +func Key(guildNodeID, channelID string) string { + return guildNodeID + "/" + channelID +} + +// Index returns a ready-to-query ByFabricChannel. +func Index(bindings []FabricBinding) ByFabricChannel { + out := make(ByFabricChannel, len(bindings)) + for i := range bindings { + b := bindings[i] + out[Key(b.FabricGuildNodeID, b.FabricChannelID)] = &b + } + return out +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..a85f754 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,92 @@ +package config + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func write(t *testing.T, dir, name, content string) { + t.Helper() + if err := os.WriteFile(filepath.Join(dir, name), []byte(content), 0o600); err != nil { + t.Fatal(err) + } +} + +func TestLoadHappyPath(t *testing.T) { + dir := t.TempDir() + write(t, dir, "team-x.json", `{ + "agent_id": "alice", + "plugin": "plexum-fabric-channel", + "fabric": {"guild_node_id": "gn_1", "channel_id": "ch_x"} + }`) + write(t, dir, "team-y.json", `{ + "agent_id": "bob", + "plugin": "plexum-fabric-channel", + "fabric": {"guild_node_id": "gn_2", "channel_id": "ch_y"} + }`) + got, err := Load(dir) + if err != nil { + t.Fatal(err) + } + if len(got) != 2 { + t.Fatalf("len = %d", len(got)) + } +} + +func TestLoadSkipsOtherPlugins(t *testing.T) { + dir := t.TempDir() + write(t, dir, "mine.json", `{"agent_id":"a","plugin":"plexum-fabric-channel","fabric":{"guild_node_id":"g","channel_id":"c"}}`) + write(t, dir, "other.json", `{"agent_id":"a","plugin":"another-plugin"}`) + write(t, dir, "no-plugin.json", `{"agent_id":"a"}`) + got, err := Load(dir) + if err != nil { + t.Fatal(err) + } + if len(got) != 1 || got[0].PlexumChannelName != "mine" { + t.Errorf("got = %+v", got) + } +} + +func TestLoadErrorsOnMissingFabricFields(t *testing.T) { + dir := t.TempDir() + write(t, dir, "broken.json", `{"agent_id":"a","plugin":"plexum-fabric-channel"}`) + _, err := Load(dir) + if err == nil || !strings.Contains(err.Error(), "fabric") { + t.Errorf("err = %v", err) + } +} + +func TestLoadErrorsOnMissingAgentID(t *testing.T) { + dir := t.TempDir() + write(t, dir, "broken.json", `{"plugin":"plexum-fabric-channel","fabric":{"guild_node_id":"g","channel_id":"c"}}`) + _, err := Load(dir) + if err == nil || !strings.Contains(err.Error(), "agent_id") { + t.Errorf("err = %v", err) + } +} + +func TestLoadMissingDirEmpty(t *testing.T) { + got, err := Load(filepath.Join(t.TempDir(), "nope")) + if err != nil || got != nil { + t.Errorf("missing dir: err=%v got=%v", err, got) + } +} + +func TestIndex(t *testing.T) { + bindings := []FabricBinding{ + {PlexumChannelName: "a", AgentID: "u", FabricGuildNodeID: "g1", FabricChannelID: "c1"}, + {PlexumChannelName: "b", AgentID: "u", FabricGuildNodeID: "g1", FabricChannelID: "c2"}, + } + idx := Index(bindings) + if idx[Key("g1", "c1")] == nil || idx[Key("g1", "c1")].PlexumChannelName != "a" { + t.Errorf("idx miss for c1") + } + if idx[Key("g1", "c2")] == nil || idx[Key("g1", "c2")].PlexumChannelName != "b" { + t.Errorf("idx miss for c2") + } + if idx[Key("g1", "ghost")] != nil { + t.Errorf("ghost entry") + } +} diff --git a/internal/fabric/client.go b/internal/fabric/client.go new file mode 100644 index 0000000..208296a --- /dev/null +++ b/internal/fabric/client.go @@ -0,0 +1,274 @@ +// Package fabric is a thin Go port of Fabric.OpenclawPlugin's +// fabric-client.ts — Center auth + Guild REST. v0.1 covers what +// F-1 needs (auth/login, refresh, me/guilds, postMessage, listChannels, +// listMessages, channelMembers); the canvas / commands / sub-discussion +// surfaces arrive in later phases as their tools land. +package fabric + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "time" +) + +// Session is what /auth/agent/login returns. +type Session struct { + AccessToken string `json:"accessToken"` + RefreshToken string `json:"refreshToken"` + User SessionUser `json:"user"` + Guilds []GuildInfo `json:"guilds"` + GuildAccessTokens []GuildAccessToken `json:"guildAccessTokens"` +} + +// SessionUser is the user metadata baked into the session. +type SessionUser struct { + ID string `json:"id"` + Email string `json:"email"` + Name string `json:"name"` +} + +// GuildInfo describes one guild this user belongs to. +type GuildInfo struct { + NodeID string `json:"nodeId"` + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Status string `json:"status"` + Purpose *string `json:"purpose,omitempty"` +} + +// GuildAccessToken pairs a per-guild short-lived JWT with the guild node. +type GuildAccessToken struct { + GuildNodeID string `json:"guildNodeId"` + Token string `json:"token"` +} + +// Client is a thin wrapper around net/http.Client. +type Client struct { + CenterAPIBase string // e.g. "http://localhost:7001/api" + HTTP *http.Client +} + +// New constructs a Client with a sensible default http client (30s timeout). +func New(centerAPIBase string) *Client { + return &Client{ + CenterAPIBase: centerAPIBase, + HTTP: &http.Client{Timeout: 30 * time.Second}, + } +} + +// ---- low-level helpers ---- + +func (c *Client) do(ctx context.Context, method, url string, auth string, body any, extraHeaders map[string]string) ([]byte, error) { + var reader io.Reader + if body != nil { + raw, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("fabric: marshal: %w", err) + } + reader = bytes.NewReader(raw) + } + req, err := http.NewRequestWithContext(ctx, method, url, reader) + if err != nil { + return nil, err + } + if body != nil { + req.Header.Set("content-type", "application/json") + } + if auth != "" { + req.Header.Set("authorization", "Bearer "+auth) + } + for k, v := range extraHeaders { + req.Header.Set(k, v) + } + resp, err := c.HTTP.Do(req) + if err != nil { + return nil, fmt.Errorf("fabric: %s %s: %w", method, url, err) + } + defer resp.Body.Close() + raw, _ := io.ReadAll(resp.Body) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("fabric: %s %s -> %d: %s", method, url, resp.StatusCode, string(raw)) + } + return raw, nil +} + +func (c *Client) postJSON(ctx context.Context, url string, body any, auth string, out any) error { + raw, err := c.do(ctx, http.MethodPost, url, auth, body, nil) + if err != nil { + return err + } + if out == nil || len(raw) == 0 { + return nil + } + return json.Unmarshal(raw, out) +} + +func (c *Client) getJSON(ctx context.Context, url, auth string, out any) error { + raw, err := c.do(ctx, http.MethodGet, url, auth, nil, nil) + if err != nil { + return err + } + if out == nil || len(raw) == 0 { + return nil + } + return json.Unmarshal(raw, out) +} + +// ---- Center: auth ---- + +// AgentLogin exchanges an API key for a fresh session + guild tokens. +func (c *Client) AgentLogin(ctx context.Context, apiKey string) (*Session, error) { + var s Session + if err := c.postJSON(ctx, c.CenterAPIBase+"/auth/agent/login", + map[string]string{"apiKey": apiKey}, "", &s); err != nil { + return nil, err + } + return &s, nil +} + +// Refresh trades a refresh token for a fresh access token (guild +// tokens are re-fetched separately via MeGuilds). +func (c *Client) Refresh(ctx context.Context, refreshToken string) (*RefreshResponse, error) { + var out RefreshResponse + if err := c.postJSON(ctx, c.CenterAPIBase+"/auth/refresh", + map[string]string{"refreshToken": refreshToken}, "", &out); err != nil { + return nil, err + } + return &out, nil +} + +// RefreshResponse is the shape returned by /auth/refresh. +type RefreshResponse struct { + AccessToken string `json:"accessToken"` + RefreshToken string `json:"refreshToken"` +} + +// MeGuilds returns the calling user's guild list + fresh per-guild tokens. +func (c *Client) MeGuilds(ctx context.Context, accessToken string) (*MeGuildsResponse, error) { + var out MeGuildsResponse + if err := c.getJSON(ctx, c.CenterAPIBase+"/auth/me/guilds", accessToken, &out); err != nil { + return nil, err + } + return &out, nil +} + +// MeGuildsResponse subset of Session. +type MeGuildsResponse struct { + Guilds []GuildInfo `json:"guilds"` + GuildAccessTokens []GuildAccessToken `json:"guildAccessTokens"` +} + +// ---- Guild: messaging ---- + +// PostMessage posts plain content to a channel as authorUserID. +func (c *Client) PostMessage(ctx context.Context, guildEndpoint, guildToken, channelID, content, authorUserID string) error { + _, err := c.do(ctx, http.MethodPost, + guildEndpoint+"/api/channels/"+url.PathEscape(channelID)+"/messages", + guildToken, + map[string]string{"content": content, "authorUserId": authorUserID}, + nil) + return err +} + +// ChannelMembers lists members of a channel. +type ChannelMember struct { + UserID string `json:"userId"` + Bypass bool `json:"bypass,omitempty"` +} + +func (c *Client) ChannelMembers(ctx context.Context, guildEndpoint, guildToken, channelID string) ([]ChannelMember, error) { + var out []ChannelMember + if err := c.getJSON(ctx, + guildEndpoint+"/api/channels/"+url.PathEscape(channelID)+"/members", + guildToken, &out); err != nil { + return nil, err + } + return out, nil +} + +// ---- Guild: channel discovery + history ---- + +// Channel is the wire shape returned by /api/channels list/get. +type Channel struct { + ID string `json:"id"` + GuildID string `json:"guildId"` + Name string `json:"name"` + XType string `json:"xType"` + Kind string `json:"kind"` + IsPublic bool `json:"isPublic"` + Closed bool `json:"closed"` + LastSeq int `json:"lastSeq"` + CreatedAt string `json:"createdAt"` + Purpose *string `json:"purpose,omitempty"` +} + +// ListChannels lists all channels in a guild visible to the calling user. +func (c *Client) ListChannels(ctx context.Context, guildEndpoint, guildToken, guildNodeID string) ([]Channel, error) { + var out []Channel + u := guildEndpoint + "/api/channels?guildId=" + url.QueryEscape(guildNodeID) + if err := c.getJSON(ctx, u, guildToken, &out); err != nil { + return nil, err + } + return out, nil +} + +// Message is the wire shape of one message in history. +type Message struct { + MessageID string `json:"messageId"` + Seq int `json:"seq"` + Content string `json:"content"` + AuthorUserID string `json:"authorUserId"` + CreatedAt string `json:"createdAt"` + EditedAt *string `json:"editedAt"` + DeletedAt *string `json:"deletedAt"` + IsDeleted bool `json:"isDeleted"` +} + +// MessagePage wraps a window of messages + pagination metadata. +type MessagePage struct { + Items []Message `json:"items"` + Page struct { + SeqFrom int `json:"seqFrom"` + SeqTo int `json:"seqTo"` + Limit int `json:"limit"` + Returned int `json:"returned"` + HasMore bool `json:"hasMore"` + NextExpectedSeq int `json:"nextExpectedSeq"` + HighestCommittedSeq int `json:"highestCommittedSeq"` + } `json:"page"` +} + +// ListMessages fetches a window of messages by seq. +func (c *Client) ListMessages(ctx context.Context, guildEndpoint, guildToken, channelID string, opts ListMessagesOpts) (*MessagePage, error) { + qs := url.Values{} + if opts.SeqFrom > 0 { + qs.Set("seq_from", fmt.Sprint(opts.SeqFrom)) + } + if opts.SeqTo > 0 { + qs.Set("seq_to", fmt.Sprint(opts.SeqTo)) + } + if opts.Limit > 0 { + qs.Set("limit", fmt.Sprint(opts.Limit)) + } + u := guildEndpoint + "/api/channels/" + url.PathEscape(channelID) + "/messages" + if encoded := qs.Encode(); encoded != "" { + u += "?" + encoded + } + var out MessagePage + if err := c.getJSON(ctx, u, guildToken, &out); err != nil { + return nil, err + } + return &out, nil +} + +// ListMessagesOpts is the optional paging window. +type ListMessagesOpts struct { + SeqFrom int + SeqTo int + Limit int +} diff --git a/internal/identity/identity.go b/internal/identity/identity.go new file mode 100644 index 0000000..818dedc --- /dev/null +++ b/internal/identity/identity.go @@ -0,0 +1,157 @@ +// Package identity manages the per-agent Fabric API key registry at +// /fabric-identity.json. Format mirrors openclaw's +// fabric-identity.json so existing operator muscle memory transfers: +// +// { +// "agents": { +// "": { +// "fabric_api_key": "fak_...", +// "fabric_user_id": "u_...", // optional, recorded on register +// "fabric_email": "...", // optional +// "enabled": true +// } +// } +// } +// +// `plexum-fabric-register` writes here; the plugin reads from here at +// startup (and rereads on SIGHUP — future work). +package identity + +import ( + "encoding/json" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "sort" + "sync" +) + +// FileName is the basename under /. +const FileName = "fabric-identity.json" + +// Entry is one agent's identity binding. +type Entry struct { + FabricAPIKey string `json:"fabric_api_key"` + FabricUserID string `json:"fabric_user_id,omitempty"` + FabricEmail string `json:"fabric_email,omitempty"` + Enabled bool `json:"enabled"` +} + +// Registry wraps the JSON file. Thread-safe. +type Registry struct { + mu sync.Mutex + path string + data map[string]*Entry +} + +// Open loads (or creates an empty) registry at the given absolute path. +func Open(path string) (*Registry, error) { + r := &Registry{path: path, data: map[string]*Entry{}} + raw, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return r, nil + } + return nil, fmt.Errorf("identity: read %s: %w", path, err) + } + if len(raw) == 0 { + return r, nil + } + var wire struct { + Agents map[string]*Entry `json:"agents"` + } + if err := json.Unmarshal(raw, &wire); err != nil { + return nil, fmt.Errorf("identity: parse %s: %w", path, err) + } + if wire.Agents != nil { + r.data = wire.Agents + } + return r, nil +} + +// Lookup returns the entry for agentID (nil if missing). +func (r *Registry) Lookup(agentID string) *Entry { + r.mu.Lock() + defer r.mu.Unlock() + return r.data[agentID] +} + +// Set inserts/replaces the entry for agentID. Does NOT persist. +func (r *Registry) Set(agentID string, e *Entry) { + r.mu.Lock() + defer r.mu.Unlock() + r.data[agentID] = e +} + +// Delete removes agentID; returns true iff it was present. +func (r *Registry) Delete(agentID string) bool { + r.mu.Lock() + defer r.mu.Unlock() + if _, ok := r.data[agentID]; !ok { + return false + } + delete(r.data, agentID) + return true +} + +// AgentIDs returns the sorted list of registered agent ids. +func (r *Registry) AgentIDs() []string { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]string, 0, len(r.data)) + for k := range r.data { + out = append(out, k) + } + sort.Strings(out) + return out +} + +// EnabledEntries returns a copy of (agentID, entry) for entries with +// Enabled=true. Plugin uses this to decide which agents to bring up. +func (r *Registry) EnabledEntries() map[string]*Entry { + r.mu.Lock() + defer r.mu.Unlock() + out := map[string]*Entry{} + for k, v := range r.data { + if v != nil && v.Enabled { + copyE := *v + out[k] = ©E + } + } + return out +} + +// Save atomically writes the registry (tmp+rename, 0600 — API keys live +// here, treat as secrets). +func (r *Registry) Save() error { + r.mu.Lock() + defer r.mu.Unlock() + if err := os.MkdirAll(filepath.Dir(r.path), 0o755); err != nil { + return fmt.Errorf("identity: mkdir: %w", err) + } + payload := struct { + Agents map[string]*Entry `json:"agents"` + }{Agents: r.data} + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + return err + } + tmp := r.path + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + return fmt.Errorf("identity: write tmp: %w", err) + } + return os.Rename(tmp, r.path) +} + +// DefaultPath returns the canonical path under PLEXUM_PROFILE_ROOT or +// ~/.plexum if the env var isn't set. +func DefaultPath() string { + root := os.Getenv("PLEXUM_PROFILE_ROOT") + if root == "" { + home, _ := os.UserHomeDir() + root = filepath.Join(home, ".plexum") + } + return filepath.Join(root, FileName) +} diff --git a/internal/identity/identity_test.go b/internal/identity/identity_test.go new file mode 100644 index 0000000..e06dd9e --- /dev/null +++ b/internal/identity/identity_test.go @@ -0,0 +1,88 @@ +package identity + +import ( + "os" + "path/filepath" + "testing" +) + +func TestOpenMissingFileEmpty(t *testing.T) { + r, err := Open(filepath.Join(t.TempDir(), "nope.json")) + if err != nil { + t.Fatal(err) + } + if len(r.AgentIDs()) != 0 { + t.Errorf("expected empty registry") + } +} + +func TestSetSaveReload(t *testing.T) { + path := filepath.Join(t.TempDir(), "id.json") + r, _ := Open(path) + r.Set("alice", &Entry{FabricAPIKey: "fak_alice", FabricEmail: "a@x", Enabled: true}) + r.Set("bob", &Entry{FabricAPIKey: "fak_bob", Enabled: false}) + if err := r.Save(); err != nil { + t.Fatal(err) + } + + r2, err := Open(path) + if err != nil { + t.Fatal(err) + } + a := r2.Lookup("alice") + if a == nil || a.FabricAPIKey != "fak_alice" || !a.Enabled { + t.Errorf("alice = %+v", a) + } + b := r2.Lookup("bob") + if b == nil || b.Enabled { + t.Errorf("bob = %+v", b) + } + + st, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + if st.Mode().Perm() != 0o600 { + t.Errorf("perms = %o, want 0600", st.Mode().Perm()) + } +} + +func TestEnabledEntriesFiltersDisabled(t *testing.T) { + path := filepath.Join(t.TempDir(), "id.json") + r, _ := Open(path) + r.Set("a", &Entry{FabricAPIKey: "x", Enabled: true}) + r.Set("b", &Entry{FabricAPIKey: "y", Enabled: false}) + r.Set("c", &Entry{FabricAPIKey: "z", Enabled: true}) + out := r.EnabledEntries() + if len(out) != 2 || out["a"] == nil || out["c"] == nil { + t.Errorf("EnabledEntries = %+v", out) + } + if out["b"] != nil { + t.Errorf("disabled should be filtered") + } +} + +func TestDelete(t *testing.T) { + r, _ := Open(filepath.Join(t.TempDir(), "id.json")) + r.Set("a", &Entry{FabricAPIKey: "x", Enabled: true}) + if !r.Delete("a") { + t.Errorf("delete present should return true") + } + if r.Delete("a") { + t.Errorf("delete missing should return false") + } +} + +func TestAgentIDsSorted(t *testing.T) { + r, _ := Open(filepath.Join(t.TempDir(), "id.json")) + for _, k := range []string{"z", "a", "m"} { + r.Set(k, &Entry{FabricAPIKey: "x", Enabled: true}) + } + ids := r.AgentIDs() + want := []string{"a", "m", "z"} + for i := range want { + if ids[i] != want[i] { + t.Errorf("ids[%d] = %q, want %q", i, ids[i], want[i]) + } + } +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..367458d --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,115 @@ +#!/usr/bin/env bash +# Plexum-fabric-channel-plugin local installer. +# +# Builds + installs: +# ~/.plexum/plugins/plexum-fabric-channel/plexum-fabric-channel-plugin +# ~/.plexum/plugins/plexum-fabric-channel/manifest.json (initial; empty channels list) +# ~/.local/bin/plexum-fabric-register (CLI for binding agents) +# +# Re-runnable: rebuilds binaries; overwrites manifest only if --reset-manifest. +# Profile data (identity registry, channel configs) is never touched. +# +# Flags: +# --profile

Override profile root (default ~/.plexum) +# --reset-manifest Overwrite the manifest's channels list with one +# derived from current channels/*.json (matches what +# the plugin advertises at runtime). Useful after +# adding/removing channels. +set -euo pipefail + +REPO="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/.." && pwd)" +PROFILE_DIR="${HOME}/.plexum" +USER_BIN="${HOME}/.local/bin" +RESET_MANIFEST=0 + +while [[ $# -gt 0 ]]; do + case "$1" in + --profile) PROFILE_DIR="$2"; shift 2 ;; + --reset-manifest) RESET_MANIFEST=1; shift ;; + -h|--help) sed -n '2,/^set -euo/p' "$0" | sed -n '/^#/p' | sed 's/^# \{0,1\}//'; exit 0 ;; + *) echo "unknown flag: $1" >&2; exit 2 ;; + esac +done + +log() { printf '\033[1;34m[fabric-install]\033[0m %s\n' "$*"; } + +command -v go >/dev/null || { echo "go not found on PATH" >&2; exit 1; } + +PLUGIN_DIR="${PROFILE_DIR}/plugins/plexum-fabric-channel" +mkdir -p "${PLUGIN_DIR}" "${USER_BIN}" + +cd "${REPO}" +VERSION="$(git describe --tags --always 2>/dev/null || echo dev)" +LDFLAGS="-X main.Version=${VERSION}" +log "building plexum-fabric-channel-plugin (v=${VERSION})" +CGO_ENABLED=0 go build -ldflags="${LDFLAGS}" \ + -o "${PLUGIN_DIR}/plexum-fabric-channel-plugin" ./cmd/plexum-fabric-channel-plugin +log "building plexum-fabric-register" +CGO_ENABLED=0 go build -ldflags="${LDFLAGS}" \ + -o "${USER_BIN}/plexum-fabric-register" ./cmd/plexum-fabric-register + +MANIFEST_PATH="${PLUGIN_DIR}/manifest.json" +if [[ ! -f "${MANIFEST_PATH}" || ${RESET_MANIFEST} -eq 1 ]]; then + log "writing initial manifest at ${MANIFEST_PATH}" + # Build channels[] from any channels/*.json that name our plugin. + CHANNELS_DIR="${PROFILE_DIR}/channels" + CHANNELS_JSON='[]' + if [[ -d "${CHANNELS_DIR}" ]]; then + CHANNELS_JSON=$(python3 -c " +import json, os, sys +out = [] +for f in sorted(os.listdir('${CHANNELS_DIR}')): + if not f.endswith('.json'): continue + try: + d = json.load(open(os.path.join('${CHANNELS_DIR}', f))) + except Exception: + continue + if d.get('plugin') == 'plexum-fabric-channel': + out.append({'name': f[:-5], 'outboundTool': 'send'}) +print(json.dumps(out)) +") + fi + cat > "${MANIFEST_PATH}" < --api-key fak_... + 2. Bind channels: write \`${PROFILE_DIR}/channels/.json\` + (see README); re-run install.sh --reset-manifest after + adding/removing channels. + 3. Allow plugin: add "plexum-fabric-channel" to \`${PROFILE_DIR}/plexum.json\` + under .plugins.allow. + 4. Restart: systemctl --user restart plexum +EOF