feat: Phase F-1 — Plexum-fabric-channel-plugin foundation
Ports the foundation of Fabric.OpenclawPlugin to a native Plexum channel plugin (Go). F-2+ phases (socket.io inbound, wakeup gate, tools, presence, etc.) follow. Layout: internal/identity/ — fabric-identity.json registry (agent → API key) internal/fabric/ — REST client (Center auth + Guild messaging) internal/config/ — channels/<name>.json fabric extension parser cmd/plexum-fabric-register/ — agent registration CLI cmd/plexum-fabric-channel-plugin/— Plexum SDK plugin entry scripts/install.sh — build + install + manifest generator Plugin behavior (F-1): - Reads <profile>/channels/*.json, filters plugin=plexum-fabric-channel, builds (plexum-channel-name → fabric channel-id) index - Validates each bound agent's API key against Center at init (warmSessions); logs warning but doesn't refuse init on bad keys - `send` MCP tool: POST plain text to the bound Fabric channel as the agent user; selects guild endpoint+token from cached session - Manifest channels[] is generated by install.sh from current channels/*.json — re-run with --reset-manifest after adding bindings - Plugin-private config at <profile>/plugins/plexum-fabric-channel/config.json (center_api_base, default http://localhost:7001/api) Live smoke verified: - plexum-fabric-register against running Fabric Center (port 7001): validated fak_..., wrote identity file with user_id + email captured Tests: identity (5) + config (6) = 11 unit tests. F-2 will hook socket.io for inbound + wakeup gating + token refresh. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
311
cmd/plexum-fabric-channel-plugin/main.go
Normal file
311
cmd/plexum-fabric-channel-plugin/main.go
Normal file
@@ -0,0 +1,311 @@
|
||||
// plexum-fabric-channel-plugin is the Plexum channel plugin that
|
||||
// connects Plexum agents to a Fabric guild as members.
|
||||
//
|
||||
// F-1 (current): identity load, channel config discovery, REST send,
|
||||
// agentLogin handshake. Plugin advertises channels via manifest +
|
||||
// reads channels/<name>.json for the Plexum-channel → Fabric-channel
|
||||
// mapping. The `send` outbound tool posts via Fabric REST.
|
||||
//
|
||||
// F-2+ (deferred): socket.io inbound, wakeup gating, token refresh,
|
||||
// presence sync, sub-discussion, MCP tool surface (~15 tools from
|
||||
// the openclaw plugin's tools.ts), attachments, channel canvas.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity"
|
||||
)
|
||||
|
||||
// HostConfig is the plugin's own config at
|
||||
// <profile>/plugins/plexum-fabric-channel/config.json:
|
||||
//
|
||||
// {
|
||||
// "center_api_base": "http://localhost:7001/api"
|
||||
// }
|
||||
type HostConfig struct {
|
||||
CenterAPIBase string `json:"center_api_base"`
|
||||
}
|
||||
|
||||
type fabricPlugin struct {
|
||||
host plugin.HostAPI
|
||||
|
||||
cfgPath string
|
||||
cfg HostConfig
|
||||
identities *identity.Registry
|
||||
bindings []config.FabricBinding
|
||||
byFabric config.ByFabricChannel
|
||||
client *fabric.Client
|
||||
|
||||
// Per-agent Session cache (refreshed lazily; full refresh in F-2).
|
||||
sessMu sync.Mutex
|
||||
sessions map[string]*fabric.Session // agentID → session
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) Manifest() plugin.Manifest {
|
||||
// Manifest channels are populated dynamically from channels/*.json
|
||||
// at startup: the operator adds a channels/<name>.json + restarts
|
||||
// the gateway, and the matching ChannelContract entry surfaces here.
|
||||
// Both halves needed because Plexum's host registry reads the
|
||||
// manifest's channel names too.
|
||||
channels := p.dynamicChannelContracts()
|
||||
return plugin.Manifest{
|
||||
Name: config.PluginName,
|
||||
Version: "0.1.0",
|
||||
Activation: plugin.ActivationLazy,
|
||||
Executable: "plexum-fabric-channel-plugin",
|
||||
Contracts: plugin.Contracts{
|
||||
Channels: channels,
|
||||
Tools: []plugin.ToolContract{
|
||||
{
|
||||
Name: "send",
|
||||
Description: "Post a plain-text message to the bound Fabric channel as the agent user.",
|
||||
InputSchema: json.RawMessage(`{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"channel_name": {"type": "string"},
|
||||
"session_id": {"type": "string"},
|
||||
"message": {"type": "string"}
|
||||
},
|
||||
"required": ["channel_name", "message"]
|
||||
}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) dynamicChannelContracts() []plugin.ChannelContract {
|
||||
// Read channels/*.json from <profile>/channels and surface every
|
||||
// `plugin: plexum-fabric-channel` entry as a ChannelContract.
|
||||
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
|
||||
if profileRoot == "" {
|
||||
home, _ := os.UserHomeDir()
|
||||
profileRoot = filepath.Join(home, ".plexum")
|
||||
}
|
||||
bindings, err := config.Load(filepath.Join(profileRoot, "channels"))
|
||||
if err != nil {
|
||||
// Logged later in Init; manifest call can't itself reach a logger.
|
||||
return nil
|
||||
}
|
||||
out := make([]plugin.ChannelContract, 0, len(bindings))
|
||||
for _, b := range bindings {
|
||||
out = append(out, plugin.ChannelContract{
|
||||
Name: b.PlexumChannelName, OutboundTool: "send",
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error {
|
||||
p.host = host
|
||||
p.sessions = map[string]*fabric.Session{}
|
||||
|
||||
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
|
||||
if profileRoot == "" {
|
||||
home, _ := os.UserHomeDir()
|
||||
profileRoot = filepath.Join(home, ".plexum")
|
||||
}
|
||||
|
||||
// Plugin-private config.
|
||||
p.cfgPath = filepath.Join(profileRoot, "plugins", config.PluginName, "config.json")
|
||||
raw, err := os.ReadFile(p.cfgPath)
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return fmt.Errorf("read %s: %w", p.cfgPath, err)
|
||||
}
|
||||
if len(raw) > 0 {
|
||||
if err := json.Unmarshal(raw, &p.cfg); err != nil {
|
||||
return fmt.Errorf("parse %s: %w", p.cfgPath, err)
|
||||
}
|
||||
}
|
||||
if p.cfg.CenterAPIBase == "" {
|
||||
p.cfg.CenterAPIBase = "http://localhost:7001/api"
|
||||
}
|
||||
p.client = fabric.New(p.cfg.CenterAPIBase)
|
||||
|
||||
// Identity.
|
||||
idPath := filepath.Join(profileRoot, identity.FileName)
|
||||
p.identities, err = identity.Open(idPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("identity: %w", err)
|
||||
}
|
||||
|
||||
// Channel bindings.
|
||||
p.bindings, err = config.Load(filepath.Join(profileRoot, "channels"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("channel bindings: %w", err)
|
||||
}
|
||||
p.byFabric = config.Index(p.bindings)
|
||||
|
||||
host.Log("info", "fabric channel plugin initialized", map[string]any{
|
||||
"center": p.cfg.CenterAPIBase,
|
||||
"identity_path": idPath,
|
||||
"channels_loaded": len(p.bindings),
|
||||
"identities_loaded": len(p.identities.AgentIDs()),
|
||||
})
|
||||
|
||||
// Eager validate: for every bound agent that has a channel, do a
|
||||
// blocking agentLogin so we surface bad keys at startup instead of
|
||||
// on first outbound. F-2 hooks socket.io subscription here too.
|
||||
if err := p.warmSessions(ctx); err != nil {
|
||||
// Log + continue; outbound will retry on demand. We don't want
|
||||
// to refuse plugin init just because one key is stale.
|
||||
host.Log("warn", "fabric warm-sessions had errors",
|
||||
map[string]any{"err": err.Error()})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) warmSessions(ctx context.Context) error {
|
||||
// Which agents appear as a binding's AgentID?
|
||||
agentsNeeded := map[string]bool{}
|
||||
for _, b := range p.bindings {
|
||||
agentsNeeded[b.AgentID] = true
|
||||
}
|
||||
enabled := p.identities.EnabledEntries()
|
||||
|
||||
var firstErr error
|
||||
for agentID := range agentsNeeded {
|
||||
entry, ok := enabled[agentID]
|
||||
if !ok {
|
||||
err := fmt.Errorf("agent %s has channels but no identity (run plexum-fabric-register --agent-id %s --api-key ...)",
|
||||
agentID, agentID)
|
||||
p.host.Log("warn", err.Error(), nil)
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
sess, err := p.client.AgentLogin(ctx, entry.FabricAPIKey)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("agent %s login: %w", agentID, err)
|
||||
p.host.Log("warn", err.Error(), nil)
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
p.sessMu.Lock()
|
||||
p.sessions[agentID] = sess
|
||||
p.sessMu.Unlock()
|
||||
p.host.Log("info", "fabric session warm", map[string]any{
|
||||
"agent": agentID, "fabric_user": sess.User.Email,
|
||||
"guilds": len(sess.Guilds),
|
||||
})
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
// CallTool handles the "send" outbound tool.
|
||||
func (p *fabricPlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (plugin.ToolResult, error) {
|
||||
if name != "send" {
|
||||
return plugin.ToolResult{}, fmt.Errorf("unknown tool: %s", name)
|
||||
}
|
||||
var args struct {
|
||||
ChannelName string `json:"channel_name"`
|
||||
SessionID string `json:"session_id"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
if err := json.Unmarshal(input, &args); err != nil {
|
||||
return plugin.ToolResult{}, fmt.Errorf("parse args: %w", err)
|
||||
}
|
||||
if args.ChannelName == "" {
|
||||
return errResult("channel_name required"), nil
|
||||
}
|
||||
|
||||
// Find the binding for this plexum channel name.
|
||||
var binding *config.FabricBinding
|
||||
for i := range p.bindings {
|
||||
if p.bindings[i].PlexumChannelName == args.ChannelName {
|
||||
binding = &p.bindings[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if binding == nil {
|
||||
return errResult("unknown plexum channel: " + args.ChannelName), nil
|
||||
}
|
||||
|
||||
// Resolve the bound agent's session (may need refresh — F-2 will
|
||||
// add a proper TTL + background refresh; for F-1 we re-login lazily
|
||||
// if the cache is empty).
|
||||
sess, err := p.sessionFor(ctx, binding.AgentID)
|
||||
if err != nil {
|
||||
return errResult("session for agent " + binding.AgentID + ": " + err.Error()), nil
|
||||
}
|
||||
|
||||
// Pick the guild endpoint + token for the target guild_node_id.
|
||||
var (
|
||||
endpoint string
|
||||
token string
|
||||
)
|
||||
for _, g := range sess.Guilds {
|
||||
if g.NodeID == binding.FabricGuildNodeID {
|
||||
endpoint = g.Endpoint
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, t := range sess.GuildAccessTokens {
|
||||
if t.GuildNodeID == binding.FabricGuildNodeID {
|
||||
token = t.Token
|
||||
break
|
||||
}
|
||||
}
|
||||
if endpoint == "" || token == "" {
|
||||
return errResult(fmt.Sprintf("agent %s has no access to guild %s",
|
||||
binding.AgentID, binding.FabricGuildNodeID)), nil
|
||||
}
|
||||
|
||||
if err := p.client.PostMessage(ctx, endpoint, token,
|
||||
binding.FabricChannelID, args.Message, sess.User.ID); err != nil {
|
||||
return errResult("post: " + err.Error()), nil
|
||||
}
|
||||
return plugin.NewTextResult("sent"), nil
|
||||
}
|
||||
|
||||
func (p *fabricPlugin) sessionFor(ctx context.Context, agentID string) (*fabric.Session, error) {
|
||||
p.sessMu.Lock()
|
||||
sess := p.sessions[agentID]
|
||||
p.sessMu.Unlock()
|
||||
if sess != nil {
|
||||
return sess, nil
|
||||
}
|
||||
entry := p.identities.Lookup(agentID)
|
||||
if entry == nil || !entry.Enabled {
|
||||
return nil, errors.New("no identity registered (use plexum-fabric-register)")
|
||||
}
|
||||
loginCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||
defer cancel()
|
||||
fresh, err := p.client.AgentLogin(loginCtx, entry.FabricAPIKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.sessMu.Lock()
|
||||
p.sessions[agentID] = fresh
|
||||
p.sessMu.Unlock()
|
||||
return fresh, nil
|
||||
}
|
||||
|
||||
func errResult(msg string) plugin.ToolResult {
|
||||
return plugin.ToolResult{
|
||||
Content: []plugin.ContentBlock{{Type: "text", Text: msg}},
|
||||
IsError: true,
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := plugin.Serve(&fabricPlugin{}); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "plexum-fabric-channel-plugin: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user