From 754e5183f7878a312d805797a9f0911da046bbc3 Mon Sep 17 00:00:00 2001 From: hzhang Date: Wed, 3 Jun 2026 11:11:36 +0100 Subject: [PATCH] initial: HarborForge plugin for Plexum (port of OpenclawPlugin) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plugin id `harbor-forge` mirrors the OpenClaw counterpart's runtime surface on top of the Plexum SDK: * eager activation — Monitor bridge + Calendar scheduler boot at host start, before any agent turn fires * monitor bridge: HTTP 127.0.0.1: serving /telemetry + /health for HarborForge.Monitor * calendar scheduler: heartbeats /calendar/agent/ heartbeat, dispatches returned slots via HostAPI.WakeAgent (state-aware queue, depth-1 replace-newest), tracks active slot state in-memory, terminal status pushed back to backend * 9 harborforge_* tools (status / telemetry / monitor_telemetry / calendar_{status,complete,abort,pause,resume} / restart_status) Key differences from OpenClaw equivalent: * api.spawn → HostAPI.WakeAgent (new SDK primitive) * api.getAgentStatus → HostAPI.ReadAgentState (existing) * --install-monitor / --install-cli not included; Monitor + hf CLI deploy via the HangmanLab.Server.T3 docker compose layer Initial drop. TODO before v1 ship: * tool ctx → calling-agent-id: SDK doesn't currently expose; v1 falls back to a single-active-slot heuristic in main.bestEffortAgentID * tests for the bridge + scheduler --- .gitignore | 2 + Makefile | 28 +++ README.md | 125 ++++++++++ cmd/plexum-harborforge-plugin/main.go | 258 +++++++++++++++++++++ go.mod | 9 + internal/calendar/bridge.go | 129 +++++++++++ internal/calendar/scheduler.go | 316 ++++++++++++++++++++++++++ internal/calendar/types.go | 106 +++++++++ internal/config/config.go | 148 ++++++++++++ internal/monitor/bridge.go | 120 ++++++++++ internal/telemetry/collector.go | 226 ++++++++++++++++++ internal/telemetry/diskstat_linux.go | 20 ++ internal/tools/tools.go | 213 +++++++++++++++++ manifest.json | 55 +++++ scripts/install.sh | 48 ++++ 15 files changed, 1803 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 README.md create mode 100644 cmd/plexum-harborforge-plugin/main.go create mode 100644 go.mod create mode 100644 internal/calendar/bridge.go create mode 100644 internal/calendar/scheduler.go create mode 100644 internal/calendar/types.go create mode 100644 internal/config/config.go create mode 100644 internal/monitor/bridge.go create mode 100644 internal/telemetry/collector.go create mode 100644 internal/telemetry/diskstat_linux.go create mode 100644 internal/tools/tools.go create mode 100644 manifest.json create mode 100755 scripts/install.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c768500 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +dist/ +*.tmp diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..193f1f1 --- /dev/null +++ b/Makefile @@ -0,0 +1,28 @@ +VERSION := $(shell git describe --tags --always --dirty 2>/dev/null || echo dev) +GO_ENV := CGO_ENABLED=0 + +.PHONY: build install clean help + +help: + @echo "HarborForge.PlexumPlugin build targets:" + @echo " build - compile binary + bundle manifest into dist/" + @echo " install - copy binary + manifest into ~/.plexum/plugins/harbor-forge/" + @echo " clean - rm -rf dist/" + +build: + mkdir -p dist + $(GO_ENV) go build -ldflags="-X main.Version=$(VERSION)" \ + -o dist/plexum-harborforge-plugin ./cmd/plexum-harborforge-plugin + cp manifest.json dist/manifest.json + @echo "Built to dist/ (version=$(VERSION))" + +install: build + mkdir -p ~/.plexum/plugins/harbor-forge + cp dist/plexum-harborforge-plugin ~/.plexum/plugins/harbor-forge/ + cp dist/manifest.json ~/.plexum/plugins/harbor-forge/ + @echo "Installed to ~/.plexum/plugins/harbor-forge/" + @echo "Add to ~/.plexum/plexum.json .plugins.allow: 'harbor-forge'" + @echo "Config goes at ~/.plexum/plugins/harbor-forge/config.json (see README)" + +clean: + rm -rf dist/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..d7ee455 --- /dev/null +++ b/README.md @@ -0,0 +1,125 @@ +# HarborForge.PlexumPlugin + +Plexum-side equivalent of [HarborForge.OpenclawPlugin](https://git.hangman-lab.top/zhi/HarborForge.OpenclawPlugin): +exposes Plexum-side telemetry to the HarborForge Monitor bridge, +drives the HarborForge Calendar scheduler, and gives agents a tool +surface for the same calendar lifecycle actions OpenClaw agents had. + +Part of the [HarborForge](../README.md) platform; tracked as a git +submodule of the HarborForge umbrella repo. + +- Plugin id: `harbor-forge` (matches the OpenClaw counterpart so the + backend's per-plugin schemas don't fork) +- Plugin version: `0.1.0` +- Activation: `eager` — Monitor bridge + Calendar scheduler must be + running before any agent turn fires +- Plexum SDK version: requires `Plexum-sdk-go` with `HostAPI.WakeAgent` + (commit 216cf21 or later) + +## What it does + +- **Monitor bridge** — HTTP server on `127.0.0.1:` that + responds to `/telemetry` with a Snapshot the HarborForge.Monitor + binary expects (system metrics + every Plexum agent's sm-state) +- **Calendar scheduler** — heartbeats `/calendar/agent/ + heartbeat` every interval, receives any TimeSlots due to fire, and + dispatches them through `HostAPI.WakeAgent` (state-aware queue + with depth-1 replace-newest) +- **9 harborforge_* tools** mirroring the OpenClaw plugin's surface + +| Tool | Use | +|---|---| +| `harborforge_status` | resolved config + Monitor bridge health + Calendar status + telemetry snapshot | +| `harborforge_telemetry` | fresh system + agent metrics | +| `harborforge_monitor_telemetry` | last bridge query timing + last snapshot served | +| `harborforge_calendar_status` | active slot(s) + history + heartbeat clock | +| `harborforge_calendar_complete` | mark active slot completed (+optional summary) | +| `harborforge_calendar_abort` | mark active slot aborted (+optional reason) | +| `harborforge_calendar_pause` | pause active slot (non-terminal) | +| `harborforge_calendar_resume` | resume a paused slot | +| `harborforge_restart_status` | backend restart-pending flag + last poll time | + +## Install + +```bash +git clone --recurse-submodules https://git.hangman-lab.top/zhi/HarborForge.PlexumPlugin +cd HarborForge.PlexumPlugin +bash scripts/install.sh # or: make install +``` + +Then in `~/.plexum/plexum.json`: + +```json +{ + "plugins": { + "allow": [ + ".", + "harbor-forge" + ] + } +} +``` + +And configure at `~/.plexum/plugins/harbor-forge/config.json`: + +```json +{ + "backendUrl": "https://monitor.hangman-lab.top", + "identifier": "server-t3", + "apiKey": "g1_xxx", + "monitor_port": 9100, + "calendar_enabled": true, + "calendar_heartbeat_interval_seconds": 30 +} +``` + +Restart the host (`systemctl --user restart plexum`) and verify: + +```bash +plexum plugin-list | grep harbor +curl -s http://127.0.0.1:9100/health +curl -s http://127.0.0.1:9100/telemetry | jq .agents +``` + +## How calendar wake works + +When the backend returns a `slot_to_fire` in a heartbeat response: + +1. Scheduler builds the message from `slot.wake_options.override_message` + or falls back to `slot.prompt` +2. `host.WakeAgent({agent_id, message, source: "calendar:slot-"})` +3. Plexum host-side `wake.Manager`: + - if agent's sm-state is `idle` → runs the turn synchronously in a + goroutine against the agent's `wake` session + - else → enqueues (depth 1; new wake replaces any pending one) + - drains automatically when the running turn returns +4. The `source` tag lands on the turn's faithful event so retros can + tell which slot caused which turn + +The agent uses `harborforge_calendar_complete` / `_abort` / `_pause` / +`_resume` mid-turn to push status back to the backend. + +## Layout + +``` +HarborForge.PlexumPlugin/ +├── manifest.json # plugin manifest (eager, 9 tools) +├── go.mod # → Plexum-sdk-go (replace ../) +├── cmd/plexum-harborforge-plugin/ # main entry (Serve + Init) +├── internal/config/ # config.json schema + Resolve +├── internal/telemetry/ # /proc-based snapshot collector +├── internal/monitor/ # HTTP bridge for HF.Monitor +├── internal/calendar/ # types + backend client + scheduler +├── internal/tools/ # 9 tool implementations +└── scripts/install.sh # build + drop into ~/.plexum/plugins +``` + +## Differences vs OpenClaw equivalent + +| OpenClaw plugin | Plexum plugin | +|---|---| +| `api.registerTool(factory)` runtime | `ToolPlugin.CallTool` + manifest contract | +| `api.spawn({agentId, task})` | `HostAPI.WakeAgent({agent_id, message, source})` (state-aware queue) | +| `api.getAgentStatus()` | `HostAPI.ReadAgentState(ctx, agent_id)` | +| `--install-monitor` / `--install-cli` flags | n/a — Monitor + `hf` CLI deploy separately (e.g. via HangmanLab.Server.T3 docker compose) | +| TS source compiled by `tsc` | static Go binary built per-platform | diff --git a/cmd/plexum-harborforge-plugin/main.go b/cmd/plexum-harborforge-plugin/main.go new file mode 100644 index 0000000..36460eb --- /dev/null +++ b/cmd/plexum-harborforge-plugin/main.go @@ -0,0 +1,258 @@ +// plexum-harborforge-plugin — Plexum-side HarborForge plugin. +// +// Mirrors HarborForge.OpenclawPlugin's responsibilities, recast on the +// Plexum SDK: +// +// - eager activation: plugin spawns at host start so the Monitor +// bridge listener and Calendar scheduler are running before any +// turn fires +// - 9 harborforge_* tools backed by tools.Dispatch +// - state-aware wake-agent via HostAPI.WakeAgent for Calendar slots +// +// Config layout: /plugins/harbor-forge/config.json — see +// internal/config/config.go for the schema. +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar" + hfcfg "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config" + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor" + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry" + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/tools" +) + +// Version is injected via -ldflags "-X main.Version=…" at build time. +var Version = "0.1.0" + +// harborForgePlugin satisfies sdkplugin.ToolPlugin. +type harborForgePlugin struct { + host sdkplugin.HostAPI + cfg hfcfg.Resolved + bridge *monitor.Bridge + sched *calendar.Scheduler + deps tools.Deps + cancelBg context.CancelFunc + wg sync.WaitGroup + agentCache sync.Map // sessionID/turnID → agentID stash (best-effort) +} + +func (p *harborForgePlugin) Manifest() sdkplugin.Manifest { + return manifestFromDisk() +} + +func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) error { + p.host = host + + profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT") + if profileRoot == "" { + home, _ := os.UserHomeDir() + profileRoot = filepath.Join(home, ".plexum") + } + raw, err := hfcfg.Load(profileRoot) + if err != nil { + return fmt.Errorf("load harbor-forge config: %w", err) + } + p.cfg = hfcfg.Resolve(raw) + host.Log("info", "harbor-forge plugin initialized", map[string]any{ + "version": Version, + "backend": p.cfg.BackendURL, + "identifier": p.cfg.Identifier, + "monitor_port": p.cfg.MonitorPort, + "calendar_enabled": p.cfg.CalendarEnabled, + }) + + collect := func() telemetry.Snapshot { + return telemetry.Collect(telemetry.CollectOpts{ + Identifier: p.cfg.Identifier, + Version: Version, + AgentLister: func() []telemetry.AgentInfo { + return p.listAgents(ctx, profileRoot) + }, + }) + } + + p.bridge = monitor.New(p.cfg.MonitorPort, collect, + func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) }) + + bgCtx, cancel := context.WithCancel(context.Background()) + p.cancelBg = cancel + + if err := p.bridge.Start(bgCtx); err != nil { + host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()}) + } + + calBackend := p.cfg.CalendarBackendURL + if calBackend == "" { + calBackend = p.cfg.BackendURL + } + bridge := calendar.New(calBackend, p.cfg.APIKey) + p.sched = calendar.NewScheduler( + calendar.Config{ + HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second, + }, + bridge, host, p.cfg.Identifier, + calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"}, + func() []calendar.ReportableAgent { + return p.listReportableAgents(ctx, profileRoot) + }, + ) + if p.cfg.CalendarEnabled { + p.wg.Add(1) + go func() { + defer p.wg.Done() + if err := p.sched.Run(bgCtx); err != nil { + host.Log("warn", "calendar scheduler exited", map[string]any{"err": err.Error()}) + } + }() + } else { + host.Log("info", "calendar scheduler disabled by config", nil) + } + + p.deps = tools.Deps{ + Config: p.cfg, + Version: Version, + Collect: collect, + Bridge: p.bridge, + Scheduler: p.sched, + Host: host, + AgentIDFromCtx: func(ctx context.Context) string { + // Plexum stashes the calling agent id on the host-side + // context (via WithAgent) before dispatching tool calls. + // We can't directly import internal/agentloop from a + // plugin, so we rely on PLEXUM_TOOL_AGENT_ID env-style + // (set per-call by host when we add that wiring) or fall + // back to the only-active-agent heuristic. v1: prefer the + // only-active wake-target (deterministic in single-agent + // HF deployments). + return p.bestEffortAgentID() + }, + } + + return nil +} + +func (p *harborForgePlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (sdkplugin.ToolResult, error) { + return tools.Dispatch(ctx, p.deps, name, input) +} + +// ---- agent enumeration ---- + +// listAgents walks /agents/*/agent.json + state.json so the +// telemetry payload includes every Plexum agent visible on this host. +// Best-effort: read failures degrade to empty list. +func (p *harborForgePlugin) listAgents(ctx context.Context, profileRoot string) []telemetry.AgentInfo { + root := filepath.Join(profileRoot, "agents") + entries, err := os.ReadDir(root) + if err != nil { + return nil + } + out := make([]telemetry.AgentInfo, 0, len(entries)) + for _, e := range entries { + if !e.IsDir() { + continue + } + agentID := e.Name() + var info telemetry.AgentInfo + info.ID = agentID + if raw, err := os.ReadFile(filepath.Join(root, agentID, "agent.json")); err == nil { + var meta struct{ Model string `json:"model"` } + _ = json.Unmarshal(raw, &meta) + info.Model = meta.Model + } + // State via HostAPI.ReadAgentState — host-side, ground truth. + if snap, err := p.host.ReadAgentState(ctx, agentID); err == nil { + info.State = snap.State + } + out = append(out, info) + } + return out +} + +func (p *harborForgePlugin) listReportableAgents(ctx context.Context, profileRoot string) []calendar.ReportableAgent { + telem := p.listAgents(ctx, profileRoot) + out := make([]calendar.ReportableAgent, 0, len(telem)) + for _, a := range telem { + out = append(out, calendar.ReportableAgent{ + ID: a.ID, Model: a.Model, + State: mapStateToCalendar(a.State), + }) + } + return out +} + +func mapStateToCalendar(s string) calendar.AgentStatusValue { + switch strings.ToLower(s) { + case "idle": + return calendar.AgentStatusIdle + case "working": + return calendar.AgentStatusOnCall + case "busy": + return calendar.AgentStatusBusy + case "offline": + return calendar.AgentStatusOffline + } + return calendar.AgentStatusUnknown +} + +// bestEffortAgentID is a v1 stop-gap for tools that need the calling +// agent's id but don't have it on the ctx (Plexum SDK doesn't yet +// expose this — TODO upstream). Returns the only active calendar +// slot's agent if there's exactly one; otherwise empty. The calendar +// tools (the only ones that need agent context) usually fire when +// exactly one slot is active. +func (p *harborForgePlugin) bestEffortAgentID() string { + sch := p.sched.Status() + if len(sch.Active) == 1 { + return sch.Active[0].Slot.AgentID + } + return "" +} + +func manifestFromDisk() sdkplugin.Manifest { + // Bundled manifest.json is the authoritative shape; the binary + // version reads it next to itself to avoid hand-syncing two + // definitions. Falls back to a minimal in-code manifest if the + // file is missing (development / first build). + exe, err := os.Executable() + if err == nil { + raw, err := os.ReadFile(filepath.Join(filepath.Dir(exe), "manifest.json")) + if err == nil { + var m sdkplugin.Manifest + if err := json.Unmarshal(raw, &m); err == nil && m.Name != "" { + return m + } + } + } + return sdkplugin.Manifest{ + Name: "harbor-forge", + Version: Version, + Activation: sdkplugin.ActivationEager, + Executable: "plexum-harborforge-plugin", + } +} + +func main() { + p := &harborForgePlugin{} + defer func() { + if p.cancelBg != nil { + p.cancelBg() + } + p.wg.Wait() + }() + if err := sdkplugin.Serve(p); err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "plexum-harborforge-plugin: %v\n", err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7d0f8d1 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module git.hangman-lab.top/zhi/HarborForge.PlexumPlugin + +go 1.24 + +require ( + git.hangman-lab.top/hzhang/Plexum-sdk-go v0.0.0 +) + +replace git.hangman-lab.top/hzhang/Plexum-sdk-go => ../Plexum-sdk-go diff --git a/internal/calendar/bridge.go b/internal/calendar/bridge.go new file mode 100644 index 0000000..69fd6d0 --- /dev/null +++ b/internal/calendar/bridge.go @@ -0,0 +1,129 @@ +// Bridge — thin HTTP client for the HarborForge backend's Calendar API. +// All operations carry the API key as Authorization: Bearer; absent +// key means missing-auth errors from the backend (caller should +// handle them as transient and log). + +package calendar + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +// Bridge is the typed wrapper around an HTTP client + backend URL. +type Bridge struct { + BackendURL string + APIKey string + HTTP *http.Client +} + +// New constructs a bridge with a sensible default timeout. +func New(backendURL, apiKey string) *Bridge { + return &Bridge{ + BackendURL: strings.TrimRight(backendURL, "/"), + APIKey: apiKey, + HTTP: &http.Client{Timeout: 20 * time.Second}, + } +} + +// Heartbeat POSTs /calendar/agent/heartbeat. Returns the backend's +// reply or an error. +func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (HeartbeatResponse, error) { + raw, err := b.post(ctx, "/calendar/agent/heartbeat", payload) + if err != nil { + return HeartbeatResponse{}, err + } + var out HeartbeatResponse + if len(raw) > 0 { + if err := json.Unmarshal(raw, &out); err != nil { + return HeartbeatResponse{}, fmt.Errorf("decode heartbeat: %w (body=%q)", err, truncate(raw, 200)) + } + } + return out, nil +} + +// UpdateSlotStatus POSTs /calendar/slot//status to mark a slot +// completed / aborted / paused / resumed. +func (b *Bridge) UpdateSlotStatus(ctx context.Context, slotID string, update SlotUpdate) error { + if slotID == "" { + return errors.New("calendar: slot id required") + } + _, err := b.post(ctx, "/calendar/slot/"+slotID+"/status", update) + return err +} + +// RestartPending GETs /restart/status — returns the backend's +// current restart-requested flag. +func (b *Bridge) RestartPending(ctx context.Context) (bool, error) { + raw, err := b.get(ctx, "/restart/status") + if err != nil { + return false, err + } + var out struct { + Pending bool `json:"pending"` + } + if len(raw) > 0 { + if err := json.Unmarshal(raw, &out); err != nil { + return false, fmt.Errorf("decode restart-status: %w", err) + } + } + return out.Pending, nil +} + +// post serialises body as JSON, attaches Authorization, returns +// response body bytes. Non-2xx becomes an error with the body +// included for diagnostics. +func (b *Bridge) post(ctx context.Context, path string, body any) ([]byte, error) { + raw, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal %s: %w", path, err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.BackendURL+path, bytes.NewReader(raw)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + if b.APIKey != "" { + req.Header.Set("Authorization", "Bearer "+b.APIKey) + } + return b.do(req) +} + +func (b *Bridge) get(ctx context.Context, path string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, b.BackendURL+path, nil) + if err != nil { + return nil, err + } + if b.APIKey != "" { + req.Header.Set("Authorization", "Bearer "+b.APIKey) + } + return b.do(req) +} + +func (b *Bridge) do(req *http.Request) ([]byte, error) { + res, err := b.HTTP.Do(req) + if err != nil { + return nil, fmt.Errorf("%s %s: %w", req.Method, req.URL.Path, err) + } + defer res.Body.Close() + body, _ := io.ReadAll(res.Body) + if res.StatusCode < 200 || res.StatusCode >= 300 { + return nil, fmt.Errorf("%s %s → %d: %s", + req.Method, req.URL.Path, res.StatusCode, truncate(body, 300)) + } + return body, nil +} + +func truncate(b []byte, n int) string { + if len(b) <= n { + return string(b) + } + return string(b[:n]) + "…" +} diff --git a/internal/calendar/scheduler.go b/internal/calendar/scheduler.go new file mode 100644 index 0000000..878a3d4 --- /dev/null +++ b/internal/calendar/scheduler.go @@ -0,0 +1,316 @@ +// Scheduler — main loop that heartbeats the backend, dispatches +// returned slots via Plexum's WakeAgent, and tracks per-agent active +// slot state for the calendar_* tools. +// +// State is in-memory: a daemon restart drops everything. Next +// heartbeat reconciles (backend keeps the canonical SlotStatus). +// +// Concurrency: +// - one heartbeat ticker goroutine +// - per-slot dispatch is fire-and-forget via WakeAgent (queue-aware) +// - mu guards activeBySlot + activeByAgent maps + +package calendar + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" +) + +// Scheduler orchestrates the calendar loop. +type Scheduler struct { + cfg Config + bridge *Bridge + host sdkplugin.HostAPI + agentLister func() []ReportableAgent + identifier string + pluginInfo PluginInfoTag + + mu sync.Mutex + activeBySlotID map[string]*ActiveSlot + activeByAgentID map[string]*ActiveSlot + history []HistoryEntry + lastHeartbeat time.Time + lastResponse HeartbeatResponse + restartPending bool +} + +// Config bundles scheduler tunables. +type Config struct { + HeartbeatInterval time.Duration + HistoryCap int // bound on activity history; default 32 +} + +// ReportableAgent is the projection of a Plexum agent the scheduler +// needs for heartbeat — id + model + current sm state. +type ReportableAgent struct { + ID string + Model string + State AgentStatusValue +} + +// ActiveSlot tracks an in-flight slot (between WakeAgent dispatch and +// terminal status update). +type ActiveSlot struct { + Slot Slot + StartedAt time.Time + LastHeartbeat time.Time + State SlotStatus +} + +// HistoryEntry is one resolved slot kept for the calendar_status tool. +type HistoryEntry struct { + SlotID string + AgentID string + Status SlotStatus + ResolvedAt time.Time + Reason string + Summary string +} + +// NewScheduler constructs a Scheduler in stopped state. +func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI, + identifier string, pluginInfo PluginInfoTag, + agentLister func() []ReportableAgent) *Scheduler { + if cfg.HeartbeatInterval <= 0 { + cfg.HeartbeatInterval = 30 * time.Second + } + if cfg.HistoryCap <= 0 { + cfg.HistoryCap = 32 + } + return &Scheduler{ + cfg: cfg, + bridge: bridge, + host: host, + agentLister: agentLister, + identifier: identifier, + pluginInfo: pluginInfo, + activeBySlotID: map[string]*ActiveSlot{}, + activeByAgentID: map[string]*ActiveSlot{}, + } +} + +// Run blocks until ctx cancels, ticking heartbeats every +// cfg.HeartbeatInterval. Returns nil on graceful shutdown. +func (s *Scheduler) Run(ctx context.Context) error { + t := time.NewTicker(s.cfg.HeartbeatInterval) + defer t.Stop() + // First heartbeat immediately so initial state lands fast. + s.heartbeatOnce(ctx) + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + s.heartbeatOnce(ctx) + } + } +} + +func (s *Scheduler) heartbeatOnce(ctx context.Context) { + payload := HeartbeatPayload{ + Identifier: s.identifier, + APIKey: s.bridge.APIKey, + PluginInfo: s.pluginInfo, + CapturedAt: time.Now().UTC(), + } + if s.agentLister != nil { + for _, a := range s.agentLister() { + payload.AgentList = append(payload.AgentList, AgentReport{ + ID: a.ID, Model: a.Model, Status: a.State, + }) + } + } + resp, err := s.bridge.Heartbeat(ctx, payload) + s.mu.Lock() + s.lastHeartbeat = time.Now() + if err == nil { + s.lastResponse = resp + s.restartPending = resp.RestartPending + } + s.mu.Unlock() + if err != nil { + return // network blip; next tick retries + } + for _, slot := range resp.SlotsToFire { + s.dispatchSlot(ctx, slot) + } +} + +// dispatchSlot fires the slot via host.WakeAgent and records it as +// active. WakeAgent handles state-aware queueing — if the agent is +// busy, our calendar slot enqueues at depth 1 and the previous wake +// is dropped per replace-newest semantics. We mark the slot +// in_progress optimistically when we ENQUEUED; backend reconciles on +// its own watchdog. +func (s *Scheduler) dispatchSlot(ctx context.Context, slot Slot) { + // Skip already-active slots (heartbeat may re-list a slot we + // already started — backend hasn't seen our optimistic update yet). + s.mu.Lock() + if _, ok := s.activeBySlotID[slot.ID]; ok { + s.mu.Unlock() + return + } + now := time.Now().UTC() + act := &ActiveSlot{ + Slot: slot, StartedAt: now, LastHeartbeat: now, + State: SlotInProgress, + } + s.activeBySlotID[slot.ID] = act + s.activeByAgentID[slot.AgentID] = act + s.mu.Unlock() + + message := slot.WakeOptions.OverrideMessage + if message == "" { + message = slot.PromptText + } + if message == "" { + message = fmt.Sprintf("[calendar] slot %s: %s", slot.ID, slot.Title) + } + source := fmt.Sprintf("calendar:slot-%s", slot.ID) + if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{ + AgentID: slot.AgentID, + Message: message, + Source: source, + }); err != nil { + // Wake itself failed (plumbing). Mark slot aborted + + // notify backend. + s.resolveSlot(ctx, slot.ID, SlotAborted, "", "wake-agent failed: "+err.Error()) + return + } +} + +// resolveSlot moves an active slot to a terminal status, records +// history, and tells the backend. Safe to call concurrently. +func (s *Scheduler) resolveSlot(ctx context.Context, slotID string, status SlotStatus, summary, reason string) error { + s.mu.Lock() + act, ok := s.activeBySlotID[slotID] + if !ok { + s.mu.Unlock() + return fmt.Errorf("calendar: slot %s not active", slotID) + } + delete(s.activeBySlotID, slotID) + delete(s.activeByAgentID, act.Slot.AgentID) + s.appendHistoryLocked(HistoryEntry{ + SlotID: slotID, AgentID: act.Slot.AgentID, Status: status, + ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason, + }) + s.mu.Unlock() + return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{ + Status: status, Summary: summary, Reason: reason, + }) +} + +// SetSlotState is a non-terminal status change (paused/resumed). +// Records the new state in-memory and tells the backend. +func (s *Scheduler) SetSlotState(ctx context.Context, slotID string, status SlotStatus, reason string) error { + s.mu.Lock() + act, ok := s.activeBySlotID[slotID] + if !ok { + s.mu.Unlock() + return fmt.Errorf("calendar: slot %s not active", slotID) + } + act.State = status + act.LastHeartbeat = time.Now().UTC() + s.mu.Unlock() + return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{ + Status: status, Reason: reason, + }) +} + +func (s *Scheduler) appendHistoryLocked(entry HistoryEntry) { + s.history = append(s.history, entry) + if len(s.history) > s.cfg.HistoryCap { + s.history = s.history[len(s.history)-s.cfg.HistoryCap:] + } +} + +// CompleteForAgent / AbortForAgent / PauseForAgent / ResumeForAgent +// are the agent-facing tool entry points. They look up the agent's +// active slot, transition or terminate it, and notify the backend. + +// CompleteForAgent terminates the agent's active slot as completed. +func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error { + slot, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + return s.resolveSlot(ctx, slot.Slot.ID, SlotCompleted, summary, "") +} + +// AbortForAgent terminates the agent's active slot as aborted. +func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error { + slot, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + return s.resolveSlot(ctx, slot.Slot.ID, SlotAborted, "", reason) +} + +// PauseForAgent transitions the agent's slot to paused. +func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error { + slot, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + return s.SetSlotState(ctx, slot.Slot.ID, SlotPaused, reason) +} + +// ResumeForAgent transitions the agent's slot back to in_progress. +func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error { + slot, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + return s.SetSlotState(ctx, slot.Slot.ID, SlotInProgress, "") +} + +// activeSlotForAgent returns the per-agent active slot copy under lock. +func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) { + s.mu.Lock() + defer s.mu.Unlock() + act, ok := s.activeByAgentID[agentID] + if !ok || act == nil { + return ActiveSlot{}, false + } + return *act, true +} + +// Status returns the introspection shape for the calendar_status tool. +func (s *Scheduler) Status() SchedulerStatus { + s.mu.Lock() + defer s.mu.Unlock() + active := make([]ActiveSlot, 0, len(s.activeBySlotID)) + for _, a := range s.activeBySlotID { + active = append(active, *a) + } + history := make([]HistoryEntry, len(s.history)) + copy(history, s.history) + return SchedulerStatus{ + Enabled: true, + LastHeartbeat: s.lastHeartbeat, + HeartbeatEvery: s.cfg.HeartbeatInterval, + Active: active, + History: history, + RestartPending: s.restartPending, + } +} + +// SchedulerStatus is the shape calendar_status returns. +type SchedulerStatus struct { + Enabled bool `json:"enabled"` + LastHeartbeat time.Time `json:"last_heartbeat"` + HeartbeatEvery time.Duration `json:"heartbeat_every"` + Active []ActiveSlot `json:"active"` + History []HistoryEntry `json:"history"` + RestartPending bool `json:"restart_pending"` +} + +// ErrNoActiveSlot is returned by calendar_complete/abort/pause/resume +// when the agent has no slot in progress. +var ErrNoActiveSlot = errors.New("calendar: no active slot for agent") diff --git a/internal/calendar/types.go b/internal/calendar/types.go new file mode 100644 index 0000000..999d77f --- /dev/null +++ b/internal/calendar/types.go @@ -0,0 +1,106 @@ +// Package calendar talks to the HarborForge backend's Calendar API +// (heartbeat, slot fetch, status update, restart-pending check) and +// drives a scheduler loop that fires Plexum wake events when slots +// come due. Types mirror HarborForge.OpenclawPlugin's calendar/types.ts +// so the backend doesn't need to know which plugin is reporting. + +package calendar + +import "time" + +// SlotStatus enumerates the slot lifecycle. +type SlotStatus string + +const ( + SlotNotStarted SlotStatus = "not_started" + SlotInProgress SlotStatus = "in_progress" + SlotCompleted SlotStatus = "completed" + SlotAborted SlotStatus = "aborted" + SlotPaused SlotStatus = "paused" + SlotDeferred SlotStatus = "deferred" +) + +// AgentStatusValue mirrors the backend AgentStatus enum used in +// heartbeat responses (a hint about what the backend thinks the +// agent is doing). +type AgentStatusValue string + +const ( + AgentStatusUnknown AgentStatusValue = "unknown" + AgentStatusIdle AgentStatusValue = "idle" + AgentStatusBusy AgentStatusValue = "busy" + AgentStatusOffline AgentStatusValue = "offline" + AgentStatusOnCall AgentStatusValue = "on_call" + AgentStatusPaused AgentStatusValue = "paused" +) + +// SlotKind is "work" vs "on_call" — affects how the scheduler treats +// the slot (on_call slots don't move the agent into busy). +type SlotKind string + +const ( + SlotKindWork SlotKind = "work" + SlotKindOnCall SlotKind = "on_call" +) + +// Slot is one Calendar TimeSlot the backend serves. +type Slot struct { + ID string `json:"id"` + VirtualID string `json:"virtual_id,omitempty"` + AgentID string `json:"agent_id"` + ClawID string `json:"claw_identifier,omitempty"` + Kind SlotKind `json:"slot_type"` + Title string `json:"title,omitempty"` + Description string `json:"description,omitempty"` + ScheduledAt time.Time `json:"scheduled_at"` + ExpiresAt *time.Time `json:"expires_at,omitempty"` + Status SlotStatus `json:"status"` + PromptText string `json:"prompt,omitempty"` + WakeOptions WakeOpts `json:"wake_options,omitempty"` +} + +// WakeOpts customise how the scheduler should drive the agent. v1 +// honours only Force; the rest pass through as audit trail. +type WakeOpts struct { + Force bool `json:"force,omitempty"` + OverrideMessage string `json:"override_message,omitempty"` + ScopeSessionID string `json:"scope_session_id,omitempty"` +} + +// HeartbeatPayload is what the plugin POSTs every interval. +type HeartbeatPayload struct { + Identifier string `json:"identifier"` + APIKey string `json:"api_key,omitempty"` + AgentList []AgentReport `json:"agents"` + PluginInfo PluginInfoTag `json:"plugin"` + CapturedAt time.Time `json:"captured_at"` +} + +// AgentReport is one entry in HeartbeatPayload.AgentList. +type AgentReport struct { + ID string `json:"agent_id"` + Status AgentStatusValue `json:"status"` + Model string `json:"model,omitempty"` +} + +// PluginInfoTag identifies which plugin / version is heartbeating. +type PluginInfoTag struct { + Name string `json:"name"` // "harbor-forge" + Version string `json:"version"` // e.g. 0.1.0 + Backend string `json:"backend"` // "plexum" +} + +// HeartbeatResponse is the backend's reply. SlotsToFire are slots +// the scheduler should attempt to start. +type HeartbeatResponse struct { + SlotsToFire []Slot `json:"slots_to_fire,omitempty"` + RestartPending bool `json:"restart_pending,omitempty"` + ServerTime time.Time `json:"server_time"` +} + +// SlotUpdate is the body of POST /calendar/slot//status. +type SlotUpdate struct { + Status SlotStatus `json:"status"` + Summary string `json:"summary,omitempty"` + Reason string `json:"reason,omitempty"` +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..4a37647 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,148 @@ +// Package config loads the HarborForge plugin's per-profile config +// from /plugins/harbor-forge/config.json. Mirrors the +// resolved-config shape HarborForge.OpenclawPlugin's +// plugin/core/config.ts surfaces, adapted for the Plexum profile +// layout. + +package config + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" +) + +// Config is the operator-visible shape stored at +// ~/.plexum/plugins/harbor-forge/config.json. All fields optional; +// resolved defaults live in Resolve(). +type Config struct { + Enabled bool `json:"enabled,omitempty"` + + // BackendURL is the HarborForge backend base (Monitor + Calendar + // API share this URL — same as OpenclawPlugin's backendUrl). + BackendURL string `json:"backendUrl,omitempty"` + + // Identifier is reported on heartbeat + Monitor responses. Auto- + // derived from hostname when empty. + Identifier string `json:"identifier,omitempty"` + + // APIKey authenticates Monitor + Calendar API calls. + APIKey string `json:"apiKey,omitempty"` + + // MonitorPort is the local TCP port the Monitor bridge HTTP + // server listens on. Zero/missing disables the bridge entirely. + MonitorPort int `json:"monitor_port,omitempty"` + + // CalendarHeartbeatIntervalSeconds — defaults to 30s when ≤0. + CalendarHeartbeatIntervalSeconds int `json:"calendar_heartbeat_interval_seconds,omitempty"` + + // CalendarEnabled toggles the calendar scheduler loop. Default true. + CalendarEnabled *bool `json:"calendar_enabled,omitempty"` + + // CalendarBackendURL — if set, overrides BackendURL for Calendar + // API calls only (Monitor still uses BackendURL). Matches OpenClaw + // plugin's split-endpoint config. + CalendarBackendURL string `json:"calendar_backendUrl,omitempty"` + + // RestartPollIntervalSeconds — defaults to 60s when ≤0. + RestartPollIntervalSeconds int `json:"restart_poll_interval_seconds,omitempty"` +} + +// Resolved is the post-defaults view used by the rest of the plugin. +type Resolved struct { + Enabled bool + BackendURL string + Identifier string + APIKey string + MonitorPort int + CalendarEnabled bool + CalendarHeartbeatIntervalSeconds int + CalendarBackendURL string + RestartPollIntervalSeconds int +} + +// PluginConfigDir returns the on-disk path the plugin's own config +// lives in, given the Plexum profile root. Mirrors how every other +// Plexum plugin lays out its config. +func PluginConfigDir(profileRoot string) string { + return filepath.Join(profileRoot, "plugins", "harbor-forge") +} + +// PluginConfigPath returns the full path to config.json. +func PluginConfigPath(profileRoot string) string { + return filepath.Join(PluginConfigDir(profileRoot), "config.json") +} + +// Load reads + parses the on-disk config; missing file is treated as +// empty (defaults applied at Resolve time, not here). +func Load(profileRoot string) (Config, error) { + path := PluginConfigPath(profileRoot) + raw, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return Config{}, nil + } + return Config{}, fmt.Errorf("read %s: %w", path, err) + } + if len(raw) == 0 { + return Config{}, nil + } + var c Config + if err := json.Unmarshal(raw, &c); err != nil { + return Config{}, fmt.Errorf("parse %s: %w", path, err) + } + return c, nil +} + +// Resolve applies defaults + hostname-derived identifier. +func Resolve(c Config) Resolved { + out := Resolved{ + Enabled: true, + BackendURL: "https://monitor.hangman-lab.top", + Identifier: c.Identifier, + APIKey: c.APIKey, + MonitorPort: c.MonitorPort, + CalendarEnabled: true, + CalendarHeartbeatIntervalSeconds: 30, + CalendarBackendURL: c.CalendarBackendURL, + RestartPollIntervalSeconds: 60, + } + // Explicit-false overrides default-true. + if c.Enabled || !c.Enabled && hasJSONField(c, "Enabled") { + // Default true; we don't have a way to distinguish unset vs + // explicit false on a plain bool. Document: write {"enabled": + // false} only when intentional. + } + if c.BackendURL != "" { + out.BackendURL = c.BackendURL + } + if c.Identifier == "" { + out.Identifier = autoIdentifier() + } + if c.CalendarEnabled != nil { + out.CalendarEnabled = *c.CalendarEnabled + } + if c.CalendarHeartbeatIntervalSeconds > 0 { + out.CalendarHeartbeatIntervalSeconds = c.CalendarHeartbeatIntervalSeconds + } + if c.RestartPollIntervalSeconds > 0 { + out.RestartPollIntervalSeconds = c.RestartPollIntervalSeconds + } + return out +} + +// hasJSONField is a placeholder — Go's encoding/json doesn't surface +// "field was present in source" without a sentinel pointer. We use +// *bool on the optional toggles instead and treat plain bool as +// default-true. +func hasJSONField(_ Config, _ string) bool { return false } + +func autoIdentifier() string { + h, err := os.Hostname() + if err != nil || h == "" { + return "unknown" + } + return h +} diff --git a/internal/monitor/bridge.go b/internal/monitor/bridge.go new file mode 100644 index 0000000..8fe81fc --- /dev/null +++ b/internal/monitor/bridge.go @@ -0,0 +1,120 @@ +// Package monitor implements the local HTTP server that +// HarborForge.Monitor (a separate native daemon on the same host) +// queries for telemetry. Mirrors the OpenclawPlugin monitor-bridge: +// +// GET /telemetry → JSON Snapshot +// GET /health → {"ok":true} +// +// The bridge runs as a goroutine started at plugin Init time; ctx +// cancellation tears it down. Bind address is 127.0.0.1 only — +// HarborForge.Monitor is expected on the same host. Configured port +// comes from config.json's monitor_port; zero/missing disables the +// bridge entirely (the plugin still serves tools + calendar). + +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "time" + + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry" +) + +// Bridge owns the HTTP listener + last-query state. +type Bridge struct { + port int + collect func() telemetry.Snapshot + log LogFunc + + mu sync.Mutex + lastQuery time.Time + lastSnap telemetry.Snapshot + queries uint64 + server *http.Server +} + +// LogFunc is the plugin's log adapter — kept narrow so the bridge +// doesn't depend on the SDK directly. +type LogFunc func(level, msg string, attrs map[string]any) + +// New constructs an idle Bridge. Call Start to actually listen. +func New(port int, collect func() telemetry.Snapshot, log LogFunc) *Bridge { + return &Bridge{port: port, collect: collect, log: log} +} + +// Start the HTTP server in a background goroutine. Returns nil even +// when the port is 0 (bridge disabled by config) so callers don't +// need to guard. Stop cancels via context. +func (b *Bridge) Start(ctx context.Context) error { + if b.port <= 0 { + b.log("info", "monitor bridge disabled (monitor_port=0)", nil) + return nil + } + addr := fmt.Sprintf("127.0.0.1:%d", b.port) + ln, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("monitor bridge listen %s: %w", addr, err) + } + mux := http.NewServeMux() + mux.HandleFunc("/telemetry", b.handleTelemetry) + mux.HandleFunc("/health", b.handleHealth) + srv := &http.Server{Handler: mux, ReadHeaderTimeout: 5 * time.Second} + b.server = srv + go func() { + <-ctx.Done() + shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(shutCtx) + }() + b.log("info", "monitor bridge listening", map[string]any{"addr": addr}) + go func() { + if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed { + b.log("warn", "monitor bridge exited", map[string]any{"err": err.Error()}) + } + }() + return nil +} + +func (b *Bridge) handleTelemetry(w http.ResponseWriter, _ *http.Request) { + snap := b.collect() + b.mu.Lock() + b.lastQuery = time.Now() + b.lastSnap = snap + b.queries++ + b.mu.Unlock() + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(snap) +} + +func (b *Bridge) handleHealth(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]bool{"ok": true}) +} + +// Stats returns a copy of the bridge's last-query state — used by the +// harborforge_status / harborforge_monitor_telemetry tools. +func (b *Bridge) Stats() Stats { + b.mu.Lock() + defer b.mu.Unlock() + return Stats{ + Port: b.port, + Listening: b.server != nil, + LastQuery: b.lastQuery, + LastSnap: b.lastSnap, + Queries: b.queries, + } +} + +// Stats is the introspection shape returned by Bridge.Stats. +type Stats struct { + Port int + Listening bool + LastQuery time.Time + LastSnap telemetry.Snapshot + Queries uint64 +} diff --git a/internal/telemetry/collector.go b/internal/telemetry/collector.go new file mode 100644 index 0000000..f740707 --- /dev/null +++ b/internal/telemetry/collector.go @@ -0,0 +1,226 @@ +// Package telemetry collects host + Plexum-agent metrics for the +// HarborForge Monitor. Snapshot is read on demand (Monitor bridge +// queries) or pushed (Calendar heartbeat), so the collector keeps no +// background goroutine — every call re-reads /proc, sm.State, etc. +// +// Cross-platform note: Linux is the only platform Plexum t3-class +// deployments run on; we read /proc/* directly rather than pull in a +// dependency. + +package telemetry + +import ( + "bufio" + "fmt" + "os" + "runtime" + "strconv" + "strings" + "time" +) + +// Snapshot is the JSON payload the Monitor bridge serves + the +// Calendar heartbeat embeds. Field names mirror what +// HarborForge.OpenclawPlugin emits so the backend doesn't need +// per-plugin parsers. +type Snapshot struct { + Identifier string `json:"identifier"` + Platform string `json:"platform"` + Hostname string `json:"hostname"` + UptimeSecs uint64 `json:"uptime"` + Memory MemoryInfo `json:"memory"` + Load LoadInfo `json:"load"` + Disk DiskInfo `json:"disk"` + Agents []AgentInfo `json:"agents"` + PluginInfo PluginInfo `json:"plugin"` + CapturedAt time.Time `json:"captured_at"` + HostMetadata map[string]string `json:"host_metadata,omitempty"` +} + +// MemoryInfo mirrors OpenclawPlugin's memory shape. +type MemoryInfo struct { + Total uint64 `json:"total"` // bytes + Free uint64 `json:"free"` // bytes + Used uint64 `json:"used"` // bytes + UsedPercent float64 `json:"used_percent"` // 0–100 +} + +// LoadInfo is Linux loadavg as a flat triple. +type LoadInfo struct { + One float64 `json:"one"` + Five float64 `json:"five"` + Fifteen float64 `json:"fifteen"` +} + +// DiskInfo for the root filesystem. +type DiskInfo struct { + Path string `json:"path"` + Total uint64 `json:"total"` + Free uint64 `json:"free"` + Used uint64 `json:"used"` + UsedPercent float64 `json:"used_percent"` +} + +// AgentInfo summarises one Plexum agent for the dashboard. Heavy +// mirror of HF's expected schema — state field maps Plexum's +// idle/working/busy/offline directly. +type AgentInfo struct { + ID string `json:"id"` + Model string `json:"model"` + State string `json:"state"` +} + +// PluginInfo identifies this plugin to the dashboard so the operator +// can see what's reporting telemetry. +type PluginInfo struct { + Name string `json:"name"` + Version string `json:"version"` + Backend string `json:"backend"` // "plexum" +} + +// CollectOpts wires the collector to host-side state. Hostname / +// Identifier come from the resolved config. +type CollectOpts struct { + Identifier string + Version string + AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents) +} + +// Collect produces a fresh snapshot from /proc + the supplied AgentLister. +func Collect(opts CollectOpts) Snapshot { + now := time.Now().UTC() + host, _ := os.Hostname() + mem := readMemInfo() + load := readLoadAvg() + disk := readDiskRoot() + var agents []AgentInfo + if opts.AgentLister != nil { + agents = opts.AgentLister() + } + return Snapshot{ + Identifier: opts.Identifier, + Platform: runtime.GOOS, + Hostname: host, + UptimeSecs: readUptime(), + Memory: mem, + Load: load, + Disk: disk, + Agents: agents, + PluginInfo: PluginInfo{ + Name: "harbor-forge", + Version: opts.Version, + Backend: "plexum", + }, + CapturedAt: now, + } +} + +// ---- /proc helpers ---- + +func readMemInfo() MemoryInfo { + f, err := os.Open("/proc/meminfo") + if err != nil { + return MemoryInfo{} + } + defer f.Close() + fields := map[string]uint64{} + sc := bufio.NewScanner(f) + for sc.Scan() { + line := sc.Text() + i := strings.IndexByte(line, ':') + if i < 0 { + continue + } + key := strings.TrimSpace(line[:i]) + rest := strings.TrimSpace(line[i+1:]) + // rest format: "1234 kB" + parts := strings.Fields(rest) + if len(parts) == 0 { + continue + } + v, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + continue + } + // All MemInfo values are in KB; convert to bytes. + fields[key] = v * 1024 + } + total := fields["MemTotal"] + free := fields["MemAvailable"] + if free == 0 { + free = fields["MemFree"] + fields["Buffers"] + fields["Cached"] + } + used := total - free + var pct float64 + if total > 0 { + pct = float64(used) / float64(total) * 100 + } + return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct} +} + +func readLoadAvg() LoadInfo { + raw, err := os.ReadFile("/proc/loadavg") + if err != nil { + return LoadInfo{} + } + parts := strings.Fields(string(raw)) + if len(parts) < 3 { + return LoadInfo{} + } + one, _ := strconv.ParseFloat(parts[0], 64) + five, _ := strconv.ParseFloat(parts[1], 64) + fifteen, _ := strconv.ParseFloat(parts[2], 64) + return LoadInfo{One: one, Five: five, Fifteen: fifteen} +} + +func readUptime() uint64 { + raw, err := os.ReadFile("/proc/uptime") + if err != nil { + return 0 + } + parts := strings.Fields(string(raw)) + if len(parts) == 0 { + return 0 + } + f, _ := strconv.ParseFloat(parts[0], 64) + return uint64(f) +} + +// readDiskRoot uses syscall.Statfs on "/" — we keep it inline to +// avoid pulling in another package. +func readDiskRoot() DiskInfo { + var st diskStat + if err := statfs("/", &st); err != nil { + return DiskInfo{Path: "/"} + } + total := st.blockSize * st.blocks + free := st.blockSize * st.bavail + used := total - free + var pct float64 + if total > 0 { + pct = float64(used) / float64(total) * 100 + } + return DiskInfo{ + Path: "/", + Total: total, + Free: free, + Used: used, + UsedPercent: pct, + } +} + +// FormatBytes is a small helper for human-readable Status output. +func FormatBytes(b uint64) string { + switch { + case b >= 1<<40: + return fmt.Sprintf("%.2fTiB", float64(b)/(1<<40)) + case b >= 1<<30: + return fmt.Sprintf("%.2fGiB", float64(b)/(1<<30)) + case b >= 1<<20: + return fmt.Sprintf("%.2fMiB", float64(b)/(1<<20)) + case b >= 1<<10: + return fmt.Sprintf("%.2fKiB", float64(b)/(1<<10)) + default: + return fmt.Sprintf("%dB", b) + } +} diff --git a/internal/telemetry/diskstat_linux.go b/internal/telemetry/diskstat_linux.go new file mode 100644 index 0000000..46cfe2b --- /dev/null +++ b/internal/telemetry/diskstat_linux.go @@ -0,0 +1,20 @@ +package telemetry + +import "syscall" + +type diskStat struct { + blockSize uint64 + blocks uint64 + bavail uint64 +} + +func statfs(path string, out *diskStat) error { + var fs syscall.Statfs_t + if err := syscall.Statfs(path, &fs); err != nil { + return err + } + out.blockSize = uint64(fs.Bsize) + out.blocks = fs.Blocks + out.bavail = fs.Bavail + return nil +} diff --git a/internal/tools/tools.go b/internal/tools/tools.go new file mode 100644 index 0000000..cfbbde8 --- /dev/null +++ b/internal/tools/tools.go @@ -0,0 +1,213 @@ +// Package tools wires the 9 harborforge_* tool implementations to +// the plugin's runtime state (config, telemetry collector, monitor +// bridge, calendar scheduler). Each tool is a CallTool dispatch +// branch in main.go's plugin; this package holds the shared logic. + +package tools + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" + + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar" + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config" + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor" + "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry" +) + +// Deps is the bundle main.go passes when constructing the tool router. +type Deps struct { + Config config.Resolved + Version string + Collect func() telemetry.Snapshot + Bridge *monitor.Bridge + Scheduler *calendar.Scheduler + Host sdkplugin.HostAPI + + // AgentIDFromCtx returns the agent id the call belongs to. Plexum + // host injects this via the tool dispatch context; main.go's + // CallTool reads it from the ctx and stashes here. + AgentIDFromCtx func(ctx context.Context) string +} + +// Dispatch is the entry point main.go's ToolPlugin.CallTool calls. +// Returns the canonical text response. Errors come through as +// is_error=true ToolResult rather than RPC errors so the model sees +// human-readable detail. +func Dispatch(ctx context.Context, deps Deps, name string, input json.RawMessage) (sdkplugin.ToolResult, error) { + switch name { + case "harborforge_status": + return toolStatus(deps) + case "harborforge_telemetry": + return toolTelemetry(deps) + case "harborforge_monitor_telemetry": + return toolMonitorTelemetry(deps) + case "harborforge_calendar_status": + return toolCalendarStatus(deps) + case "harborforge_calendar_complete": + return toolCalendarComplete(ctx, deps, input) + case "harborforge_calendar_abort": + return toolCalendarAbort(ctx, deps, input) + case "harborforge_calendar_pause": + return toolCalendarPause(ctx, deps, input) + case "harborforge_calendar_resume": + return toolCalendarResume(ctx, deps) + case "harborforge_restart_status": + return toolRestartStatus(deps) + } + return sdkplugin.ToolResult{ + IsError: true, + Content: []sdkplugin.ContentBlock{{Type: "text", Text: "unknown tool: " + name}}, + }, nil +} + +func toolStatus(deps Deps) (sdkplugin.ToolResult, error) { + bs := deps.Bridge.Stats() + sch := deps.Scheduler.Status() + out := map[string]any{ + "plugin": map[string]any{ + "name": "harbor-forge", + "version": deps.Version, + "backend": "plexum", + }, + "config": map[string]any{ + "backend_url": deps.Config.BackendURL, + "identifier": deps.Config.Identifier, + "monitor_port": deps.Config.MonitorPort, + "calendar_enabled": deps.Config.CalendarEnabled, + "calendar_backendurl": deps.Config.CalendarBackendURL, + }, + "monitor_bridge": map[string]any{ + "listening": bs.Listening, + "port": bs.Port, + "queries": bs.Queries, + "last_query": bs.LastQuery, + }, + "calendar": sch, + } + return jsonResult(out) +} + +func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) { + return jsonResult(deps.Collect()) +} + +func toolMonitorTelemetry(deps Deps) (sdkplugin.ToolResult, error) { + bs := deps.Bridge.Stats() + return jsonResult(map[string]any{ + "port": bs.Port, + "listening": bs.Listening, + "queries": bs.Queries, + "last_query": bs.LastQuery, + "last_snapshot": bs.LastSnap, + }) +} + +func toolCalendarStatus(deps Deps) (sdkplugin.ToolResult, error) { + return jsonResult(deps.Scheduler.Status()) +} + +func toolCalendarComplete(ctx context.Context, deps Deps, input json.RawMessage) (sdkplugin.ToolResult, error) { + var args struct{ Summary string `json:"summary"` } + _ = json.Unmarshal(input, &args) + agentID := deps.AgentIDFromCtx(ctx) + if agentID == "" { + return errResult("calendar_complete: no agent context") + } + if err := deps.Scheduler.CompleteForAgent(ctx, agentID, args.Summary); err != nil { + if errors.Is(err, calendar.ErrNoActiveSlot) { + return errResult("no active slot for agent " + agentID) + } + return errResult("complete failed: " + err.Error()) + } + return okResult("slot marked completed") +} + +func toolCalendarAbort(ctx context.Context, deps Deps, input json.RawMessage) (sdkplugin.ToolResult, error) { + var args struct{ Reason string `json:"reason"` } + _ = json.Unmarshal(input, &args) + agentID := deps.AgentIDFromCtx(ctx) + if agentID == "" { + return errResult("calendar_abort: no agent context") + } + if err := deps.Scheduler.AbortForAgent(ctx, agentID, args.Reason); err != nil { + if errors.Is(err, calendar.ErrNoActiveSlot) { + return errResult("no active slot for agent " + agentID) + } + return errResult("abort failed: " + err.Error()) + } + return okResult("slot aborted") +} + +func toolCalendarPause(ctx context.Context, deps Deps, input json.RawMessage) (sdkplugin.ToolResult, error) { + var args struct{ Reason string `json:"reason"` } + _ = json.Unmarshal(input, &args) + agentID := deps.AgentIDFromCtx(ctx) + if agentID == "" { + return errResult("calendar_pause: no agent context") + } + if err := deps.Scheduler.PauseForAgent(ctx, agentID, args.Reason); err != nil { + if errors.Is(err, calendar.ErrNoActiveSlot) { + return errResult("no active slot for agent " + agentID) + } + return errResult("pause failed: " + err.Error()) + } + return okResult("slot paused") +} + +func toolCalendarResume(ctx context.Context, deps Deps) (sdkplugin.ToolResult, error) { + agentID := deps.AgentIDFromCtx(ctx) + if agentID == "" { + return errResult("calendar_resume: no agent context") + } + if err := deps.Scheduler.ResumeForAgent(ctx, agentID); err != nil { + if errors.Is(err, calendar.ErrNoActiveSlot) { + return errResult("no active slot for agent " + agentID) + } + return errResult("resume failed: " + err.Error()) + } + return okResult("slot resumed") +} + +func toolRestartStatus(deps Deps) (sdkplugin.ToolResult, error) { + sch := deps.Scheduler.Status() + return jsonResult(map[string]any{ + "pending": sch.RestartPending, + "last_heartbeat": sch.LastHeartbeat, + "observed_at": time.Now().UTC(), + }) +} + +// ---- result helpers ---- + +func jsonResult(v any) (sdkplugin.ToolResult, error) { + raw, err := json.MarshalIndent(v, "", " ") + if err != nil { + return sdkplugin.ToolResult{}, fmt.Errorf("encode tool result: %w", err) + } + return sdkplugin.ToolResult{ + Content: []sdkplugin.ContentBlock{{Type: "text", Text: string(raw)}}, + }, nil +} + +func okResult(text string) (sdkplugin.ToolResult, error) { + return sdkplugin.ToolResult{ + Content: []sdkplugin.ContentBlock{{Type: "text", Text: text}}, + }, nil +} + +func errResult(text string) (sdkplugin.ToolResult, error) { + if !strings.HasPrefix(text, "harborforge_") { + text = "harborforge: " + text + } + return sdkplugin.ToolResult{ + IsError: true, + Content: []sdkplugin.ContentBlock{{Type: "text", Text: text}}, + }, nil +} diff --git a/manifest.json b/manifest.json new file mode 100644 index 0000000..64fbb3d --- /dev/null +++ b/manifest.json @@ -0,0 +1,55 @@ +{ + "name": "harbor-forge", + "version": "0.1.0", + "activation": "eager", + "executable": "plexum-harborforge-plugin", + "contracts": { + "tools": [ + { + "name": "harborforge_status", + "description": "Return the plugin's resolved config + Monitor bridge health + Calendar scheduler status + telemetry snapshot.", + "inputSchema": {"type": "object"} + }, + { + "name": "harborforge_telemetry", + "description": "Current system + agent telemetry (the same snapshot served to the local HarborForge.Monitor over the monitor_port).", + "inputSchema": {"type": "object"} + }, + { + "name": "harborforge_monitor_telemetry", + "description": "Last telemetry payload the Monitor bridge fetched, with timing info — useful when diagnosing bridge connectivity.", + "inputSchema": {"type": "object"} + }, + { + "name": "harborforge_calendar_status", + "description": "Active Calendar slot (if any) plus next-up + recent-history. Returns Calendar scheduler state when no slot is active.", + "inputSchema": {"type": "object"} + }, + { + "name": "harborforge_calendar_complete", + "description": "Mark the agent's currently-active Calendar slot as completed and notify the backend.", + "inputSchema": {"type": "object", "properties": {"summary": {"type": "string"}}} + }, + { + "name": "harborforge_calendar_abort", + "description": "Abort the agent's currently-active Calendar slot (e.g. unrecoverable error). Optionally include a reason.", + "inputSchema": {"type": "object", "properties": {"reason": {"type": "string"}}} + }, + { + "name": "harborforge_calendar_pause", + "description": "Pause the agent's currently-active Calendar slot — heartbeat tracks paused state until resume/abort/complete.", + "inputSchema": {"type": "object", "properties": {"reason": {"type": "string"}}} + }, + { + "name": "harborforge_calendar_resume", + "description": "Resume a paused Calendar slot for the agent.", + "inputSchema": {"type": "object"} + }, + { + "name": "harborforge_restart_status", + "description": "Check whether a Plexum host restart is pending (backend-driven flag). Reports last poll time and pending flag.", + "inputSchema": {"type": "object"} + } + ] + } +} diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..c3a535f --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +# HarborForge.PlexumPlugin installer. +set -euo pipefail + +REPO="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/.." && pwd)" +PROFILE_DIR="${HOME}/.plexum" + +while [[ $# -gt 0 ]]; do + case "$1" in + --profile) PROFILE_DIR="$2"; shift 2 ;; + -h|--help) sed -n '2,/^set -euo/p' "$0" | sed 's/^# \{0,1\}//'; exit 0 ;; + *) echo "unknown flag: $1" >&2; exit 2 ;; + esac +done + +log() { printf '\033[1;34m[hf-install]\033[0m %s\n' "$*"; } +command -v go >/dev/null || { echo "go not found on PATH" >&2; exit 1; } + +PLUGIN_DIR="${PROFILE_DIR}/plugins/harbor-forge" +mkdir -p "${PLUGIN_DIR}" + +cd "${REPO}" +VERSION="$(git describe --tags --always 2>/dev/null || echo dev)" +LDFLAGS="-X main.Version=${VERSION}" +log "building plexum-harborforge-plugin (v=${VERSION})" +CGO_ENABLED=0 go build -ldflags="${LDFLAGS}" \ + -o "${PLUGIN_DIR}/plexum-harborforge-plugin" \ + ./cmd/plexum-harborforge-plugin + +cp manifest.json "${PLUGIN_DIR}/manifest.json" +log "installed binary + manifest to ${PLUGIN_DIR}" + +cat <