// Package telemetry collects host + Plexum-agent metrics for the // HarborForge Monitor. Snapshot is read on demand (Monitor bridge // queries) or pushed (Calendar heartbeat), so the collector keeps no // background goroutine — every call re-reads /proc, sm.State, etc. // // Cross-platform note: Linux is the only platform Plexum t3-class // deployments run on; we read /proc/* directly rather than pull in a // dependency. package telemetry import ( "bufio" "fmt" "os" "runtime" "strconv" "strings" "time" ) // Snapshot is the JSON payload the Monitor bridge serves + the // Calendar heartbeat embeds. Field names mirror what // HarborForge.OpenclawPlugin emits so the backend doesn't need // per-plugin parsers. type Snapshot struct { Identifier string `json:"identifier"` Platform string `json:"platform"` Hostname string `json:"hostname"` UptimeSecs uint64 `json:"uptime"` Memory MemoryInfo `json:"memory"` Swap SwapInfo `json:"swap"` Load LoadInfo `json:"load"` Disk DiskInfo `json:"disk"` CPU CPUInfo `json:"cpu"` Agents []AgentInfo `json:"agents"` PluginInfo PluginInfo `json:"plugin"` CapturedAt time.Time `json:"captured_at"` HostMetadata map[string]string `json:"host_metadata,omitempty"` } // SwapInfo is the system swap usage. Zeroes when swap isn't configured. type SwapInfo struct { Total uint64 `json:"total"` Free uint64 `json:"free"` Used uint64 `json:"used"` UsedPercent float64 `json:"used_percent"` } // CPUInfo holds the most recent CPU usage estimate. UsedPercent is // computed across one sample interval (see Collect's cpu helper). type CPUInfo struct { UsedPercent float64 `json:"used_percent"` } // MemoryInfo mirrors OpenclawPlugin's memory shape. type MemoryInfo struct { Total uint64 `json:"total"` // bytes Free uint64 `json:"free"` // bytes Used uint64 `json:"used"` // bytes UsedPercent float64 `json:"used_percent"` // 0–100 } // LoadInfo is Linux loadavg as a flat triple. type LoadInfo struct { One float64 `json:"one"` Five float64 `json:"five"` Fifteen float64 `json:"fifteen"` } // DiskInfo for the root filesystem. type DiskInfo struct { Path string `json:"path"` Total uint64 `json:"total"` Free uint64 `json:"free"` Used uint64 `json:"used"` UsedPercent float64 `json:"used_percent"` } // AgentInfo summarises one Plexum agent for the dashboard. Heavy // mirror of HF's expected schema — state field maps Plexum's // idle/working/busy/offline directly. type AgentInfo struct { ID string `json:"id"` Model string `json:"model"` State string `json:"state"` } // PluginInfo identifies this plugin to the dashboard so the operator // can see what's reporting telemetry. type PluginInfo struct { Name string `json:"name"` Version string `json:"version"` Backend string `json:"backend"` // "plexum" } // CollectOpts wires the collector to host-side state. Hostname / // Identifier come from the resolved config. type CollectOpts struct { Identifier string Version string AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents) // SampleCPU asks Collect to take a 1-second CPU sample. Off-path // (status endpoint, bridge serve) leave false to keep calls cheap; // the slow push loop sets it true. SampleCPU bool } // Collect produces a fresh snapshot from /proc + the supplied AgentLister. // SampleCPU=true takes a 1-second CPU sample (two reads of /proc/stat // with a sleep between); otherwise CPU usage stays zero. Set true on // the slow push loop, false on the cheap on-demand status endpoint. func Collect(opts CollectOpts) Snapshot { now := time.Now().UTC() host, _ := os.Hostname() mem, swap := readMemAndSwap() load := readLoadAvg() disk := readDiskRoot() cpu := CPUInfo{} if opts.SampleCPU { cpu.UsedPercent = sampleCPUPercent(time.Second) } var agents []AgentInfo if opts.AgentLister != nil { agents = opts.AgentLister() } return Snapshot{ Identifier: opts.Identifier, Platform: runtime.GOOS, Hostname: host, UptimeSecs: readUptime(), Memory: mem, Swap: swap, Load: load, Disk: disk, CPU: cpu, Agents: agents, PluginInfo: PluginInfo{ Name: "harbor-forge", Version: opts.Version, Backend: "plexum", }, CapturedAt: now, } } // ---- /proc helpers ---- func readMemAndSwap() (MemoryInfo, SwapInfo) { f, err := os.Open("/proc/meminfo") if err != nil { return MemoryInfo{}, SwapInfo{} } defer f.Close() fields := map[string]uint64{} sc := bufio.NewScanner(f) for sc.Scan() { line := sc.Text() i := strings.IndexByte(line, ':') if i < 0 { continue } key := strings.TrimSpace(line[:i]) rest := strings.TrimSpace(line[i+1:]) // rest format: "1234 kB" parts := strings.Fields(rest) if len(parts) == 0 { continue } v, err := strconv.ParseUint(parts[0], 10, 64) if err != nil { continue } // All MemInfo values are in KB; convert to bytes. fields[key] = v * 1024 } mem := buildMemInfo(fields) swap := buildSwapInfo(fields) return mem, swap } func buildMemInfo(fields map[string]uint64) MemoryInfo { total := fields["MemTotal"] free := fields["MemAvailable"] if free == 0 { free = fields["MemFree"] + fields["Buffers"] + fields["Cached"] } used := total - free var pct float64 if total > 0 { pct = float64(used) / float64(total) * 100 } return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct} } func buildSwapInfo(fields map[string]uint64) SwapInfo { total := fields["SwapTotal"] free := fields["SwapFree"] if total == 0 { return SwapInfo{} } used := total - free pct := float64(used) / float64(total) * 100 return SwapInfo{Total: total, Free: free, Used: used, UsedPercent: pct} } // sampleCPUPercent computes overall CPU usage across one sample // interval. Two reads of /proc/stat's aggregate "cpu" line, derive // busy-time delta as (1 - idle/total). Returns 0 on read failure. func sampleCPUPercent(interval time.Duration) float64 { total1, idle1, ok := readCPUStat() if !ok { return 0 } time.Sleep(interval) total2, idle2, ok := readCPUStat() if !ok || total2 <= total1 { return 0 } totalDelta := total2 - total1 idleDelta := idle2 - idle1 if idleDelta > totalDelta { return 0 } return float64(totalDelta-idleDelta) / float64(totalDelta) * 100 } func readCPUStat() (total, idle uint64, ok bool) { f, err := os.Open("/proc/stat") if err != nil { return 0, 0, false } defer f.Close() sc := bufio.NewScanner(f) if !sc.Scan() { return 0, 0, false } parts := strings.Fields(sc.Text()) if len(parts) < 5 || parts[0] != "cpu" { return 0, 0, false } for i := 1; i < len(parts); i++ { v, err := strconv.ParseUint(parts[i], 10, 64) if err != nil { return 0, 0, false } total += v // idle is the 4th column (parts[4]); iowait (parts[5]) is also // idle-ish but we count it as busy to match gopsutil's default. if i == 4 { idle = v } } return total, idle, true } func readLoadAvg() LoadInfo { raw, err := os.ReadFile("/proc/loadavg") if err != nil { return LoadInfo{} } parts := strings.Fields(string(raw)) if len(parts) < 3 { return LoadInfo{} } one, _ := strconv.ParseFloat(parts[0], 64) five, _ := strconv.ParseFloat(parts[1], 64) fifteen, _ := strconv.ParseFloat(parts[2], 64) return LoadInfo{One: one, Five: five, Fifteen: fifteen} } func readUptime() uint64 { raw, err := os.ReadFile("/proc/uptime") if err != nil { return 0 } parts := strings.Fields(string(raw)) if len(parts) == 0 { return 0 } f, _ := strconv.ParseFloat(parts[0], 64) return uint64(f) } // readDiskRoot uses syscall.Statfs on "/" — we keep it inline to // avoid pulling in another package. func readDiskRoot() DiskInfo { var st diskStat if err := statfs("/", &st); err != nil { return DiskInfo{Path: "/"} } total := st.blockSize * st.blocks free := st.blockSize * st.bavail used := total - free var pct float64 if total > 0 { pct = float64(used) / float64(total) * 100 } return DiskInfo{ Path: "/", Total: total, Free: free, Used: used, UsedPercent: pct, } } // FormatBytes is a small helper for human-readable Status output. func FormatBytes(b uint64) string { switch { case b >= 1<<40: return fmt.Sprintf("%.2fTiB", float64(b)/(1<<40)) case b >= 1<<30: return fmt.Sprintf("%.2fGiB", float64(b)/(1<<30)) case b >= 1<<20: return fmt.Sprintf("%.2fMiB", float64(b)/(1<<20)) case b >= 1<<10: return fmt.Sprintf("%.2fKiB", float64(b)/(1<<10)) default: return fmt.Sprintf("%dB", b) } }