Files
HarborForge.PlexumPlugin/internal/telemetry/collector.go
hzhang 754e5183f7 initial: HarborForge plugin for Plexum (port of OpenclawPlugin)
Plugin id `harbor-forge` mirrors the OpenClaw counterpart's runtime
surface on top of the Plexum SDK:

  * eager activation — Monitor bridge + Calendar scheduler boot at
    host start, before any agent turn fires
  * monitor bridge: HTTP 127.0.0.1:<monitor_port> serving /telemetry
    + /health for HarborForge.Monitor
  * calendar scheduler: heartbeats <backendUrl>/calendar/agent/
    heartbeat, dispatches returned slots via HostAPI.WakeAgent
    (state-aware queue, depth-1 replace-newest), tracks active slot
    state in-memory, terminal status pushed back to backend
  * 9 harborforge_* tools (status / telemetry / monitor_telemetry /
    calendar_{status,complete,abort,pause,resume} / restart_status)

Key differences from OpenClaw equivalent:
  * api.spawn → HostAPI.WakeAgent (new SDK primitive)
  * api.getAgentStatus → HostAPI.ReadAgentState (existing)
  * --install-monitor / --install-cli not included; Monitor + hf CLI
    deploy via the HangmanLab.Server.T3 docker compose layer

Initial drop. TODO before v1 ship:
  * tool ctx → calling-agent-id: SDK doesn't currently expose; v1
    falls back to a single-active-slot heuristic in
    main.bestEffortAgentID
  * tests for the bridge + scheduler
2026-06-03 11:11:36 +01:00

227 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package telemetry collects host + Plexum-agent metrics for the
// HarborForge Monitor. Snapshot is read on demand (Monitor bridge
// queries) or pushed (Calendar heartbeat), so the collector keeps no
// background goroutine — every call re-reads /proc, sm.State, etc.
//
// Cross-platform note: Linux is the only platform Plexum t3-class
// deployments run on; we read /proc/* directly rather than pull in a
// dependency.
package telemetry
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"strings"
"time"
)
// Snapshot is the JSON payload the Monitor bridge serves + the
// Calendar heartbeat embeds. Field names mirror what
// HarborForge.OpenclawPlugin emits so the backend doesn't need
// per-plugin parsers.
type Snapshot struct {
Identifier string `json:"identifier"`
Platform string `json:"platform"`
Hostname string `json:"hostname"`
UptimeSecs uint64 `json:"uptime"`
Memory MemoryInfo `json:"memory"`
Load LoadInfo `json:"load"`
Disk DiskInfo `json:"disk"`
Agents []AgentInfo `json:"agents"`
PluginInfo PluginInfo `json:"plugin"`
CapturedAt time.Time `json:"captured_at"`
HostMetadata map[string]string `json:"host_metadata,omitempty"`
}
// MemoryInfo mirrors OpenclawPlugin's memory shape.
type MemoryInfo struct {
Total uint64 `json:"total"` // bytes
Free uint64 `json:"free"` // bytes
Used uint64 `json:"used"` // bytes
UsedPercent float64 `json:"used_percent"` // 0100
}
// LoadInfo is Linux loadavg as a flat triple.
type LoadInfo struct {
One float64 `json:"one"`
Five float64 `json:"five"`
Fifteen float64 `json:"fifteen"`
}
// DiskInfo for the root filesystem.
type DiskInfo struct {
Path string `json:"path"`
Total uint64 `json:"total"`
Free uint64 `json:"free"`
Used uint64 `json:"used"`
UsedPercent float64 `json:"used_percent"`
}
// AgentInfo summarises one Plexum agent for the dashboard. Heavy
// mirror of HF's expected schema — state field maps Plexum's
// idle/working/busy/offline directly.
type AgentInfo struct {
ID string `json:"id"`
Model string `json:"model"`
State string `json:"state"`
}
// PluginInfo identifies this plugin to the dashboard so the operator
// can see what's reporting telemetry.
type PluginInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Backend string `json:"backend"` // "plexum"
}
// CollectOpts wires the collector to host-side state. Hostname /
// Identifier come from the resolved config.
type CollectOpts struct {
Identifier string
Version string
AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents)
}
// Collect produces a fresh snapshot from /proc + the supplied AgentLister.
func Collect(opts CollectOpts) Snapshot {
now := time.Now().UTC()
host, _ := os.Hostname()
mem := readMemInfo()
load := readLoadAvg()
disk := readDiskRoot()
var agents []AgentInfo
if opts.AgentLister != nil {
agents = opts.AgentLister()
}
return Snapshot{
Identifier: opts.Identifier,
Platform: runtime.GOOS,
Hostname: host,
UptimeSecs: readUptime(),
Memory: mem,
Load: load,
Disk: disk,
Agents: agents,
PluginInfo: PluginInfo{
Name: "harbor-forge",
Version: opts.Version,
Backend: "plexum",
},
CapturedAt: now,
}
}
// ---- /proc helpers ----
func readMemInfo() MemoryInfo {
f, err := os.Open("/proc/meminfo")
if err != nil {
return MemoryInfo{}
}
defer f.Close()
fields := map[string]uint64{}
sc := bufio.NewScanner(f)
for sc.Scan() {
line := sc.Text()
i := strings.IndexByte(line, ':')
if i < 0 {
continue
}
key := strings.TrimSpace(line[:i])
rest := strings.TrimSpace(line[i+1:])
// rest format: "1234 kB"
parts := strings.Fields(rest)
if len(parts) == 0 {
continue
}
v, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
continue
}
// All MemInfo values are in KB; convert to bytes.
fields[key] = v * 1024
}
total := fields["MemTotal"]
free := fields["MemAvailable"]
if free == 0 {
free = fields["MemFree"] + fields["Buffers"] + fields["Cached"]
}
used := total - free
var pct float64
if total > 0 {
pct = float64(used) / float64(total) * 100
}
return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
}
func readLoadAvg() LoadInfo {
raw, err := os.ReadFile("/proc/loadavg")
if err != nil {
return LoadInfo{}
}
parts := strings.Fields(string(raw))
if len(parts) < 3 {
return LoadInfo{}
}
one, _ := strconv.ParseFloat(parts[0], 64)
five, _ := strconv.ParseFloat(parts[1], 64)
fifteen, _ := strconv.ParseFloat(parts[2], 64)
return LoadInfo{One: one, Five: five, Fifteen: fifteen}
}
func readUptime() uint64 {
raw, err := os.ReadFile("/proc/uptime")
if err != nil {
return 0
}
parts := strings.Fields(string(raw))
if len(parts) == 0 {
return 0
}
f, _ := strconv.ParseFloat(parts[0], 64)
return uint64(f)
}
// readDiskRoot uses syscall.Statfs on "/" — we keep it inline to
// avoid pulling in another package.
func readDiskRoot() DiskInfo {
var st diskStat
if err := statfs("/", &st); err != nil {
return DiskInfo{Path: "/"}
}
total := st.blockSize * st.blocks
free := st.blockSize * st.bavail
used := total - free
var pct float64
if total > 0 {
pct = float64(used) / float64(total) * 100
}
return DiskInfo{
Path: "/",
Total: total,
Free: free,
Used: used,
UsedPercent: pct,
}
}
// FormatBytes is a small helper for human-readable Status output.
func FormatBytes(b uint64) string {
switch {
case b >= 1<<40:
return fmt.Sprintf("%.2fTiB", float64(b)/(1<<40))
case b >= 1<<30:
return fmt.Sprintf("%.2fGiB", float64(b)/(1<<30))
case b >= 1<<20:
return fmt.Sprintf("%.2fMiB", float64(b)/(1<<20))
case b >= 1<<10:
return fmt.Sprintf("%.2fKiB", float64(b)/(1<<10))
default:
return fmt.Sprintf("%dB", b)
}
}