First successful push emits an info-level "monitor push started" so
operators can confirm the loop wired up correctly. Subsequent
successes log every 60 cycles ("monitor push heartbeat") so the
journal stays quiet but still proves the loop isn't dead. Errors
already log at warn; this fills the success-side gap so a silent
journal can't hide a "no successes, no errors" pathology.
239 lines
6.9 KiB
Go
239 lines
6.9 KiB
Go
// Pusher periodically uploads system telemetry to the HarborForge
|
|
// backend's /monitor/server/heartbeat endpoint. Replaces the standalone
|
|
// `harborforge-monitor` daemon — the plugin's lifecycle (host gateway
|
|
// start/stop) bounds the heartbeat loop, so no separate process need
|
|
// supervise it.
|
|
//
|
|
// Wire shape mirrors HarborForge.Monitor's `telemetry.Payload`
|
|
// (flat `cpu_pct/mem_pct/...` fields + `X-API-Key` header). The
|
|
// translation from internal `telemetry.Snapshot` to that shape lives
|
|
// in buildPayload; HF backend stays unchanged.
|
|
|
|
package monitor
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
|
|
)
|
|
|
|
// PushPayload is the wire shape POSTed to /monitor/server/heartbeat —
|
|
// 1:1 with HarborForge.Monitor's `telemetry.Payload`.
|
|
type PushPayload struct {
|
|
Identifier string `json:"identifier"`
|
|
PluginVersion string `json:"plugin_version,omitempty"`
|
|
Agents []any `json:"agents"`
|
|
NginxInstalled bool `json:"nginx_installed"`
|
|
NginxSites []string `json:"nginx_sites"`
|
|
CPUPct float64 `json:"cpu_pct,omitempty"`
|
|
MemPct float64 `json:"mem_pct,omitempty"`
|
|
DiskPct float64 `json:"disk_pct,omitempty"`
|
|
SwapPct float64 `json:"swap_pct,omitempty"`
|
|
LoadAvg []float64 `json:"load_avg,omitempty"`
|
|
UptimeSeconds uint64 `json:"uptime_seconds,omitempty"`
|
|
}
|
|
|
|
// PusherConfig is the operator-supplied tuning.
|
|
type PusherConfig struct {
|
|
BackendURL string // e.g. https://hf-api.hangman-lab.top
|
|
APIKey string // sent as X-API-Key
|
|
Interval time.Duration // default 30s when <=0
|
|
}
|
|
|
|
// Pusher runs the periodic POST loop. One per plugin process.
|
|
type Pusher struct {
|
|
cfg PusherConfig
|
|
collect func() telemetry.Snapshot
|
|
log LogFunc
|
|
http *http.Client
|
|
|
|
// stats — for the monitor_telemetry tool / status surfacing.
|
|
mu sync.RWMutex
|
|
lastSentAt time.Time
|
|
lastStatus int
|
|
lastErr string
|
|
successHits uint64
|
|
errHits uint64
|
|
}
|
|
|
|
// NewPusher constructs the loop runner. collect must be a snapshot
|
|
// producer (caller usually wires it to telemetry.Collect with
|
|
// SampleCPU=true).
|
|
func NewPusher(cfg PusherConfig, collect func() telemetry.Snapshot, log LogFunc) *Pusher {
|
|
if cfg.Interval <= 0 {
|
|
cfg.Interval = 30 * time.Second
|
|
}
|
|
if log == nil {
|
|
log = func(string, string, map[string]any) {}
|
|
}
|
|
return &Pusher{
|
|
cfg: cfg,
|
|
collect: collect,
|
|
log: log,
|
|
http: &http.Client{Timeout: 15 * time.Second},
|
|
}
|
|
}
|
|
|
|
// Run drives the push loop until ctx is cancelled. Returns ctx.Err().
|
|
// First push happens immediately so the backend sees this claw alive
|
|
// without waiting an interval.
|
|
func (p *Pusher) Run(ctx context.Context) error {
|
|
if p.cfg.BackendURL == "" {
|
|
p.log("warn", "monitor push disabled (empty backendURL)", nil)
|
|
return nil
|
|
}
|
|
if p.cfg.APIKey == "" {
|
|
p.log("warn", "monitor push disabled (empty apiKey)", nil)
|
|
return nil
|
|
}
|
|
url := strings.TrimRight(p.cfg.BackendURL, "/") + "/monitor/server/heartbeat"
|
|
|
|
tick := time.NewTicker(p.cfg.Interval)
|
|
defer tick.Stop()
|
|
|
|
p.pushOnce(ctx, url)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-tick.C:
|
|
p.pushOnce(ctx, url)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Pusher) pushOnce(ctx context.Context, url string) {
|
|
snap := p.collect()
|
|
body, err := json.Marshal(buildPayload(snap))
|
|
if err != nil {
|
|
p.recordErr("marshal: " + err.Error())
|
|
return
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
p.recordErr("build req: " + err.Error())
|
|
return
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("X-API-Key", p.cfg.APIKey)
|
|
res, err := p.http.Do(req)
|
|
if err != nil {
|
|
p.recordErr("send: " + err.Error())
|
|
p.log("warn", "monitor push failed", map[string]any{"err": err.Error()})
|
|
return
|
|
}
|
|
defer res.Body.Close()
|
|
raw, _ := io.ReadAll(res.Body)
|
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
|
p.recordErr(fmt.Sprintf("%d: %s", res.StatusCode, truncate(raw, 200)))
|
|
p.log("warn", "monitor push non-2xx", map[string]any{
|
|
"status": res.StatusCode, "body": truncate(raw, 200),
|
|
})
|
|
return
|
|
}
|
|
p.recordOK(res.StatusCode)
|
|
}
|
|
|
|
// Stats exposes a copy of the latest push state for diagnostics
|
|
// (harborforge_monitor_telemetry tool surfaces this).
|
|
type PushStats struct {
|
|
LastSentAt time.Time
|
|
LastStatus int
|
|
LastErr string
|
|
SuccessCount uint64
|
|
ErrorCount uint64
|
|
}
|
|
|
|
func (p *Pusher) Stats() PushStats {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
return PushStats{
|
|
LastSentAt: p.lastSentAt,
|
|
LastStatus: p.lastStatus,
|
|
LastErr: p.lastErr,
|
|
SuccessCount: p.successHits,
|
|
ErrorCount: p.errHits,
|
|
}
|
|
}
|
|
|
|
func (p *Pusher) recordOK(status int) {
|
|
p.mu.Lock()
|
|
wasFirst := p.successHits == 0
|
|
p.lastSentAt = time.Now().UTC()
|
|
p.lastStatus = status
|
|
p.lastErr = ""
|
|
p.successHits++
|
|
count := p.successHits
|
|
p.mu.Unlock()
|
|
|
|
// First success is an operator signal that the push loop is live;
|
|
// log it loud so the journal carries proof. Subsequent successes
|
|
// log on a slow heartbeat (every 60 cycles) so the journal stays
|
|
// quiet but still proves the loop hasn't drifted into "0 successes
|
|
// but no errors either" territory.
|
|
if wasFirst {
|
|
p.log("info", "monitor push started", map[string]any{"status": status})
|
|
} else if count%60 == 0 {
|
|
p.log("info", "monitor push heartbeat", map[string]any{
|
|
"successes": count, "status": status,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (p *Pusher) recordErr(msg string) {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
p.lastSentAt = time.Now().UTC()
|
|
p.lastErr = msg
|
|
p.errHits++
|
|
}
|
|
|
|
// buildPayload translates the internal Snapshot into the flat
|
|
// PushPayload shape the backend expects. agents is passed through as
|
|
// []any (one entry per agent — id/model/state preserved).
|
|
func buildPayload(snap telemetry.Snapshot) PushPayload {
|
|
agents := make([]any, 0, len(snap.Agents))
|
|
for _, a := range snap.Agents {
|
|
agents = append(agents, map[string]any{
|
|
"id": a.ID,
|
|
"model": a.Model,
|
|
"state": a.State,
|
|
})
|
|
}
|
|
return PushPayload{
|
|
Identifier: snap.Identifier,
|
|
PluginVersion: snap.PluginInfo.Version,
|
|
Agents: agents,
|
|
// nginx detection is independent monitor's responsibility today;
|
|
// HF plugin leaves it blank rather than rediscovering nginx
|
|
// state. Operators that need it can keep the standalone monitor
|
|
// alongside or wait for a follow-up commit.
|
|
NginxInstalled: false,
|
|
NginxSites: []string{},
|
|
CPUPct: round1(snap.CPU.UsedPercent),
|
|
MemPct: round1(snap.Memory.UsedPercent),
|
|
DiskPct: round1(snap.Disk.UsedPercent),
|
|
SwapPct: round1(snap.Swap.UsedPercent),
|
|
LoadAvg: []float64{round2(snap.Load.One), round2(snap.Load.Five), round2(snap.Load.Fifteen)},
|
|
UptimeSeconds: snap.UptimeSecs,
|
|
}
|
|
}
|
|
|
|
func round1(v float64) float64 { return float64(int64(v*10+0.5)) / 10 }
|
|
func round2(v float64) float64 { return float64(int64(v*100+0.5)) / 100 }
|
|
|
|
func truncate(b []byte, n int) string {
|
|
if len(b) <= n {
|
|
return string(b)
|
|
}
|
|
return string(b[:n]) + "…"
|
|
}
|