feat: Phase F-6 — attachments + final docs (F-8 = no-op)
F-6 attachments:
- internal/attachments/ (~145 LOC + 8 tests): per-message Downloader
that fetches Fabric attachment URLs into $TMPDIR/plexum-fabric/<msg-id>/
with the agent's guild token, sanitizing filenames + message ids
against path-traversal
- AppendFooter renders downloaded paths as a markdown footer
("Attachments:\n - /tmp/... (mime, N bytes)\n")
- Agents access via the exec MCP tool (cat / file / etc.)
internal/inbound.Supervisor:
- new Attachments *attachments.Downloader field (nil → skip with warn)
- inbound.dispatch: when message has attachments, blocking-download +
AppendFooter before emitting notification (so agent's first turn
sees the paths)
cmd/plexum-fabric-channel-plugin:
- sup.Attachments = attachments.New("") wired at init
F-8 coalesce: no-op. openclaw plugin's coalesce buffered the
text→thinking→tool→text segments openclaw emits across multiple
deliver() calls per turn. Plexum's loop.Run returns ONE final assistant
text per turn (via extractFinalText), so coalescing isn't a concern.
The channel outbound posts a single message naturally.
F-5b presence-sync: deferred. The openclaw plugin pushes HarborForge
on-call status to Fabric's per-recipient presence so the backend can
busy-discard 'announce' deliveries. Plexum's state machine has
different semantics (idle/working/busy/offline) and is per-Plexum-agent
not per-user; mapping requires more design.
README updated with the phase status table + 16-tool list.
Tests: 8 new in internal/attachments (35 total in this repo).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
32
README.md
32
README.md
@@ -1,21 +1,33 @@
|
||||
# Plexum-fabric-channel-plugin
|
||||
|
||||
Native Plexum channel plugin connecting Plexum agents to **Fabric** guilds as
|
||||
real channel members. Inspired by `Fabric.OpenclawPlugin` — Plexum port,
|
||||
delivered in phases.
|
||||
real channel members. Port of `Fabric.OpenclawPlugin` — delivered in phases.
|
||||
|
||||
## Status
|
||||
|
||||
**Phase F-1 (foundation) — current**: identity registry, Center auth, REST
|
||||
client, plugin scaffold, channel-binding discovery. The `send` outbound tool
|
||||
posts plain text to a Fabric channel as the bound agent.
|
||||
| Phase | Scope | Status |
|
||||
|-------|-------|--------|
|
||||
| **F-1** | identity + REST + plugin scaffold + `send` outbound | ✅ |
|
||||
| **F-2** | socket.io inbound + wakeup gate + token refresh + per-channel serial | ✅ |
|
||||
| **F-3** | exponential reconnect backoff | ✅ |
|
||||
| **F-4** | tool surface batch 1 (8 tools) | ✅ |
|
||||
| **F-5** | command-sync (slash autocomplete) | ✅ |
|
||||
| **F-6** | attachments → temp-dir + footer | ✅ |
|
||||
| **F-7** | sub-discussion KV + create-*-channel + discussion-complete | ✅ |
|
||||
| **F-8** | coalesce parity | ⏭ no-op — Plexum doesn't segment deliver |
|
||||
| **F-5b** | presence-sync | ⏭ deferred — Plexum state machine ≠ HF semantics |
|
||||
|
||||
**Phase F-2 (deferred)**: socket.io inbound, wakeup-gated dispatch, token
|
||||
refresh, per-channel serial queue.
|
||||
### Agent-facing tools (16)
|
||||
|
||||
**Phase F-3+ (deferred)**: tools.ts port (~15 MCP tools — channel/canvas/
|
||||
sub-discussion/etc.), presence sync, command sync, attachments, coalesce
|
||||
parity.
|
||||
- `send` — host-driven channel outbound (channel manager → plugin)
|
||||
- `fabric-send-message`, `fabric-send-sys-msg` — post normal / system msgs
|
||||
- `fabric-channel-list`, `fabric-guild-list` — discovery
|
||||
- `fabric-message-history` — paginate by seq
|
||||
- `fabric-channel`, `fabric-channel-set-purpose` — channel metadata
|
||||
- `fabric-canvas` — get/share/update/remove canvas
|
||||
- `create-{chat,work,report,discussion}-channel` — create new channels
|
||||
- `discussion-complete` — summary + close
|
||||
- `create-sub-discussion`, `close-sub-discussion` — sub-discussion lifecycle
|
||||
|
||||
## Install
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity"
|
||||
@@ -212,6 +213,10 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error {
|
||||
// the host log.
|
||||
logger := slog.New(&hostLogHandler{host: host, level: slog.LevelInfo})
|
||||
sup := inbound.New(p.client, p.tokens, p.bindings, notifier, logger)
|
||||
// F-6: enable attachment downloads. Files land under
|
||||
// $TMPDIR/plexum-fabric/<msg-id>/<filename>; agents access via
|
||||
// the exec tool (cat / file).
|
||||
sup.Attachments = attachments.New("")
|
||||
go func() {
|
||||
defer close(p.inboundDone)
|
||||
if err := sup.Run(ctxBg); err != nil {
|
||||
|
||||
185
internal/attachments/attachments.go
Normal file
185
internal/attachments/attachments.go
Normal file
@@ -0,0 +1,185 @@
|
||||
// Package attachments downloads Fabric attachment files referenced in
|
||||
// inbound messages into a temp dir so the agent can read them via the
|
||||
// exec tool (cat / file / etc.).
|
||||
//
|
||||
// Plexum's notifications/plexum/channel/inbound payload doesn't yet
|
||||
// have a structured "media" field (host-side work), so we surface the
|
||||
// downloaded paths by appending a markdown footer to the message text:
|
||||
//
|
||||
// hello, please review the attached file
|
||||
//
|
||||
// Attachments:
|
||||
// - /tmp/plexum-fabric/<msg-id>/proposal.md (text/markdown, 3142 bytes)
|
||||
//
|
||||
// Agent then uses the exec tool to read whichever paths it cares about.
|
||||
package attachments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DownloadResult tracks one downloaded file.
|
||||
type DownloadResult struct {
|
||||
LocalPath string
|
||||
Name string
|
||||
MimeType string
|
||||
Size int64
|
||||
}
|
||||
|
||||
// Downloader fetches attachments using a per-call guild token.
|
||||
type Downloader struct {
|
||||
HTTP *http.Client
|
||||
BaseDir string // e.g. /tmp/plexum-fabric
|
||||
}
|
||||
|
||||
// New constructs a Downloader. baseDir defaults to /tmp/plexum-fabric.
|
||||
func New(baseDir string) *Downloader {
|
||||
if baseDir == "" {
|
||||
baseDir = filepath.Join(os.TempDir(), "plexum-fabric")
|
||||
}
|
||||
return &Downloader{
|
||||
HTTP: &http.Client{Timeout: 30 * time.Second},
|
||||
BaseDir: baseDir,
|
||||
}
|
||||
}
|
||||
|
||||
// FetchAll downloads every attachment for messageID; returns the
|
||||
// successful results. Per-attachment errors are accumulated as nil
|
||||
// entries' "err" field skipped — caller may log via supplied logger.
|
||||
// Files land under <BaseDir>/<messageID>/<filename>.
|
||||
func (d *Downloader) FetchAll(ctx context.Context, guildToken, messageID string, urls []AttachmentRef) []DownloadResult {
|
||||
if len(urls) == 0 {
|
||||
return nil
|
||||
}
|
||||
dir := filepath.Join(d.BaseDir, sanitizeID(messageID))
|
||||
if err := os.MkdirAll(dir, 0o700); err != nil {
|
||||
return nil
|
||||
}
|
||||
var out []DownloadResult
|
||||
for i, ref := range urls {
|
||||
if ref.URL == "" {
|
||||
continue
|
||||
}
|
||||
name := ref.Name
|
||||
if name == "" {
|
||||
name = fmt.Sprintf("attachment-%d", i)
|
||||
}
|
||||
name = sanitizeFilename(name)
|
||||
dst := filepath.Join(dir, name)
|
||||
size, err := d.fetchOne(ctx, guildToken, ref.URL, dst)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
out = append(out, DownloadResult{
|
||||
LocalPath: dst, Name: name, MimeType: ref.MimeType, Size: size,
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// fetchOne downloads ref.URL into dst with auth. Returns bytes written.
|
||||
func (d *Downloader) fetchOne(ctx context.Context, guildToken, src, dst string) (int64, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, src, nil)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if guildToken != "" {
|
||||
req.Header.Set("authorization", "Bearer "+guildToken)
|
||||
}
|
||||
resp, err := d.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return 0, fmt.Errorf("attachments: %s -> %d", src, resp.StatusCode)
|
||||
}
|
||||
f, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer f.Close()
|
||||
return io.Copy(f, resp.Body)
|
||||
}
|
||||
|
||||
// AttachmentRef is the minimum the downloader needs per file.
|
||||
type AttachmentRef struct {
|
||||
URL string
|
||||
Name string
|
||||
MimeType string
|
||||
}
|
||||
|
||||
// AppendFooter returns body with a markdown footer listing the
|
||||
// downloaded attachments (in agent-friendly form). Empty list →
|
||||
// body unchanged.
|
||||
func AppendFooter(body string, files []DownloadResult) string {
|
||||
if len(files) == 0 {
|
||||
return body
|
||||
}
|
||||
var sb strings.Builder
|
||||
sb.WriteString(body)
|
||||
if !strings.HasSuffix(body, "\n") {
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
sb.WriteString("\nAttachments:\n")
|
||||
for _, f := range files {
|
||||
fmt.Fprintf(&sb, " - %s (%s, %d bytes)\n", f.LocalPath, dispMime(f.MimeType), f.Size)
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
// ---- helpers ----
|
||||
|
||||
func sanitizeFilename(name string) string {
|
||||
// Strip path separators; collapse weird chars. Keep ext.
|
||||
name = filepath.Base(name)
|
||||
if name == "" || name == "." || name == "/" {
|
||||
return "attachment.bin"
|
||||
}
|
||||
// Reject query strings if any leaked in via name=url.
|
||||
if i := strings.IndexAny(name, "?#"); i > 0 {
|
||||
name = name[:i]
|
||||
}
|
||||
if u, err := url.QueryUnescape(name); err == nil {
|
||||
name = u
|
||||
}
|
||||
// Drop any leading dots so we don't end up writing hidden files.
|
||||
name = strings.TrimLeft(name, ".")
|
||||
if name == "" {
|
||||
return "attachment.bin"
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
func sanitizeID(s string) string {
|
||||
// Allow [a-zA-Z0-9_-]; replace anything else with '_'.
|
||||
var sb strings.Builder
|
||||
sb.Grow(len(s))
|
||||
for _, r := range s {
|
||||
switch {
|
||||
case r >= '0' && r <= '9', r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r == '-', r == '_':
|
||||
sb.WriteRune(r)
|
||||
default:
|
||||
sb.WriteByte('_')
|
||||
}
|
||||
}
|
||||
if sb.Len() == 0 {
|
||||
return "msg"
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func dispMime(m string) string {
|
||||
if m == "" {
|
||||
return "application/octet-stream"
|
||||
}
|
||||
return m
|
||||
}
|
||||
134
internal/attachments/attachments_test.go
Normal file
134
internal/attachments/attachments_test.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package attachments
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSanitizeFilename(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"proposal.md": "proposal.md",
|
||||
"../etc/passwd": "passwd",
|
||||
"": "attachment.bin",
|
||||
"...": "attachment.bin",
|
||||
"a?b=c": "a",
|
||||
"weird name.png": "weird name.png",
|
||||
"name%20encoded.txt": "name encoded.txt",
|
||||
}
|
||||
for in, want := range cases {
|
||||
got := sanitizeFilename(in)
|
||||
if got != want {
|
||||
t.Errorf("sanitizeFilename(%q) = %q, want %q", in, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSanitizeID(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"m-abc-123": "m-abc-123",
|
||||
"../../etc/passwd": "______etc_passwd",
|
||||
"": "msg",
|
||||
"with space": "with_space",
|
||||
}
|
||||
for in, want := range cases {
|
||||
got := sanitizeID(in)
|
||||
if got != want {
|
||||
t.Errorf("sanitizeID(%q) = %q, want %q", in, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendFooterEmpty(t *testing.T) {
|
||||
got := AppendFooter("hello", nil)
|
||||
if got != "hello" {
|
||||
t.Errorf("empty list should preserve body, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendFooterFormats(t *testing.T) {
|
||||
files := []DownloadResult{
|
||||
{LocalPath: "/tmp/a.md", Name: "a.md", MimeType: "text/markdown", Size: 42},
|
||||
{LocalPath: "/tmp/b.png", Name: "b.png", MimeType: "", Size: 1024},
|
||||
}
|
||||
out := AppendFooter("hello\nworld", files)
|
||||
if !strings.Contains(out, "hello\nworld\n\nAttachments:\n") {
|
||||
t.Errorf("footer placement wrong: %q", out)
|
||||
}
|
||||
if !strings.Contains(out, "- /tmp/a.md (text/markdown, 42 bytes)") {
|
||||
t.Errorf("first entry: %q", out)
|
||||
}
|
||||
if !strings.Contains(out, "- /tmp/b.png (application/octet-stream, 1024 bytes)") {
|
||||
t.Errorf("default mime: %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchAllHappyPath(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(200)
|
||||
w.Write([]byte("file contents " + r.URL.Path))
|
||||
}))
|
||||
defer srv.Close()
|
||||
d := New(filepath.Join(t.TempDir(), "att"))
|
||||
files := d.FetchAll(context.Background(), "tok", "msg-1", []AttachmentRef{
|
||||
{URL: srv.URL + "/foo.md", Name: "foo.md", MimeType: "text/markdown"},
|
||||
{URL: srv.URL + "/bar.png", Name: "bar.png"},
|
||||
})
|
||||
if len(files) != 2 {
|
||||
t.Fatalf("len = %d", len(files))
|
||||
}
|
||||
if !strings.HasSuffix(files[0].LocalPath, "/msg-1/foo.md") {
|
||||
t.Errorf("path = %s", files[0].LocalPath)
|
||||
}
|
||||
raw, _ := os.ReadFile(files[0].LocalPath)
|
||||
if !strings.HasPrefix(string(raw), "file contents") {
|
||||
t.Errorf("file content = %q", raw)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchAllSkipsErrors(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "ok") {
|
||||
w.Write([]byte("ok"))
|
||||
} else {
|
||||
w.WriteHeader(404)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
d := New(filepath.Join(t.TempDir(), "att"))
|
||||
files := d.FetchAll(context.Background(), "", "m", []AttachmentRef{
|
||||
{URL: srv.URL + "/ok.txt", Name: "ok.txt"},
|
||||
{URL: srv.URL + "/missing.txt", Name: "missing.txt"},
|
||||
})
|
||||
if len(files) != 1 {
|
||||
t.Errorf("expected 1 result (404 skipped), got %d", len(files))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchAllEmptyListNoMkdir(t *testing.T) {
|
||||
d := New(filepath.Join(t.TempDir(), "att"))
|
||||
got := d.FetchAll(context.Background(), "", "m", nil)
|
||||
if got != nil {
|
||||
t.Errorf("empty → nil, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchAllSendsAuthHeader(t *testing.T) {
|
||||
gotAuth := ""
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gotAuth = r.Header.Get("authorization")
|
||||
w.Write([]byte("x"))
|
||||
}))
|
||||
defer srv.Close()
|
||||
d := New(filepath.Join(t.TempDir(), "att"))
|
||||
d.FetchAll(context.Background(), "the-token", "m", []AttachmentRef{
|
||||
{URL: srv.URL + "/a", Name: "a"},
|
||||
})
|
||||
if gotAuth != "Bearer the-token" {
|
||||
t.Errorf("auth header = %q", gotAuth)
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio"
|
||||
@@ -53,13 +54,17 @@ type Notifier func(channelName, message, sessionID string)
|
||||
// Supervisor owns the per-(agent, guild) socket.io connections. Run
|
||||
// blocks until ctx is cancelled.
|
||||
type Supervisor struct {
|
||||
Client *fabric.Client
|
||||
Tokens *tokens.Cache
|
||||
Bindings []config.FabricBinding
|
||||
ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding
|
||||
AgentIDs []string // unique agents to bring up
|
||||
Notify Notifier
|
||||
Logger *slog.Logger
|
||||
Client *fabric.Client
|
||||
Tokens *tokens.Cache
|
||||
Bindings []config.FabricBinding
|
||||
ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding
|
||||
AgentIDs []string // unique agents to bring up
|
||||
Notify Notifier
|
||||
Logger *slog.Logger
|
||||
// Attachments, if non-nil, downloads message attachments to a temp
|
||||
// dir and appends paths to the message body before notify. nil →
|
||||
// attachments are dropped silently (warning logged).
|
||||
Attachments *attachments.Downloader
|
||||
|
||||
// Deduplicate (agentID, messageId) → already dispatched. Bounded
|
||||
// to ~5000 entries (matches openclaw).
|
||||
@@ -68,8 +73,8 @@ type Supervisor struct {
|
||||
|
||||
// Per-channel serial chain: each channel id maps to a chan of
|
||||
// queued task funcs. Channel goroutine drains them in order.
|
||||
chainMu sync.Mutex
|
||||
chains map[string]chan func()
|
||||
chainMu sync.Mutex
|
||||
chains map[string]chan func()
|
||||
}
|
||||
|
||||
// New constructs a Supervisor with deduped agentIDs derived from bindings.
|
||||
@@ -371,13 +376,36 @@ func (s *Supervisor) dispatch(agentID, guildNodeID, selfUserID string, m *Fabric
|
||||
}
|
||||
|
||||
logger.Info("inbound: dispatch",
|
||||
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType)
|
||||
"channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType,
|
||||
"attachments", len(m.Attachments))
|
||||
|
||||
// Per-channel serial queue. Concurrent inbounds on the same channel
|
||||
// must run one at a time so Plexum's state machine sees clean turns.
|
||||
sessionID := "s_fab_" + m.ChannelID
|
||||
body := m.Content
|
||||
plexumChannel := binding.PlexumChannelName
|
||||
// Attachment download is fire-once-per-message: we'd rather block
|
||||
// here so the agent's first turn sees the paths, than download
|
||||
// async and have the agent miss them on the first read.
|
||||
if len(m.Attachments) > 0 && s.Attachments != nil {
|
||||
tok, terr := s.Tokens.GuildToken(context.Background(), agentID, guildNodeID)
|
||||
if terr == nil {
|
||||
refs := make([]attachments.AttachmentRef, len(m.Attachments))
|
||||
for i, a := range m.Attachments {
|
||||
refs[i] = attachments.AttachmentRef{
|
||||
URL: a.URL, Name: a.Name, MimeType: a.MimeType,
|
||||
}
|
||||
}
|
||||
files := s.Attachments.FetchAll(context.Background(), tok, m.MessageID, refs)
|
||||
body = attachments.AppendFooter(body, files)
|
||||
if len(files) < len(refs) {
|
||||
logger.Warn("inbound: some attachments failed to download",
|
||||
"requested", len(refs), "got", len(files))
|
||||
}
|
||||
} else {
|
||||
logger.Warn("inbound: attachment guild token failed", "err", terr)
|
||||
}
|
||||
}
|
||||
s.enqueueChannel(m.ChannelID, func() {
|
||||
s.Notify(plexumChannel, body, sessionID)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user