Files
HarborForge.PlexumPlugin/internal/monitor/pusher.go
hzhang 6e3ad669f8 feat(monitor): active push loop replacing standalone monitor
Adds a periodic POST loop to <backend>/monitor/server/heartbeat so
HF plugin can take over the standalone harborforge-monitor daemon's
job — same X-API-Key header, same flat telemetry shape (cpu_pct /
mem_pct / disk_pct / swap_pct / load_avg / uptime_seconds /
plugin_version / agents[]). HF backend stays unchanged.

Config: monitor_push_enabled (default false; opt-in to avoid surprise
heartbeats from existing deployments), monitor_push_interval_seconds
(default 30), reuses apiKey for the X-API-Key header. Lift the
container's HF_MONITER_API_KEY into config.apiKey, flip
monitor_push_enabled true, then docker rm -f the container — DB
last_seen_at keeps advancing under the plugin's loop.

Collector grew swap + cpu sampling (two reads of /proc/stat over a
1-second window when SampleCPU=true). Bridge endpoint stays cheap
(SampleCPU=false on demand); push loop is the only caller paying the
sampling cost.

E2E in sim: monitor_push_enabled=true + apiKey from injected
MonitoredServer row → server_states.last_seen_at advances exactly
every interval_seconds (10s configured, 10s observed). cpu/mem/disk/
swap_pct all populate correctly.
2026-06-03 13:04:51 +01:00

224 lines
6.4 KiB
Go

// Pusher periodically uploads system telemetry to the HarborForge
// backend's /monitor/server/heartbeat endpoint. Replaces the standalone
// `harborforge-monitor` daemon — the plugin's lifecycle (host gateway
// start/stop) bounds the heartbeat loop, so no separate process need
// supervise it.
//
// Wire shape mirrors HarborForge.Monitor's `telemetry.Payload`
// (flat `cpu_pct/mem_pct/...` fields + `X-API-Key` header). The
// translation from internal `telemetry.Snapshot` to that shape lives
// in buildPayload; HF backend stays unchanged.
package monitor
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
)
// PushPayload is the wire shape POSTed to /monitor/server/heartbeat —
// 1:1 with HarborForge.Monitor's `telemetry.Payload`.
type PushPayload struct {
Identifier string `json:"identifier"`
PluginVersion string `json:"plugin_version,omitempty"`
Agents []any `json:"agents"`
NginxInstalled bool `json:"nginx_installed"`
NginxSites []string `json:"nginx_sites"`
CPUPct float64 `json:"cpu_pct,omitempty"`
MemPct float64 `json:"mem_pct,omitempty"`
DiskPct float64 `json:"disk_pct,omitempty"`
SwapPct float64 `json:"swap_pct,omitempty"`
LoadAvg []float64 `json:"load_avg,omitempty"`
UptimeSeconds uint64 `json:"uptime_seconds,omitempty"`
}
// PusherConfig is the operator-supplied tuning.
type PusherConfig struct {
BackendURL string // e.g. https://hf-api.hangman-lab.top
APIKey string // sent as X-API-Key
Interval time.Duration // default 30s when <=0
}
// Pusher runs the periodic POST loop. One per plugin process.
type Pusher struct {
cfg PusherConfig
collect func() telemetry.Snapshot
log LogFunc
http *http.Client
// stats — for the monitor_telemetry tool / status surfacing.
mu sync.RWMutex
lastSentAt time.Time
lastStatus int
lastErr string
successHits uint64
errHits uint64
}
// NewPusher constructs the loop runner. collect must be a snapshot
// producer (caller usually wires it to telemetry.Collect with
// SampleCPU=true).
func NewPusher(cfg PusherConfig, collect func() telemetry.Snapshot, log LogFunc) *Pusher {
if cfg.Interval <= 0 {
cfg.Interval = 30 * time.Second
}
if log == nil {
log = func(string, string, map[string]any) {}
}
return &Pusher{
cfg: cfg,
collect: collect,
log: log,
http: &http.Client{Timeout: 15 * time.Second},
}
}
// Run drives the push loop until ctx is cancelled. Returns ctx.Err().
// First push happens immediately so the backend sees this claw alive
// without waiting an interval.
func (p *Pusher) Run(ctx context.Context) error {
if p.cfg.BackendURL == "" {
p.log("warn", "monitor push disabled (empty backendURL)", nil)
return nil
}
if p.cfg.APIKey == "" {
p.log("warn", "monitor push disabled (empty apiKey)", nil)
return nil
}
url := strings.TrimRight(p.cfg.BackendURL, "/") + "/monitor/server/heartbeat"
tick := time.NewTicker(p.cfg.Interval)
defer tick.Stop()
p.pushOnce(ctx, url)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
p.pushOnce(ctx, url)
}
}
}
func (p *Pusher) pushOnce(ctx context.Context, url string) {
snap := p.collect()
body, err := json.Marshal(buildPayload(snap))
if err != nil {
p.recordErr("marshal: " + err.Error())
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
p.recordErr("build req: " + err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-API-Key", p.cfg.APIKey)
res, err := p.http.Do(req)
if err != nil {
p.recordErr("send: " + err.Error())
p.log("warn", "monitor push failed", map[string]any{"err": err.Error()})
return
}
defer res.Body.Close()
raw, _ := io.ReadAll(res.Body)
if res.StatusCode < 200 || res.StatusCode >= 300 {
p.recordErr(fmt.Sprintf("%d: %s", res.StatusCode, truncate(raw, 200)))
p.log("warn", "monitor push non-2xx", map[string]any{
"status": res.StatusCode, "body": truncate(raw, 200),
})
return
}
p.recordOK(res.StatusCode)
}
// Stats exposes a copy of the latest push state for diagnostics
// (harborforge_monitor_telemetry tool surfaces this).
type PushStats struct {
LastSentAt time.Time
LastStatus int
LastErr string
SuccessCount uint64
ErrorCount uint64
}
func (p *Pusher) Stats() PushStats {
p.mu.RLock()
defer p.mu.RUnlock()
return PushStats{
LastSentAt: p.lastSentAt,
LastStatus: p.lastStatus,
LastErr: p.lastErr,
SuccessCount: p.successHits,
ErrorCount: p.errHits,
}
}
func (p *Pusher) recordOK(status int) {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSentAt = time.Now().UTC()
p.lastStatus = status
p.lastErr = ""
p.successHits++
}
func (p *Pusher) recordErr(msg string) {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSentAt = time.Now().UTC()
p.lastErr = msg
p.errHits++
}
// buildPayload translates the internal Snapshot into the flat
// PushPayload shape the backend expects. agents is passed through as
// []any (one entry per agent — id/model/state preserved).
func buildPayload(snap telemetry.Snapshot) PushPayload {
agents := make([]any, 0, len(snap.Agents))
for _, a := range snap.Agents {
agents = append(agents, map[string]any{
"id": a.ID,
"model": a.Model,
"state": a.State,
})
}
return PushPayload{
Identifier: snap.Identifier,
PluginVersion: snap.PluginInfo.Version,
Agents: agents,
// nginx detection is independent monitor's responsibility today;
// HF plugin leaves it blank rather than rediscovering nginx
// state. Operators that need it can keep the standalone monitor
// alongside or wait for a follow-up commit.
NginxInstalled: false,
NginxSites: []string{},
CPUPct: round1(snap.CPU.UsedPercent),
MemPct: round1(snap.Memory.UsedPercent),
DiskPct: round1(snap.Disk.UsedPercent),
SwapPct: round1(snap.Swap.UsedPercent),
LoadAvg: []float64{round2(snap.Load.One), round2(snap.Load.Five), round2(snap.Load.Fifteen)},
UptimeSeconds: snap.UptimeSecs,
}
}
func round1(v float64) float64 { return float64(int64(v*10+0.5)) / 10 }
func round2(v float64) float64 { return float64(int64(v*100+0.5)) / 100 }
func truncate(b []byte, n int) string {
if len(b) <= n {
return string(b)
}
return string(b[:n]) + "…"
}