Compare commits

...

4 Commits

Author SHA1 Message Date
046a7753e6 log monitor push start + slow heartbeat
First successful push emits an info-level "monitor push started" so
operators can confirm the loop wired up correctly. Subsequent
successes log every 60 cycles ("monitor push heartbeat") so the
journal stays quiet but still proves the loop isn't dead. Errors
already log at warn; this fills the success-side gap so a silent
journal can't hide a "no successes, no errors" pathology.
2026-06-03 13:13:33 +01:00
6e3ad669f8 feat(monitor): active push loop replacing standalone monitor
Adds a periodic POST loop to <backend>/monitor/server/heartbeat so
HF plugin can take over the standalone harborforge-monitor daemon's
job — same X-API-Key header, same flat telemetry shape (cpu_pct /
mem_pct / disk_pct / swap_pct / load_avg / uptime_seconds /
plugin_version / agents[]). HF backend stays unchanged.

Config: monitor_push_enabled (default false; opt-in to avoid surprise
heartbeats from existing deployments), monitor_push_interval_seconds
(default 30), reuses apiKey for the X-API-Key header. Lift the
container's HF_MONITER_API_KEY into config.apiKey, flip
monitor_push_enabled true, then docker rm -f the container — DB
last_seen_at keeps advancing under the plugin's loop.

Collector grew swap + cpu sampling (two reads of /proc/stat over a
1-second window when SampleCPU=true). Bridge endpoint stays cheap
(SampleCPU=false on demand); push loop is the only caller paying the
sampling cost.

E2E in sim: monitor_push_enabled=true + apiKey from injected
MonitoredServer row → server_states.last_seen_at advances exactly
every interval_seconds (10s configured, 10s observed). cpu/mem/disk/
swap_pct all populate correctly.
2026-06-03 13:04:51 +01:00
472cecd771 feat: read agent_id from ctx (SDK now plumbs it)
Plexum-sdk-go now propagates the caller agent id via
`_meta.agent_id` on tools/call. AgentIDFromCtx prefers
plugin.AgentIDFromContext(ctx); falls back to the
single-active-calendar-slot heuristic for host-driven dispatch
paths (channel manager, CLI plugin-call) that lack ctx.

Drops bestEffortAgentID — the inline closure does the same thing
without the dead-Slot-iterate noise.
2026-06-03 12:54:57 +01:00
bc1ab7b6ea fix: snake_case SlotStatus + scheduler debug logs
Two issues found while end-to-end testing against a running
harborforge-backend:

  - SlotStatus enum values: backend stores snake_case
    ("not_started" / "ongoing" / …), not the camelCase the
    OpenClaw plugin's TypeScript types.ts misled the initial
    drop into using. Heartbeat responses came back with
    Slot.Status="not_started" which the scheduler never matched
    against SlotStatus("NotStarted"), so dispatchSlot never
    fired. Aligned with backend's actual enum string values
    (verified via heartbeat response shape).

  - Added info-level logs at slot selection + dispatchSlot
    entry + WakeAgent fire/result so operators can see the
    plugin's decision chain in production without enabling
    debug. Cheap (~one tick per agent per heartbeat interval).

E2E in sim: backend returns slots=1 → selection chosen=true →
dispatch enter → WakeAgent enqueued ok → backend slot ongoing
→ next heartbeat returns slots=0.
2026-06-03 11:42:18 +01:00
9 changed files with 499 additions and 68 deletions

View File

@@ -18,9 +18,19 @@ submodule of the HarborForge umbrella repo.
## What it does ## What it does
- **Monitor bridge** — HTTP server on `127.0.0.1:<monitor_port>` that - **Monitor push loop** — when `monitor_push_enabled: true`, posts a
responds to `/telemetry` with a Snapshot the HarborForge.Monitor flat telemetry payload (cpu/mem/disk/swap/load + per-agent state)
binary expects (system metrics + every Plexum agent's sm-state) to `<backendUrl>/monitor/server/heartbeat` every
`monitor_push_interval_seconds`. This replaces the standalone
`harborforge-monitor` daemon — the plugin's lifecycle (gateway
start/stop) bounds the loop, so a separate supervisor isn't needed.
Use the same `apiKey` value the standalone monitor's
`HF_MONITER_API_KEY` carried.
- **Monitor bridge** (optional) — HTTP server on
`127.0.0.1:<monitor_port>` that responds to `/telemetry` with a
Snapshot. Useful when the standalone monitor is still present and
you want it to enrich its push payload from the plugin's view of
agents. Disable by setting `monitor_port: 0`.
- **Calendar scheduler** — heartbeats `<backendUrl>/calendar/agent/ - **Calendar scheduler** — heartbeats `<backendUrl>/calendar/agent/
heartbeat` every interval, receives any TimeSlots due to fire, and heartbeat` every interval, receives any TimeSlots due to fire, and
dispatches them through `HostAPI.WakeAgent` (state-aware queue dispatches them through `HostAPI.WakeAgent` (state-aware queue
@@ -64,15 +74,24 @@ And configure at `~/.plexum/plugins/harbor-forge/config.json`:
```json ```json
{ {
"backendUrl": "https://monitor.hangman-lab.top", "backendUrl": "https://hf-api.hangman-lab.top",
"identifier": "server-t3", "identifier": "server-t3",
"apiKey": "g1_xxx", "apiKey": "g1_xxx",
"monitor_port": 9100, "monitor_push_enabled": true,
"monitor_push_interval_seconds": 30,
"monitor_port": 0,
"calendar_enabled": true, "calendar_enabled": true,
"calendar_heartbeat_interval_seconds": 30 "calendar_heartbeat_interval_seconds": 30
} }
``` ```
Replacing the standalone `harborforge-monitor` container: lift the
container's `HF_MONITER_API_KEY` into `apiKey`, set
`monitor_push_enabled: true`, then `docker rm -f harborforge-monitor`
once you've confirmed the plugin's pushes are landing (the backend's
`server_states.last_seen_at` should keep advancing without the
container running).
Restart the host (`systemctl --user restart plexum`) and verify: Restart the host (`systemctl --user restart plexum`) and verify:
```bash ```bash

View File

@@ -41,6 +41,7 @@ type harborForgePlugin struct {
host sdkplugin.HostAPI host sdkplugin.HostAPI
cfg hfcfg.Resolved cfg hfcfg.Resolved
bridge *monitor.Bridge bridge *monitor.Bridge
pusher *monitor.Pusher
sched *calendar.Scheduler sched *calendar.Scheduler
deps tools.Deps deps tools.Deps
cancelBg context.CancelFunc cancelBg context.CancelFunc
@@ -70,6 +71,7 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
"backend": p.cfg.BackendURL, "backend": p.cfg.BackendURL,
"identifier": p.cfg.Identifier, "identifier": p.cfg.Identifier,
"monitor_port": p.cfg.MonitorPort, "monitor_port": p.cfg.MonitorPort,
"monitor_push_enabled": p.cfg.MonitorPushEnabled,
"calendar_enabled": p.cfg.CalendarEnabled, "calendar_enabled": p.cfg.CalendarEnabled,
}) })
@@ -79,15 +81,22 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
// Listers + collectors capture bgCtx (not Init ctx) — Init returns // Listers + collectors capture bgCtx (not Init ctx) — Init returns
// once MCP initialize completes, but the plugin process lives on // once MCP initialize completes, but the plugin process lives on
// and so do the goroutines + closures we registered. // and so do the goroutines + closures we registered.
collect := func() telemetry.Snapshot { makeCollector := func(sampleCPU bool) func() telemetry.Snapshot {
return func() telemetry.Snapshot {
return telemetry.Collect(telemetry.CollectOpts{ return telemetry.Collect(telemetry.CollectOpts{
Identifier: p.cfg.Identifier, Identifier: p.cfg.Identifier,
Version: Version, Version: Version,
SampleCPU: sampleCPU,
AgentLister: func() []telemetry.AgentInfo { AgentLister: func() []telemetry.AgentInfo {
return p.listAgents(bgCtx, profileRoot) return p.listAgents(bgCtx, profileRoot)
}, },
}) })
} }
}
// Bridge serves on-demand reads; cheap, no CPU sampling.
collect := makeCollector(false)
// Pusher runs the slow push loop; CPU sampling fine here.
collectForPush := makeCollector(true)
p.bridge = monitor.New(p.cfg.MonitorPort, collect, p.bridge = monitor.New(p.cfg.MonitorPort, collect,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) }) func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
@@ -96,6 +105,25 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()}) host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
} }
// Active push loop — replaces the standalone harborforge-monitor
// container. Off by default; operator opts in via
// monitor_push_enabled + apiKey.
p.pusher = monitor.NewPusher(monitor.PusherConfig{
BackendURL: p.cfg.BackendURL,
APIKey: p.cfg.APIKey,
Interval: time.Duration(p.cfg.MonitorPushIntervalSeconds) * time.Second,
}, collectForPush,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if p.cfg.MonitorPushEnabled {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.pusher.Run(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
host.Log("warn", "monitor pusher exited", map[string]any{"err": err.Error()})
}
}()
}
calBackend := p.cfg.CalendarBackendURL calBackend := p.cfg.CalendarBackendURL
if calBackend == "" { if calBackend == "" {
calBackend = p.cfg.BackendURL calBackend = p.cfg.BackendURL
@@ -128,18 +156,20 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
Version: Version, Version: Version,
Collect: collect, Collect: collect,
Bridge: p.bridge, Bridge: p.bridge,
Pusher: p.pusher,
Scheduler: p.sched, Scheduler: p.sched,
Host: host, Host: host,
AgentIDFromCtx: func(ctx context.Context) string { AgentIDFromCtx: func(ctx context.Context) string {
// Plexum stashes the calling agent id on the host-side // Host attaches the caller agent id via tools/call
// context (via WithAgent) before dispatching tool calls. // `_meta.agent_id`; SDK unpacks it into ctx.
// We can't directly import internal/agentloop from a if id := sdkplugin.AgentIDFromContext(ctx); id != "" {
// plugin, so we rely on PLEXUM_TOOL_AGENT_ID env-style return id
// (set per-call by host when we add that wiring) or fall }
// back to the only-active-agent heuristic. v1: prefer the // Fallback for host paths that don't carry an agent
// only-active wake-target (deterministic in single-agent // (channel-driven, CLI plugin-call). When a single
// HF deployments). // calendar slot is active we can deterministically
return p.bestEffortAgentID() // attribute the call to that slot's owner.
return p.sched.SingleActiveAgentID()
}, },
} }
@@ -209,28 +239,6 @@ func mapStateToCalendar(s string) calendar.AgentStatusValue {
return calendar.AgentStatusOffline return calendar.AgentStatusOffline
} }
// bestEffortAgentID is a v1 stop-gap for tools that need the calling
// agent's id but don't have it on the ctx (Plexum SDK doesn't yet
// expose this — TODO upstream). v1: if exactly one agent has an
// active calendar slot we return it; otherwise empty. The calendar
// tools (the only ones that need agent context) usually fire when
// exactly one slot is active.
func (p *harborForgePlugin) bestEffortAgentID() string {
sch := p.sched.Status()
if len(sch.Active) != 1 {
return ""
}
// We don't track AgentID on Slot directly — the scheduler keeps
// activeByAgentID. Iterate to find the one.
for _, a := range sch.Active {
// Slot is shared between agents only via the scheduler's maps;
// here we have just the Slot struct without owner.
_ = a
}
// Fallback to scheduler's helper:
return p.sched.SingleActiveAgentID()
}
func manifestFromDisk() sdkplugin.Manifest { func manifestFromDisk() sdkplugin.Manifest {
// Bundled manifest.json is the authoritative shape; the binary // Bundled manifest.json is the authoritative shape; the binary
// version reads it next to itself to avoid hand-syncing two // version reads it next to itself to avoid hand-syncing two

View File

@@ -158,6 +158,9 @@ func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now
chosen = slot chosen = slot
} }
} }
s.host.Log("info", "calendar slot selection", map[string]any{
"agent": agent.ID, "available": len(resp.Slots), "chosen": chosen != nil,
})
if chosen != nil { if chosen != nil {
s.dispatchSlot(ctx, agent.ID, *chosen) s.dispatchSlot(ctx, agent.ID, *chosen)
} }
@@ -173,14 +176,19 @@ func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now
// transition immediately. // transition immediately.
func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) { func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) {
ident := slot.SlotIdent() ident := slot.SlotIdent()
s.host.Log("info", "calendar dispatchSlot enter", map[string]any{
"agent": agentID, "slot_ident": ident,
})
s.mu.Lock() s.mu.Lock()
if _, dup := s.activeBySlotIdent[ident]; dup { if _, dup := s.activeBySlotIdent[ident]; dup {
s.mu.Unlock() s.mu.Unlock()
s.host.Log("info", "calendar dispatchSlot skipped (already active)", map[string]any{"slot": ident})
return return
} }
if _, agentBusy := s.activeByAgentID[agentID]; agentBusy { if _, agentBusy := s.activeByAgentID[agentID]; agentBusy {
// Don't pick up another slot until the current one resolves. // Don't pick up another slot until the current one resolves.
s.mu.Unlock() s.mu.Unlock()
s.host.Log("info", "calendar dispatchSlot skipped (agent has active slot)", map[string]any{"agent": agentID})
return return
} }
now := time.Now().UTC() now := time.Now().UTC()
@@ -191,12 +199,21 @@ func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot)
message := buildWakeMessage(slot) message := buildWakeMessage(slot)
source := "calendar:" + ident source := "calendar:" + ident
s.host.Log("info", "calendar firing WakeAgent", map[string]any{
"agent": agentID, "slot": ident, "source": source, "msg_len": len(message),
})
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{ if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
AgentID: agentID, Message: message, Source: source, AgentID: agentID, Message: message, Source: source,
}); err != nil { }); err != nil {
s.host.Log("warn", "calendar WakeAgent failed", map[string]any{
"agent": agentID, "err": err.Error(),
})
s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error()) s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error())
return return
} }
s.host.Log("info", "calendar WakeAgent enqueued ok", map[string]any{
"agent": agentID, "slot": ident,
})
// Mark Ongoing on the backend. // Mark Ongoing on the backend.
update := SlotAgentUpdate{ update := SlotAgentUpdate{
Status: SlotOngoing, StartedAt: now.Format("15:04:05"), Status: SlotOngoing, StartedAt: now.Format("15:04:05"),

View File

@@ -8,16 +8,18 @@ package calendar
import "time" import "time"
// SlotStatus enumerates the lifecycle. String values match backend's // SlotStatus enumerates the lifecycle. String values match backend's
// SlotStatus enum verbatim (camelCase as stored in DB). // SlotStatus enum verbatim (snake_case — verified via heartbeat
// response shape against running harborforge-backend).
type SlotStatus string type SlotStatus string
const ( const (
SlotNotStarted SlotStatus = "NotStarted" SlotNotStarted SlotStatus = "not_started"
SlotOngoing SlotStatus = "Ongoing" SlotOngoing SlotStatus = "ongoing"
SlotFinished SlotStatus = "Finished" SlotFinished SlotStatus = "finished"
SlotAborted SlotStatus = "Aborted" SlotAborted SlotStatus = "aborted"
SlotDeferred SlotStatus = "Deferred" SlotDeferred SlotStatus = "deferred"
SlotPaused SlotStatus = "Paused" SlotPaused SlotStatus = "paused"
SlotSkipped SlotStatus = "skipped"
) )
// SlotType: work vs on_call. Affects whether the agent flips to busy. // SlotType: work vs on_call. Affects whether the agent flips to busy.

View File

@@ -35,6 +35,17 @@ type Config struct {
// server listens on. Zero/missing disables the bridge entirely. // server listens on. Zero/missing disables the bridge entirely.
MonitorPort int `json:"monitor_port,omitempty"` MonitorPort int `json:"monitor_port,omitempty"`
// MonitorPushEnabled toggles the active push loop that uploads
// system telemetry to BackendURL /monitor/server/heartbeat. Lets
// HF plugin replace the standalone harborforge-monitor container.
// nil (unset) defaults to false; operators must opt in explicitly
// since they need to provision APIKey too.
MonitorPushEnabled *bool `json:"monitor_push_enabled,omitempty"`
// MonitorPushIntervalSeconds — defaults to 30s when ≤0. Mirrors
// the standalone monitor's HF_MONITER_REPORT_INTERVAL knob.
MonitorPushIntervalSeconds int `json:"monitor_push_interval_seconds,omitempty"`
// CalendarHeartbeatIntervalSeconds — defaults to 30s when ≤0. // CalendarHeartbeatIntervalSeconds — defaults to 30s when ≤0.
CalendarHeartbeatIntervalSeconds int `json:"calendar_heartbeat_interval_seconds,omitempty"` CalendarHeartbeatIntervalSeconds int `json:"calendar_heartbeat_interval_seconds,omitempty"`
@@ -57,6 +68,8 @@ type Resolved struct {
Identifier string Identifier string
APIKey string APIKey string
MonitorPort int MonitorPort int
MonitorPushEnabled bool
MonitorPushIntervalSeconds int
CalendarEnabled bool CalendarEnabled bool
CalendarHeartbeatIntervalSeconds int CalendarHeartbeatIntervalSeconds int
CalendarBackendURL string CalendarBackendURL string
@@ -104,6 +117,8 @@ func Resolve(c Config) Resolved {
Identifier: c.Identifier, Identifier: c.Identifier,
APIKey: c.APIKey, APIKey: c.APIKey,
MonitorPort: c.MonitorPort, MonitorPort: c.MonitorPort,
MonitorPushEnabled: false,
MonitorPushIntervalSeconds: 30,
CalendarEnabled: true, CalendarEnabled: true,
CalendarHeartbeatIntervalSeconds: 30, CalendarHeartbeatIntervalSeconds: 30,
CalendarBackendURL: c.CalendarBackendURL, CalendarBackendURL: c.CalendarBackendURL,
@@ -127,6 +142,12 @@ func Resolve(c Config) Resolved {
if c.CalendarHeartbeatIntervalSeconds > 0 { if c.CalendarHeartbeatIntervalSeconds > 0 {
out.CalendarHeartbeatIntervalSeconds = c.CalendarHeartbeatIntervalSeconds out.CalendarHeartbeatIntervalSeconds = c.CalendarHeartbeatIntervalSeconds
} }
if c.MonitorPushEnabled != nil {
out.MonitorPushEnabled = *c.MonitorPushEnabled
}
if c.MonitorPushIntervalSeconds > 0 {
out.MonitorPushIntervalSeconds = c.MonitorPushIntervalSeconds
}
if c.RestartPollIntervalSeconds > 0 { if c.RestartPollIntervalSeconds > 0 {
out.RestartPollIntervalSeconds = c.RestartPollIntervalSeconds out.RestartPollIntervalSeconds = c.RestartPollIntervalSeconds
} }

238
internal/monitor/pusher.go Normal file
View File

@@ -0,0 +1,238 @@
// Pusher periodically uploads system telemetry to the HarborForge
// backend's /monitor/server/heartbeat endpoint. Replaces the standalone
// `harborforge-monitor` daemon — the plugin's lifecycle (host gateway
// start/stop) bounds the heartbeat loop, so no separate process need
// supervise it.
//
// Wire shape mirrors HarborForge.Monitor's `telemetry.Payload`
// (flat `cpu_pct/mem_pct/...` fields + `X-API-Key` header). The
// translation from internal `telemetry.Snapshot` to that shape lives
// in buildPayload; HF backend stays unchanged.
package monitor
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
)
// PushPayload is the wire shape POSTed to /monitor/server/heartbeat —
// 1:1 with HarborForge.Monitor's `telemetry.Payload`.
type PushPayload struct {
Identifier string `json:"identifier"`
PluginVersion string `json:"plugin_version,omitempty"`
Agents []any `json:"agents"`
NginxInstalled bool `json:"nginx_installed"`
NginxSites []string `json:"nginx_sites"`
CPUPct float64 `json:"cpu_pct,omitempty"`
MemPct float64 `json:"mem_pct,omitempty"`
DiskPct float64 `json:"disk_pct,omitempty"`
SwapPct float64 `json:"swap_pct,omitempty"`
LoadAvg []float64 `json:"load_avg,omitempty"`
UptimeSeconds uint64 `json:"uptime_seconds,omitempty"`
}
// PusherConfig is the operator-supplied tuning.
type PusherConfig struct {
BackendURL string // e.g. https://hf-api.hangman-lab.top
APIKey string // sent as X-API-Key
Interval time.Duration // default 30s when <=0
}
// Pusher runs the periodic POST loop. One per plugin process.
type Pusher struct {
cfg PusherConfig
collect func() telemetry.Snapshot
log LogFunc
http *http.Client
// stats — for the monitor_telemetry tool / status surfacing.
mu sync.RWMutex
lastSentAt time.Time
lastStatus int
lastErr string
successHits uint64
errHits uint64
}
// NewPusher constructs the loop runner. collect must be a snapshot
// producer (caller usually wires it to telemetry.Collect with
// SampleCPU=true).
func NewPusher(cfg PusherConfig, collect func() telemetry.Snapshot, log LogFunc) *Pusher {
if cfg.Interval <= 0 {
cfg.Interval = 30 * time.Second
}
if log == nil {
log = func(string, string, map[string]any) {}
}
return &Pusher{
cfg: cfg,
collect: collect,
log: log,
http: &http.Client{Timeout: 15 * time.Second},
}
}
// Run drives the push loop until ctx is cancelled. Returns ctx.Err().
// First push happens immediately so the backend sees this claw alive
// without waiting an interval.
func (p *Pusher) Run(ctx context.Context) error {
if p.cfg.BackendURL == "" {
p.log("warn", "monitor push disabled (empty backendURL)", nil)
return nil
}
if p.cfg.APIKey == "" {
p.log("warn", "monitor push disabled (empty apiKey)", nil)
return nil
}
url := strings.TrimRight(p.cfg.BackendURL, "/") + "/monitor/server/heartbeat"
tick := time.NewTicker(p.cfg.Interval)
defer tick.Stop()
p.pushOnce(ctx, url)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
p.pushOnce(ctx, url)
}
}
}
func (p *Pusher) pushOnce(ctx context.Context, url string) {
snap := p.collect()
body, err := json.Marshal(buildPayload(snap))
if err != nil {
p.recordErr("marshal: " + err.Error())
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
p.recordErr("build req: " + err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-API-Key", p.cfg.APIKey)
res, err := p.http.Do(req)
if err != nil {
p.recordErr("send: " + err.Error())
p.log("warn", "monitor push failed", map[string]any{"err": err.Error()})
return
}
defer res.Body.Close()
raw, _ := io.ReadAll(res.Body)
if res.StatusCode < 200 || res.StatusCode >= 300 {
p.recordErr(fmt.Sprintf("%d: %s", res.StatusCode, truncate(raw, 200)))
p.log("warn", "monitor push non-2xx", map[string]any{
"status": res.StatusCode, "body": truncate(raw, 200),
})
return
}
p.recordOK(res.StatusCode)
}
// Stats exposes a copy of the latest push state for diagnostics
// (harborforge_monitor_telemetry tool surfaces this).
type PushStats struct {
LastSentAt time.Time
LastStatus int
LastErr string
SuccessCount uint64
ErrorCount uint64
}
func (p *Pusher) Stats() PushStats {
p.mu.RLock()
defer p.mu.RUnlock()
return PushStats{
LastSentAt: p.lastSentAt,
LastStatus: p.lastStatus,
LastErr: p.lastErr,
SuccessCount: p.successHits,
ErrorCount: p.errHits,
}
}
func (p *Pusher) recordOK(status int) {
p.mu.Lock()
wasFirst := p.successHits == 0
p.lastSentAt = time.Now().UTC()
p.lastStatus = status
p.lastErr = ""
p.successHits++
count := p.successHits
p.mu.Unlock()
// First success is an operator signal that the push loop is live;
// log it loud so the journal carries proof. Subsequent successes
// log on a slow heartbeat (every 60 cycles) so the journal stays
// quiet but still proves the loop hasn't drifted into "0 successes
// but no errors either" territory.
if wasFirst {
p.log("info", "monitor push started", map[string]any{"status": status})
} else if count%60 == 0 {
p.log("info", "monitor push heartbeat", map[string]any{
"successes": count, "status": status,
})
}
}
func (p *Pusher) recordErr(msg string) {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSentAt = time.Now().UTC()
p.lastErr = msg
p.errHits++
}
// buildPayload translates the internal Snapshot into the flat
// PushPayload shape the backend expects. agents is passed through as
// []any (one entry per agent — id/model/state preserved).
func buildPayload(snap telemetry.Snapshot) PushPayload {
agents := make([]any, 0, len(snap.Agents))
for _, a := range snap.Agents {
agents = append(agents, map[string]any{
"id": a.ID,
"model": a.Model,
"state": a.State,
})
}
return PushPayload{
Identifier: snap.Identifier,
PluginVersion: snap.PluginInfo.Version,
Agents: agents,
// nginx detection is independent monitor's responsibility today;
// HF plugin leaves it blank rather than rediscovering nginx
// state. Operators that need it can keep the standalone monitor
// alongside or wait for a follow-up commit.
NginxInstalled: false,
NginxSites: []string{},
CPUPct: round1(snap.CPU.UsedPercent),
MemPct: round1(snap.Memory.UsedPercent),
DiskPct: round1(snap.Disk.UsedPercent),
SwapPct: round1(snap.Swap.UsedPercent),
LoadAvg: []float64{round2(snap.Load.One), round2(snap.Load.Five), round2(snap.Load.Fifteen)},
UptimeSeconds: snap.UptimeSecs,
}
}
func round1(v float64) float64 { return float64(int64(v*10+0.5)) / 10 }
func round2(v float64) float64 { return float64(int64(v*100+0.5)) / 100 }
func truncate(b []byte, n int) string {
if len(b) <= n {
return string(b)
}
return string(b[:n]) + "…"
}

View File

@@ -29,14 +29,30 @@ type Snapshot struct {
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
UptimeSecs uint64 `json:"uptime"` UptimeSecs uint64 `json:"uptime"`
Memory MemoryInfo `json:"memory"` Memory MemoryInfo `json:"memory"`
Swap SwapInfo `json:"swap"`
Load LoadInfo `json:"load"` Load LoadInfo `json:"load"`
Disk DiskInfo `json:"disk"` Disk DiskInfo `json:"disk"`
CPU CPUInfo `json:"cpu"`
Agents []AgentInfo `json:"agents"` Agents []AgentInfo `json:"agents"`
PluginInfo PluginInfo `json:"plugin"` PluginInfo PluginInfo `json:"plugin"`
CapturedAt time.Time `json:"captured_at"` CapturedAt time.Time `json:"captured_at"`
HostMetadata map[string]string `json:"host_metadata,omitempty"` HostMetadata map[string]string `json:"host_metadata,omitempty"`
} }
// SwapInfo is the system swap usage. Zeroes when swap isn't configured.
type SwapInfo struct {
Total uint64 `json:"total"`
Free uint64 `json:"free"`
Used uint64 `json:"used"`
UsedPercent float64 `json:"used_percent"`
}
// CPUInfo holds the most recent CPU usage estimate. UsedPercent is
// computed across one sample interval (see Collect's cpu helper).
type CPUInfo struct {
UsedPercent float64 `json:"used_percent"`
}
// MemoryInfo mirrors OpenclawPlugin's memory shape. // MemoryInfo mirrors OpenclawPlugin's memory shape.
type MemoryInfo struct { type MemoryInfo struct {
Total uint64 `json:"total"` // bytes Total uint64 `json:"total"` // bytes
@@ -84,15 +100,27 @@ type CollectOpts struct {
Identifier string Identifier string
Version string Version string
AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents) AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents)
// SampleCPU asks Collect to take a 1-second CPU sample. Off-path
// (status endpoint, bridge serve) leave false to keep calls cheap;
// the slow push loop sets it true.
SampleCPU bool
} }
// Collect produces a fresh snapshot from /proc + the supplied AgentLister. // Collect produces a fresh snapshot from /proc + the supplied AgentLister.
// SampleCPU=true takes a 1-second CPU sample (two reads of /proc/stat
// with a sleep between); otherwise CPU usage stays zero. Set true on
// the slow push loop, false on the cheap on-demand status endpoint.
func Collect(opts CollectOpts) Snapshot { func Collect(opts CollectOpts) Snapshot {
now := time.Now().UTC() now := time.Now().UTC()
host, _ := os.Hostname() host, _ := os.Hostname()
mem := readMemInfo() mem, swap := readMemAndSwap()
load := readLoadAvg() load := readLoadAvg()
disk := readDiskRoot() disk := readDiskRoot()
cpu := CPUInfo{}
if opts.SampleCPU {
cpu.UsedPercent = sampleCPUPercent(time.Second)
}
var agents []AgentInfo var agents []AgentInfo
if opts.AgentLister != nil { if opts.AgentLister != nil {
agents = opts.AgentLister() agents = opts.AgentLister()
@@ -103,8 +131,10 @@ func Collect(opts CollectOpts) Snapshot {
Hostname: host, Hostname: host,
UptimeSecs: readUptime(), UptimeSecs: readUptime(),
Memory: mem, Memory: mem,
Swap: swap,
Load: load, Load: load,
Disk: disk, Disk: disk,
CPU: cpu,
Agents: agents, Agents: agents,
PluginInfo: PluginInfo{ PluginInfo: PluginInfo{
Name: "harbor-forge", Name: "harbor-forge",
@@ -117,10 +147,10 @@ func Collect(opts CollectOpts) Snapshot {
// ---- /proc helpers ---- // ---- /proc helpers ----
func readMemInfo() MemoryInfo { func readMemAndSwap() (MemoryInfo, SwapInfo) {
f, err := os.Open("/proc/meminfo") f, err := os.Open("/proc/meminfo")
if err != nil { if err != nil {
return MemoryInfo{} return MemoryInfo{}, SwapInfo{}
} }
defer f.Close() defer f.Close()
fields := map[string]uint64{} fields := map[string]uint64{}
@@ -145,6 +175,12 @@ func readMemInfo() MemoryInfo {
// All MemInfo values are in KB; convert to bytes. // All MemInfo values are in KB; convert to bytes.
fields[key] = v * 1024 fields[key] = v * 1024
} }
mem := buildMemInfo(fields)
swap := buildSwapInfo(fields)
return mem, swap
}
func buildMemInfo(fields map[string]uint64) MemoryInfo {
total := fields["MemTotal"] total := fields["MemTotal"]
free := fields["MemAvailable"] free := fields["MemAvailable"]
if free == 0 { if free == 0 {
@@ -158,6 +194,67 @@ func readMemInfo() MemoryInfo {
return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct} return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
} }
func buildSwapInfo(fields map[string]uint64) SwapInfo {
total := fields["SwapTotal"]
free := fields["SwapFree"]
if total == 0 {
return SwapInfo{}
}
used := total - free
pct := float64(used) / float64(total) * 100
return SwapInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
}
// sampleCPUPercent computes overall CPU usage across one sample
// interval. Two reads of /proc/stat's aggregate "cpu" line, derive
// busy-time delta as (1 - idle/total). Returns 0 on read failure.
func sampleCPUPercent(interval time.Duration) float64 {
total1, idle1, ok := readCPUStat()
if !ok {
return 0
}
time.Sleep(interval)
total2, idle2, ok := readCPUStat()
if !ok || total2 <= total1 {
return 0
}
totalDelta := total2 - total1
idleDelta := idle2 - idle1
if idleDelta > totalDelta {
return 0
}
return float64(totalDelta-idleDelta) / float64(totalDelta) * 100
}
func readCPUStat() (total, idle uint64, ok bool) {
f, err := os.Open("/proc/stat")
if err != nil {
return 0, 0, false
}
defer f.Close()
sc := bufio.NewScanner(f)
if !sc.Scan() {
return 0, 0, false
}
parts := strings.Fields(sc.Text())
if len(parts) < 5 || parts[0] != "cpu" {
return 0, 0, false
}
for i := 1; i < len(parts); i++ {
v, err := strconv.ParseUint(parts[i], 10, 64)
if err != nil {
return 0, 0, false
}
total += v
// idle is the 4th column (parts[4]); iowait (parts[5]) is also
// idle-ish but we count it as busy to match gopsutil's default.
if i == 4 {
idle = v
}
}
return total, idle, true
}
func readLoadAvg() LoadInfo { func readLoadAvg() LoadInfo {
raw, err := os.ReadFile("/proc/loadavg") raw, err := os.ReadFile("/proc/loadavg")
if err != nil { if err != nil {

View File

@@ -27,6 +27,7 @@ type Deps struct {
Version string Version string
Collect func() telemetry.Snapshot Collect func() telemetry.Snapshot
Bridge *monitor.Bridge Bridge *monitor.Bridge
Pusher *monitor.Pusher
Scheduler *calendar.Scheduler Scheduler *calendar.Scheduler
Host sdkplugin.HostAPI Host sdkplugin.HostAPI
@@ -89,11 +90,32 @@ func toolStatus(deps Deps) (sdkplugin.ToolResult, error) {
"queries": bs.Queries, "queries": bs.Queries,
"last_query": bs.LastQuery, "last_query": bs.LastQuery,
}, },
"monitor_push": monitorPushSummary(deps),
"calendar": sch, "calendar": sch,
} }
return jsonResult(out) return jsonResult(out)
} }
// monitorPushSummary returns the pusher's last-known state in the same
// JSON layout the status/monitor_telemetry tools surface. Nil-safe: if
// no pusher is wired (testing, push disabled), reports enabled=false.
func monitorPushSummary(deps Deps) map[string]any {
out := map[string]any{
"enabled": deps.Config.MonitorPushEnabled,
"interval_seconds": deps.Config.MonitorPushIntervalSeconds,
"endpoint": deps.Config.BackendURL + "/monitor/server/heartbeat",
}
if deps.Pusher != nil {
st := deps.Pusher.Stats()
out["last_sent_at"] = st.LastSentAt
out["last_status"] = st.LastStatus
out["last_err"] = st.LastErr
out["success_count"] = st.SuccessCount
out["error_count"] = st.ErrorCount
}
return out
}
func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) { func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
return jsonResult(deps.Collect()) return jsonResult(deps.Collect())
} }
@@ -101,11 +123,14 @@ func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
func toolMonitorTelemetry(deps Deps) (sdkplugin.ToolResult, error) { func toolMonitorTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
bs := deps.Bridge.Stats() bs := deps.Bridge.Stats()
return jsonResult(map[string]any{ return jsonResult(map[string]any{
"bridge": map[string]any{
"port": bs.Port, "port": bs.Port,
"listening": bs.Listening, "listening": bs.Listening,
"queries": bs.Queries, "queries": bs.Queries,
"last_query": bs.LastQuery, "last_query": bs.LastQuery,
"last_snapshot": bs.LastSnap, "last_snapshot": bs.LastSnap,
},
"push": monitorPushSummary(deps),
}) })
} }

View File

@@ -37,12 +37,16 @@ Next steps:
"harbor-forge" "harbor-forge"
2. Write ${PLUGIN_DIR}/config.json — sample: 2. Write ${PLUGIN_DIR}/config.json — sample:
{ {
"backendUrl": "https://monitor.hangman-lab.top", "backendUrl": "https://hf-api.hangman-lab.top",
"identifier": "server-t3", "identifier": "server-t3",
"apiKey": "g1_xxx", "apiKey": "<copy from HF_MONITER_API_KEY>",
"monitor_port": 9100, "monitor_push_enabled": true,
"monitor_push_interval_seconds": 30,
"monitor_port": 0,
"calendar_enabled": true, "calendar_enabled": true,
"calendar_heartbeat_interval_seconds": 30 "calendar_heartbeat_interval_seconds": 30
} }
3. Restart the host: systemctl --user restart plexum 3. Restart the host: systemctl --user restart plexum
4. Verify push is landing (DB last_seen_at advancing) and then
remove the standalone harborforge-monitor container.
EOF EOF