From 6def33161b881078c22d44a705ea7627060f1384 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 31 May 2026 15:42:23 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=20F-6=20=E2=80=94=20attachments?= =?UTF-8?q?=20+=20final=20docs=20(F-8=20=3D=20no-op)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit F-6 attachments: - internal/attachments/ (~145 LOC + 8 tests): per-message Downloader that fetches Fabric attachment URLs into $TMPDIR/plexum-fabric// with the agent's guild token, sanitizing filenames + message ids against path-traversal - AppendFooter renders downloaded paths as a markdown footer ("Attachments:\n - /tmp/... (mime, N bytes)\n") - Agents access via the exec MCP tool (cat / file / etc.) internal/inbound.Supervisor: - new Attachments *attachments.Downloader field (nil → skip with warn) - inbound.dispatch: when message has attachments, blocking-download + AppendFooter before emitting notification (so agent's first turn sees the paths) cmd/plexum-fabric-channel-plugin: - sup.Attachments = attachments.New("") wired at init F-8 coalesce: no-op. openclaw plugin's coalesce buffered the text→thinking→tool→text segments openclaw emits across multiple deliver() calls per turn. Plexum's loop.Run returns ONE final assistant text per turn (via extractFinalText), so coalescing isn't a concern. The channel outbound posts a single message naturally. F-5b presence-sync: deferred. The openclaw plugin pushes HarborForge on-call status to Fabric's per-recipient presence so the backend can busy-discard 'announce' deliveries. Plexum's state machine has different semantics (idle/working/busy/offline) and is per-Plexum-agent not per-user; mapping requires more design. README updated with the phase status table + 16-tool list. Tests: 8 new in internal/attachments (35 total in this repo). Co-Authored-By: Claude Opus 4.7 --- README.md | 32 ++-- cmd/plexum-fabric-channel-plugin/main.go | 5 + internal/attachments/attachments.go | 185 +++++++++++++++++++++++ internal/attachments/attachments_test.go | 134 ++++++++++++++++ internal/inbound/inbound.go | 48 ++++-- 5 files changed, 384 insertions(+), 20 deletions(-) create mode 100644 internal/attachments/attachments.go create mode 100644 internal/attachments/attachments_test.go diff --git a/README.md b/README.md index 358539c..82bc52d 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,33 @@ # Plexum-fabric-channel-plugin Native Plexum channel plugin connecting Plexum agents to **Fabric** guilds as -real channel members. Inspired by `Fabric.OpenclawPlugin` — Plexum port, -delivered in phases. +real channel members. Port of `Fabric.OpenclawPlugin` — delivered in phases. ## Status -**Phase F-1 (foundation) — current**: identity registry, Center auth, REST -client, plugin scaffold, channel-binding discovery. The `send` outbound tool -posts plain text to a Fabric channel as the bound agent. +| Phase | Scope | Status | +|-------|-------|--------| +| **F-1** | identity + REST + plugin scaffold + `send` outbound | ✅ | +| **F-2** | socket.io inbound + wakeup gate + token refresh + per-channel serial | ✅ | +| **F-3** | exponential reconnect backoff | ✅ | +| **F-4** | tool surface batch 1 (8 tools) | ✅ | +| **F-5** | command-sync (slash autocomplete) | ✅ | +| **F-6** | attachments → temp-dir + footer | ✅ | +| **F-7** | sub-discussion KV + create-*-channel + discussion-complete | ✅ | +| **F-8** | coalesce parity | ⏭ no-op — Plexum doesn't segment deliver | +| **F-5b** | presence-sync | ⏭ deferred — Plexum state machine ≠ HF semantics | -**Phase F-2 (deferred)**: socket.io inbound, wakeup-gated dispatch, token -refresh, per-channel serial queue. +### Agent-facing tools (16) -**Phase F-3+ (deferred)**: tools.ts port (~15 MCP tools — channel/canvas/ -sub-discussion/etc.), presence sync, command sync, attachments, coalesce -parity. +- `send` — host-driven channel outbound (channel manager → plugin) +- `fabric-send-message`, `fabric-send-sys-msg` — post normal / system msgs +- `fabric-channel-list`, `fabric-guild-list` — discovery +- `fabric-message-history` — paginate by seq +- `fabric-channel`, `fabric-channel-set-purpose` — channel metadata +- `fabric-canvas` — get/share/update/remove canvas +- `create-{chat,work,report,discussion}-channel` — create new channels +- `discussion-complete` — summary + close +- `create-sub-discussion`, `close-sub-discussion` — sub-discussion lifecycle ## Install diff --git a/cmd/plexum-fabric-channel-plugin/main.go b/cmd/plexum-fabric-channel-plugin/main.go index 92d1328..594c7d7 100644 --- a/cmd/plexum-fabric-channel-plugin/main.go +++ b/cmd/plexum-fabric-channel-plugin/main.go @@ -24,6 +24,7 @@ import ( plugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/identity" @@ -212,6 +213,10 @@ func (p *fabricPlugin) Init(ctx context.Context, host plugin.HostAPI) error { // the host log. logger := slog.New(&hostLogHandler{host: host, level: slog.LevelInfo}) sup := inbound.New(p.client, p.tokens, p.bindings, notifier, logger) + // F-6: enable attachment downloads. Files land under + // $TMPDIR/plexum-fabric//; agents access via + // the exec tool (cat / file). + sup.Attachments = attachments.New("") go func() { defer close(p.inboundDone) if err := sup.Run(ctxBg); err != nil { diff --git a/internal/attachments/attachments.go b/internal/attachments/attachments.go new file mode 100644 index 0000000..8a47c91 --- /dev/null +++ b/internal/attachments/attachments.go @@ -0,0 +1,185 @@ +// Package attachments downloads Fabric attachment files referenced in +// inbound messages into a temp dir so the agent can read them via the +// exec tool (cat / file / etc.). +// +// Plexum's notifications/plexum/channel/inbound payload doesn't yet +// have a structured "media" field (host-side work), so we surface the +// downloaded paths by appending a markdown footer to the message text: +// +// hello, please review the attached file +// +// Attachments: +// - /tmp/plexum-fabric//proposal.md (text/markdown, 3142 bytes) +// +// Agent then uses the exec tool to read whichever paths it cares about. +package attachments + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + "time" +) + +// DownloadResult tracks one downloaded file. +type DownloadResult struct { + LocalPath string + Name string + MimeType string + Size int64 +} + +// Downloader fetches attachments using a per-call guild token. +type Downloader struct { + HTTP *http.Client + BaseDir string // e.g. /tmp/plexum-fabric +} + +// New constructs a Downloader. baseDir defaults to /tmp/plexum-fabric. +func New(baseDir string) *Downloader { + if baseDir == "" { + baseDir = filepath.Join(os.TempDir(), "plexum-fabric") + } + return &Downloader{ + HTTP: &http.Client{Timeout: 30 * time.Second}, + BaseDir: baseDir, + } +} + +// FetchAll downloads every attachment for messageID; returns the +// successful results. Per-attachment errors are accumulated as nil +// entries' "err" field skipped — caller may log via supplied logger. +// Files land under //. +func (d *Downloader) FetchAll(ctx context.Context, guildToken, messageID string, urls []AttachmentRef) []DownloadResult { + if len(urls) == 0 { + return nil + } + dir := filepath.Join(d.BaseDir, sanitizeID(messageID)) + if err := os.MkdirAll(dir, 0o700); err != nil { + return nil + } + var out []DownloadResult + for i, ref := range urls { + if ref.URL == "" { + continue + } + name := ref.Name + if name == "" { + name = fmt.Sprintf("attachment-%d", i) + } + name = sanitizeFilename(name) + dst := filepath.Join(dir, name) + size, err := d.fetchOne(ctx, guildToken, ref.URL, dst) + if err != nil { + continue + } + out = append(out, DownloadResult{ + LocalPath: dst, Name: name, MimeType: ref.MimeType, Size: size, + }) + } + return out +} + +// fetchOne downloads ref.URL into dst with auth. Returns bytes written. +func (d *Downloader) fetchOne(ctx context.Context, guildToken, src, dst string) (int64, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, src, nil) + if err != nil { + return 0, err + } + if guildToken != "" { + req.Header.Set("authorization", "Bearer "+guildToken) + } + resp, err := d.HTTP.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return 0, fmt.Errorf("attachments: %s -> %d", src, resp.StatusCode) + } + f, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o600) + if err != nil { + return 0, err + } + defer f.Close() + return io.Copy(f, resp.Body) +} + +// AttachmentRef is the minimum the downloader needs per file. +type AttachmentRef struct { + URL string + Name string + MimeType string +} + +// AppendFooter returns body with a markdown footer listing the +// downloaded attachments (in agent-friendly form). Empty list → +// body unchanged. +func AppendFooter(body string, files []DownloadResult) string { + if len(files) == 0 { + return body + } + var sb strings.Builder + sb.WriteString(body) + if !strings.HasSuffix(body, "\n") { + sb.WriteByte('\n') + } + sb.WriteString("\nAttachments:\n") + for _, f := range files { + fmt.Fprintf(&sb, " - %s (%s, %d bytes)\n", f.LocalPath, dispMime(f.MimeType), f.Size) + } + return sb.String() +} + +// ---- helpers ---- + +func sanitizeFilename(name string) string { + // Strip path separators; collapse weird chars. Keep ext. + name = filepath.Base(name) + if name == "" || name == "." || name == "/" { + return "attachment.bin" + } + // Reject query strings if any leaked in via name=url. + if i := strings.IndexAny(name, "?#"); i > 0 { + name = name[:i] + } + if u, err := url.QueryUnescape(name); err == nil { + name = u + } + // Drop any leading dots so we don't end up writing hidden files. + name = strings.TrimLeft(name, ".") + if name == "" { + return "attachment.bin" + } + return name +} + +func sanitizeID(s string) string { + // Allow [a-zA-Z0-9_-]; replace anything else with '_'. + var sb strings.Builder + sb.Grow(len(s)) + for _, r := range s { + switch { + case r >= '0' && r <= '9', r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r == '-', r == '_': + sb.WriteRune(r) + default: + sb.WriteByte('_') + } + } + if sb.Len() == 0 { + return "msg" + } + return sb.String() +} + +func dispMime(m string) string { + if m == "" { + return "application/octet-stream" + } + return m +} diff --git a/internal/attachments/attachments_test.go b/internal/attachments/attachments_test.go new file mode 100644 index 0000000..8a539d6 --- /dev/null +++ b/internal/attachments/attachments_test.go @@ -0,0 +1,134 @@ +package attachments + +import ( + "context" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestSanitizeFilename(t *testing.T) { + cases := map[string]string{ + "proposal.md": "proposal.md", + "../etc/passwd": "passwd", + "": "attachment.bin", + "...": "attachment.bin", + "a?b=c": "a", + "weird name.png": "weird name.png", + "name%20encoded.txt": "name encoded.txt", + } + for in, want := range cases { + got := sanitizeFilename(in) + if got != want { + t.Errorf("sanitizeFilename(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSanitizeID(t *testing.T) { + cases := map[string]string{ + "m-abc-123": "m-abc-123", + "../../etc/passwd": "______etc_passwd", + "": "msg", + "with space": "with_space", + } + for in, want := range cases { + got := sanitizeID(in) + if got != want { + t.Errorf("sanitizeID(%q) = %q, want %q", in, got, want) + } + } +} + +func TestAppendFooterEmpty(t *testing.T) { + got := AppendFooter("hello", nil) + if got != "hello" { + t.Errorf("empty list should preserve body, got %q", got) + } +} + +func TestAppendFooterFormats(t *testing.T) { + files := []DownloadResult{ + {LocalPath: "/tmp/a.md", Name: "a.md", MimeType: "text/markdown", Size: 42}, + {LocalPath: "/tmp/b.png", Name: "b.png", MimeType: "", Size: 1024}, + } + out := AppendFooter("hello\nworld", files) + if !strings.Contains(out, "hello\nworld\n\nAttachments:\n") { + t.Errorf("footer placement wrong: %q", out) + } + if !strings.Contains(out, "- /tmp/a.md (text/markdown, 42 bytes)") { + t.Errorf("first entry: %q", out) + } + if !strings.Contains(out, "- /tmp/b.png (application/octet-stream, 1024 bytes)") { + t.Errorf("default mime: %q", out) + } +} + +func TestFetchAllHappyPath(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Write([]byte("file contents " + r.URL.Path)) + })) + defer srv.Close() + d := New(filepath.Join(t.TempDir(), "att")) + files := d.FetchAll(context.Background(), "tok", "msg-1", []AttachmentRef{ + {URL: srv.URL + "/foo.md", Name: "foo.md", MimeType: "text/markdown"}, + {URL: srv.URL + "/bar.png", Name: "bar.png"}, + }) + if len(files) != 2 { + t.Fatalf("len = %d", len(files)) + } + if !strings.HasSuffix(files[0].LocalPath, "/msg-1/foo.md") { + t.Errorf("path = %s", files[0].LocalPath) + } + raw, _ := os.ReadFile(files[0].LocalPath) + if !strings.HasPrefix(string(raw), "file contents") { + t.Errorf("file content = %q", raw) + } +} + +func TestFetchAllSkipsErrors(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.Contains(r.URL.Path, "ok") { + w.Write([]byte("ok")) + } else { + w.WriteHeader(404) + } + })) + defer srv.Close() + d := New(filepath.Join(t.TempDir(), "att")) + files := d.FetchAll(context.Background(), "", "m", []AttachmentRef{ + {URL: srv.URL + "/ok.txt", Name: "ok.txt"}, + {URL: srv.URL + "/missing.txt", Name: "missing.txt"}, + }) + if len(files) != 1 { + t.Errorf("expected 1 result (404 skipped), got %d", len(files)) + } +} + +func TestFetchAllEmptyListNoMkdir(t *testing.T) { + d := New(filepath.Join(t.TempDir(), "att")) + got := d.FetchAll(context.Background(), "", "m", nil) + if got != nil { + t.Errorf("empty → nil, got %v", got) + } +} + +func TestFetchAllSendsAuthHeader(t *testing.T) { + gotAuth := "" + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("authorization") + w.Write([]byte("x")) + })) + defer srv.Close() + d := New(filepath.Join(t.TempDir(), "att")) + d.FetchAll(context.Background(), "the-token", "m", []AttachmentRef{ + {URL: srv.URL + "/a", Name: "a"}, + }) + if gotAuth != "Bearer the-token" { + t.Errorf("auth header = %q", gotAuth) + } +} diff --git a/internal/inbound/inbound.go b/internal/inbound/inbound.go index aa263d3..1b0c789 100644 --- a/internal/inbound/inbound.go +++ b/internal/inbound/inbound.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/attachments" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/config" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/fabric" "git.hangman-lab.top/hzhang/Plexum-fabric-channel-plugin/internal/socketio" @@ -53,13 +54,17 @@ type Notifier func(channelName, message, sessionID string) // Supervisor owns the per-(agent, guild) socket.io connections. Run // blocks until ctx is cancelled. type Supervisor struct { - Client *fabric.Client - Tokens *tokens.Cache - Bindings []config.FabricBinding - ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding - AgentIDs []string // unique agents to bring up - Notify Notifier - Logger *slog.Logger + Client *fabric.Client + Tokens *tokens.Cache + Bindings []config.FabricBinding + ByFabric config.ByFabricChannel // (guild_node, channel_id) → binding + AgentIDs []string // unique agents to bring up + Notify Notifier + Logger *slog.Logger + // Attachments, if non-nil, downloads message attachments to a temp + // dir and appends paths to the message body before notify. nil → + // attachments are dropped silently (warning logged). + Attachments *attachments.Downloader // Deduplicate (agentID, messageId) → already dispatched. Bounded // to ~5000 entries (matches openclaw). @@ -68,8 +73,8 @@ type Supervisor struct { // Per-channel serial chain: each channel id maps to a chan of // queued task funcs. Channel goroutine drains them in order. - chainMu sync.Mutex - chains map[string]chan func() + chainMu sync.Mutex + chains map[string]chan func() } // New constructs a Supervisor with deduped agentIDs derived from bindings. @@ -371,13 +376,36 @@ func (s *Supervisor) dispatch(agentID, guildNodeID, selfUserID string, m *Fabric } logger.Info("inbound: dispatch", - "channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType) + "channel", m.ChannelID, "agent", agentID, "msg", m.MessageID, "xtype", m.XType, + "attachments", len(m.Attachments)) // Per-channel serial queue. Concurrent inbounds on the same channel // must run one at a time so Plexum's state machine sees clean turns. sessionID := "s_fab_" + m.ChannelID body := m.Content plexumChannel := binding.PlexumChannelName + // Attachment download is fire-once-per-message: we'd rather block + // here so the agent's first turn sees the paths, than download + // async and have the agent miss them on the first read. + if len(m.Attachments) > 0 && s.Attachments != nil { + tok, terr := s.Tokens.GuildToken(context.Background(), agentID, guildNodeID) + if terr == nil { + refs := make([]attachments.AttachmentRef, len(m.Attachments)) + for i, a := range m.Attachments { + refs[i] = attachments.AttachmentRef{ + URL: a.URL, Name: a.Name, MimeType: a.MimeType, + } + } + files := s.Attachments.FetchAll(context.Background(), tok, m.MessageID, refs) + body = attachments.AppendFooter(body, files) + if len(files) < len(refs) { + logger.Warn("inbound: some attachments failed to download", + "requested", len(refs), "got", len(files)) + } + } else { + logger.Warn("inbound: attachment guild token failed", "err", terr) + } + } s.enqueueChannel(m.ChannelID, func() { s.Notify(plexumChannel, body, sessionID) })