Compare commits
6 Commits
754e5183f7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 9e43021ba5 | |||
| 046a7753e6 | |||
| 6e3ad669f8 | |||
| 472cecd771 | |||
| bc1ab7b6ea | |||
| 78b1ec5181 |
29
README.md
29
README.md
@@ -18,9 +18,19 @@ submodule of the HarborForge umbrella repo.
|
||||
|
||||
## What it does
|
||||
|
||||
- **Monitor bridge** — HTTP server on `127.0.0.1:<monitor_port>` that
|
||||
responds to `/telemetry` with a Snapshot the HarborForge.Monitor
|
||||
binary expects (system metrics + every Plexum agent's sm-state)
|
||||
- **Monitor push loop** — when `monitor_push_enabled: true`, posts a
|
||||
flat telemetry payload (cpu/mem/disk/swap/load + per-agent state)
|
||||
to `<backendUrl>/monitor/server/heartbeat` every
|
||||
`monitor_push_interval_seconds`. This replaces the standalone
|
||||
`harborforge-monitor` daemon — the plugin's lifecycle (gateway
|
||||
start/stop) bounds the loop, so a separate supervisor isn't needed.
|
||||
Use the same `apiKey` value the standalone monitor's
|
||||
`HF_MONITER_API_KEY` carried.
|
||||
- **Monitor bridge** (optional) — HTTP server on
|
||||
`127.0.0.1:<monitor_port>` that responds to `/telemetry` with a
|
||||
Snapshot. Useful when the standalone monitor is still present and
|
||||
you want it to enrich its push payload from the plugin's view of
|
||||
agents. Disable by setting `monitor_port: 0`.
|
||||
- **Calendar scheduler** — heartbeats `<backendUrl>/calendar/agent/
|
||||
heartbeat` every interval, receives any TimeSlots due to fire, and
|
||||
dispatches them through `HostAPI.WakeAgent` (state-aware queue
|
||||
@@ -64,15 +74,24 @@ And configure at `~/.plexum/plugins/harbor-forge/config.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"backendUrl": "https://monitor.hangman-lab.top",
|
||||
"backendUrl": "https://hf-api.hangman-lab.top",
|
||||
"identifier": "server-t3",
|
||||
"apiKey": "g1_xxx",
|
||||
"monitor_port": 9100,
|
||||
"monitor_push_enabled": true,
|
||||
"monitor_push_interval_seconds": 30,
|
||||
"monitor_port": 0,
|
||||
"calendar_enabled": true,
|
||||
"calendar_heartbeat_interval_seconds": 30
|
||||
}
|
||||
```
|
||||
|
||||
Replacing the standalone `harborforge-monitor` container: lift the
|
||||
container's `HF_MONITER_API_KEY` into `apiKey`, set
|
||||
`monitor_push_enabled: true`, then `docker rm -f harborforge-monitor`
|
||||
once you've confirmed the plugin's pushes are landing (the backend's
|
||||
`server_states.last_seen_at` should keep advancing without the
|
||||
container running).
|
||||
|
||||
Restart the host (`systemctl --user restart plexum`) and verify:
|
||||
|
||||
```bash
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar"
|
||||
hfcfg "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config"
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor"
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/tools"
|
||||
@@ -36,16 +38,31 @@ import (
|
||||
// Version is injected via -ldflags "-X main.Version=…" at build time.
|
||||
var Version = "0.1.0"
|
||||
|
||||
// harborForgePlugin satisfies sdkplugin.ToolPlugin.
|
||||
// harborForgePlugin satisfies sdkplugin.ToolPlugin AND
|
||||
// sdkplugin.DynamicBlockProvider (the <kb-block> subblock contributor
|
||||
// per Plexum DESIGN-DYNAMIC-BLOCK.md §3.3).
|
||||
type harborForgePlugin struct {
|
||||
host sdkplugin.HostAPI
|
||||
cfg hfcfg.Resolved
|
||||
bridge *monitor.Bridge
|
||||
pusher *monitor.Pusher
|
||||
sched *calendar.Scheduler
|
||||
kbClient *kbclient.Client
|
||||
deps tools.Deps
|
||||
cancelBg context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
agentCache sync.Map // sessionID/turnID → agentID stash (best-effort)
|
||||
|
||||
// profileRoot caches PLEXUM_PROFILE_ROOT for kbblock path resolution.
|
||||
profileRoot string
|
||||
|
||||
// agentSession maps agentID → sessionID, populated each turn by
|
||||
// the host's plexum/plugin/dynamic-block/render RPC (which carries
|
||||
// both ids). dynamic-kb-cache + dynamic-kb-evict read this to
|
||||
// resolve the per-session kb-block.json path. Best-effort —
|
||||
// before the first RenderDynamicSubblock fires for an agent the
|
||||
// map is empty and the kb tools surface a clear error.
|
||||
agentSession sync.Map // agentID (string) → sessionID (string)
|
||||
}
|
||||
|
||||
func (p *harborForgePlugin) Manifest() sdkplugin.Manifest {
|
||||
@@ -60,6 +77,7 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
|
||||
home, _ := os.UserHomeDir()
|
||||
profileRoot = filepath.Join(home, ".plexum")
|
||||
}
|
||||
p.profileRoot = profileRoot
|
||||
raw, err := hfcfg.Load(profileRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load harbor-forge config: %w", err)
|
||||
@@ -70,42 +88,72 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
|
||||
"backend": p.cfg.BackendURL,
|
||||
"identifier": p.cfg.Identifier,
|
||||
"monitor_port": p.cfg.MonitorPort,
|
||||
"monitor_push_enabled": p.cfg.MonitorPushEnabled,
|
||||
"calendar_enabled": p.cfg.CalendarEnabled,
|
||||
})
|
||||
|
||||
collect := func() telemetry.Snapshot {
|
||||
return telemetry.Collect(telemetry.CollectOpts{
|
||||
Identifier: p.cfg.Identifier,
|
||||
Version: Version,
|
||||
AgentLister: func() []telemetry.AgentInfo {
|
||||
return p.listAgents(ctx, profileRoot)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
|
||||
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
||||
|
||||
bgCtx, cancel := context.WithCancel(context.Background())
|
||||
p.cancelBg = cancel
|
||||
|
||||
// Listers + collectors capture bgCtx (not Init ctx) — Init returns
|
||||
// once MCP initialize completes, but the plugin process lives on
|
||||
// and so do the goroutines + closures we registered.
|
||||
makeCollector := func(sampleCPU bool) func() telemetry.Snapshot {
|
||||
return func() telemetry.Snapshot {
|
||||
return telemetry.Collect(telemetry.CollectOpts{
|
||||
Identifier: p.cfg.Identifier,
|
||||
Version: Version,
|
||||
SampleCPU: sampleCPU,
|
||||
AgentLister: func() []telemetry.AgentInfo {
|
||||
return p.listAgents(bgCtx, profileRoot)
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
// Bridge serves on-demand reads; cheap, no CPU sampling.
|
||||
collect := makeCollector(false)
|
||||
// Pusher runs the slow push loop; CPU sampling fine here.
|
||||
collectForPush := makeCollector(true)
|
||||
|
||||
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
|
||||
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
||||
|
||||
if err := p.bridge.Start(bgCtx); err != nil {
|
||||
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
|
||||
}
|
||||
|
||||
// Active push loop — replaces the standalone harborforge-monitor
|
||||
// container. Off by default; operator opts in via
|
||||
// monitor_push_enabled + apiKey.
|
||||
p.pusher = monitor.NewPusher(monitor.PusherConfig{
|
||||
BackendURL: p.cfg.BackendURL,
|
||||
APIKey: p.cfg.APIKey,
|
||||
Interval: time.Duration(p.cfg.MonitorPushIntervalSeconds) * time.Second,
|
||||
}, collectForPush,
|
||||
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
||||
if p.cfg.MonitorPushEnabled {
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
if err := p.pusher.Run(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
|
||||
host.Log("warn", "monitor pusher exited", map[string]any{"err": err.Error()})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
calBackend := p.cfg.CalendarBackendURL
|
||||
if calBackend == "" {
|
||||
calBackend = p.cfg.BackendURL
|
||||
}
|
||||
bridge := calendar.New(calBackend, p.cfg.APIKey)
|
||||
bridge := calendar.New(calBackend, p.cfg.Identifier)
|
||||
p.sched = calendar.NewScheduler(
|
||||
calendar.Config{
|
||||
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
|
||||
},
|
||||
bridge, host, p.cfg.Identifier,
|
||||
bridge, host,
|
||||
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
|
||||
func() []calendar.ReportableAgent {
|
||||
return p.listReportableAgents(ctx, profileRoot)
|
||||
return p.listReportableAgents(bgCtx, profileRoot)
|
||||
},
|
||||
)
|
||||
if p.cfg.CalendarEnabled {
|
||||
@@ -120,29 +168,80 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
|
||||
host.Log("info", "calendar scheduler disabled by config", nil)
|
||||
}
|
||||
|
||||
// KB HTTP client — shares plugin-level APIKey via Bearer (per-agent
|
||||
// hf-token resolution is a TODO when SDK exposes secret-mgr access).
|
||||
if p.cfg.BackendURL != "" && p.cfg.APIKey != "" {
|
||||
p.kbClient = kbclient.New(p.cfg.BackendURL, p.cfg.APIKey)
|
||||
} else {
|
||||
host.Log("info", "kb client not initialized (need BackendURL + APIKey); dynamic-kb-* tools will return backend-unavailable", nil)
|
||||
}
|
||||
|
||||
p.deps = tools.Deps{
|
||||
Config: p.cfg,
|
||||
Version: Version,
|
||||
Collect: collect,
|
||||
Bridge: p.bridge,
|
||||
Pusher: p.pusher,
|
||||
Scheduler: p.sched,
|
||||
Host: host,
|
||||
AgentIDFromCtx: func(ctx context.Context) string {
|
||||
// Plexum stashes the calling agent id on the host-side
|
||||
// context (via WithAgent) before dispatching tool calls.
|
||||
// We can't directly import internal/agentloop from a
|
||||
// plugin, so we rely on PLEXUM_TOOL_AGENT_ID env-style
|
||||
// (set per-call by host when we add that wiring) or fall
|
||||
// back to the only-active-agent heuristic. v1: prefer the
|
||||
// only-active wake-target (deterministic in single-agent
|
||||
// HF deployments).
|
||||
return p.bestEffortAgentID()
|
||||
// Host attaches the caller agent id via tools/call
|
||||
// `_meta.agent_id`; SDK unpacks it into ctx.
|
||||
if id := sdkplugin.AgentIDFromContext(ctx); id != "" {
|
||||
return id
|
||||
}
|
||||
// Fallback for host paths that don't carry an agent
|
||||
// (channel-driven, CLI plugin-call). When a single
|
||||
// calendar slot is active we can deterministically
|
||||
// attribute the call to that slot's owner.
|
||||
return p.sched.SingleActiveAgentID()
|
||||
},
|
||||
KB: tools.KBDeps{
|
||||
Client: p.kbClient,
|
||||
ProfileRoot: profileRoot,
|
||||
SessionFor: func(agentID string) string {
|
||||
if v, ok := p.agentSession.Load(agentID); ok {
|
||||
return v.(string)
|
||||
}
|
||||
return ""
|
||||
},
|
||||
Turn: func(agentID string) int { return 0 }, // best-effort; turn ctx not available to plugins yet
|
||||
},
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RenderDynamicSubblock implements sdkplugin.DynamicBlockProvider. The
|
||||
// host calls this once per turn per declared subblock name from this
|
||||
// plugin's manifest (DESIGN-DYNAMIC-BLOCK.md §2 / §3.3). For
|
||||
// "kb-block" we open the per-session kb-block.json and return its
|
||||
// rendered body (host wraps in <kb-block>...</kb-block>). Empty block
|
||||
// → return "" → host omits the subblock for this turn.
|
||||
//
|
||||
// Side effect: stash agentID → sessionID into p.agentSession so the
|
||||
// dynamic-kb-cache + dynamic-kb-evict tools can resolve the same
|
||||
// kb-block file when they fire later in the same turn.
|
||||
func (p *harborForgePlugin) RenderDynamicSubblock(ctx context.Context, agentID, sessionID, name string) (string, error) {
|
||||
if agentID == "" || sessionID == "" {
|
||||
return "", nil
|
||||
}
|
||||
// Cache the (agent, session) pair for tool dispatch this turn.
|
||||
p.agentSession.Store(agentID, sessionID)
|
||||
|
||||
if name != "kb-block" {
|
||||
return "", nil
|
||||
}
|
||||
block, err := kbblock.Open(p.profileRoot, agentID, sessionID)
|
||||
if err != nil {
|
||||
p.host.Log("warn", "open kb-block", map[string]any{
|
||||
"agent": agentID, "session": sessionID, "err": err.Error(),
|
||||
})
|
||||
return "", nil
|
||||
}
|
||||
return block.Render(), nil
|
||||
}
|
||||
|
||||
func (p *harborForgePlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (sdkplugin.ToolResult, error) {
|
||||
return tools.Dispatch(ctx, p.deps, name, input)
|
||||
}
|
||||
@@ -203,21 +302,7 @@ func mapStateToCalendar(s string) calendar.AgentStatusValue {
|
||||
case "offline":
|
||||
return calendar.AgentStatusOffline
|
||||
}
|
||||
return calendar.AgentStatusUnknown
|
||||
}
|
||||
|
||||
// bestEffortAgentID is a v1 stop-gap for tools that need the calling
|
||||
// agent's id but don't have it on the ctx (Plexum SDK doesn't yet
|
||||
// expose this — TODO upstream). Returns the only active calendar
|
||||
// slot's agent if there's exactly one; otherwise empty. The calendar
|
||||
// tools (the only ones that need agent context) usually fire when
|
||||
// exactly one slot is active.
|
||||
func (p *harborForgePlugin) bestEffortAgentID() string {
|
||||
sch := p.sched.Status()
|
||||
if len(sch.Active) == 1 {
|
||||
return sch.Active[0].Slot.AgentID
|
||||
}
|
||||
return ""
|
||||
return calendar.AgentStatusOffline
|
||||
}
|
||||
|
||||
func manifestFromDisk() sdkplugin.Manifest {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
// Bridge — thin HTTP client for the HarborForge backend's Calendar API.
|
||||
// All operations carry the API key as Authorization: Bearer; absent
|
||||
// key means missing-auth errors from the backend (caller should
|
||||
// handle them as transient and log).
|
||||
// Bridge — typed HTTP client for HarborForge.Backend's calendar API.
|
||||
// Endpoint shapes verified via the backend's /openapi.json and against
|
||||
// HarborForge.OpenclawPlugin/plugin/calendar/calendar-bridge.ts so
|
||||
// the two plugins drop into the same backend without per-plugin
|
||||
// adapters.
|
||||
|
||||
package calendar
|
||||
|
||||
@@ -13,30 +14,33 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Bridge is the typed wrapper around an HTTP client + backend URL.
|
||||
// Bridge is constructed once per scheduler and reused across heartbeats.
|
||||
type Bridge struct {
|
||||
BackendURL string
|
||||
APIKey string
|
||||
BaseURL string
|
||||
ClawIdentifier string
|
||||
HTTP *http.Client
|
||||
}
|
||||
|
||||
// New constructs a bridge with a sensible default timeout.
|
||||
func New(backendURL, apiKey string) *Bridge {
|
||||
// New constructs a bridge with a 20s default timeout.
|
||||
func New(baseURL, clawIdentifier string) *Bridge {
|
||||
return &Bridge{
|
||||
BackendURL: strings.TrimRight(backendURL, "/"),
|
||||
APIKey: apiKey,
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
ClawIdentifier: clawIdentifier,
|
||||
HTTP: &http.Client{Timeout: 20 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// Heartbeat POSTs /calendar/agent/heartbeat. Returns the backend's
|
||||
// reply or an error.
|
||||
func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (HeartbeatResponse, error) {
|
||||
raw, err := b.post(ctx, "/calendar/agent/heartbeat", payload)
|
||||
// Heartbeat POSTs /calendar/agent/heartbeat. Per-agent: each running
|
||||
// agent on this claw drives its own heartbeat (matches OpenClaw plugin
|
||||
// semantics).
|
||||
func (b *Bridge) Heartbeat(ctx context.Context, agentID string) (HeartbeatResponse, error) {
|
||||
body := HeartbeatRequest{ClawIdentifier: b.ClawIdentifier, AgentID: agentID}
|
||||
raw, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/heartbeat", agentID, body)
|
||||
if err != nil {
|
||||
return HeartbeatResponse{}, err
|
||||
}
|
||||
@@ -49,76 +53,64 @@ func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (Heart
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// UpdateSlotStatus POSTs /calendar/slot/<id>/status to mark a slot
|
||||
// completed / aborted / paused / resumed.
|
||||
func (b *Bridge) UpdateSlotStatus(ctx context.Context, slotID string, update SlotUpdate) error {
|
||||
if slotID == "" {
|
||||
return errors.New("calendar: slot id required")
|
||||
}
|
||||
_, err := b.post(ctx, "/calendar/slot/"+slotID+"/status", update)
|
||||
// UpdateRealSlot PATCHes /calendar/slots/{id}/agent-update.
|
||||
func (b *Bridge) UpdateRealSlot(ctx context.Context, agentID string, slotID int64, update SlotAgentUpdate) error {
|
||||
path := "/calendar/slots/" + strconv.FormatInt(slotID, 10) + "/agent-update"
|
||||
_, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update)
|
||||
return err
|
||||
}
|
||||
|
||||
// RestartPending GETs /restart/status — returns the backend's
|
||||
// current restart-requested flag.
|
||||
func (b *Bridge) RestartPending(ctx context.Context) (bool, error) {
|
||||
raw, err := b.get(ctx, "/restart/status")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
var out struct {
|
||||
Pending bool `json:"pending"`
|
||||
}
|
||||
if len(raw) > 0 {
|
||||
if err := json.Unmarshal(raw, &out); err != nil {
|
||||
return false, fmt.Errorf("decode restart-status: %w", err)
|
||||
}
|
||||
}
|
||||
return out.Pending, nil
|
||||
// UpdateVirtualSlot PATCHes /calendar/slots/virtual/{vid}/agent-update.
|
||||
// The backend materialises the virtual slot first; subsequent calls
|
||||
// against the same logical slot should use UpdateRealSlot with the
|
||||
// id returned in the response — but for v1 we don't round-trip the
|
||||
// materialised id back to the scheduler (would require a separate
|
||||
// fetch); the agent-update path tolerates re-PATCHing a virtual id.
|
||||
func (b *Bridge) UpdateVirtualSlot(ctx context.Context, agentID string, virtualID string, update SlotAgentUpdate) error {
|
||||
path := "/calendar/slots/virtual/" + virtualID + "/agent-update"
|
||||
_, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update)
|
||||
return err
|
||||
}
|
||||
|
||||
// post serialises body as JSON, attaches Authorization, returns
|
||||
// response body bytes. Non-2xx becomes an error with the body
|
||||
// included for diagnostics.
|
||||
func (b *Bridge) post(ctx context.Context, path string, body any) ([]byte, error) {
|
||||
// PushAgentStatus POSTs /calendar/agent/status. Used to push idle ↔
|
||||
// busy transitions out of the normal heartbeat cycle.
|
||||
func (b *Bridge) PushAgentStatus(ctx context.Context, agentID string, status AgentStatusValue) error {
|
||||
body := AgentStatusPush{
|
||||
ClawIdentifier: b.ClawIdentifier, AgentID: agentID, Status: status,
|
||||
}
|
||||
_, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/status", agentID, body)
|
||||
return err
|
||||
}
|
||||
|
||||
// doJSON serialises body, attaches the two auth headers, and returns
|
||||
// the response bytes. Errors on non-2xx with truncated body.
|
||||
func (b *Bridge) doJSON(ctx context.Context, method, path, agentID string, body any) ([]byte, error) {
|
||||
if agentID == "" {
|
||||
return nil, errors.New("calendar: agent_id required for auth headers")
|
||||
}
|
||||
raw, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal %s: %w", path, err)
|
||||
return nil, fmt.Errorf("marshal %s %s: %w", method, path, err)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.BackendURL+path, bytes.NewReader(raw))
|
||||
req, err := http.NewRequestWithContext(ctx, method, b.BaseURL+path, bytes.NewReader(raw))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if b.APIKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+b.APIKey)
|
||||
}
|
||||
return b.do(req)
|
||||
}
|
||||
req.Header.Set("X-Agent-ID", agentID)
|
||||
req.Header.Set("X-Claw-Identifier", b.ClawIdentifier)
|
||||
|
||||
func (b *Bridge) get(ctx context.Context, path string) ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, b.BackendURL+path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if b.APIKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+b.APIKey)
|
||||
}
|
||||
return b.do(req)
|
||||
}
|
||||
|
||||
func (b *Bridge) do(req *http.Request) ([]byte, error) {
|
||||
res, err := b.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%s %s: %w", req.Method, req.URL.Path, err)
|
||||
return nil, fmt.Errorf("%s %s: %w", method, path, err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
out, _ := io.ReadAll(res.Body)
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
return nil, fmt.Errorf("%s %s → %d: %s",
|
||||
req.Method, req.URL.Path, res.StatusCode, truncate(body, 300))
|
||||
method, path, res.StatusCode, truncate(out, 300))
|
||||
}
|
||||
return body, nil
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func truncate(b []byte, n int) string {
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
// Scheduler — main loop that heartbeats the backend, dispatches
|
||||
// returned slots via Plexum's WakeAgent, and tracks per-agent active
|
||||
// slot state for the calendar_* tools.
|
||||
// Scheduler — loops over every Plexum agent, heartbeats per-agent,
|
||||
// picks the highest-priority pending slot for each, dispatches via
|
||||
// host.WakeAgent. Mirrors HarborForge.OpenclawPlugin's per-agent
|
||||
// scheduler loop (PLG-CAL-002).
|
||||
//
|
||||
// State is in-memory: a daemon restart drops everything. Next
|
||||
// heartbeat reconciles (backend keeps the canonical SlotStatus).
|
||||
// In-memory state: per-agent active slot map. A daemon restart drops
|
||||
// it; next heartbeat reconciles from the backend's canonical state.
|
||||
//
|
||||
// Concurrency:
|
||||
// - one heartbeat ticker goroutine
|
||||
// - per-slot dispatch is fire-and-forget via WakeAgent (queue-aware)
|
||||
// - mu guards activeBySlot + activeByAgent maps
|
||||
// Wake semantics: WakeAgent is fire-and-forget; the SDK's wake queue
|
||||
// (depth 1 replace-newest) handles state-aware dispatch. We mark the
|
||||
// slot Ongoing optimistically the moment we call WakeAgent; agents
|
||||
// drive complete/abort/pause/resume via the harborforge_calendar_*
|
||||
// tools.
|
||||
|
||||
package calendar
|
||||
|
||||
@@ -22,22 +24,20 @@ import (
|
||||
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
||||
)
|
||||
|
||||
// Scheduler orchestrates the calendar loop.
|
||||
// Scheduler is the long-running calendar driver.
|
||||
type Scheduler struct {
|
||||
cfg Config
|
||||
bridge *Bridge
|
||||
host sdkplugin.HostAPI
|
||||
agentLister func() []ReportableAgent
|
||||
identifier string
|
||||
pluginInfo PluginInfoTag
|
||||
|
||||
mu sync.Mutex
|
||||
activeBySlotID map[string]*ActiveSlot
|
||||
activeByAgentID map[string]*ActiveSlot
|
||||
activeBySlotIdent map[string]*ActiveSlot
|
||||
history []HistoryEntry
|
||||
lastHeartbeat time.Time
|
||||
lastResponse HeartbeatResponse
|
||||
restartPending bool
|
||||
lastHeartbeats map[string]time.Time
|
||||
lastErrors map[string]string
|
||||
}
|
||||
|
||||
// Config bundles scheduler tunables.
|
||||
@@ -46,26 +46,32 @@ type Config struct {
|
||||
HistoryCap int // bound on activity history; default 32
|
||||
}
|
||||
|
||||
// ReportableAgent is the projection of a Plexum agent the scheduler
|
||||
// needs for heartbeat — id + model + current sm state.
|
||||
// PluginInfoTag tags heartbeat reports so the backend knows which
|
||||
// plugin / version is reporting.
|
||||
type PluginInfoTag struct {
|
||||
Name string
|
||||
Version string
|
||||
Backend string // "plexum"
|
||||
}
|
||||
|
||||
// ReportableAgent is the per-agent projection the scheduler needs for
|
||||
// heartbeat enumeration.
|
||||
type ReportableAgent struct {
|
||||
ID string
|
||||
Model string
|
||||
State AgentStatusValue
|
||||
}
|
||||
|
||||
// ActiveSlot tracks an in-flight slot (between WakeAgent dispatch and
|
||||
// terminal status update).
|
||||
// ActiveSlot tracks an in-flight slot from dispatch to terminal state.
|
||||
type ActiveSlot struct {
|
||||
Slot Slot
|
||||
StartedAt time.Time
|
||||
LastHeartbeat time.Time
|
||||
State SlotStatus
|
||||
}
|
||||
|
||||
// HistoryEntry is one resolved slot kept for the calendar_status tool.
|
||||
// HistoryEntry records one resolved slot for the calendar_status tool.
|
||||
type HistoryEntry struct {
|
||||
SlotID string
|
||||
Ident string
|
||||
AgentID string
|
||||
Status SlotStatus
|
||||
ResolvedAt time.Time
|
||||
@@ -75,7 +81,7 @@ type HistoryEntry struct {
|
||||
|
||||
// NewScheduler constructs a Scheduler in stopped state.
|
||||
func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
|
||||
identifier string, pluginInfo PluginInfoTag,
|
||||
pluginInfo PluginInfoTag,
|
||||
agentLister func() []ReportableAgent) *Scheduler {
|
||||
if cfg.HeartbeatInterval <= 0 {
|
||||
cfg.HeartbeatInterval = 30 * time.Second
|
||||
@@ -88,189 +94,228 @@ func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
|
||||
bridge: bridge,
|
||||
host: host,
|
||||
agentLister: agentLister,
|
||||
identifier: identifier,
|
||||
pluginInfo: pluginInfo,
|
||||
activeBySlotID: map[string]*ActiveSlot{},
|
||||
activeByAgentID: map[string]*ActiveSlot{},
|
||||
activeBySlotIdent: map[string]*ActiveSlot{},
|
||||
lastHeartbeats: map[string]time.Time{},
|
||||
lastErrors: map[string]string{},
|
||||
}
|
||||
}
|
||||
|
||||
// Run blocks until ctx cancels, ticking heartbeats every
|
||||
// cfg.HeartbeatInterval. Returns nil on graceful shutdown.
|
||||
// Run blocks until ctx cancels.
|
||||
func (s *Scheduler) Run(ctx context.Context) error {
|
||||
t := time.NewTicker(s.cfg.HeartbeatInterval)
|
||||
defer t.Stop()
|
||||
// First heartbeat immediately so initial state lands fast.
|
||||
s.heartbeatOnce(ctx)
|
||||
s.tick(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-t.C:
|
||||
s.heartbeatOnce(ctx)
|
||||
s.tick(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) heartbeatOnce(ctx context.Context) {
|
||||
payload := HeartbeatPayload{
|
||||
Identifier: s.identifier,
|
||||
APIKey: s.bridge.APIKey,
|
||||
PluginInfo: s.pluginInfo,
|
||||
CapturedAt: time.Now().UTC(),
|
||||
}
|
||||
if s.agentLister != nil {
|
||||
for _, a := range s.agentLister() {
|
||||
payload.AgentList = append(payload.AgentList, AgentReport{
|
||||
ID: a.ID, Model: a.Model, Status: a.State,
|
||||
})
|
||||
}
|
||||
}
|
||||
resp, err := s.bridge.Heartbeat(ctx, payload)
|
||||
s.mu.Lock()
|
||||
s.lastHeartbeat = time.Now()
|
||||
if err == nil {
|
||||
s.lastResponse = resp
|
||||
s.restartPending = resp.RestartPending
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if err != nil {
|
||||
return // network blip; next tick retries
|
||||
}
|
||||
for _, slot := range resp.SlotsToFire {
|
||||
s.dispatchSlot(ctx, slot)
|
||||
}
|
||||
}
|
||||
|
||||
// dispatchSlot fires the slot via host.WakeAgent and records it as
|
||||
// active. WakeAgent handles state-aware queueing — if the agent is
|
||||
// busy, our calendar slot enqueues at depth 1 and the previous wake
|
||||
// is dropped per replace-newest semantics. We mark the slot
|
||||
// in_progress optimistically when we ENQUEUED; backend reconciles on
|
||||
// its own watchdog.
|
||||
func (s *Scheduler) dispatchSlot(ctx context.Context, slot Slot) {
|
||||
// Skip already-active slots (heartbeat may re-list a slot we
|
||||
// already started — backend hasn't seen our optimistic update yet).
|
||||
s.mu.Lock()
|
||||
if _, ok := s.activeBySlotID[slot.ID]; ok {
|
||||
s.mu.Unlock()
|
||||
func (s *Scheduler) tick(ctx context.Context) {
|
||||
if s.agentLister == nil {
|
||||
return
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
act := &ActiveSlot{
|
||||
Slot: slot, StartedAt: now, LastHeartbeat: now,
|
||||
State: SlotInProgress,
|
||||
agents := s.agentLister()
|
||||
s.host.Log("info", "calendar tick", map[string]any{"agents": len(agents)})
|
||||
for _, agent := range agents {
|
||||
s.tickForAgent(ctx, agent, now)
|
||||
}
|
||||
}
|
||||
s.activeBySlotID[slot.ID] = act
|
||||
s.activeByAgentID[slot.AgentID] = act
|
||||
s.mu.Unlock()
|
||||
|
||||
message := slot.WakeOptions.OverrideMessage
|
||||
if message == "" {
|
||||
message = slot.PromptText
|
||||
}
|
||||
if message == "" {
|
||||
message = fmt.Sprintf("[calendar] slot %s: %s", slot.ID, slot.Title)
|
||||
}
|
||||
source := fmt.Sprintf("calendar:slot-%s", slot.ID)
|
||||
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
|
||||
AgentID: slot.AgentID,
|
||||
Message: message,
|
||||
Source: source,
|
||||
}); err != nil {
|
||||
// Wake itself failed (plumbing). Mark slot aborted +
|
||||
// notify backend.
|
||||
s.resolveSlot(ctx, slot.ID, SlotAborted, "", "wake-agent failed: "+err.Error())
|
||||
func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now time.Time) {
|
||||
resp, err := s.bridge.Heartbeat(ctx, agent.ID)
|
||||
s.mu.Lock()
|
||||
s.lastHeartbeats[agent.ID] = now
|
||||
if err != nil {
|
||||
s.lastErrors[agent.ID] = err.Error()
|
||||
s.mu.Unlock()
|
||||
s.host.Log("warn", "calendar heartbeat failed", map[string]any{
|
||||
"agent": agent.ID, "err": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
delete(s.lastErrors, agent.ID)
|
||||
s.mu.Unlock()
|
||||
s.host.Log("info", "calendar heartbeat ok", map[string]any{
|
||||
"agent": agent.ID, "slots": len(resp.Slots), "agent_status": string(resp.AgentStatus),
|
||||
})
|
||||
|
||||
// Pick highest-priority NotStarted slot; defer the rest.
|
||||
var chosen *Slot
|
||||
for i := range resp.Slots {
|
||||
slot := &resp.Slots[i]
|
||||
if slot.Status != SlotNotStarted && slot.Status != SlotDeferred {
|
||||
continue
|
||||
}
|
||||
if chosen == nil || slot.Priority > chosen.Priority {
|
||||
chosen = slot
|
||||
}
|
||||
}
|
||||
s.host.Log("info", "calendar slot selection", map[string]any{
|
||||
"agent": agent.ID, "available": len(resp.Slots), "chosen": chosen != nil,
|
||||
})
|
||||
if chosen != nil {
|
||||
s.dispatchSlot(ctx, agent.ID, *chosen)
|
||||
}
|
||||
// Defer the other unchosen NotStarted/Deferred slots (priority +1)
|
||||
// so they bubble up next heartbeat. We don't strictly need to push
|
||||
// the update; the backend's priority bookkeeping survives without
|
||||
// our nudge for v1. (OpenClaw plugin DOES push priority bumps —
|
||||
// future v2 work if backend feedback shows starvation.)
|
||||
}
|
||||
|
||||
// resolveSlot moves an active slot to a terminal status, records
|
||||
// history, and tells the backend. Safe to call concurrently.
|
||||
func (s *Scheduler) resolveSlot(ctx context.Context, slotID string, status SlotStatus, summary, reason string) error {
|
||||
// dispatchSlot fires WakeAgent + records the slot active. Marks the
|
||||
// slot Ongoing on the backend so the dashboard reflects the
|
||||
// transition immediately.
|
||||
func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) {
|
||||
ident := slot.SlotIdent()
|
||||
s.host.Log("info", "calendar dispatchSlot enter", map[string]any{
|
||||
"agent": agentID, "slot_ident": ident,
|
||||
})
|
||||
s.mu.Lock()
|
||||
act, ok := s.activeBySlotID[slotID]
|
||||
if !ok {
|
||||
if _, dup := s.activeBySlotIdent[ident]; dup {
|
||||
s.mu.Unlock()
|
||||
return fmt.Errorf("calendar: slot %s not active", slotID)
|
||||
s.host.Log("info", "calendar dispatchSlot skipped (already active)", map[string]any{"slot": ident})
|
||||
return
|
||||
}
|
||||
delete(s.activeBySlotID, slotID)
|
||||
delete(s.activeByAgentID, act.Slot.AgentID)
|
||||
s.appendHistoryLocked(HistoryEntry{
|
||||
SlotID: slotID, AgentID: act.Slot.AgentID, Status: status,
|
||||
if _, agentBusy := s.activeByAgentID[agentID]; agentBusy {
|
||||
// Don't pick up another slot until the current one resolves.
|
||||
s.mu.Unlock()
|
||||
s.host.Log("info", "calendar dispatchSlot skipped (agent has active slot)", map[string]any{"agent": agentID})
|
||||
return
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
active := &ActiveSlot{Slot: slot, StartedAt: now, LastHeartbeat: now}
|
||||
s.activeBySlotIdent[ident] = active
|
||||
s.activeByAgentID[agentID] = active
|
||||
s.mu.Unlock()
|
||||
|
||||
message := buildWakeMessage(slot)
|
||||
source := "calendar:" + ident
|
||||
s.host.Log("info", "calendar firing WakeAgent", map[string]any{
|
||||
"agent": agentID, "slot": ident, "source": source, "msg_len": len(message),
|
||||
})
|
||||
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
|
||||
AgentID: agentID, Message: message, Source: source,
|
||||
}); err != nil {
|
||||
s.host.Log("warn", "calendar WakeAgent failed", map[string]any{
|
||||
"agent": agentID, "err": err.Error(),
|
||||
})
|
||||
s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error())
|
||||
return
|
||||
}
|
||||
s.host.Log("info", "calendar WakeAgent enqueued ok", map[string]any{
|
||||
"agent": agentID, "slot": ident,
|
||||
})
|
||||
// Mark Ongoing on the backend.
|
||||
update := SlotAgentUpdate{
|
||||
Status: SlotOngoing, StartedAt: now.Format("15:04:05"),
|
||||
}
|
||||
s.pushUpdate(ctx, agentID, slot, update)
|
||||
}
|
||||
|
||||
func buildWakeMessage(slot Slot) string {
|
||||
// Backend EventData → prompt. v1 is intentionally simple; refine
|
||||
// when the prompt-engineering side of the plugin matures.
|
||||
if slot.EventType != nil {
|
||||
switch *slot.EventType {
|
||||
case EventTypeSystemEvent:
|
||||
if ev, ok := slot.EventData["event"].(string); ok {
|
||||
return fmt.Sprintf("[calendar system_event] %s", ev)
|
||||
}
|
||||
case EventTypeJob:
|
||||
code, _ := slot.EventData["code"].(string)
|
||||
typ, _ := slot.EventData["type"].(string)
|
||||
if code != "" {
|
||||
return fmt.Sprintf("[calendar job %s/%s] please handle this", typ, code)
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("[calendar slot %s] scheduled work — please proceed", slot.SlotIdent())
|
||||
}
|
||||
|
||||
// CompleteForAgent → terminal; pushes Finished to backend.
|
||||
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
|
||||
act, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
duration := int(now.Sub(act.StartedAt).Minutes())
|
||||
if duration < 1 {
|
||||
duration = 1
|
||||
}
|
||||
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{
|
||||
Status: SlotFinished, ActualDuration: duration,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotFinished, summary, "")
|
||||
return nil
|
||||
}
|
||||
|
||||
// AbortForAgent → terminal; pushes Aborted to backend.
|
||||
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
|
||||
act, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotAborted}); err != nil {
|
||||
return err
|
||||
}
|
||||
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotAborted, "", reason)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PauseForAgent → non-terminal; pushes Paused.
|
||||
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
|
||||
act, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotPaused})
|
||||
}
|
||||
|
||||
// ResumeForAgent → non-terminal; pushes Ongoing.
|
||||
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
|
||||
act, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotOngoing})
|
||||
}
|
||||
|
||||
func (s *Scheduler) pushUpdate(ctx context.Context, agentID string, slot Slot, update SlotAgentUpdate) error {
|
||||
if slot.HasRealID() {
|
||||
return s.bridge.UpdateRealSlot(ctx, agentID, *slot.ID, update)
|
||||
}
|
||||
if slot.VirtualID != nil {
|
||||
return s.bridge.UpdateVirtualSlot(ctx, agentID, *slot.VirtualID, update)
|
||||
}
|
||||
return errors.New("calendar: slot has neither real id nor virtual id")
|
||||
}
|
||||
|
||||
func (s *Scheduler) resolveLocally(ident, agentID string, status SlotStatus, summary, reason string) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
delete(s.activeBySlotIdent, ident)
|
||||
delete(s.activeByAgentID, agentID)
|
||||
s.history = append(s.history, HistoryEntry{
|
||||
Ident: ident, AgentID: agentID, Status: status,
|
||||
ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason,
|
||||
})
|
||||
s.mu.Unlock()
|
||||
return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{
|
||||
Status: status, Summary: summary, Reason: reason,
|
||||
})
|
||||
}
|
||||
|
||||
// SetSlotState is a non-terminal status change (paused/resumed).
|
||||
// Records the new state in-memory and tells the backend.
|
||||
func (s *Scheduler) SetSlotState(ctx context.Context, slotID string, status SlotStatus, reason string) error {
|
||||
s.mu.Lock()
|
||||
act, ok := s.activeBySlotID[slotID]
|
||||
if !ok {
|
||||
s.mu.Unlock()
|
||||
return fmt.Errorf("calendar: slot %s not active", slotID)
|
||||
}
|
||||
act.State = status
|
||||
act.LastHeartbeat = time.Now().UTC()
|
||||
s.mu.Unlock()
|
||||
return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{
|
||||
Status: status, Reason: reason,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Scheduler) appendHistoryLocked(entry HistoryEntry) {
|
||||
s.history = append(s.history, entry)
|
||||
if len(s.history) > s.cfg.HistoryCap {
|
||||
s.history = s.history[len(s.history)-s.cfg.HistoryCap:]
|
||||
}
|
||||
}
|
||||
|
||||
// CompleteForAgent / AbortForAgent / PauseForAgent / ResumeForAgent
|
||||
// are the agent-facing tool entry points. They look up the agent's
|
||||
// active slot, transition or terminate it, and notify the backend.
|
||||
|
||||
// CompleteForAgent terminates the agent's active slot as completed.
|
||||
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
|
||||
slot, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
return s.resolveSlot(ctx, slot.Slot.ID, SlotCompleted, summary, "")
|
||||
}
|
||||
|
||||
// AbortForAgent terminates the agent's active slot as aborted.
|
||||
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
|
||||
slot, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
return s.resolveSlot(ctx, slot.Slot.ID, SlotAborted, "", reason)
|
||||
}
|
||||
|
||||
// PauseForAgent transitions the agent's slot to paused.
|
||||
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
|
||||
slot, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
return s.SetSlotState(ctx, slot.Slot.ID, SlotPaused, reason)
|
||||
}
|
||||
|
||||
// ResumeForAgent transitions the agent's slot back to in_progress.
|
||||
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
|
||||
slot, ok := s.activeSlotForAgent(agentID)
|
||||
if !ok {
|
||||
return ErrNoActiveSlot
|
||||
}
|
||||
return s.SetSlotState(ctx, slot.Slot.ID, SlotInProgress, "")
|
||||
}
|
||||
|
||||
// activeSlotForAgent returns the per-agent active slot copy under lock.
|
||||
func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -281,36 +326,59 @@ func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
|
||||
return *act, true
|
||||
}
|
||||
|
||||
// Status returns the introspection shape for the calendar_status tool.
|
||||
func (s *Scheduler) Status() SchedulerStatus {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
active := make([]ActiveSlot, 0, len(s.activeBySlotID))
|
||||
for _, a := range s.activeBySlotID {
|
||||
active = append(active, *a)
|
||||
}
|
||||
history := make([]HistoryEntry, len(s.history))
|
||||
copy(history, s.history)
|
||||
return SchedulerStatus{
|
||||
Enabled: true,
|
||||
LastHeartbeat: s.lastHeartbeat,
|
||||
HeartbeatEvery: s.cfg.HeartbeatInterval,
|
||||
Active: active,
|
||||
History: history,
|
||||
RestartPending: s.restartPending,
|
||||
}
|
||||
}
|
||||
|
||||
// SchedulerStatus is the shape calendar_status returns.
|
||||
type SchedulerStatus struct {
|
||||
// Status is the introspection shape calendar_status returns.
|
||||
type Status struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
LastHeartbeat time.Time `json:"last_heartbeat"`
|
||||
LastHeartbeats map[string]time.Time `json:"last_heartbeats"`
|
||||
LastErrors map[string]string `json:"last_errors,omitempty"`
|
||||
HeartbeatEvery time.Duration `json:"heartbeat_every"`
|
||||
Active []ActiveSlot `json:"active"`
|
||||
History []HistoryEntry `json:"history"`
|
||||
RestartPending bool `json:"restart_pending"`
|
||||
}
|
||||
|
||||
// ErrNoActiveSlot is returned by calendar_complete/abort/pause/resume
|
||||
// when the agent has no slot in progress.
|
||||
// SingleActiveAgentID returns the agent id when exactly one active
|
||||
// slot exists, empty otherwise. Used by the plugin's bestEffortAgentID
|
||||
// fallback for tool calls that don't carry agent context.
|
||||
func (s *Scheduler) SingleActiveAgentID() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if len(s.activeByAgentID) != 1 {
|
||||
return ""
|
||||
}
|
||||
for k := range s.activeByAgentID {
|
||||
return k
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Status returns the introspection shape calendar_status returns.
|
||||
func (s *Scheduler) Status() Status {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
active := make([]ActiveSlot, 0, len(s.activeByAgentID))
|
||||
for _, a := range s.activeByAgentID {
|
||||
active = append(active, *a)
|
||||
}
|
||||
hb := make(map[string]time.Time, len(s.lastHeartbeats))
|
||||
for k, v := range s.lastHeartbeats {
|
||||
hb[k] = v
|
||||
}
|
||||
errs := make(map[string]string, len(s.lastErrors))
|
||||
for k, v := range s.lastErrors {
|
||||
errs[k] = v
|
||||
}
|
||||
history := make([]HistoryEntry, len(s.history))
|
||||
copy(history, s.history)
|
||||
return Status{
|
||||
Enabled: true,
|
||||
LastHeartbeats: hb,
|
||||
LastErrors: errs,
|
||||
HeartbeatEvery: s.cfg.HeartbeatInterval,
|
||||
Active: active,
|
||||
History: history,
|
||||
}
|
||||
}
|
||||
|
||||
// ErrNoActiveSlot is returned when an agent calls calendar_complete /
|
||||
// abort / pause / resume but has no slot active.
|
||||
var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")
|
||||
|
||||
@@ -1,106 +1,154 @@
|
||||
// Package calendar talks to the HarborForge backend's Calendar API
|
||||
// (heartbeat, slot fetch, status update, restart-pending check) and
|
||||
// drives a scheduler loop that fires Plexum wake events when slots
|
||||
// come due. Types mirror HarborForge.OpenclawPlugin's calendar/types.ts
|
||||
// so the backend doesn't need to know which plugin is reporting.
|
||||
// Types matching HarborForge.Backend's actual calendar API contract
|
||||
// (verified via /openapi.json on a running backend). Aligns 1:1 with
|
||||
// HarborForge.OpenclawPlugin/plugin/calendar/types.ts so the two
|
||||
// plugins can hit the same backend interchangeably.
|
||||
|
||||
package calendar
|
||||
|
||||
import "time"
|
||||
|
||||
// SlotStatus enumerates the slot lifecycle.
|
||||
// SlotStatus enumerates the lifecycle. String values match backend's
|
||||
// SlotStatus enum verbatim (snake_case — verified via heartbeat
|
||||
// response shape against running harborforge-backend).
|
||||
type SlotStatus string
|
||||
|
||||
const (
|
||||
SlotNotStarted SlotStatus = "not_started"
|
||||
SlotInProgress SlotStatus = "in_progress"
|
||||
SlotCompleted SlotStatus = "completed"
|
||||
SlotOngoing SlotStatus = "ongoing"
|
||||
SlotFinished SlotStatus = "finished"
|
||||
SlotAborted SlotStatus = "aborted"
|
||||
SlotPaused SlotStatus = "paused"
|
||||
SlotDeferred SlotStatus = "deferred"
|
||||
SlotPaused SlotStatus = "paused"
|
||||
SlotSkipped SlotStatus = "skipped"
|
||||
)
|
||||
|
||||
// AgentStatusValue mirrors the backend AgentStatus enum used in
|
||||
// heartbeat responses (a hint about what the backend thinks the
|
||||
// agent is doing).
|
||||
// SlotType: work vs on_call. Affects whether the agent flips to busy.
|
||||
type SlotType string
|
||||
|
||||
const (
|
||||
SlotTypeWork SlotType = "work"
|
||||
SlotTypeOnCall SlotType = "on_call"
|
||||
)
|
||||
|
||||
// EventType categorises what the slot represents.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventTypeJob EventType = "job"
|
||||
EventTypeSystemEvent EventType = "system_event"
|
||||
EventTypeEntertainment EventType = "entertainment"
|
||||
)
|
||||
|
||||
// AgentStatusValue mirrors the backend AgentStatus enum.
|
||||
type AgentStatusValue string
|
||||
|
||||
const (
|
||||
AgentStatusUnknown AgentStatusValue = "unknown"
|
||||
AgentStatusIdle AgentStatusValue = "idle"
|
||||
AgentStatusBusy AgentStatusValue = "busy"
|
||||
AgentStatusOffline AgentStatusValue = "offline"
|
||||
AgentStatusOnCall AgentStatusValue = "on_call"
|
||||
AgentStatusPaused AgentStatusValue = "paused"
|
||||
AgentStatusExhausted AgentStatusValue = "exhausted"
|
||||
)
|
||||
|
||||
// SlotKind is "work" vs "on_call" — affects how the scheduler treats
|
||||
// the slot (on_call slots don't move the agent into busy).
|
||||
type SlotKind string
|
||||
|
||||
const (
|
||||
SlotKindWork SlotKind = "work"
|
||||
SlotKindOnCall SlotKind = "on_call"
|
||||
)
|
||||
|
||||
// Slot is one Calendar TimeSlot the backend serves.
|
||||
type Slot struct {
|
||||
ID string `json:"id"`
|
||||
VirtualID string `json:"virtual_id,omitempty"`
|
||||
// HeartbeatRequest is the POST /calendar/agent/heartbeat body.
|
||||
type HeartbeatRequest struct {
|
||||
ClawIdentifier string `json:"claw_identifier"`
|
||||
AgentID string `json:"agent_id"`
|
||||
ClawID string `json:"claw_identifier,omitempty"`
|
||||
Kind SlotKind `json:"slot_type"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
ScheduledAt time.Time `json:"scheduled_at"`
|
||||
ExpiresAt *time.Time `json:"expires_at,omitempty"`
|
||||
Status SlotStatus `json:"status"`
|
||||
PromptText string `json:"prompt,omitempty"`
|
||||
WakeOptions WakeOpts `json:"wake_options,omitempty"`
|
||||
}
|
||||
|
||||
// WakeOpts customise how the scheduler should drive the agent. v1
|
||||
// honours only Force; the rest pass through as audit trail.
|
||||
type WakeOpts struct {
|
||||
Force bool `json:"force,omitempty"`
|
||||
OverrideMessage string `json:"override_message,omitempty"`
|
||||
ScopeSessionID string `json:"scope_session_id,omitempty"`
|
||||
}
|
||||
|
||||
// HeartbeatPayload is what the plugin POSTs every interval.
|
||||
type HeartbeatPayload struct {
|
||||
Identifier string `json:"identifier"`
|
||||
APIKey string `json:"api_key,omitempty"`
|
||||
AgentList []AgentReport `json:"agents"`
|
||||
PluginInfo PluginInfoTag `json:"plugin"`
|
||||
CapturedAt time.Time `json:"captured_at"`
|
||||
}
|
||||
|
||||
// AgentReport is one entry in HeartbeatPayload.AgentList.
|
||||
type AgentReport struct {
|
||||
ID string `json:"agent_id"`
|
||||
Status AgentStatusValue `json:"status"`
|
||||
Model string `json:"model,omitempty"`
|
||||
}
|
||||
|
||||
// PluginInfoTag identifies which plugin / version is heartbeating.
|
||||
type PluginInfoTag struct {
|
||||
Name string `json:"name"` // "harbor-forge"
|
||||
Version string `json:"version"` // e.g. 0.1.0
|
||||
Backend string `json:"backend"` // "plexum"
|
||||
}
|
||||
|
||||
// HeartbeatResponse is the backend's reply. SlotsToFire are slots
|
||||
// the scheduler should attempt to start.
|
||||
// HeartbeatResponse is the backend's reply.
|
||||
type HeartbeatResponse struct {
|
||||
SlotsToFire []Slot `json:"slots_to_fire,omitempty"`
|
||||
RestartPending bool `json:"restart_pending,omitempty"`
|
||||
ServerTime time.Time `json:"server_time"`
|
||||
Slots []Slot `json:"slots"`
|
||||
AgentStatus AgentStatusValue `json:"agent_status"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// SlotUpdate is the body of POST /calendar/slot/<id>/status.
|
||||
type SlotUpdate struct {
|
||||
// Slot is one calendar TimeSlot — real (has ID) or virtual
|
||||
// (has VirtualID). Field names mirror the backend's
|
||||
// CalendarSlotResponse schema.
|
||||
type Slot struct {
|
||||
ID *int64 `json:"id"` // real slot db id; null for virtual
|
||||
VirtualID *string `json:"virtual_id"` // plan-{plan_id}-{date}; null for real
|
||||
UserID int64 `json:"user_id"`
|
||||
Date string `json:"date"` // YYYY-MM-DD
|
||||
SlotType SlotType `json:"slot_type"`
|
||||
EstimatedDuration int `json:"estimated_duration"` // minutes
|
||||
ScheduledAt string `json:"scheduled_at"` // HH:MM:SS
|
||||
StartedAt *string `json:"started_at"`
|
||||
Attended bool `json:"attended"`
|
||||
ActualDuration *int `json:"actual_duration"`
|
||||
EventType *EventType `json:"event_type"`
|
||||
EventData EventData `json:"event_data"`
|
||||
Priority int `json:"priority"`
|
||||
Status SlotStatus `json:"status"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
PlanID *int64 `json:"plan_id"`
|
||||
}
|
||||
|
||||
// EventData is loosely-typed since the backend stores it as JSONB and
|
||||
// the shape varies by event_type. Plugin code does best-effort
|
||||
// unmarshal into JobData / SystemEventData when needed.
|
||||
type EventData map[string]any
|
||||
|
||||
// JobData is the event_data shape when event_type=="job".
|
||||
type JobData struct {
|
||||
Type string `json:"type"` // Task|Support|Meeting|Essential
|
||||
Code string `json:"code"` // e.g. "TASK-42"
|
||||
WorkingSessions []string `json:"working_sessions"` // arbitrary session ids
|
||||
}
|
||||
|
||||
// SystemEventData is the event_data shape when event_type=="system_event".
|
||||
type SystemEventData struct {
|
||||
Event string `json:"event"` // ScheduleToday | SummaryToday | ScheduledGatewayRestart
|
||||
}
|
||||
|
||||
// SlotAgentUpdate is the body of PATCH /calendar/slots/{id}/agent-update
|
||||
// (and the virtual variant). started_at + actual_duration are set
|
||||
// depending on which status transition the agent is reporting.
|
||||
type SlotAgentUpdate struct {
|
||||
Status SlotStatus `json:"status"`
|
||||
StartedAt string `json:"started_at,omitempty"` // HH:MM:SS
|
||||
ActualDuration int `json:"actual_duration,omitempty"` // minutes
|
||||
}
|
||||
|
||||
// AgentStatusPush is the body of POST /calendar/agent/status.
|
||||
type AgentStatusPush struct {
|
||||
ClawIdentifier string `json:"claw_identifier"`
|
||||
AgentID string `json:"agent_id"`
|
||||
Status AgentStatusValue `json:"status"`
|
||||
}
|
||||
|
||||
// HasRealID reports whether a Slot is the materialized (DB row) flavor.
|
||||
func (s Slot) HasRealID() bool { return s.ID != nil && *s.ID > 0 }
|
||||
|
||||
// SlotIdent returns a stable string identifier for log + map keys —
|
||||
// "real:<id>" for materialized, "virtual:<vid>" for virtual.
|
||||
func (s Slot) SlotIdent() string {
|
||||
if s.HasRealID() {
|
||||
return formatInt("real", *s.ID)
|
||||
}
|
||||
if s.VirtualID != nil {
|
||||
return "virtual:" + *s.VirtualID
|
||||
}
|
||||
return "unknown:" + time.Now().UTC().Format(time.RFC3339Nano)
|
||||
}
|
||||
|
||||
func formatInt(prefix string, n int64) string {
|
||||
// avoid pulling fmt for one call
|
||||
const digits = "0123456789"
|
||||
if n == 0 {
|
||||
return prefix + ":0"
|
||||
}
|
||||
neg := n < 0
|
||||
if neg {
|
||||
n = -n
|
||||
}
|
||||
buf := make([]byte, 0, 20)
|
||||
for n > 0 {
|
||||
buf = append([]byte{digits[n%10]}, buf...)
|
||||
n /= 10
|
||||
}
|
||||
if neg {
|
||||
buf = append([]byte{'-'}, buf...)
|
||||
}
|
||||
return prefix + ":" + string(buf)
|
||||
}
|
||||
|
||||
@@ -35,6 +35,17 @@ type Config struct {
|
||||
// server listens on. Zero/missing disables the bridge entirely.
|
||||
MonitorPort int `json:"monitor_port,omitempty"`
|
||||
|
||||
// MonitorPushEnabled toggles the active push loop that uploads
|
||||
// system telemetry to BackendURL /monitor/server/heartbeat. Lets
|
||||
// HF plugin replace the standalone harborforge-monitor container.
|
||||
// nil (unset) defaults to false; operators must opt in explicitly
|
||||
// since they need to provision APIKey too.
|
||||
MonitorPushEnabled *bool `json:"monitor_push_enabled,omitempty"`
|
||||
|
||||
// MonitorPushIntervalSeconds — defaults to 30s when ≤0. Mirrors
|
||||
// the standalone monitor's HF_MONITER_REPORT_INTERVAL knob.
|
||||
MonitorPushIntervalSeconds int `json:"monitor_push_interval_seconds,omitempty"`
|
||||
|
||||
// CalendarHeartbeatIntervalSeconds — defaults to 30s when ≤0.
|
||||
CalendarHeartbeatIntervalSeconds int `json:"calendar_heartbeat_interval_seconds,omitempty"`
|
||||
|
||||
@@ -57,6 +68,8 @@ type Resolved struct {
|
||||
Identifier string
|
||||
APIKey string
|
||||
MonitorPort int
|
||||
MonitorPushEnabled bool
|
||||
MonitorPushIntervalSeconds int
|
||||
CalendarEnabled bool
|
||||
CalendarHeartbeatIntervalSeconds int
|
||||
CalendarBackendURL string
|
||||
@@ -104,6 +117,8 @@ func Resolve(c Config) Resolved {
|
||||
Identifier: c.Identifier,
|
||||
APIKey: c.APIKey,
|
||||
MonitorPort: c.MonitorPort,
|
||||
MonitorPushEnabled: false,
|
||||
MonitorPushIntervalSeconds: 30,
|
||||
CalendarEnabled: true,
|
||||
CalendarHeartbeatIntervalSeconds: 30,
|
||||
CalendarBackendURL: c.CalendarBackendURL,
|
||||
@@ -127,6 +142,12 @@ func Resolve(c Config) Resolved {
|
||||
if c.CalendarHeartbeatIntervalSeconds > 0 {
|
||||
out.CalendarHeartbeatIntervalSeconds = c.CalendarHeartbeatIntervalSeconds
|
||||
}
|
||||
if c.MonitorPushEnabled != nil {
|
||||
out.MonitorPushEnabled = *c.MonitorPushEnabled
|
||||
}
|
||||
if c.MonitorPushIntervalSeconds > 0 {
|
||||
out.MonitorPushIntervalSeconds = c.MonitorPushIntervalSeconds
|
||||
}
|
||||
if c.RestartPollIntervalSeconds > 0 {
|
||||
out.RestartPollIntervalSeconds = c.RestartPollIntervalSeconds
|
||||
}
|
||||
|
||||
256
internal/kbblock/kbblock.go
Normal file
256
internal/kbblock/kbblock.go
Normal file
@@ -0,0 +1,256 @@
|
||||
// Package kbblock implements the per-session knowledge-base block
|
||||
// owned by the HarborForge Plexum plugin per Plexum DESIGN-DYNAMIC-
|
||||
// BLOCK.md §3.3 / §4.4.
|
||||
//
|
||||
// The kb-block stores HarborForge KB facts the agent has cached for
|
||||
// the current session. Each entry renders as
|
||||
//
|
||||
// <kb-fact id=N kb=<code> source=topic:<slug>>content</kb-fact>
|
||||
//
|
||||
// (id = HF backend's DB primary key for the fact, NOT a per-session
|
||||
// monotonic). InsertSeq controls cache-insertion-order rendering per
|
||||
// §9 #4. Duplicate Add(id, ...) silently no-ops per §9 #4.
|
||||
//
|
||||
// Storage location: plugin-managed file at
|
||||
//
|
||||
// <PLEXUM_PROFILE_ROOT>/agents/<agentID>/sessions/<sessionID>/
|
||||
// plugins/harbor-forge/kb-block.json
|
||||
//
|
||||
// Plexum core's session cleanup (del-session) cascades this directory
|
||||
// alongside the rest of the session state.
|
||||
//
|
||||
// Fade-out (§9 #3 — KB fade params = memory fade params n=5/w=10/m=70)
|
||||
// is NOT applied in v1. Phase 2 v1 stores raw content + agent
|
||||
// dynamic-kb-evict is the only eviction path. Fade will be added once
|
||||
// we have prod data on whether KB facts need it.
|
||||
package kbblock
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const FileName = "kb-block.json"
|
||||
const currentVersion = 1
|
||||
|
||||
// Entry is one cached KB fact.
|
||||
type Entry struct {
|
||||
ID int `json:"id"` // HF backend DB primary key
|
||||
KBCode string `json:"kb_code"` // e.g. "KB-PAYROT"
|
||||
SourceTopic string `json:"source_topic"` // human slug from HF topic, e.g. "debugging"
|
||||
Content string `json:"content"`
|
||||
InsertSeq int `json:"insert_seq"` // per-session monotonic, for render order
|
||||
AddedAtTurn int `json:"added_at_turn"`
|
||||
LastRefreshAtTurn int `json:"last_refresh_at_turn"`
|
||||
}
|
||||
|
||||
// Block is the in-memory representation of kb-block.json.
|
||||
type Block struct {
|
||||
mu sync.Mutex
|
||||
path string
|
||||
NextSeq int `json:"next_seq"`
|
||||
Entries []*Entry `json:"entries"`
|
||||
}
|
||||
|
||||
// SessionDir returns the plugin-scoped session subdir under the Plexum
|
||||
// profile root: <profileRoot>/agents/<agentID>/sessions/<sessionID>/
|
||||
// plugins/harbor-forge.
|
||||
func SessionDir(profileRoot, agentID, sessionID string) string {
|
||||
return filepath.Join(profileRoot, "agents", agentID, "sessions", sessionID, "plugins", "harbor-forge")
|
||||
}
|
||||
|
||||
// Open loads the block at <SessionDir>/kb-block.json. Missing file →
|
||||
// empty block (NextSeq=1). Caller is expected to Close-free; Block
|
||||
// can be re-Opened cheaply per-call (small JSON).
|
||||
func Open(profileRoot, agentID, sessionID string) (*Block, error) {
|
||||
if profileRoot == "" || agentID == "" || sessionID == "" {
|
||||
return nil, fmt.Errorf("kbblock.Open: profileRoot/agentID/sessionID all required")
|
||||
}
|
||||
path := filepath.Join(SessionDir(profileRoot, agentID, sessionID), FileName)
|
||||
b := &Block{path: path, NextSeq: 1}
|
||||
raw, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return b, nil
|
||||
}
|
||||
return nil, fmt.Errorf("kbblock: read %s: %w", path, err)
|
||||
}
|
||||
if len(raw) == 0 {
|
||||
return b, nil
|
||||
}
|
||||
var loaded struct {
|
||||
NextSeq int `json:"next_seq"`
|
||||
Entries []*Entry `json:"entries"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &loaded); err != nil {
|
||||
return nil, fmt.Errorf("kbblock: parse %s: %w", path, err)
|
||||
}
|
||||
if loaded.NextSeq < 1 {
|
||||
loaded.NextSeq = 1
|
||||
}
|
||||
b.NextSeq = loaded.NextSeq
|
||||
b.Entries = loaded.Entries
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Path returns the underlying file path.
|
||||
func (b *Block) Path() string { return b.path }
|
||||
|
||||
// Len returns the entry count.
|
||||
func (b *Block) Len() int {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return len(b.Entries)
|
||||
}
|
||||
|
||||
// Has reports whether a fact with this ID is already cached.
|
||||
func (b *Block) Has(id int) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for _, e := range b.Entries {
|
||||
if e.ID == id {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Add appends a new entry. Duplicate ID → silent no-op, returns nil.
|
||||
// Does NOT persist; caller must Save.
|
||||
func (b *Block) Add(id int, kbCode, sourceTopic, content string, atTurn int) *Entry {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for _, e := range b.Entries {
|
||||
if e.ID == id {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
e := &Entry{
|
||||
ID: id,
|
||||
KBCode: kbCode,
|
||||
SourceTopic: sourceTopic,
|
||||
Content: content,
|
||||
InsertSeq: b.NextSeq,
|
||||
AddedAtTurn: atTurn,
|
||||
LastRefreshAtTurn: atTurn,
|
||||
}
|
||||
b.NextSeq++
|
||||
b.Entries = append(b.Entries, e)
|
||||
return e
|
||||
}
|
||||
|
||||
// Remove drops entries by ID. Returns the IDs actually removed.
|
||||
func (b *Block) Remove(ids ...int) []int {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
want := map[int]bool{}
|
||||
for _, id := range ids {
|
||||
want[id] = true
|
||||
}
|
||||
kept := b.Entries[:0]
|
||||
var removed []int
|
||||
for _, e := range b.Entries {
|
||||
if want[e.ID] {
|
||||
removed = append(removed, e.ID)
|
||||
continue
|
||||
}
|
||||
kept = append(kept, e)
|
||||
}
|
||||
b.Entries = kept
|
||||
return removed
|
||||
}
|
||||
|
||||
// Lookup returns the entry with the given ID, or nil.
|
||||
func (b *Block) Lookup(id int) *Entry {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for _, e := range b.Entries {
|
||||
if e.ID == id {
|
||||
return e
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AllIDs returns the cached fact IDs sorted by InsertSeq ascending.
|
||||
func (b *Block) AllIDs() []int {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
out := make([]*Entry, len(b.Entries))
|
||||
copy(out, b.Entries)
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].InsertSeq < out[j].InsertSeq })
|
||||
ids := make([]int, len(out))
|
||||
for i, e := range out {
|
||||
ids[i] = e.ID
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// Save atomically writes (tmp+rename, 0600). Empty block with no prior
|
||||
// file = no-op. Parent dirs auto-created.
|
||||
func (b *Block) Save() error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.Entries) == 0 {
|
||||
if _, err := os.Stat(b.path); os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(b.path), 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
payload := struct {
|
||||
Version int `json:"version"`
|
||||
NextSeq int `json:"next_seq"`
|
||||
Entries []*Entry `json:"entries"`
|
||||
}{Version: currentVersion, NextSeq: b.NextSeq, Entries: b.Entries}
|
||||
data, err := json.MarshalIndent(payload, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmp := b.path + ".tmp"
|
||||
if err := os.WriteFile(tmp, data, 0o600); err != nil {
|
||||
return fmt.Errorf("kbblock: write tmp: %w", err)
|
||||
}
|
||||
return os.Rename(tmp, b.path)
|
||||
}
|
||||
|
||||
// Render returns the inner body of the <kb-block> subblock — a flat
|
||||
// sequence of `<kb-fact id=N kb=<code> source=topic:<slug>>\n<content>
|
||||
// \n</kb-fact>` separated by single blank lines, ordered by InsertSeq
|
||||
// (DESIGN-DYNAMIC-BLOCK.md §9 #4). Empty block returns "".
|
||||
//
|
||||
// The plugin's DynamicBlockProvider.RenderDynamicSubblock returns this
|
||||
// to the Plexum host, which wraps in <kb-block>...</kb-block>.
|
||||
func (b *Block) Render() string {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if len(b.Entries) == 0 {
|
||||
return ""
|
||||
}
|
||||
ordered := make([]*Entry, len(b.Entries))
|
||||
copy(ordered, b.Entries)
|
||||
sort.Slice(ordered, func(i, j int) bool { return ordered[i].InsertSeq < ordered[j].InsertSeq })
|
||||
var sb strings.Builder
|
||||
for i, e := range ordered {
|
||||
if i > 0 {
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
fmt.Fprintf(&sb, "<kb-fact id=%d kb=%s", e.ID, e.KBCode)
|
||||
if e.SourceTopic != "" {
|
||||
fmt.Fprintf(&sb, " source=topic:%s", e.SourceTopic)
|
||||
}
|
||||
sb.WriteString(">\n")
|
||||
sb.WriteString(e.Content)
|
||||
if !strings.HasSuffix(e.Content, "\n") {
|
||||
sb.WriteByte('\n')
|
||||
}
|
||||
sb.WriteString("</kb-fact>\n")
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
139
internal/kbblock/kbblock_test.go
Normal file
139
internal/kbblock/kbblock_test.go
Normal file
@@ -0,0 +1,139 @@
|
||||
package kbblock
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSessionDir(t *testing.T) {
|
||||
got := SessionDir("/root/.plexum", "alice", "s_1")
|
||||
want := "/root/.plexum/agents/alice/sessions/s_1/plugins/harbor-forge"
|
||||
if got != want {
|
||||
t.Errorf("SessionDir = %q want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenMissingFile(t *testing.T) {
|
||||
b, err := Open(t.TempDir(), "alice", "s_1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if b.Len() != 0 || b.NextSeq != 1 {
|
||||
t.Errorf("empty open: Len=%d NextSeq=%d", b.Len(), b.NextSeq)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenRequiresAllArgs(t *testing.T) {
|
||||
if _, err := Open("", "a", "s"); err == nil {
|
||||
t.Error("want error on empty profileRoot")
|
||||
}
|
||||
if _, err := Open("/x", "", "s"); err == nil {
|
||||
t.Error("want error on empty agentID")
|
||||
}
|
||||
if _, err := Open("/x", "a", ""); err == nil {
|
||||
t.Error("want error on empty sessionID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddAndDuplicate(t *testing.T) {
|
||||
b, _ := Open(t.TempDir(), "a", "s")
|
||||
e := b.Add(42, "KB-PAYROT", "debugging", "OOM fix", 3)
|
||||
if e == nil || e.ID != 42 || e.KBCode != "KB-PAYROT" || e.SourceTopic != "debugging" ||
|
||||
e.InsertSeq != 1 || e.AddedAtTurn != 3 {
|
||||
t.Errorf("entry wrong: %+v", e)
|
||||
}
|
||||
if b.Add(42, "X", "y", "z", 7) != nil {
|
||||
t.Error("dup should return nil")
|
||||
}
|
||||
if b.Len() != 1 {
|
||||
t.Errorf("Len=%d", b.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundtrip(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
b, _ := Open(dir, "a", "s")
|
||||
b.Add(101, "KB-A", "topic-a", "a", 1)
|
||||
b.Add(202, "KB-A", "topic-b", "b", 2)
|
||||
if err := b.Save(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
b2, _ := Open(dir, "a", "s")
|
||||
if b2.Len() != 2 || b2.NextSeq != 3 {
|
||||
t.Errorf("reopen Len=%d NextSeq=%d", b2.Len(), b2.NextSeq)
|
||||
}
|
||||
if !b2.Has(101) || !b2.Has(202) {
|
||||
t.Error("entries missing after reopen")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenderInsertOrder(t *testing.T) {
|
||||
b, _ := Open(t.TempDir(), "a", "s")
|
||||
b.Add(999, "K", "t", "first", 0)
|
||||
b.Add(1, "K", "t", "second", 0)
|
||||
out := b.Render()
|
||||
idx999 := strings.Index(out, "id=999")
|
||||
idx1 := strings.Index(out, "id=1 ")
|
||||
if !(idx999 >= 0 && idx1 > idx999) {
|
||||
t.Errorf("order broken: %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenderAttributes(t *testing.T) {
|
||||
b, _ := Open(t.TempDir(), "a", "s")
|
||||
b.Add(42, "KB-PAYROT", "debugging", "OOM fix: bump heap", 0)
|
||||
out := b.Render()
|
||||
if !strings.HasPrefix(out, "<kb-fact id=42 kb=KB-PAYROT source=topic:debugging>\n") {
|
||||
t.Errorf("head wrong: %q", out)
|
||||
}
|
||||
if !strings.HasSuffix(out, "</kb-fact>\n") {
|
||||
t.Errorf("tail wrong: %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenderOmitsSourceWhenEmpty(t *testing.T) {
|
||||
b, _ := Open(t.TempDir(), "a", "s")
|
||||
b.Add(7, "KB-X", "", "raw", 0)
|
||||
out := b.Render()
|
||||
if strings.Contains(out, "source=topic:") {
|
||||
t.Errorf("source attr leaked: %q", out)
|
||||
}
|
||||
if !strings.Contains(out, "<kb-fact id=7 kb=KB-X>") {
|
||||
t.Errorf("tight tag missing: %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
b, _ := Open(t.TempDir(), "a", "s")
|
||||
b.Add(1, "K", "t", "a", 0)
|
||||
b.Add(2, "K", "t", "b", 0)
|
||||
b.Add(3, "K", "t", "c", 0)
|
||||
removed := b.Remove(2, 99, 3)
|
||||
if len(removed) != 2 {
|
||||
t.Errorf("removed=%v", removed)
|
||||
}
|
||||
if b.Len() != 1 || !b.Has(1) {
|
||||
t.Errorf("post-state wrong: Len=%d", b.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenderEmpty(t *testing.T) {
|
||||
b, _ := Open(t.TempDir(), "a", "s")
|
||||
if b.Render() != "" {
|
||||
t.Error("empty Render should be \"\"")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSaveEmptyNoFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
b, _ := Open(dir, "a", "s")
|
||||
if err := b.Save(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
path := filepath.Join(SessionDir(dir, "a", "s"), FileName)
|
||||
if _, err := os.Stat(path); !os.IsNotExist(err) {
|
||||
t.Errorf("file should not exist: %v", err)
|
||||
}
|
||||
}
|
||||
285
internal/kbclient/client.go
Normal file
285
internal/kbclient/client.go
Normal file
@@ -0,0 +1,285 @@
|
||||
// Package kbclient is a typed HTTP client for the HarborForge.Backend
|
||||
// knowledge-base REST API. Used by the dynamic-kb-* tools to power
|
||||
// agent browse + cache flows.
|
||||
//
|
||||
// Route shapes verified against
|
||||
// HarborForge.Backend/app/api/routers/knowledge.py:
|
||||
//
|
||||
// GET /knowledge-bases[?project=<code>] list KBs
|
||||
// GET /knowledge-bases/{kb_id_or_code}/topics list topics in a KB
|
||||
// GET /knowledge-bases/{kb_id_or_code}/tree full hierarchy
|
||||
// GET /knowledge-facts/{fact_id} single fact
|
||||
//
|
||||
// HF's KB hierarchy is KB → Topics → Categories → Facts. ListFacts
|
||||
// uses the /tree endpoint and filters client-side to the requested
|
||||
// topic ids (the backend has no flat per-topic-facts route as of
|
||||
// the route audit on 2026-06-08).
|
||||
//
|
||||
// Auth: v1 uses plugin-level APIKey via Bearer header. Per-agent
|
||||
// token resolution (matching agent's hf-token from secret-mgr) is a
|
||||
// TODO — needs SDK extension for plugin → secret-mgr access.
|
||||
package kbclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client is the HTTP wrapper. Construct once at plugin init; safe to
|
||||
// share across goroutines.
|
||||
type Client struct {
|
||||
BaseURL string
|
||||
APIKey string
|
||||
HTTP *http.Client
|
||||
}
|
||||
|
||||
// New returns a client with a 30s default timeout. BaseURL has any
|
||||
// trailing slash trimmed.
|
||||
func New(baseURL, apiKey string) *Client {
|
||||
return &Client{
|
||||
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||
APIKey: apiKey,
|
||||
HTTP: &http.Client{Timeout: 30 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Payload types matching HF backend response shapes ----
|
||||
|
||||
// KBSummary is one row from GET /knowledge-bases.
|
||||
type KBSummary struct {
|
||||
ID int `json:"id"`
|
||||
KnowledgeBaseCode string `json:"knowledge_base_code"`
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
// TopicSummary is one row from GET /knowledge-bases/{id}/topics.
|
||||
type TopicSummary struct {
|
||||
ID int `json:"id"`
|
||||
Topic string `json:"topic"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
// FactSummary is one item the dynamic-kb-list-facts tool surfaces to
|
||||
// the agent. Built client-side from the tree response — title + a
|
||||
// snippet of the content. Kept light so list-facts output stays
|
||||
// scan-able even on large KBs.
|
||||
type FactSummary struct {
|
||||
ID int `json:"id"`
|
||||
TopicID int `json:"topic_id"`
|
||||
TopicSlug string `json:"topic_slug"`
|
||||
Title string `json:"title"`
|
||||
Snippet string `json:"snippet"`
|
||||
}
|
||||
|
||||
// Fact is the full fact body fetched by dynamic-kb-cache before
|
||||
// writing into kb-block.json.
|
||||
type Fact struct {
|
||||
ID int `json:"id"`
|
||||
KBCode string `json:"kb_code"`
|
||||
TopicSlug string `json:"topic_slug"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
// ---- HTTP methods ----
|
||||
|
||||
// ListKBs hits GET /knowledge-bases[?project=<code>]. Empty
|
||||
// projectCode = unfiltered.
|
||||
func (c *Client) ListKBs(ctx context.Context, projectCode string) ([]KBSummary, error) {
|
||||
path := "/knowledge-bases"
|
||||
if projectCode != "" {
|
||||
path += "?project=" + url.QueryEscape(projectCode)
|
||||
}
|
||||
var out []KBSummary
|
||||
if err := c.getJSON(ctx, path, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ListTopics hits GET /knowledge-bases/{kbCode}/topics. kbCode is the
|
||||
// human KB code (e.g. "KB-PAYROT"); the backend resolves either int
|
||||
// id or code in the same path slot.
|
||||
func (c *Client) ListTopics(ctx context.Context, kbCode string) ([]TopicSummary, error) {
|
||||
if kbCode == "" {
|
||||
return nil, fmt.Errorf("kbclient: kb-code required")
|
||||
}
|
||||
path := "/knowledge-bases/" + url.PathEscape(kbCode) + "/topics"
|
||||
var out []TopicSummary
|
||||
if err := c.getJSON(ctx, path, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// kbTree mirrors the shape of GET /knowledge-bases/{id}/tree. We only
|
||||
// pull the fields we need to build FactSummary; unknown fields are
|
||||
// dropped silently.
|
||||
type kbTree struct {
|
||||
KnowledgeBaseCode string `json:"knowledge_base_code"`
|
||||
Topics []treeNode `json:"topics"`
|
||||
}
|
||||
|
||||
type treeNode struct {
|
||||
ID int `json:"id"`
|
||||
Topic string `json:"topic"` // topic-level
|
||||
Category string `json:"category"` // category-level
|
||||
Title string `json:"title"` // fact-level
|
||||
Content string `json:"content"` // fact-level
|
||||
Categories []treeNode `json:"categories"` // children of topic / category
|
||||
Facts []treeNode `json:"facts"` // fact children
|
||||
}
|
||||
|
||||
// ListFacts hits GET /knowledge-bases/{kbCode}/tree, walks the tree,
|
||||
// and flattens facts under the requested topic ids into FactSummary
|
||||
// rows. Snippet = first 120 chars of content (UTF-8 safe-truncated).
|
||||
func (c *Client) ListFacts(ctx context.Context, kbCode string, topicIDs []int) ([]FactSummary, error) {
|
||||
if kbCode == "" {
|
||||
return nil, fmt.Errorf("kbclient: kb-code required")
|
||||
}
|
||||
if len(topicIDs) == 0 {
|
||||
return nil, fmt.Errorf("kbclient: topic-ids required")
|
||||
}
|
||||
path := "/knowledge-bases/" + url.PathEscape(kbCode) + "/tree"
|
||||
var tree kbTree
|
||||
if err := c.getJSON(ctx, path, &tree); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wantTopic := map[int]bool{}
|
||||
for _, id := range topicIDs {
|
||||
wantTopic[id] = true
|
||||
}
|
||||
var out []FactSummary
|
||||
for _, topic := range tree.Topics {
|
||||
if !wantTopic[topic.ID] {
|
||||
continue
|
||||
}
|
||||
walkFactsInto(&out, topic.ID, topic.Topic, topic.Categories, topic.Facts)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// walkFactsInto descends category subtree, collecting facts. cur is
|
||||
// the topic context (id + slug) for tagging.
|
||||
func walkFactsInto(out *[]FactSummary, topicID int, topicSlug string, cats, facts []treeNode) {
|
||||
for _, f := range facts {
|
||||
*out = append(*out, FactSummary{
|
||||
ID: f.ID,
|
||||
TopicID: topicID,
|
||||
TopicSlug: topicSlug,
|
||||
Title: f.Title,
|
||||
Snippet: snippet(f.Content, 120),
|
||||
})
|
||||
}
|
||||
for _, c := range cats {
|
||||
walkFactsInto(out, topicID, topicSlug, c.Categories, c.Facts)
|
||||
}
|
||||
}
|
||||
|
||||
// GetFacts pulls each fact's full content by ID. Calls
|
||||
// /knowledge-facts/{id} per fact; rate-limited to sequential issue
|
||||
// (small N expected — agents cache a handful per call). Pass kbCode
|
||||
// for tagging the returned Fact records; backend doesn't return it.
|
||||
func (c *Client) GetFacts(ctx context.Context, kbCode string, factIDs []int) ([]Fact, error) {
|
||||
if kbCode == "" {
|
||||
return nil, fmt.Errorf("kbclient: kb-code required")
|
||||
}
|
||||
out := make([]Fact, 0, len(factIDs))
|
||||
for _, id := range factIDs {
|
||||
path := "/knowledge-facts/" + strconv.Itoa(id)
|
||||
var node struct {
|
||||
ID int `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Content string `json:"content"`
|
||||
TopicID int `json:"topic_id"`
|
||||
TopicSlug string `json:"topic"`
|
||||
CategoryID int `json:"category_id"`
|
||||
}
|
||||
if err := c.getJSON(ctx, path, &node); err != nil {
|
||||
// 404 → skip silently; other errors fail the batch
|
||||
if isNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, Fact{
|
||||
ID: node.ID,
|
||||
KBCode: kbCode,
|
||||
TopicSlug: node.TopicSlug,
|
||||
Content: node.Content,
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ---- transport plumbing ----
|
||||
|
||||
func (c *Client) getJSON(ctx context.Context, path string, out any) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+path, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if c.APIKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.APIKey)
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
res, err := c.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("GET %s: %w", path, err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
if res.StatusCode == 404 {
|
||||
return notFoundErr{path: path}
|
||||
}
|
||||
if res.StatusCode >= 400 {
|
||||
return fmt.Errorf("GET %s: HTTP %d: %s", path, res.StatusCode, truncate(body, 200))
|
||||
}
|
||||
if len(body) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := json.Unmarshal(body, out); err != nil {
|
||||
return fmt.Errorf("GET %s decode: %w (body=%q)", path, err, truncate(body, 200))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type notFoundErr struct{ path string }
|
||||
|
||||
func (e notFoundErr) Error() string { return "GET " + e.path + ": 404 not found" }
|
||||
|
||||
func isNotFound(err error) bool {
|
||||
_, ok := err.(notFoundErr)
|
||||
return ok
|
||||
}
|
||||
|
||||
func truncate(b []byte, n int) string {
|
||||
if len(b) > n {
|
||||
return string(b[:n]) + "..."
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// snippet returns the first n bytes of s, padded with "..." if cut.
|
||||
// Naive byte slice; HF content is UTF-8 and we accept the small risk
|
||||
// of cutting mid-rune for log-level rendering (the agent gets full
|
||||
// content via GetFacts before caching).
|
||||
func snippet(s string, n int) string {
|
||||
s = strings.TrimSpace(s)
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n] + "..."
|
||||
}
|
||||
|
||||
// Compile-time guard: caller code expected to wrap bytes.Buffer usage
|
||||
// (currently unused but here for future POST/PATCH endpoints).
|
||||
var _ = bytes.NewReader([]byte{})
|
||||
238
internal/monitor/pusher.go
Normal file
238
internal/monitor/pusher.go
Normal file
@@ -0,0 +1,238 @@
|
||||
// Pusher periodically uploads system telemetry to the HarborForge
|
||||
// backend's /monitor/server/heartbeat endpoint. Replaces the standalone
|
||||
// `harborforge-monitor` daemon — the plugin's lifecycle (host gateway
|
||||
// start/stop) bounds the heartbeat loop, so no separate process need
|
||||
// supervise it.
|
||||
//
|
||||
// Wire shape mirrors HarborForge.Monitor's `telemetry.Payload`
|
||||
// (flat `cpu_pct/mem_pct/...` fields + `X-API-Key` header). The
|
||||
// translation from internal `telemetry.Snapshot` to that shape lives
|
||||
// in buildPayload; HF backend stays unchanged.
|
||||
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
|
||||
)
|
||||
|
||||
// PushPayload is the wire shape POSTed to /monitor/server/heartbeat —
|
||||
// 1:1 with HarborForge.Monitor's `telemetry.Payload`.
|
||||
type PushPayload struct {
|
||||
Identifier string `json:"identifier"`
|
||||
PluginVersion string `json:"plugin_version,omitempty"`
|
||||
Agents []any `json:"agents"`
|
||||
NginxInstalled bool `json:"nginx_installed"`
|
||||
NginxSites []string `json:"nginx_sites"`
|
||||
CPUPct float64 `json:"cpu_pct,omitempty"`
|
||||
MemPct float64 `json:"mem_pct,omitempty"`
|
||||
DiskPct float64 `json:"disk_pct,omitempty"`
|
||||
SwapPct float64 `json:"swap_pct,omitempty"`
|
||||
LoadAvg []float64 `json:"load_avg,omitempty"`
|
||||
UptimeSeconds uint64 `json:"uptime_seconds,omitempty"`
|
||||
}
|
||||
|
||||
// PusherConfig is the operator-supplied tuning.
|
||||
type PusherConfig struct {
|
||||
BackendURL string // e.g. https://hf-api.hangman-lab.top
|
||||
APIKey string // sent as X-API-Key
|
||||
Interval time.Duration // default 30s when <=0
|
||||
}
|
||||
|
||||
// Pusher runs the periodic POST loop. One per plugin process.
|
||||
type Pusher struct {
|
||||
cfg PusherConfig
|
||||
collect func() telemetry.Snapshot
|
||||
log LogFunc
|
||||
http *http.Client
|
||||
|
||||
// stats — for the monitor_telemetry tool / status surfacing.
|
||||
mu sync.RWMutex
|
||||
lastSentAt time.Time
|
||||
lastStatus int
|
||||
lastErr string
|
||||
successHits uint64
|
||||
errHits uint64
|
||||
}
|
||||
|
||||
// NewPusher constructs the loop runner. collect must be a snapshot
|
||||
// producer (caller usually wires it to telemetry.Collect with
|
||||
// SampleCPU=true).
|
||||
func NewPusher(cfg PusherConfig, collect func() telemetry.Snapshot, log LogFunc) *Pusher {
|
||||
if cfg.Interval <= 0 {
|
||||
cfg.Interval = 30 * time.Second
|
||||
}
|
||||
if log == nil {
|
||||
log = func(string, string, map[string]any) {}
|
||||
}
|
||||
return &Pusher{
|
||||
cfg: cfg,
|
||||
collect: collect,
|
||||
log: log,
|
||||
http: &http.Client{Timeout: 15 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// Run drives the push loop until ctx is cancelled. Returns ctx.Err().
|
||||
// First push happens immediately so the backend sees this claw alive
|
||||
// without waiting an interval.
|
||||
func (p *Pusher) Run(ctx context.Context) error {
|
||||
if p.cfg.BackendURL == "" {
|
||||
p.log("warn", "monitor push disabled (empty backendURL)", nil)
|
||||
return nil
|
||||
}
|
||||
if p.cfg.APIKey == "" {
|
||||
p.log("warn", "monitor push disabled (empty apiKey)", nil)
|
||||
return nil
|
||||
}
|
||||
url := strings.TrimRight(p.cfg.BackendURL, "/") + "/monitor/server/heartbeat"
|
||||
|
||||
tick := time.NewTicker(p.cfg.Interval)
|
||||
defer tick.Stop()
|
||||
|
||||
p.pushOnce(ctx, url)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-tick.C:
|
||||
p.pushOnce(ctx, url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pusher) pushOnce(ctx context.Context, url string) {
|
||||
snap := p.collect()
|
||||
body, err := json.Marshal(buildPayload(snap))
|
||||
if err != nil {
|
||||
p.recordErr("marshal: " + err.Error())
|
||||
return
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
p.recordErr("build req: " + err.Error())
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("X-API-Key", p.cfg.APIKey)
|
||||
res, err := p.http.Do(req)
|
||||
if err != nil {
|
||||
p.recordErr("send: " + err.Error())
|
||||
p.log("warn", "monitor push failed", map[string]any{"err": err.Error()})
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
raw, _ := io.ReadAll(res.Body)
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
p.recordErr(fmt.Sprintf("%d: %s", res.StatusCode, truncate(raw, 200)))
|
||||
p.log("warn", "monitor push non-2xx", map[string]any{
|
||||
"status": res.StatusCode, "body": truncate(raw, 200),
|
||||
})
|
||||
return
|
||||
}
|
||||
p.recordOK(res.StatusCode)
|
||||
}
|
||||
|
||||
// Stats exposes a copy of the latest push state for diagnostics
|
||||
// (harborforge_monitor_telemetry tool surfaces this).
|
||||
type PushStats struct {
|
||||
LastSentAt time.Time
|
||||
LastStatus int
|
||||
LastErr string
|
||||
SuccessCount uint64
|
||||
ErrorCount uint64
|
||||
}
|
||||
|
||||
func (p *Pusher) Stats() PushStats {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return PushStats{
|
||||
LastSentAt: p.lastSentAt,
|
||||
LastStatus: p.lastStatus,
|
||||
LastErr: p.lastErr,
|
||||
SuccessCount: p.successHits,
|
||||
ErrorCount: p.errHits,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pusher) recordOK(status int) {
|
||||
p.mu.Lock()
|
||||
wasFirst := p.successHits == 0
|
||||
p.lastSentAt = time.Now().UTC()
|
||||
p.lastStatus = status
|
||||
p.lastErr = ""
|
||||
p.successHits++
|
||||
count := p.successHits
|
||||
p.mu.Unlock()
|
||||
|
||||
// First success is an operator signal that the push loop is live;
|
||||
// log it loud so the journal carries proof. Subsequent successes
|
||||
// log on a slow heartbeat (every 60 cycles) so the journal stays
|
||||
// quiet but still proves the loop hasn't drifted into "0 successes
|
||||
// but no errors either" territory.
|
||||
if wasFirst {
|
||||
p.log("info", "monitor push started", map[string]any{"status": status})
|
||||
} else if count%60 == 0 {
|
||||
p.log("info", "monitor push heartbeat", map[string]any{
|
||||
"successes": count, "status": status,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pusher) recordErr(msg string) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.lastSentAt = time.Now().UTC()
|
||||
p.lastErr = msg
|
||||
p.errHits++
|
||||
}
|
||||
|
||||
// buildPayload translates the internal Snapshot into the flat
|
||||
// PushPayload shape the backend expects. agents is passed through as
|
||||
// []any (one entry per agent — id/model/state preserved).
|
||||
func buildPayload(snap telemetry.Snapshot) PushPayload {
|
||||
agents := make([]any, 0, len(snap.Agents))
|
||||
for _, a := range snap.Agents {
|
||||
agents = append(agents, map[string]any{
|
||||
"id": a.ID,
|
||||
"model": a.Model,
|
||||
"state": a.State,
|
||||
})
|
||||
}
|
||||
return PushPayload{
|
||||
Identifier: snap.Identifier,
|
||||
PluginVersion: snap.PluginInfo.Version,
|
||||
Agents: agents,
|
||||
// nginx detection is independent monitor's responsibility today;
|
||||
// HF plugin leaves it blank rather than rediscovering nginx
|
||||
// state. Operators that need it can keep the standalone monitor
|
||||
// alongside or wait for a follow-up commit.
|
||||
NginxInstalled: false,
|
||||
NginxSites: []string{},
|
||||
CPUPct: round1(snap.CPU.UsedPercent),
|
||||
MemPct: round1(snap.Memory.UsedPercent),
|
||||
DiskPct: round1(snap.Disk.UsedPercent),
|
||||
SwapPct: round1(snap.Swap.UsedPercent),
|
||||
LoadAvg: []float64{round2(snap.Load.One), round2(snap.Load.Five), round2(snap.Load.Fifteen)},
|
||||
UptimeSeconds: snap.UptimeSecs,
|
||||
}
|
||||
}
|
||||
|
||||
func round1(v float64) float64 { return float64(int64(v*10+0.5)) / 10 }
|
||||
func round2(v float64) float64 { return float64(int64(v*100+0.5)) / 100 }
|
||||
|
||||
func truncate(b []byte, n int) string {
|
||||
if len(b) <= n {
|
||||
return string(b)
|
||||
}
|
||||
return string(b[:n]) + "…"
|
||||
}
|
||||
@@ -29,14 +29,30 @@ type Snapshot struct {
|
||||
Hostname string `json:"hostname"`
|
||||
UptimeSecs uint64 `json:"uptime"`
|
||||
Memory MemoryInfo `json:"memory"`
|
||||
Swap SwapInfo `json:"swap"`
|
||||
Load LoadInfo `json:"load"`
|
||||
Disk DiskInfo `json:"disk"`
|
||||
CPU CPUInfo `json:"cpu"`
|
||||
Agents []AgentInfo `json:"agents"`
|
||||
PluginInfo PluginInfo `json:"plugin"`
|
||||
CapturedAt time.Time `json:"captured_at"`
|
||||
HostMetadata map[string]string `json:"host_metadata,omitempty"`
|
||||
}
|
||||
|
||||
// SwapInfo is the system swap usage. Zeroes when swap isn't configured.
|
||||
type SwapInfo struct {
|
||||
Total uint64 `json:"total"`
|
||||
Free uint64 `json:"free"`
|
||||
Used uint64 `json:"used"`
|
||||
UsedPercent float64 `json:"used_percent"`
|
||||
}
|
||||
|
||||
// CPUInfo holds the most recent CPU usage estimate. UsedPercent is
|
||||
// computed across one sample interval (see Collect's cpu helper).
|
||||
type CPUInfo struct {
|
||||
UsedPercent float64 `json:"used_percent"`
|
||||
}
|
||||
|
||||
// MemoryInfo mirrors OpenclawPlugin's memory shape.
|
||||
type MemoryInfo struct {
|
||||
Total uint64 `json:"total"` // bytes
|
||||
@@ -84,15 +100,27 @@ type CollectOpts struct {
|
||||
Identifier string
|
||||
Version string
|
||||
AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents)
|
||||
|
||||
// SampleCPU asks Collect to take a 1-second CPU sample. Off-path
|
||||
// (status endpoint, bridge serve) leave false to keep calls cheap;
|
||||
// the slow push loop sets it true.
|
||||
SampleCPU bool
|
||||
}
|
||||
|
||||
// Collect produces a fresh snapshot from /proc + the supplied AgentLister.
|
||||
// SampleCPU=true takes a 1-second CPU sample (two reads of /proc/stat
|
||||
// with a sleep between); otherwise CPU usage stays zero. Set true on
|
||||
// the slow push loop, false on the cheap on-demand status endpoint.
|
||||
func Collect(opts CollectOpts) Snapshot {
|
||||
now := time.Now().UTC()
|
||||
host, _ := os.Hostname()
|
||||
mem := readMemInfo()
|
||||
mem, swap := readMemAndSwap()
|
||||
load := readLoadAvg()
|
||||
disk := readDiskRoot()
|
||||
cpu := CPUInfo{}
|
||||
if opts.SampleCPU {
|
||||
cpu.UsedPercent = sampleCPUPercent(time.Second)
|
||||
}
|
||||
var agents []AgentInfo
|
||||
if opts.AgentLister != nil {
|
||||
agents = opts.AgentLister()
|
||||
@@ -103,8 +131,10 @@ func Collect(opts CollectOpts) Snapshot {
|
||||
Hostname: host,
|
||||
UptimeSecs: readUptime(),
|
||||
Memory: mem,
|
||||
Swap: swap,
|
||||
Load: load,
|
||||
Disk: disk,
|
||||
CPU: cpu,
|
||||
Agents: agents,
|
||||
PluginInfo: PluginInfo{
|
||||
Name: "harbor-forge",
|
||||
@@ -117,10 +147,10 @@ func Collect(opts CollectOpts) Snapshot {
|
||||
|
||||
// ---- /proc helpers ----
|
||||
|
||||
func readMemInfo() MemoryInfo {
|
||||
func readMemAndSwap() (MemoryInfo, SwapInfo) {
|
||||
f, err := os.Open("/proc/meminfo")
|
||||
if err != nil {
|
||||
return MemoryInfo{}
|
||||
return MemoryInfo{}, SwapInfo{}
|
||||
}
|
||||
defer f.Close()
|
||||
fields := map[string]uint64{}
|
||||
@@ -145,6 +175,12 @@ func readMemInfo() MemoryInfo {
|
||||
// All MemInfo values are in KB; convert to bytes.
|
||||
fields[key] = v * 1024
|
||||
}
|
||||
mem := buildMemInfo(fields)
|
||||
swap := buildSwapInfo(fields)
|
||||
return mem, swap
|
||||
}
|
||||
|
||||
func buildMemInfo(fields map[string]uint64) MemoryInfo {
|
||||
total := fields["MemTotal"]
|
||||
free := fields["MemAvailable"]
|
||||
if free == 0 {
|
||||
@@ -158,6 +194,67 @@ func readMemInfo() MemoryInfo {
|
||||
return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
|
||||
}
|
||||
|
||||
func buildSwapInfo(fields map[string]uint64) SwapInfo {
|
||||
total := fields["SwapTotal"]
|
||||
free := fields["SwapFree"]
|
||||
if total == 0 {
|
||||
return SwapInfo{}
|
||||
}
|
||||
used := total - free
|
||||
pct := float64(used) / float64(total) * 100
|
||||
return SwapInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
|
||||
}
|
||||
|
||||
// sampleCPUPercent computes overall CPU usage across one sample
|
||||
// interval. Two reads of /proc/stat's aggregate "cpu" line, derive
|
||||
// busy-time delta as (1 - idle/total). Returns 0 on read failure.
|
||||
func sampleCPUPercent(interval time.Duration) float64 {
|
||||
total1, idle1, ok := readCPUStat()
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
time.Sleep(interval)
|
||||
total2, idle2, ok := readCPUStat()
|
||||
if !ok || total2 <= total1 {
|
||||
return 0
|
||||
}
|
||||
totalDelta := total2 - total1
|
||||
idleDelta := idle2 - idle1
|
||||
if idleDelta > totalDelta {
|
||||
return 0
|
||||
}
|
||||
return float64(totalDelta-idleDelta) / float64(totalDelta) * 100
|
||||
}
|
||||
|
||||
func readCPUStat() (total, idle uint64, ok bool) {
|
||||
f, err := os.Open("/proc/stat")
|
||||
if err != nil {
|
||||
return 0, 0, false
|
||||
}
|
||||
defer f.Close()
|
||||
sc := bufio.NewScanner(f)
|
||||
if !sc.Scan() {
|
||||
return 0, 0, false
|
||||
}
|
||||
parts := strings.Fields(sc.Text())
|
||||
if len(parts) < 5 || parts[0] != "cpu" {
|
||||
return 0, 0, false
|
||||
}
|
||||
for i := 1; i < len(parts); i++ {
|
||||
v, err := strconv.ParseUint(parts[i], 10, 64)
|
||||
if err != nil {
|
||||
return 0, 0, false
|
||||
}
|
||||
total += v
|
||||
// idle is the 4th column (parts[4]); iowait (parts[5]) is also
|
||||
// idle-ish but we count it as busy to match gopsutil's default.
|
||||
if i == 4 {
|
||||
idle = v
|
||||
}
|
||||
}
|
||||
return total, idle, true
|
||||
}
|
||||
|
||||
func readLoadAvg() LoadInfo {
|
||||
raw, err := os.ReadFile("/proc/loadavg")
|
||||
if err != nil {
|
||||
|
||||
284
internal/tools/kb.go
Normal file
284
internal/tools/kb.go
Normal file
@@ -0,0 +1,284 @@
|
||||
// kb.go — dynamic-kb-* tool implementations (DESIGN-DYNAMIC-BLOCK.md
|
||||
// §3.3 / §4.4). Each tool returns a sdkplugin.ToolResult; errors are
|
||||
// surfaced as IsError:true rather than RPC errors so the model sees a
|
||||
// human-readable message.
|
||||
//
|
||||
// All 5 tools need the agent id (from ctx) + session id (looked up
|
||||
// via Deps.SessionForAgent — main.go threads this from the per-turn
|
||||
// RenderDynamicSubblock callback that updates the lookup table).
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
||||
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
|
||||
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
|
||||
)
|
||||
|
||||
// KBDeps bundles the dependencies the KB tools need. Caller (main.go)
|
||||
// constructs once and reuses across all tool calls.
|
||||
type KBDeps struct {
|
||||
Client *kbclient.Client
|
||||
ProfileRoot string // <PLEXUM_PROFILE_ROOT>
|
||||
SessionFor func(agentID string) string // returns sessionID or ""
|
||||
Turn func(agentID string) int // current turn (best-effort, can return 0)
|
||||
HostCallTool func(ctx context.Context, agentID, toolName string, input json.RawMessage) (json.RawMessage, error)
|
||||
}
|
||||
|
||||
// kb tool inputs
|
||||
|
||||
type listKBsInput struct {
|
||||
ProjectCode string `json:"project-code,omitempty"`
|
||||
}
|
||||
|
||||
type listTopicsInput struct {
|
||||
KBCode string `json:"kb-code"`
|
||||
}
|
||||
|
||||
type listFactsInput struct {
|
||||
KBCode string `json:"kb-code"`
|
||||
TopicIDs []int `json:"topic-ids"`
|
||||
}
|
||||
|
||||
type cacheInput struct {
|
||||
KBCode string `json:"kb-code"`
|
||||
FactIDs []int `json:"fact-ids"`
|
||||
PreviousDynamicID string `json:"previous-dynamic-id,omitempty"`
|
||||
}
|
||||
|
||||
type evictInput struct {
|
||||
FactIDs []int `json:"fact-ids"`
|
||||
}
|
||||
|
||||
type cacheResult struct {
|
||||
Added []int `json:"added,omitempty"`
|
||||
AlreadyCached []int `json:"already_cached,omitempty"`
|
||||
Missing []int `json:"missing,omitempty"`
|
||||
Note string `json:"note"`
|
||||
}
|
||||
|
||||
type evictResult struct {
|
||||
Evicted []int `json:"evicted,omitempty"`
|
||||
NotCached []int `json:"not_cached,omitempty"`
|
||||
Note string `json:"note"`
|
||||
}
|
||||
|
||||
// ToolKBListKBs implements dynamic-kb-list-kbs.
|
||||
func ToolKBListKBs(ctx context.Context, deps KBDeps, raw json.RawMessage) (sdkplugin.ToolResult, error) {
|
||||
if deps.Client == nil {
|
||||
return errResult("dynamic-kb-list-kbs: HF KB backend unavailable")
|
||||
}
|
||||
var in listKBsInput
|
||||
if len(raw) > 0 {
|
||||
_ = json.Unmarshal(raw, &in)
|
||||
}
|
||||
kbs, err := deps.Client.ListKBs(ctx, in.ProjectCode)
|
||||
if err != nil {
|
||||
return errResult("dynamic-kb-list-kbs: " + err.Error())
|
||||
}
|
||||
var sb strings.Builder
|
||||
if in.ProjectCode != "" {
|
||||
fmt.Fprintf(&sb, "project: %s\n", in.ProjectCode)
|
||||
}
|
||||
fmt.Fprintf(&sb, "kbs: %d\n\n", len(kbs))
|
||||
for _, k := range kbs {
|
||||
fmt.Fprintf(&sb, "[%s] %s\n %s\n\n", k.KnowledgeBaseCode, k.Title, k.Description)
|
||||
}
|
||||
sb.WriteString("Call dynamic-kb-list-topics({kb-code: \"<code>\"}) to drill into a KB.\n")
|
||||
return okResult(sb.String())
|
||||
}
|
||||
|
||||
// ToolKBListTopics implements dynamic-kb-list-topics.
|
||||
func ToolKBListTopics(ctx context.Context, deps KBDeps, raw json.RawMessage) (sdkplugin.ToolResult, error) {
|
||||
if deps.Client == nil {
|
||||
return errResult("dynamic-kb-list-topics: HF KB backend unavailable")
|
||||
}
|
||||
var in listTopicsInput
|
||||
if err := json.Unmarshal(raw, &in); err != nil {
|
||||
return errResult("dynamic-kb-list-topics: parse: " + err.Error())
|
||||
}
|
||||
if in.KBCode == "" {
|
||||
return errResult("dynamic-kb-list-topics: kb-code required")
|
||||
}
|
||||
topics, err := deps.Client.ListTopics(ctx, in.KBCode)
|
||||
if err != nil {
|
||||
return errResult("dynamic-kb-list-topics: " + err.Error())
|
||||
}
|
||||
var sb strings.Builder
|
||||
fmt.Fprintf(&sb, "kb: %s\ntopics: %d\n\n", in.KBCode, len(topics))
|
||||
for _, t := range topics {
|
||||
fmt.Fprintf(&sb, "[%d] %s\n %s\n\n", t.ID, t.Topic, t.Description)
|
||||
}
|
||||
fmt.Fprintf(&sb, "Call dynamic-kb-list-facts({kb-code: %q, topic-ids: [<id>, ...]}) to drill into facts.\n",
|
||||
in.KBCode)
|
||||
return okResult(sb.String())
|
||||
}
|
||||
|
||||
// ToolKBListFacts implements dynamic-kb-list-facts.
|
||||
func ToolKBListFacts(ctx context.Context, deps KBDeps, raw json.RawMessage) (sdkplugin.ToolResult, error) {
|
||||
if deps.Client == nil {
|
||||
return errResult("dynamic-kb-list-facts: HF KB backend unavailable")
|
||||
}
|
||||
var in listFactsInput
|
||||
if err := json.Unmarshal(raw, &in); err != nil {
|
||||
return errResult("dynamic-kb-list-facts: parse: " + err.Error())
|
||||
}
|
||||
if in.KBCode == "" || len(in.TopicIDs) == 0 {
|
||||
return errResult("dynamic-kb-list-facts: kb-code + topic-ids required")
|
||||
}
|
||||
facts, err := deps.Client.ListFacts(ctx, in.KBCode, in.TopicIDs)
|
||||
if err != nil {
|
||||
return errResult("dynamic-kb-list-facts: " + err.Error())
|
||||
}
|
||||
var sb strings.Builder
|
||||
fmt.Fprintf(&sb, "kb: %s\ntopics: %v\nfacts: %d\n\n", in.KBCode, in.TopicIDs, len(facts))
|
||||
for _, f := range facts {
|
||||
fmt.Fprintf(&sb, "[%d] (topic %d/%s) %s\n %s\n\n",
|
||||
f.ID, f.TopicID, f.TopicSlug, f.Title, f.Snippet)
|
||||
}
|
||||
sb.WriteString(
|
||||
"Call dynamic-kb-cache({kb-code: \"" + in.KBCode +
|
||||
"\", fact-ids: [<id>, ...]}) to commit selected facts to your kb-block.\n",
|
||||
)
|
||||
return okResult(sb.String())
|
||||
}
|
||||
|
||||
// ToolKBCache implements dynamic-kb-cache. Per-agent session lookup
|
||||
// resolves the kb-block.json location.
|
||||
func ToolKBCache(ctx context.Context, deps KBDeps, agentID string, raw json.RawMessage) (sdkplugin.ToolResult, error) {
|
||||
if deps.Client == nil {
|
||||
return errResult("dynamic-kb-cache: HF KB backend unavailable")
|
||||
}
|
||||
sessionID := ""
|
||||
if deps.SessionFor != nil {
|
||||
sessionID = deps.SessionFor(agentID)
|
||||
}
|
||||
if agentID == "" || sessionID == "" {
|
||||
return errResult("dynamic-kb-cache: no per-session context (agent/session unknown)")
|
||||
}
|
||||
var in cacheInput
|
||||
if err := json.Unmarshal(raw, &in); err != nil {
|
||||
return errResult("dynamic-kb-cache: parse: " + err.Error())
|
||||
}
|
||||
if in.KBCode == "" || len(in.FactIDs) == 0 {
|
||||
return errResult("dynamic-kb-cache: kb-code + fact-ids required")
|
||||
}
|
||||
block, err := kbblock.Open(deps.ProfileRoot, agentID, sessionID)
|
||||
if err != nil {
|
||||
return errResult("dynamic-kb-cache: open block: " + err.Error())
|
||||
}
|
||||
// Split into already-cached vs to-fetch.
|
||||
var toFetch []int
|
||||
var alreadyCached []int
|
||||
for _, id := range in.FactIDs {
|
||||
if block.Has(id) {
|
||||
alreadyCached = append(alreadyCached, id)
|
||||
} else {
|
||||
toFetch = append(toFetch, id)
|
||||
}
|
||||
}
|
||||
var fetched []kbclient.Fact
|
||||
if len(toFetch) > 0 {
|
||||
fetched, err = deps.Client.GetFacts(ctx, in.KBCode, toFetch)
|
||||
if err != nil {
|
||||
return errResult("dynamic-kb-cache: " + err.Error())
|
||||
}
|
||||
}
|
||||
turn := 0
|
||||
if deps.Turn != nil {
|
||||
turn = deps.Turn(agentID)
|
||||
}
|
||||
var added []int
|
||||
fetchedByID := map[int]kbclient.Fact{}
|
||||
for _, f := range fetched {
|
||||
fetchedByID[f.ID] = f
|
||||
}
|
||||
for _, id := range toFetch {
|
||||
f, ok := fetchedByID[id]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if e := block.Add(f.ID, in.KBCode, f.TopicSlug, f.Content, turn); e != nil {
|
||||
added = append(added, f.ID)
|
||||
}
|
||||
}
|
||||
if err := block.Save(); err != nil {
|
||||
return errResult("dynamic-kb-cache: save: " + err.Error())
|
||||
}
|
||||
missing := diffInts(toFetch, added)
|
||||
sort.Ints(added)
|
||||
sort.Ints(alreadyCached)
|
||||
res := cacheResult{
|
||||
Added: added,
|
||||
AlreadyCached: alreadyCached,
|
||||
Missing: missing,
|
||||
Note: "Newly cached facts are available starting your next turn.",
|
||||
}
|
||||
out, _ := json.Marshal(res)
|
||||
return okResult(string(out))
|
||||
}
|
||||
|
||||
// ToolKBEvict implements dynamic-kb-evict.
|
||||
func ToolKBEvict(ctx context.Context, deps KBDeps, agentID string, raw json.RawMessage) (sdkplugin.ToolResult, error) {
|
||||
sessionID := ""
|
||||
if deps.SessionFor != nil {
|
||||
sessionID = deps.SessionFor(agentID)
|
||||
}
|
||||
if agentID == "" || sessionID == "" {
|
||||
return errResult("dynamic-kb-evict: no per-session context")
|
||||
}
|
||||
var in evictInput
|
||||
if err := json.Unmarshal(raw, &in); err != nil {
|
||||
return errResult("dynamic-kb-evict: parse: " + err.Error())
|
||||
}
|
||||
if len(in.FactIDs) == 0 {
|
||||
return errResult("dynamic-kb-evict: fact-ids required")
|
||||
}
|
||||
block, err := kbblock.Open(deps.ProfileRoot, agentID, sessionID)
|
||||
if err != nil {
|
||||
return errResult("dynamic-kb-evict: open block: " + err.Error())
|
||||
}
|
||||
evicted := block.Remove(in.FactIDs...)
|
||||
if err := block.Save(); err != nil {
|
||||
return errResult("dynamic-kb-evict: save: " + err.Error())
|
||||
}
|
||||
notCached := diffInts(in.FactIDs, evicted)
|
||||
sort.Ints(evicted)
|
||||
res := evictResult{
|
||||
Evicted: evicted,
|
||||
NotCached: notCached,
|
||||
Note: "Evictions take effect starting your next turn.",
|
||||
}
|
||||
out, _ := json.Marshal(res)
|
||||
return okResult(string(out))
|
||||
}
|
||||
|
||||
// ---- helpers ----
|
||||
|
||||
func diffInts(a, b []int) []int {
|
||||
if len(a) == 0 {
|
||||
return nil
|
||||
}
|
||||
want := make(map[int]bool, len(b))
|
||||
for _, x := range b {
|
||||
want[x] = true
|
||||
}
|
||||
out := make([]int, 0, len(a))
|
||||
for _, x := range a {
|
||||
if !want[x] {
|
||||
out = append(out, x)
|
||||
}
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil
|
||||
}
|
||||
sort.Ints(out)
|
||||
return out
|
||||
}
|
||||
@@ -27,6 +27,7 @@ type Deps struct {
|
||||
Version string
|
||||
Collect func() telemetry.Snapshot
|
||||
Bridge *monitor.Bridge
|
||||
Pusher *monitor.Pusher
|
||||
Scheduler *calendar.Scheduler
|
||||
Host sdkplugin.HostAPI
|
||||
|
||||
@@ -34,6 +35,12 @@ type Deps struct {
|
||||
// host injects this via the tool dispatch context; main.go's
|
||||
// CallTool reads it from the ctx and stashes here.
|
||||
AgentIDFromCtx func(ctx context.Context) string
|
||||
|
||||
// KB wires the dynamic-kb-* tool family (DESIGN-DYNAMIC-BLOCK.md
|
||||
// §3.3 / §4.4). Zero value when HF KB backend is unconfigured;
|
||||
// each KB tool then returns a graceful "backend unavailable"
|
||||
// error rather than crashing.
|
||||
KB KBDeps
|
||||
}
|
||||
|
||||
// Dispatch is the entry point main.go's ToolPlugin.CallTool calls.
|
||||
@@ -60,6 +67,16 @@ func Dispatch(ctx context.Context, deps Deps, name string, input json.RawMessage
|
||||
return toolCalendarResume(ctx, deps)
|
||||
case "harborforge_restart_status":
|
||||
return toolRestartStatus(deps)
|
||||
case "dynamic-kb-list-kbs":
|
||||
return ToolKBListKBs(ctx, deps.KB, input)
|
||||
case "dynamic-kb-list-topics":
|
||||
return ToolKBListTopics(ctx, deps.KB, input)
|
||||
case "dynamic-kb-list-facts":
|
||||
return ToolKBListFacts(ctx, deps.KB, input)
|
||||
case "dynamic-kb-cache":
|
||||
return ToolKBCache(ctx, deps.KB, deps.AgentIDFromCtx(ctx), input)
|
||||
case "dynamic-kb-evict":
|
||||
return ToolKBEvict(ctx, deps.KB, deps.AgentIDFromCtx(ctx), input)
|
||||
}
|
||||
return sdkplugin.ToolResult{
|
||||
IsError: true,
|
||||
@@ -89,11 +106,32 @@ func toolStatus(deps Deps) (sdkplugin.ToolResult, error) {
|
||||
"queries": bs.Queries,
|
||||
"last_query": bs.LastQuery,
|
||||
},
|
||||
"monitor_push": monitorPushSummary(deps),
|
||||
"calendar": sch,
|
||||
}
|
||||
return jsonResult(out)
|
||||
}
|
||||
|
||||
// monitorPushSummary returns the pusher's last-known state in the same
|
||||
// JSON layout the status/monitor_telemetry tools surface. Nil-safe: if
|
||||
// no pusher is wired (testing, push disabled), reports enabled=false.
|
||||
func monitorPushSummary(deps Deps) map[string]any {
|
||||
out := map[string]any{
|
||||
"enabled": deps.Config.MonitorPushEnabled,
|
||||
"interval_seconds": deps.Config.MonitorPushIntervalSeconds,
|
||||
"endpoint": deps.Config.BackendURL + "/monitor/server/heartbeat",
|
||||
}
|
||||
if deps.Pusher != nil {
|
||||
st := deps.Pusher.Stats()
|
||||
out["last_sent_at"] = st.LastSentAt
|
||||
out["last_status"] = st.LastStatus
|
||||
out["last_err"] = st.LastErr
|
||||
out["success_count"] = st.SuccessCount
|
||||
out["error_count"] = st.ErrorCount
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
|
||||
return jsonResult(deps.Collect())
|
||||
}
|
||||
@@ -101,11 +139,14 @@ func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
|
||||
func toolMonitorTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
|
||||
bs := deps.Bridge.Stats()
|
||||
return jsonResult(map[string]any{
|
||||
"bridge": map[string]any{
|
||||
"port": bs.Port,
|
||||
"listening": bs.Listening,
|
||||
"queries": bs.Queries,
|
||||
"last_query": bs.LastQuery,
|
||||
"last_snapshot": bs.LastSnap,
|
||||
},
|
||||
"push": monitorPushSummary(deps),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -176,10 +217,14 @@ func toolCalendarResume(ctx context.Context, deps Deps) (sdkplugin.ToolResult, e
|
||||
}
|
||||
|
||||
func toolRestartStatus(deps Deps) (sdkplugin.ToolResult, error) {
|
||||
// HarborForge backend doesn't expose a restart-pending endpoint
|
||||
// (verified via /openapi.json) so we report the most recent
|
||||
// heartbeat freshness instead. Useful for operators sanity-
|
||||
// checking that the plugin's calendar loop is still alive.
|
||||
sch := deps.Scheduler.Status()
|
||||
return jsonResult(map[string]any{
|
||||
"pending": sch.RestartPending,
|
||||
"last_heartbeat": sch.LastHeartbeat,
|
||||
"pending": false,
|
||||
"last_heartbeats": sch.LastHeartbeats,
|
||||
"observed_at": time.Now().UTC(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -49,7 +49,35 @@
|
||||
"name": "harborforge_restart_status",
|
||||
"description": "Check whether a Plexum host restart is pending (backend-driven flag). Reports last poll time and pending flag.",
|
||||
"inputSchema": {"type": "object"}
|
||||
},
|
||||
{
|
||||
"name": "dynamic-kb-list-kbs",
|
||||
"description": "List HarborForge Knowledge Bases the agent can access. Optional project-code filter (equivalent to `hf knowledge-base list [--project <code>]`). Returns code/title/description per KB. Browse — follow up with dynamic-kb-list-topics to drill into one.",
|
||||
"inputSchema": {"type": "object", "properties": {"project-code": {"type": "string"}}}
|
||||
},
|
||||
{
|
||||
"name": "dynamic-kb-list-topics",
|
||||
"description": "List topics in one KB by code. Returns topic id/slug/description per row. Browse — follow up with dynamic-kb-list-facts.",
|
||||
"inputSchema": {"type": "object", "properties": {"kb-code": {"type": "string"}}, "required": ["kb-code"]}
|
||||
},
|
||||
{
|
||||
"name": "dynamic-kb-list-facts",
|
||||
"description": "List facts in given topics. Returns lightweight previews (id/topic/title/snippet). Browse — feed into dynamic-kb-cache to commit selected facts to your kb-block.",
|
||||
"inputSchema": {"type": "object", "properties": {"kb-code": {"type": "string"}, "topic-ids": {"type": "array", "items": {"type": "integer"}}}, "required": ["kb-code", "topic-ids"]}
|
||||
},
|
||||
{
|
||||
"name": "dynamic-kb-cache",
|
||||
"description": "Cache specific KB facts into your per-session kb-block (visible in System prompt next turn). Pass previous-dynamic-id to consume the matching list-facts browse output.",
|
||||
"inputSchema": {"type": "object", "properties": {"kb-code": {"type": "string"}, "fact-ids": {"type": "array", "items": {"type": "integer"}}, "previous-dynamic-id": {"type": "string"}}, "required": ["kb-code", "fact-ids"]}
|
||||
},
|
||||
{
|
||||
"name": "dynamic-kb-evict",
|
||||
"description": "Remove cached facts from your kb-block (free ctx). Takes effect starting your next turn.",
|
||||
"inputSchema": {"type": "object", "properties": {"fact-ids": {"type": "array", "items": {"type": "integer"}}}, "required": ["fact-ids"]}
|
||||
}
|
||||
],
|
||||
"dynamicSubblocks": [
|
||||
{"name": "kb-block"}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,12 +37,16 @@ Next steps:
|
||||
"harbor-forge"
|
||||
2. Write ${PLUGIN_DIR}/config.json — sample:
|
||||
{
|
||||
"backendUrl": "https://monitor.hangman-lab.top",
|
||||
"backendUrl": "https://hf-api.hangman-lab.top",
|
||||
"identifier": "server-t3",
|
||||
"apiKey": "g1_xxx",
|
||||
"monitor_port": 9100,
|
||||
"apiKey": "<copy from HF_MONITER_API_KEY>",
|
||||
"monitor_push_enabled": true,
|
||||
"monitor_push_interval_seconds": 30,
|
||||
"monitor_port": 0,
|
||||
"calendar_enabled": true,
|
||||
"calendar_heartbeat_interval_seconds": 30
|
||||
}
|
||||
3. Restart the host: systemctl --user restart plexum
|
||||
4. Verify push is landing (DB last_seen_at advancing) and then
|
||||
remove the standalone harborforge-monitor container.
|
||||
EOF
|
||||
|
||||
Reference in New Issue
Block a user