Plugin id `harbor-forge` mirrors the OpenClaw counterpart's runtime
surface on top of the Plexum SDK:
* eager activation — Monitor bridge + Calendar scheduler boot at
host start, before any agent turn fires
* monitor bridge: HTTP 127.0.0.1:<monitor_port> serving /telemetry
+ /health for HarborForge.Monitor
* calendar scheduler: heartbeats <backendUrl>/calendar/agent/
heartbeat, dispatches returned slots via HostAPI.WakeAgent
(state-aware queue, depth-1 replace-newest), tracks active slot
state in-memory, terminal status pushed back to backend
* 9 harborforge_* tools (status / telemetry / monitor_telemetry /
calendar_{status,complete,abort,pause,resume} / restart_status)
Key differences from OpenClaw equivalent:
* api.spawn → HostAPI.WakeAgent (new SDK primitive)
* api.getAgentStatus → HostAPI.ReadAgentState (existing)
* --install-monitor / --install-cli not included; Monitor + hf CLI
deploy via the HangmanLab.Server.T3 docker compose layer
Initial drop. TODO before v1 ship:
* tool ctx → calling-agent-id: SDK doesn't currently expose; v1
falls back to a single-active-slot heuristic in
main.bestEffortAgentID
* tests for the bridge + scheduler
121 lines
3.4 KiB
Go
121 lines
3.4 KiB
Go
// Package monitor implements the local HTTP server that
|
|
// HarborForge.Monitor (a separate native daemon on the same host)
|
|
// queries for telemetry. Mirrors the OpenclawPlugin monitor-bridge:
|
|
//
|
|
// GET /telemetry → JSON Snapshot
|
|
// GET /health → {"ok":true}
|
|
//
|
|
// The bridge runs as a goroutine started at plugin Init time; ctx
|
|
// cancellation tears it down. Bind address is 127.0.0.1 only —
|
|
// HarborForge.Monitor is expected on the same host. Configured port
|
|
// comes from config.json's monitor_port; zero/missing disables the
|
|
// bridge entirely (the plugin still serves tools + calendar).
|
|
|
|
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
|
|
)
|
|
|
|
// Bridge owns the HTTP listener + last-query state.
|
|
type Bridge struct {
|
|
port int
|
|
collect func() telemetry.Snapshot
|
|
log LogFunc
|
|
|
|
mu sync.Mutex
|
|
lastQuery time.Time
|
|
lastSnap telemetry.Snapshot
|
|
queries uint64
|
|
server *http.Server
|
|
}
|
|
|
|
// LogFunc is the plugin's log adapter — kept narrow so the bridge
|
|
// doesn't depend on the SDK directly.
|
|
type LogFunc func(level, msg string, attrs map[string]any)
|
|
|
|
// New constructs an idle Bridge. Call Start to actually listen.
|
|
func New(port int, collect func() telemetry.Snapshot, log LogFunc) *Bridge {
|
|
return &Bridge{port: port, collect: collect, log: log}
|
|
}
|
|
|
|
// Start the HTTP server in a background goroutine. Returns nil even
|
|
// when the port is 0 (bridge disabled by config) so callers don't
|
|
// need to guard. Stop cancels via context.
|
|
func (b *Bridge) Start(ctx context.Context) error {
|
|
if b.port <= 0 {
|
|
b.log("info", "monitor bridge disabled (monitor_port=0)", nil)
|
|
return nil
|
|
}
|
|
addr := fmt.Sprintf("127.0.0.1:%d", b.port)
|
|
ln, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return fmt.Errorf("monitor bridge listen %s: %w", addr, err)
|
|
}
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/telemetry", b.handleTelemetry)
|
|
mux.HandleFunc("/health", b.handleHealth)
|
|
srv := &http.Server{Handler: mux, ReadHeaderTimeout: 5 * time.Second}
|
|
b.server = srv
|
|
go func() {
|
|
<-ctx.Done()
|
|
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
_ = srv.Shutdown(shutCtx)
|
|
}()
|
|
b.log("info", "monitor bridge listening", map[string]any{"addr": addr})
|
|
go func() {
|
|
if err := srv.Serve(ln); err != nil && err != http.ErrServerClosed {
|
|
b.log("warn", "monitor bridge exited", map[string]any{"err": err.Error()})
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (b *Bridge) handleTelemetry(w http.ResponseWriter, _ *http.Request) {
|
|
snap := b.collect()
|
|
b.mu.Lock()
|
|
b.lastQuery = time.Now()
|
|
b.lastSnap = snap
|
|
b.queries++
|
|
b.mu.Unlock()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(snap)
|
|
}
|
|
|
|
func (b *Bridge) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true})
|
|
}
|
|
|
|
// Stats returns a copy of the bridge's last-query state — used by the
|
|
// harborforge_status / harborforge_monitor_telemetry tools.
|
|
func (b *Bridge) Stats() Stats {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return Stats{
|
|
Port: b.port,
|
|
Listening: b.server != nil,
|
|
LastQuery: b.lastQuery,
|
|
LastSnap: b.lastSnap,
|
|
Queries: b.queries,
|
|
}
|
|
}
|
|
|
|
// Stats is the introspection shape returned by Bridge.Stats.
|
|
type Stats struct {
|
|
Port int
|
|
Listening bool
|
|
LastQuery time.Time
|
|
LastSnap telemetry.Snapshot
|
|
Queries uint64
|
|
}
|