Compare commits

..

6 Commits

Author SHA1 Message Date
9e43021ba5 feat(kb): dynamic-kb-* tool family + <kb-block> subblock provider
Implements the HF side of Plexum DESIGN-DYNAMIC-BLOCK.md Phase 2 — the
HarborForge knowledge-base block agents cache facts into per session.
Cross-runtime aligned with the ClawSkills workflow text (same tool
names + input schemas + return shapes will land on openclaw side later).

manifest.json:
  - 5 new dynamic-kb-* tool contracts (list-kbs / list-topics /
    list-facts / cache / evict)
  - dynamicSubblocks contract entry declaring this plugin owns
    the "kb-block" <dynamic-block> subblock

internal/kbblock/ (new):
  - Per-session storage at <PLEXUM_PROFILE_ROOT>/agents/<id>/sessions/
    <sid>/plugins/harbor-forge/kb-block.json
  - Entry carries ID (HF backend DB primary key) + KBCode + SourceTopic
    + Content + InsertSeq for §9 #4 cache-insertion-order rendering
  - Render emits <kb-fact id=N kb=<code> source=topic:<slug>>...
    </kb-fact> (no title/description per §9 #8; source attr omitted
    when empty)
  - Fade NOT applied in v1 — §9 #3 lock has fade params shared with
    memory but implementation deferred until prod data informs whether
    KB needs it; agent dynamic-kb-evict is the only eviction path
  - 11 unit tests

internal/kbclient/ (new):
  - Typed HTTP client for HarborForge.Backend KB routes verified
    against app/api/routers/knowledge.py
  - GET /knowledge-bases[?project=<code>] (list KBs)
  - GET /knowledge-bases/{kb_code}/topics (list topics)
  - GET /knowledge-bases/{kb_code}/tree (full hierarchy — ListFacts
    flattens this client-side filtered by topic ids; backend has no
    flat list-facts-in-topic route)
  - GET /knowledge-facts/{id} per fact (GetFacts batch loop)
  - Auth: plugin-level Bearer APIKey. Per-agent hf-token resolution
    is a TODO when SDK exposes secret-mgr access.

internal/tools/kb.go (new) + tools.go:
  - 5 tool functions hooked into Dispatch
  - KBDeps struct bundles Client + ProfileRoot + SessionFor + Turn
  - Cache/evict use SessionFor lookup populated by main.go's
    RenderDynamicSubblock (called per turn by host; carries sessionID)

cmd/plexum-harborforge-plugin/main.go:
  - kbClient field initialized when BackendURL + APIKey present
  - profileRoot cached for kbblock path resolution
  - agentSession sync.Map tracks agentID → sessionID; populated by
    RenderDynamicSubblock so subsequent tool calls in the same turn
    can resolve the per-session kb-block.json path
  - Implements sdkplugin.DynamicBlockProvider.RenderDynamicSubblock:
    opens kbblock for (agentID, sessionID) and returns its Render()
    body; host wraps in <kb-block>...</kb-block>

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-05 20:15:34 +01:00
046a7753e6 log monitor push start + slow heartbeat
First successful push emits an info-level "monitor push started" so
operators can confirm the loop wired up correctly. Subsequent
successes log every 60 cycles ("monitor push heartbeat") so the
journal stays quiet but still proves the loop isn't dead. Errors
already log at warn; this fills the success-side gap so a silent
journal can't hide a "no successes, no errors" pathology.
2026-06-03 13:13:33 +01:00
6e3ad669f8 feat(monitor): active push loop replacing standalone monitor
Adds a periodic POST loop to <backend>/monitor/server/heartbeat so
HF plugin can take over the standalone harborforge-monitor daemon's
job — same X-API-Key header, same flat telemetry shape (cpu_pct /
mem_pct / disk_pct / swap_pct / load_avg / uptime_seconds /
plugin_version / agents[]). HF backend stays unchanged.

Config: monitor_push_enabled (default false; opt-in to avoid surprise
heartbeats from existing deployments), monitor_push_interval_seconds
(default 30), reuses apiKey for the X-API-Key header. Lift the
container's HF_MONITER_API_KEY into config.apiKey, flip
monitor_push_enabled true, then docker rm -f the container — DB
last_seen_at keeps advancing under the plugin's loop.

Collector grew swap + cpu sampling (two reads of /proc/stat over a
1-second window when SampleCPU=true). Bridge endpoint stays cheap
(SampleCPU=false on demand); push loop is the only caller paying the
sampling cost.

E2E in sim: monitor_push_enabled=true + apiKey from injected
MonitoredServer row → server_states.last_seen_at advances exactly
every interval_seconds (10s configured, 10s observed). cpu/mem/disk/
swap_pct all populate correctly.
2026-06-03 13:04:51 +01:00
472cecd771 feat: read agent_id from ctx (SDK now plumbs it)
Plexum-sdk-go now propagates the caller agent id via
`_meta.agent_id` on tools/call. AgentIDFromCtx prefers
plugin.AgentIDFromContext(ctx); falls back to the
single-active-calendar-slot heuristic for host-driven dispatch
paths (channel manager, CLI plugin-call) that lack ctx.

Drops bestEffortAgentID — the inline closure does the same thing
without the dead-Slot-iterate noise.
2026-06-03 12:54:57 +01:00
bc1ab7b6ea fix: snake_case SlotStatus + scheduler debug logs
Two issues found while end-to-end testing against a running
harborforge-backend:

  - SlotStatus enum values: backend stores snake_case
    ("not_started" / "ongoing" / …), not the camelCase the
    OpenClaw plugin's TypeScript types.ts misled the initial
    drop into using. Heartbeat responses came back with
    Slot.Status="not_started" which the scheduler never matched
    against SlotStatus("NotStarted"), so dispatchSlot never
    fired. Aligned with backend's actual enum string values
    (verified via heartbeat response shape).

  - Added info-level logs at slot selection + dispatchSlot
    entry + WakeAgent fire/result so operators can see the
    plugin's decision chain in production without enabling
    debug. Cheap (~one tick per agent per heartbeat interval).

E2E in sim: backend returns slots=1 → selection chosen=true →
dispatch enter → WakeAgent enqueued ok → backend slot ongoing
→ next heartbeat returns slots=0.
2026-06-03 11:42:18 +01:00
78b1ec5181 fix: align calendar API with actual HarborForge.Backend contract
Initial drop guessed the heartbeat shape; sim e2e against a running
harborforge-backend revealed the real contract is per-agent with
header auth, not server-wide with bearer:

  POST /calendar/agent/heartbeat
    headers: X-Agent-ID, X-Claw-Identifier
    body:    {claw_identifier, agent_id}
    response: {slots: [Slot], agent_status, message?}

  PATCH /calendar/slots/{id}/agent-update
  PATCH /calendar/slots/virtual/{vid}/agent-update
    body: {status, started_at?, actual_duration?}

  POST /calendar/agent/status
    body: {claw_identifier, agent_id, status}

Refactors:

  - internal/calendar/types.go now mirrors OpenclawPlugin/calendar/
    types.ts 1:1 (SlotStatus camelCase, real vs virtual slot id
    discrimination, event_data shape)
  - internal/calendar/bridge.go: header-based auth, per-agent method
    signatures, separate UpdateRealSlot vs UpdateVirtualSlot
  - internal/calendar/scheduler.go: per-agent heartbeat loop
    (one HTTP call per agent per tick), highest-priority slot
    selection, agent-update PATCH for terminal/non-terminal states
  - SingleActiveAgentID helper for main.bestEffortAgentID

Also fix two bugs found in sim:

  - bgCtx capture: AgentLister closures were capturing Init's ctx
    which dies the moment MCP initialize returns; switched to
    bgCtx (lifetime = plugin process)
  - tools.toolRestartStatus referenced a non-existent
    sch.RestartPending — HF backend has no restart endpoint per
    /openapi.json, so the tool now reports last_heartbeats freshness

Scheduler logs each tick + each heartbeat outcome at info so
operators can see backend connectivity without enabling debug.

E2E against http://harborforge-backend:8000 in sim:
  daemon → heartbeat → 404 "Agent not found"
  (= correct endpoint, correct headers, correct body — agent just
   isn't registered yet, which is expected for an untenanted
   plugin)
2026-06-03 11:28:05 +01:00
15 changed files with 2036 additions and 427 deletions

View File

@@ -18,9 +18,19 @@ submodule of the HarborForge umbrella repo.
## What it does
- **Monitor bridge** — HTTP server on `127.0.0.1:<monitor_port>` that
responds to `/telemetry` with a Snapshot the HarborForge.Monitor
binary expects (system metrics + every Plexum agent's sm-state)
- **Monitor push loop** — when `monitor_push_enabled: true`, posts a
flat telemetry payload (cpu/mem/disk/swap/load + per-agent state)
to `<backendUrl>/monitor/server/heartbeat` every
`monitor_push_interval_seconds`. This replaces the standalone
`harborforge-monitor` daemon — the plugin's lifecycle (gateway
start/stop) bounds the loop, so a separate supervisor isn't needed.
Use the same `apiKey` value the standalone monitor's
`HF_MONITER_API_KEY` carried.
- **Monitor bridge** (optional) — HTTP server on
`127.0.0.1:<monitor_port>` that responds to `/telemetry` with a
Snapshot. Useful when the standalone monitor is still present and
you want it to enrich its push payload from the plugin's view of
agents. Disable by setting `monitor_port: 0`.
- **Calendar scheduler** — heartbeats `<backendUrl>/calendar/agent/
heartbeat` every interval, receives any TimeSlots due to fire, and
dispatches them through `HostAPI.WakeAgent` (state-aware queue
@@ -64,15 +74,24 @@ And configure at `~/.plexum/plugins/harbor-forge/config.json`:
```json
{
"backendUrl": "https://monitor.hangman-lab.top",
"backendUrl": "https://hf-api.hangman-lab.top",
"identifier": "server-t3",
"apiKey": "g1_xxx",
"monitor_port": 9100,
"monitor_push_enabled": true,
"monitor_push_interval_seconds": 30,
"monitor_port": 0,
"calendar_enabled": true,
"calendar_heartbeat_interval_seconds": 30
}
```
Replacing the standalone `harborforge-monitor` container: lift the
container's `HF_MONITER_API_KEY` into `apiKey`, set
`monitor_push_enabled: true`, then `docker rm -f harborforge-monitor`
once you've confirmed the plugin's pushes are landing (the backend's
`server_states.last_seen_at` should keep advancing without the
container running).
Restart the host (`systemctl --user restart plexum`) and verify:
```bash

View File

@@ -28,6 +28,8 @@ import (
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar"
hfcfg "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/tools"
@@ -36,16 +38,31 @@ import (
// Version is injected via -ldflags "-X main.Version=…" at build time.
var Version = "0.1.0"
// harborForgePlugin satisfies sdkplugin.ToolPlugin.
// harborForgePlugin satisfies sdkplugin.ToolPlugin AND
// sdkplugin.DynamicBlockProvider (the <kb-block> subblock contributor
// per Plexum DESIGN-DYNAMIC-BLOCK.md §3.3).
type harborForgePlugin struct {
host sdkplugin.HostAPI
cfg hfcfg.Resolved
bridge *monitor.Bridge
pusher *monitor.Pusher
sched *calendar.Scheduler
kbClient *kbclient.Client
deps tools.Deps
cancelBg context.CancelFunc
wg sync.WaitGroup
agentCache sync.Map // sessionID/turnID → agentID stash (best-effort)
// profileRoot caches PLEXUM_PROFILE_ROOT for kbblock path resolution.
profileRoot string
// agentSession maps agentID → sessionID, populated each turn by
// the host's plexum/plugin/dynamic-block/render RPC (which carries
// both ids). dynamic-kb-cache + dynamic-kb-evict read this to
// resolve the per-session kb-block.json path. Best-effort —
// before the first RenderDynamicSubblock fires for an agent the
// map is empty and the kb tools surface a clear error.
agentSession sync.Map // agentID (string) → sessionID (string)
}
func (p *harborForgePlugin) Manifest() sdkplugin.Manifest {
@@ -60,6 +77,7 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
home, _ := os.UserHomeDir()
profileRoot = filepath.Join(home, ".plexum")
}
p.profileRoot = profileRoot
raw, err := hfcfg.Load(profileRoot)
if err != nil {
return fmt.Errorf("load harbor-forge config: %w", err)
@@ -70,42 +88,72 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
"backend": p.cfg.BackendURL,
"identifier": p.cfg.Identifier,
"monitor_port": p.cfg.MonitorPort,
"monitor_push_enabled": p.cfg.MonitorPushEnabled,
"calendar_enabled": p.cfg.CalendarEnabled,
})
collect := func() telemetry.Snapshot {
return telemetry.Collect(telemetry.CollectOpts{
Identifier: p.cfg.Identifier,
Version: Version,
AgentLister: func() []telemetry.AgentInfo {
return p.listAgents(ctx, profileRoot)
},
})
}
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
bgCtx, cancel := context.WithCancel(context.Background())
p.cancelBg = cancel
// Listers + collectors capture bgCtx (not Init ctx) — Init returns
// once MCP initialize completes, but the plugin process lives on
// and so do the goroutines + closures we registered.
makeCollector := func(sampleCPU bool) func() telemetry.Snapshot {
return func() telemetry.Snapshot {
return telemetry.Collect(telemetry.CollectOpts{
Identifier: p.cfg.Identifier,
Version: Version,
SampleCPU: sampleCPU,
AgentLister: func() []telemetry.AgentInfo {
return p.listAgents(bgCtx, profileRoot)
},
})
}
}
// Bridge serves on-demand reads; cheap, no CPU sampling.
collect := makeCollector(false)
// Pusher runs the slow push loop; CPU sampling fine here.
collectForPush := makeCollector(true)
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if err := p.bridge.Start(bgCtx); err != nil {
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
}
// Active push loop — replaces the standalone harborforge-monitor
// container. Off by default; operator opts in via
// monitor_push_enabled + apiKey.
p.pusher = monitor.NewPusher(monitor.PusherConfig{
BackendURL: p.cfg.BackendURL,
APIKey: p.cfg.APIKey,
Interval: time.Duration(p.cfg.MonitorPushIntervalSeconds) * time.Second,
}, collectForPush,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if p.cfg.MonitorPushEnabled {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.pusher.Run(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
host.Log("warn", "monitor pusher exited", map[string]any{"err": err.Error()})
}
}()
}
calBackend := p.cfg.CalendarBackendURL
if calBackend == "" {
calBackend = p.cfg.BackendURL
}
bridge := calendar.New(calBackend, p.cfg.APIKey)
bridge := calendar.New(calBackend, p.cfg.Identifier)
p.sched = calendar.NewScheduler(
calendar.Config{
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
},
bridge, host, p.cfg.Identifier,
bridge, host,
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
func() []calendar.ReportableAgent {
return p.listReportableAgents(ctx, profileRoot)
return p.listReportableAgents(bgCtx, profileRoot)
},
)
if p.cfg.CalendarEnabled {
@@ -120,29 +168,80 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
host.Log("info", "calendar scheduler disabled by config", nil)
}
// KB HTTP client — shares plugin-level APIKey via Bearer (per-agent
// hf-token resolution is a TODO when SDK exposes secret-mgr access).
if p.cfg.BackendURL != "" && p.cfg.APIKey != "" {
p.kbClient = kbclient.New(p.cfg.BackendURL, p.cfg.APIKey)
} else {
host.Log("info", "kb client not initialized (need BackendURL + APIKey); dynamic-kb-* tools will return backend-unavailable", nil)
}
p.deps = tools.Deps{
Config: p.cfg,
Version: Version,
Collect: collect,
Bridge: p.bridge,
Pusher: p.pusher,
Scheduler: p.sched,
Host: host,
AgentIDFromCtx: func(ctx context.Context) string {
// Plexum stashes the calling agent id on the host-side
// context (via WithAgent) before dispatching tool calls.
// We can't directly import internal/agentloop from a
// plugin, so we rely on PLEXUM_TOOL_AGENT_ID env-style
// (set per-call by host when we add that wiring) or fall
// back to the only-active-agent heuristic. v1: prefer the
// only-active wake-target (deterministic in single-agent
// HF deployments).
return p.bestEffortAgentID()
// Host attaches the caller agent id via tools/call
// `_meta.agent_id`; SDK unpacks it into ctx.
if id := sdkplugin.AgentIDFromContext(ctx); id != "" {
return id
}
// Fallback for host paths that don't carry an agent
// (channel-driven, CLI plugin-call). When a single
// calendar slot is active we can deterministically
// attribute the call to that slot's owner.
return p.sched.SingleActiveAgentID()
},
KB: tools.KBDeps{
Client: p.kbClient,
ProfileRoot: profileRoot,
SessionFor: func(agentID string) string {
if v, ok := p.agentSession.Load(agentID); ok {
return v.(string)
}
return ""
},
Turn: func(agentID string) int { return 0 }, // best-effort; turn ctx not available to plugins yet
},
}
return nil
}
// RenderDynamicSubblock implements sdkplugin.DynamicBlockProvider. The
// host calls this once per turn per declared subblock name from this
// plugin's manifest (DESIGN-DYNAMIC-BLOCK.md §2 / §3.3). For
// "kb-block" we open the per-session kb-block.json and return its
// rendered body (host wraps in <kb-block>...</kb-block>). Empty block
// → return "" → host omits the subblock for this turn.
//
// Side effect: stash agentID → sessionID into p.agentSession so the
// dynamic-kb-cache + dynamic-kb-evict tools can resolve the same
// kb-block file when they fire later in the same turn.
func (p *harborForgePlugin) RenderDynamicSubblock(ctx context.Context, agentID, sessionID, name string) (string, error) {
if agentID == "" || sessionID == "" {
return "", nil
}
// Cache the (agent, session) pair for tool dispatch this turn.
p.agentSession.Store(agentID, sessionID)
if name != "kb-block" {
return "", nil
}
block, err := kbblock.Open(p.profileRoot, agentID, sessionID)
if err != nil {
p.host.Log("warn", "open kb-block", map[string]any{
"agent": agentID, "session": sessionID, "err": err.Error(),
})
return "", nil
}
return block.Render(), nil
}
func (p *harborForgePlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (sdkplugin.ToolResult, error) {
return tools.Dispatch(ctx, p.deps, name, input)
}
@@ -203,21 +302,7 @@ func mapStateToCalendar(s string) calendar.AgentStatusValue {
case "offline":
return calendar.AgentStatusOffline
}
return calendar.AgentStatusUnknown
}
// bestEffortAgentID is a v1 stop-gap for tools that need the calling
// agent's id but don't have it on the ctx (Plexum SDK doesn't yet
// expose this — TODO upstream). Returns the only active calendar
// slot's agent if there's exactly one; otherwise empty. The calendar
// tools (the only ones that need agent context) usually fire when
// exactly one slot is active.
func (p *harborForgePlugin) bestEffortAgentID() string {
sch := p.sched.Status()
if len(sch.Active) == 1 {
return sch.Active[0].Slot.AgentID
}
return ""
return calendar.AgentStatusOffline
}
func manifestFromDisk() sdkplugin.Manifest {

View File

@@ -1,7 +1,8 @@
// Bridge — thin HTTP client for the HarborForge backend's Calendar API.
// All operations carry the API key as Authorization: Bearer; absent
// key means missing-auth errors from the backend (caller should
// handle them as transient and log).
// Bridge — typed HTTP client for HarborForge.Backend's calendar API.
// Endpoint shapes verified via the backend's /openapi.json and against
// HarborForge.OpenclawPlugin/plugin/calendar/calendar-bridge.ts so
// the two plugins drop into the same backend without per-plugin
// adapters.
package calendar
@@ -13,30 +14,33 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
)
// Bridge is the typed wrapper around an HTTP client + backend URL.
// Bridge is constructed once per scheduler and reused across heartbeats.
type Bridge struct {
BackendURL string
APIKey string
BaseURL string
ClawIdentifier string
HTTP *http.Client
}
// New constructs a bridge with a sensible default timeout.
func New(backendURL, apiKey string) *Bridge {
// New constructs a bridge with a 20s default timeout.
func New(baseURL, clawIdentifier string) *Bridge {
return &Bridge{
BackendURL: strings.TrimRight(backendURL, "/"),
APIKey: apiKey,
BaseURL: strings.TrimRight(baseURL, "/"),
ClawIdentifier: clawIdentifier,
HTTP: &http.Client{Timeout: 20 * time.Second},
}
}
// Heartbeat POSTs /calendar/agent/heartbeat. Returns the backend's
// reply or an error.
func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (HeartbeatResponse, error) {
raw, err := b.post(ctx, "/calendar/agent/heartbeat", payload)
// Heartbeat POSTs /calendar/agent/heartbeat. Per-agent: each running
// agent on this claw drives its own heartbeat (matches OpenClaw plugin
// semantics).
func (b *Bridge) Heartbeat(ctx context.Context, agentID string) (HeartbeatResponse, error) {
body := HeartbeatRequest{ClawIdentifier: b.ClawIdentifier, AgentID: agentID}
raw, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/heartbeat", agentID, body)
if err != nil {
return HeartbeatResponse{}, err
}
@@ -49,76 +53,64 @@ func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (Heart
return out, nil
}
// UpdateSlotStatus POSTs /calendar/slot/<id>/status to mark a slot
// completed / aborted / paused / resumed.
func (b *Bridge) UpdateSlotStatus(ctx context.Context, slotID string, update SlotUpdate) error {
if slotID == "" {
return errors.New("calendar: slot id required")
}
_, err := b.post(ctx, "/calendar/slot/"+slotID+"/status", update)
// UpdateRealSlot PATCHes /calendar/slots/{id}/agent-update.
func (b *Bridge) UpdateRealSlot(ctx context.Context, agentID string, slotID int64, update SlotAgentUpdate) error {
path := "/calendar/slots/" + strconv.FormatInt(slotID, 10) + "/agent-update"
_, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update)
return err
}
// RestartPending GETs /restart/status — returns the backend's
// current restart-requested flag.
func (b *Bridge) RestartPending(ctx context.Context) (bool, error) {
raw, err := b.get(ctx, "/restart/status")
if err != nil {
return false, err
}
var out struct {
Pending bool `json:"pending"`
}
if len(raw) > 0 {
if err := json.Unmarshal(raw, &out); err != nil {
return false, fmt.Errorf("decode restart-status: %w", err)
}
}
return out.Pending, nil
// UpdateVirtualSlot PATCHes /calendar/slots/virtual/{vid}/agent-update.
// The backend materialises the virtual slot first; subsequent calls
// against the same logical slot should use UpdateRealSlot with the
// id returned in the response — but for v1 we don't round-trip the
// materialised id back to the scheduler (would require a separate
// fetch); the agent-update path tolerates re-PATCHing a virtual id.
func (b *Bridge) UpdateVirtualSlot(ctx context.Context, agentID string, virtualID string, update SlotAgentUpdate) error {
path := "/calendar/slots/virtual/" + virtualID + "/agent-update"
_, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update)
return err
}
// post serialises body as JSON, attaches Authorization, returns
// response body bytes. Non-2xx becomes an error with the body
// included for diagnostics.
func (b *Bridge) post(ctx context.Context, path string, body any) ([]byte, error) {
// PushAgentStatus POSTs /calendar/agent/status. Used to push idle ↔
// busy transitions out of the normal heartbeat cycle.
func (b *Bridge) PushAgentStatus(ctx context.Context, agentID string, status AgentStatusValue) error {
body := AgentStatusPush{
ClawIdentifier: b.ClawIdentifier, AgentID: agentID, Status: status,
}
_, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/status", agentID, body)
return err
}
// doJSON serialises body, attaches the two auth headers, and returns
// the response bytes. Errors on non-2xx with truncated body.
func (b *Bridge) doJSON(ctx context.Context, method, path, agentID string, body any) ([]byte, error) {
if agentID == "" {
return nil, errors.New("calendar: agent_id required for auth headers")
}
raw, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("marshal %s: %w", path, err)
return nil, fmt.Errorf("marshal %s %s: %w", method, path, err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.BackendURL+path, bytes.NewReader(raw))
req, err := http.NewRequestWithContext(ctx, method, b.BaseURL+path, bytes.NewReader(raw))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
if b.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+b.APIKey)
}
return b.do(req)
}
req.Header.Set("X-Agent-ID", agentID)
req.Header.Set("X-Claw-Identifier", b.ClawIdentifier)
func (b *Bridge) get(ctx context.Context, path string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, b.BackendURL+path, nil)
if err != nil {
return nil, err
}
if b.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+b.APIKey)
}
return b.do(req)
}
func (b *Bridge) do(req *http.Request) ([]byte, error) {
res, err := b.HTTP.Do(req)
if err != nil {
return nil, fmt.Errorf("%s %s: %w", req.Method, req.URL.Path, err)
return nil, fmt.Errorf("%s %s: %w", method, path, err)
}
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
out, _ := io.ReadAll(res.Body)
if res.StatusCode < 200 || res.StatusCode >= 300 {
return nil, fmt.Errorf("%s %s → %d: %s",
req.Method, req.URL.Path, res.StatusCode, truncate(body, 300))
method, path, res.StatusCode, truncate(out, 300))
}
return body, nil
return out, nil
}
func truncate(b []byte, n int) string {

View File

@@ -1,14 +1,16 @@
// Scheduler — main loop that heartbeats the backend, dispatches
// returned slots via Plexum's WakeAgent, and tracks per-agent active
// slot state for the calendar_* tools.
// Scheduler — loops over every Plexum agent, heartbeats per-agent,
// picks the highest-priority pending slot for each, dispatches via
// host.WakeAgent. Mirrors HarborForge.OpenclawPlugin's per-agent
// scheduler loop (PLG-CAL-002).
//
// State is in-memory: a daemon restart drops everything. Next
// heartbeat reconciles (backend keeps the canonical SlotStatus).
// In-memory state: per-agent active slot map. A daemon restart drops
// it; next heartbeat reconciles from the backend's canonical state.
//
// Concurrency:
// - one heartbeat ticker goroutine
// - per-slot dispatch is fire-and-forget via WakeAgent (queue-aware)
// - mu guards activeBySlot + activeByAgent maps
// Wake semantics: WakeAgent is fire-and-forget; the SDK's wake queue
// (depth 1 replace-newest) handles state-aware dispatch. We mark the
// slot Ongoing optimistically the moment we call WakeAgent; agents
// drive complete/abort/pause/resume via the harborforge_calendar_*
// tools.
package calendar
@@ -22,22 +24,20 @@ import (
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
)
// Scheduler orchestrates the calendar loop.
// Scheduler is the long-running calendar driver.
type Scheduler struct {
cfg Config
bridge *Bridge
host sdkplugin.HostAPI
agentLister func() []ReportableAgent
identifier string
pluginInfo PluginInfoTag
mu sync.Mutex
activeBySlotID map[string]*ActiveSlot
activeByAgentID map[string]*ActiveSlot
activeBySlotIdent map[string]*ActiveSlot
history []HistoryEntry
lastHeartbeat time.Time
lastResponse HeartbeatResponse
restartPending bool
lastHeartbeats map[string]time.Time
lastErrors map[string]string
}
// Config bundles scheduler tunables.
@@ -46,26 +46,32 @@ type Config struct {
HistoryCap int // bound on activity history; default 32
}
// ReportableAgent is the projection of a Plexum agent the scheduler
// needs for heartbeat — id + model + current sm state.
// PluginInfoTag tags heartbeat reports so the backend knows which
// plugin / version is reporting.
type PluginInfoTag struct {
Name string
Version string
Backend string // "plexum"
}
// ReportableAgent is the per-agent projection the scheduler needs for
// heartbeat enumeration.
type ReportableAgent struct {
ID string
Model string
State AgentStatusValue
}
// ActiveSlot tracks an in-flight slot (between WakeAgent dispatch and
// terminal status update).
// ActiveSlot tracks an in-flight slot from dispatch to terminal state.
type ActiveSlot struct {
Slot Slot
StartedAt time.Time
LastHeartbeat time.Time
State SlotStatus
}
// HistoryEntry is one resolved slot kept for the calendar_status tool.
// HistoryEntry records one resolved slot for the calendar_status tool.
type HistoryEntry struct {
SlotID string
Ident string
AgentID string
Status SlotStatus
ResolvedAt time.Time
@@ -75,7 +81,7 @@ type HistoryEntry struct {
// NewScheduler constructs a Scheduler in stopped state.
func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
identifier string, pluginInfo PluginInfoTag,
pluginInfo PluginInfoTag,
agentLister func() []ReportableAgent) *Scheduler {
if cfg.HeartbeatInterval <= 0 {
cfg.HeartbeatInterval = 30 * time.Second
@@ -88,189 +94,228 @@ func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
bridge: bridge,
host: host,
agentLister: agentLister,
identifier: identifier,
pluginInfo: pluginInfo,
activeBySlotID: map[string]*ActiveSlot{},
activeByAgentID: map[string]*ActiveSlot{},
activeBySlotIdent: map[string]*ActiveSlot{},
lastHeartbeats: map[string]time.Time{},
lastErrors: map[string]string{},
}
}
// Run blocks until ctx cancels, ticking heartbeats every
// cfg.HeartbeatInterval. Returns nil on graceful shutdown.
// Run blocks until ctx cancels.
func (s *Scheduler) Run(ctx context.Context) error {
t := time.NewTicker(s.cfg.HeartbeatInterval)
defer t.Stop()
// First heartbeat immediately so initial state lands fast.
s.heartbeatOnce(ctx)
s.tick(ctx)
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
s.heartbeatOnce(ctx)
s.tick(ctx)
}
}
}
func (s *Scheduler) heartbeatOnce(ctx context.Context) {
payload := HeartbeatPayload{
Identifier: s.identifier,
APIKey: s.bridge.APIKey,
PluginInfo: s.pluginInfo,
CapturedAt: time.Now().UTC(),
}
if s.agentLister != nil {
for _, a := range s.agentLister() {
payload.AgentList = append(payload.AgentList, AgentReport{
ID: a.ID, Model: a.Model, Status: a.State,
})
}
}
resp, err := s.bridge.Heartbeat(ctx, payload)
s.mu.Lock()
s.lastHeartbeat = time.Now()
if err == nil {
s.lastResponse = resp
s.restartPending = resp.RestartPending
}
s.mu.Unlock()
if err != nil {
return // network blip; next tick retries
}
for _, slot := range resp.SlotsToFire {
s.dispatchSlot(ctx, slot)
}
}
// dispatchSlot fires the slot via host.WakeAgent and records it as
// active. WakeAgent handles state-aware queueing — if the agent is
// busy, our calendar slot enqueues at depth 1 and the previous wake
// is dropped per replace-newest semantics. We mark the slot
// in_progress optimistically when we ENQUEUED; backend reconciles on
// its own watchdog.
func (s *Scheduler) dispatchSlot(ctx context.Context, slot Slot) {
// Skip already-active slots (heartbeat may re-list a slot we
// already started — backend hasn't seen our optimistic update yet).
s.mu.Lock()
if _, ok := s.activeBySlotID[slot.ID]; ok {
s.mu.Unlock()
func (s *Scheduler) tick(ctx context.Context) {
if s.agentLister == nil {
return
}
now := time.Now().UTC()
act := &ActiveSlot{
Slot: slot, StartedAt: now, LastHeartbeat: now,
State: SlotInProgress,
agents := s.agentLister()
s.host.Log("info", "calendar tick", map[string]any{"agents": len(agents)})
for _, agent := range agents {
s.tickForAgent(ctx, agent, now)
}
}
s.activeBySlotID[slot.ID] = act
s.activeByAgentID[slot.AgentID] = act
s.mu.Unlock()
message := slot.WakeOptions.OverrideMessage
if message == "" {
message = slot.PromptText
}
if message == "" {
message = fmt.Sprintf("[calendar] slot %s: %s", slot.ID, slot.Title)
}
source := fmt.Sprintf("calendar:slot-%s", slot.ID)
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
AgentID: slot.AgentID,
Message: message,
Source: source,
}); err != nil {
// Wake itself failed (plumbing). Mark slot aborted +
// notify backend.
s.resolveSlot(ctx, slot.ID, SlotAborted, "", "wake-agent failed: "+err.Error())
func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now time.Time) {
resp, err := s.bridge.Heartbeat(ctx, agent.ID)
s.mu.Lock()
s.lastHeartbeats[agent.ID] = now
if err != nil {
s.lastErrors[agent.ID] = err.Error()
s.mu.Unlock()
s.host.Log("warn", "calendar heartbeat failed", map[string]any{
"agent": agent.ID, "err": err.Error(),
})
return
}
delete(s.lastErrors, agent.ID)
s.mu.Unlock()
s.host.Log("info", "calendar heartbeat ok", map[string]any{
"agent": agent.ID, "slots": len(resp.Slots), "agent_status": string(resp.AgentStatus),
})
// Pick highest-priority NotStarted slot; defer the rest.
var chosen *Slot
for i := range resp.Slots {
slot := &resp.Slots[i]
if slot.Status != SlotNotStarted && slot.Status != SlotDeferred {
continue
}
if chosen == nil || slot.Priority > chosen.Priority {
chosen = slot
}
}
s.host.Log("info", "calendar slot selection", map[string]any{
"agent": agent.ID, "available": len(resp.Slots), "chosen": chosen != nil,
})
if chosen != nil {
s.dispatchSlot(ctx, agent.ID, *chosen)
}
// Defer the other unchosen NotStarted/Deferred slots (priority +1)
// so they bubble up next heartbeat. We don't strictly need to push
// the update; the backend's priority bookkeeping survives without
// our nudge for v1. (OpenClaw plugin DOES push priority bumps —
// future v2 work if backend feedback shows starvation.)
}
// resolveSlot moves an active slot to a terminal status, records
// history, and tells the backend. Safe to call concurrently.
func (s *Scheduler) resolveSlot(ctx context.Context, slotID string, status SlotStatus, summary, reason string) error {
// dispatchSlot fires WakeAgent + records the slot active. Marks the
// slot Ongoing on the backend so the dashboard reflects the
// transition immediately.
func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) {
ident := slot.SlotIdent()
s.host.Log("info", "calendar dispatchSlot enter", map[string]any{
"agent": agentID, "slot_ident": ident,
})
s.mu.Lock()
act, ok := s.activeBySlotID[slotID]
if !ok {
if _, dup := s.activeBySlotIdent[ident]; dup {
s.mu.Unlock()
return fmt.Errorf("calendar: slot %s not active", slotID)
s.host.Log("info", "calendar dispatchSlot skipped (already active)", map[string]any{"slot": ident})
return
}
delete(s.activeBySlotID, slotID)
delete(s.activeByAgentID, act.Slot.AgentID)
s.appendHistoryLocked(HistoryEntry{
SlotID: slotID, AgentID: act.Slot.AgentID, Status: status,
if _, agentBusy := s.activeByAgentID[agentID]; agentBusy {
// Don't pick up another slot until the current one resolves.
s.mu.Unlock()
s.host.Log("info", "calendar dispatchSlot skipped (agent has active slot)", map[string]any{"agent": agentID})
return
}
now := time.Now().UTC()
active := &ActiveSlot{Slot: slot, StartedAt: now, LastHeartbeat: now}
s.activeBySlotIdent[ident] = active
s.activeByAgentID[agentID] = active
s.mu.Unlock()
message := buildWakeMessage(slot)
source := "calendar:" + ident
s.host.Log("info", "calendar firing WakeAgent", map[string]any{
"agent": agentID, "slot": ident, "source": source, "msg_len": len(message),
})
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
AgentID: agentID, Message: message, Source: source,
}); err != nil {
s.host.Log("warn", "calendar WakeAgent failed", map[string]any{
"agent": agentID, "err": err.Error(),
})
s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error())
return
}
s.host.Log("info", "calendar WakeAgent enqueued ok", map[string]any{
"agent": agentID, "slot": ident,
})
// Mark Ongoing on the backend.
update := SlotAgentUpdate{
Status: SlotOngoing, StartedAt: now.Format("15:04:05"),
}
s.pushUpdate(ctx, agentID, slot, update)
}
func buildWakeMessage(slot Slot) string {
// Backend EventData → prompt. v1 is intentionally simple; refine
// when the prompt-engineering side of the plugin matures.
if slot.EventType != nil {
switch *slot.EventType {
case EventTypeSystemEvent:
if ev, ok := slot.EventData["event"].(string); ok {
return fmt.Sprintf("[calendar system_event] %s", ev)
}
case EventTypeJob:
code, _ := slot.EventData["code"].(string)
typ, _ := slot.EventData["type"].(string)
if code != "" {
return fmt.Sprintf("[calendar job %s/%s] please handle this", typ, code)
}
}
}
return fmt.Sprintf("[calendar slot %s] scheduled work — please proceed", slot.SlotIdent())
}
// CompleteForAgent → terminal; pushes Finished to backend.
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
now := time.Now().UTC()
duration := int(now.Sub(act.StartedAt).Minutes())
if duration < 1 {
duration = 1
}
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{
Status: SlotFinished, ActualDuration: duration,
}); err != nil {
return err
}
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotFinished, summary, "")
return nil
}
// AbortForAgent → terminal; pushes Aborted to backend.
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotAborted}); err != nil {
return err
}
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotAborted, "", reason)
return nil
}
// PauseForAgent → non-terminal; pushes Paused.
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotPaused})
}
// ResumeForAgent → non-terminal; pushes Ongoing.
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotOngoing})
}
func (s *Scheduler) pushUpdate(ctx context.Context, agentID string, slot Slot, update SlotAgentUpdate) error {
if slot.HasRealID() {
return s.bridge.UpdateRealSlot(ctx, agentID, *slot.ID, update)
}
if slot.VirtualID != nil {
return s.bridge.UpdateVirtualSlot(ctx, agentID, *slot.VirtualID, update)
}
return errors.New("calendar: slot has neither real id nor virtual id")
}
func (s *Scheduler) resolveLocally(ident, agentID string, status SlotStatus, summary, reason string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.activeBySlotIdent, ident)
delete(s.activeByAgentID, agentID)
s.history = append(s.history, HistoryEntry{
Ident: ident, AgentID: agentID, Status: status,
ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason,
})
s.mu.Unlock()
return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{
Status: status, Summary: summary, Reason: reason,
})
}
// SetSlotState is a non-terminal status change (paused/resumed).
// Records the new state in-memory and tells the backend.
func (s *Scheduler) SetSlotState(ctx context.Context, slotID string, status SlotStatus, reason string) error {
s.mu.Lock()
act, ok := s.activeBySlotID[slotID]
if !ok {
s.mu.Unlock()
return fmt.Errorf("calendar: slot %s not active", slotID)
}
act.State = status
act.LastHeartbeat = time.Now().UTC()
s.mu.Unlock()
return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{
Status: status, Reason: reason,
})
}
func (s *Scheduler) appendHistoryLocked(entry HistoryEntry) {
s.history = append(s.history, entry)
if len(s.history) > s.cfg.HistoryCap {
s.history = s.history[len(s.history)-s.cfg.HistoryCap:]
}
}
// CompleteForAgent / AbortForAgent / PauseForAgent / ResumeForAgent
// are the agent-facing tool entry points. They look up the agent's
// active slot, transition or terminate it, and notify the backend.
// CompleteForAgent terminates the agent's active slot as completed.
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
slot, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.resolveSlot(ctx, slot.Slot.ID, SlotCompleted, summary, "")
}
// AbortForAgent terminates the agent's active slot as aborted.
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
slot, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.resolveSlot(ctx, slot.Slot.ID, SlotAborted, "", reason)
}
// PauseForAgent transitions the agent's slot to paused.
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
slot, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.SetSlotState(ctx, slot.Slot.ID, SlotPaused, reason)
}
// ResumeForAgent transitions the agent's slot back to in_progress.
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
slot, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.SetSlotState(ctx, slot.Slot.ID, SlotInProgress, "")
}
// activeSlotForAgent returns the per-agent active slot copy under lock.
func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
s.mu.Lock()
defer s.mu.Unlock()
@@ -281,36 +326,59 @@ func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
return *act, true
}
// Status returns the introspection shape for the calendar_status tool.
func (s *Scheduler) Status() SchedulerStatus {
s.mu.Lock()
defer s.mu.Unlock()
active := make([]ActiveSlot, 0, len(s.activeBySlotID))
for _, a := range s.activeBySlotID {
active = append(active, *a)
}
history := make([]HistoryEntry, len(s.history))
copy(history, s.history)
return SchedulerStatus{
Enabled: true,
LastHeartbeat: s.lastHeartbeat,
HeartbeatEvery: s.cfg.HeartbeatInterval,
Active: active,
History: history,
RestartPending: s.restartPending,
}
}
// SchedulerStatus is the shape calendar_status returns.
type SchedulerStatus struct {
// Status is the introspection shape calendar_status returns.
type Status struct {
Enabled bool `json:"enabled"`
LastHeartbeat time.Time `json:"last_heartbeat"`
LastHeartbeats map[string]time.Time `json:"last_heartbeats"`
LastErrors map[string]string `json:"last_errors,omitempty"`
HeartbeatEvery time.Duration `json:"heartbeat_every"`
Active []ActiveSlot `json:"active"`
History []HistoryEntry `json:"history"`
RestartPending bool `json:"restart_pending"`
}
// ErrNoActiveSlot is returned by calendar_complete/abort/pause/resume
// when the agent has no slot in progress.
// SingleActiveAgentID returns the agent id when exactly one active
// slot exists, empty otherwise. Used by the plugin's bestEffortAgentID
// fallback for tool calls that don't carry agent context.
func (s *Scheduler) SingleActiveAgentID() string {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.activeByAgentID) != 1 {
return ""
}
for k := range s.activeByAgentID {
return k
}
return ""
}
// Status returns the introspection shape calendar_status returns.
func (s *Scheduler) Status() Status {
s.mu.Lock()
defer s.mu.Unlock()
active := make([]ActiveSlot, 0, len(s.activeByAgentID))
for _, a := range s.activeByAgentID {
active = append(active, *a)
}
hb := make(map[string]time.Time, len(s.lastHeartbeats))
for k, v := range s.lastHeartbeats {
hb[k] = v
}
errs := make(map[string]string, len(s.lastErrors))
for k, v := range s.lastErrors {
errs[k] = v
}
history := make([]HistoryEntry, len(s.history))
copy(history, s.history)
return Status{
Enabled: true,
LastHeartbeats: hb,
LastErrors: errs,
HeartbeatEvery: s.cfg.HeartbeatInterval,
Active: active,
History: history,
}
}
// ErrNoActiveSlot is returned when an agent calls calendar_complete /
// abort / pause / resume but has no slot active.
var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")

View File

@@ -1,106 +1,154 @@
// Package calendar talks to the HarborForge backend's Calendar API
// (heartbeat, slot fetch, status update, restart-pending check) and
// drives a scheduler loop that fires Plexum wake events when slots
// come due. Types mirror HarborForge.OpenclawPlugin's calendar/types.ts
// so the backend doesn't need to know which plugin is reporting.
// Types matching HarborForge.Backend's actual calendar API contract
// (verified via /openapi.json on a running backend). Aligns 1:1 with
// HarborForge.OpenclawPlugin/plugin/calendar/types.ts so the two
// plugins can hit the same backend interchangeably.
package calendar
import "time"
// SlotStatus enumerates the slot lifecycle.
// SlotStatus enumerates the lifecycle. String values match backend's
// SlotStatus enum verbatim (snake_case — verified via heartbeat
// response shape against running harborforge-backend).
type SlotStatus string
const (
SlotNotStarted SlotStatus = "not_started"
SlotInProgress SlotStatus = "in_progress"
SlotCompleted SlotStatus = "completed"
SlotOngoing SlotStatus = "ongoing"
SlotFinished SlotStatus = "finished"
SlotAborted SlotStatus = "aborted"
SlotPaused SlotStatus = "paused"
SlotDeferred SlotStatus = "deferred"
SlotPaused SlotStatus = "paused"
SlotSkipped SlotStatus = "skipped"
)
// AgentStatusValue mirrors the backend AgentStatus enum used in
// heartbeat responses (a hint about what the backend thinks the
// agent is doing).
// SlotType: work vs on_call. Affects whether the agent flips to busy.
type SlotType string
const (
SlotTypeWork SlotType = "work"
SlotTypeOnCall SlotType = "on_call"
)
// EventType categorises what the slot represents.
type EventType string
const (
EventTypeJob EventType = "job"
EventTypeSystemEvent EventType = "system_event"
EventTypeEntertainment EventType = "entertainment"
)
// AgentStatusValue mirrors the backend AgentStatus enum.
type AgentStatusValue string
const (
AgentStatusUnknown AgentStatusValue = "unknown"
AgentStatusIdle AgentStatusValue = "idle"
AgentStatusBusy AgentStatusValue = "busy"
AgentStatusOffline AgentStatusValue = "offline"
AgentStatusOnCall AgentStatusValue = "on_call"
AgentStatusPaused AgentStatusValue = "paused"
AgentStatusExhausted AgentStatusValue = "exhausted"
)
// SlotKind is "work" vs "on_call" — affects how the scheduler treats
// the slot (on_call slots don't move the agent into busy).
type SlotKind string
const (
SlotKindWork SlotKind = "work"
SlotKindOnCall SlotKind = "on_call"
)
// Slot is one Calendar TimeSlot the backend serves.
type Slot struct {
ID string `json:"id"`
VirtualID string `json:"virtual_id,omitempty"`
// HeartbeatRequest is the POST /calendar/agent/heartbeat body.
type HeartbeatRequest struct {
ClawIdentifier string `json:"claw_identifier"`
AgentID string `json:"agent_id"`
ClawID string `json:"claw_identifier,omitempty"`
Kind SlotKind `json:"slot_type"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
ScheduledAt time.Time `json:"scheduled_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
Status SlotStatus `json:"status"`
PromptText string `json:"prompt,omitempty"`
WakeOptions WakeOpts `json:"wake_options,omitempty"`
}
// WakeOpts customise how the scheduler should drive the agent. v1
// honours only Force; the rest pass through as audit trail.
type WakeOpts struct {
Force bool `json:"force,omitempty"`
OverrideMessage string `json:"override_message,omitempty"`
ScopeSessionID string `json:"scope_session_id,omitempty"`
}
// HeartbeatPayload is what the plugin POSTs every interval.
type HeartbeatPayload struct {
Identifier string `json:"identifier"`
APIKey string `json:"api_key,omitempty"`
AgentList []AgentReport `json:"agents"`
PluginInfo PluginInfoTag `json:"plugin"`
CapturedAt time.Time `json:"captured_at"`
}
// AgentReport is one entry in HeartbeatPayload.AgentList.
type AgentReport struct {
ID string `json:"agent_id"`
Status AgentStatusValue `json:"status"`
Model string `json:"model,omitempty"`
}
// PluginInfoTag identifies which plugin / version is heartbeating.
type PluginInfoTag struct {
Name string `json:"name"` // "harbor-forge"
Version string `json:"version"` // e.g. 0.1.0
Backend string `json:"backend"` // "plexum"
}
// HeartbeatResponse is the backend's reply. SlotsToFire are slots
// the scheduler should attempt to start.
// HeartbeatResponse is the backend's reply.
type HeartbeatResponse struct {
SlotsToFire []Slot `json:"slots_to_fire,omitempty"`
RestartPending bool `json:"restart_pending,omitempty"`
ServerTime time.Time `json:"server_time"`
Slots []Slot `json:"slots"`
AgentStatus AgentStatusValue `json:"agent_status"`
Message string `json:"message,omitempty"`
}
// SlotUpdate is the body of POST /calendar/slot/<id>/status.
type SlotUpdate struct {
// Slot is one calendar TimeSlot — real (has ID) or virtual
// (has VirtualID). Field names mirror the backend's
// CalendarSlotResponse schema.
type Slot struct {
ID *int64 `json:"id"` // real slot db id; null for virtual
VirtualID *string `json:"virtual_id"` // plan-{plan_id}-{date}; null for real
UserID int64 `json:"user_id"`
Date string `json:"date"` // YYYY-MM-DD
SlotType SlotType `json:"slot_type"`
EstimatedDuration int `json:"estimated_duration"` // minutes
ScheduledAt string `json:"scheduled_at"` // HH:MM:SS
StartedAt *string `json:"started_at"`
Attended bool `json:"attended"`
ActualDuration *int `json:"actual_duration"`
EventType *EventType `json:"event_type"`
EventData EventData `json:"event_data"`
Priority int `json:"priority"`
Status SlotStatus `json:"status"`
Summary string `json:"summary,omitempty"`
Reason string `json:"reason,omitempty"`
PlanID *int64 `json:"plan_id"`
}
// EventData is loosely-typed since the backend stores it as JSONB and
// the shape varies by event_type. Plugin code does best-effort
// unmarshal into JobData / SystemEventData when needed.
type EventData map[string]any
// JobData is the event_data shape when event_type=="job".
type JobData struct {
Type string `json:"type"` // Task|Support|Meeting|Essential
Code string `json:"code"` // e.g. "TASK-42"
WorkingSessions []string `json:"working_sessions"` // arbitrary session ids
}
// SystemEventData is the event_data shape when event_type=="system_event".
type SystemEventData struct {
Event string `json:"event"` // ScheduleToday | SummaryToday | ScheduledGatewayRestart
}
// SlotAgentUpdate is the body of PATCH /calendar/slots/{id}/agent-update
// (and the virtual variant). started_at + actual_duration are set
// depending on which status transition the agent is reporting.
type SlotAgentUpdate struct {
Status SlotStatus `json:"status"`
StartedAt string `json:"started_at,omitempty"` // HH:MM:SS
ActualDuration int `json:"actual_duration,omitempty"` // minutes
}
// AgentStatusPush is the body of POST /calendar/agent/status.
type AgentStatusPush struct {
ClawIdentifier string `json:"claw_identifier"`
AgentID string `json:"agent_id"`
Status AgentStatusValue `json:"status"`
}
// HasRealID reports whether a Slot is the materialized (DB row) flavor.
func (s Slot) HasRealID() bool { return s.ID != nil && *s.ID > 0 }
// SlotIdent returns a stable string identifier for log + map keys —
// "real:<id>" for materialized, "virtual:<vid>" for virtual.
func (s Slot) SlotIdent() string {
if s.HasRealID() {
return formatInt("real", *s.ID)
}
if s.VirtualID != nil {
return "virtual:" + *s.VirtualID
}
return "unknown:" + time.Now().UTC().Format(time.RFC3339Nano)
}
func formatInt(prefix string, n int64) string {
// avoid pulling fmt for one call
const digits = "0123456789"
if n == 0 {
return prefix + ":0"
}
neg := n < 0
if neg {
n = -n
}
buf := make([]byte, 0, 20)
for n > 0 {
buf = append([]byte{digits[n%10]}, buf...)
n /= 10
}
if neg {
buf = append([]byte{'-'}, buf...)
}
return prefix + ":" + string(buf)
}

View File

@@ -35,6 +35,17 @@ type Config struct {
// server listens on. Zero/missing disables the bridge entirely.
MonitorPort int `json:"monitor_port,omitempty"`
// MonitorPushEnabled toggles the active push loop that uploads
// system telemetry to BackendURL /monitor/server/heartbeat. Lets
// HF plugin replace the standalone harborforge-monitor container.
// nil (unset) defaults to false; operators must opt in explicitly
// since they need to provision APIKey too.
MonitorPushEnabled *bool `json:"monitor_push_enabled,omitempty"`
// MonitorPushIntervalSeconds — defaults to 30s when ≤0. Mirrors
// the standalone monitor's HF_MONITER_REPORT_INTERVAL knob.
MonitorPushIntervalSeconds int `json:"monitor_push_interval_seconds,omitempty"`
// CalendarHeartbeatIntervalSeconds — defaults to 30s when ≤0.
CalendarHeartbeatIntervalSeconds int `json:"calendar_heartbeat_interval_seconds,omitempty"`
@@ -57,6 +68,8 @@ type Resolved struct {
Identifier string
APIKey string
MonitorPort int
MonitorPushEnabled bool
MonitorPushIntervalSeconds int
CalendarEnabled bool
CalendarHeartbeatIntervalSeconds int
CalendarBackendURL string
@@ -104,6 +117,8 @@ func Resolve(c Config) Resolved {
Identifier: c.Identifier,
APIKey: c.APIKey,
MonitorPort: c.MonitorPort,
MonitorPushEnabled: false,
MonitorPushIntervalSeconds: 30,
CalendarEnabled: true,
CalendarHeartbeatIntervalSeconds: 30,
CalendarBackendURL: c.CalendarBackendURL,
@@ -127,6 +142,12 @@ func Resolve(c Config) Resolved {
if c.CalendarHeartbeatIntervalSeconds > 0 {
out.CalendarHeartbeatIntervalSeconds = c.CalendarHeartbeatIntervalSeconds
}
if c.MonitorPushEnabled != nil {
out.MonitorPushEnabled = *c.MonitorPushEnabled
}
if c.MonitorPushIntervalSeconds > 0 {
out.MonitorPushIntervalSeconds = c.MonitorPushIntervalSeconds
}
if c.RestartPollIntervalSeconds > 0 {
out.RestartPollIntervalSeconds = c.RestartPollIntervalSeconds
}

256
internal/kbblock/kbblock.go Normal file
View File

@@ -0,0 +1,256 @@
// Package kbblock implements the per-session knowledge-base block
// owned by the HarborForge Plexum plugin per Plexum DESIGN-DYNAMIC-
// BLOCK.md §3.3 / §4.4.
//
// The kb-block stores HarborForge KB facts the agent has cached for
// the current session. Each entry renders as
//
// <kb-fact id=N kb=<code> source=topic:<slug>>content</kb-fact>
//
// (id = HF backend's DB primary key for the fact, NOT a per-session
// monotonic). InsertSeq controls cache-insertion-order rendering per
// §9 #4. Duplicate Add(id, ...) silently no-ops per §9 #4.
//
// Storage location: plugin-managed file at
//
// <PLEXUM_PROFILE_ROOT>/agents/<agentID>/sessions/<sessionID>/
// plugins/harbor-forge/kb-block.json
//
// Plexum core's session cleanup (del-session) cascades this directory
// alongside the rest of the session state.
//
// Fade-out (§9 #3 — KB fade params = memory fade params n=5/w=10/m=70)
// is NOT applied in v1. Phase 2 v1 stores raw content + agent
// dynamic-kb-evict is the only eviction path. Fade will be added once
// we have prod data on whether KB facts need it.
package kbblock
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
)
const FileName = "kb-block.json"
const currentVersion = 1
// Entry is one cached KB fact.
type Entry struct {
ID int `json:"id"` // HF backend DB primary key
KBCode string `json:"kb_code"` // e.g. "KB-PAYROT"
SourceTopic string `json:"source_topic"` // human slug from HF topic, e.g. "debugging"
Content string `json:"content"`
InsertSeq int `json:"insert_seq"` // per-session monotonic, for render order
AddedAtTurn int `json:"added_at_turn"`
LastRefreshAtTurn int `json:"last_refresh_at_turn"`
}
// Block is the in-memory representation of kb-block.json.
type Block struct {
mu sync.Mutex
path string
NextSeq int `json:"next_seq"`
Entries []*Entry `json:"entries"`
}
// SessionDir returns the plugin-scoped session subdir under the Plexum
// profile root: <profileRoot>/agents/<agentID>/sessions/<sessionID>/
// plugins/harbor-forge.
func SessionDir(profileRoot, agentID, sessionID string) string {
return filepath.Join(profileRoot, "agents", agentID, "sessions", sessionID, "plugins", "harbor-forge")
}
// Open loads the block at <SessionDir>/kb-block.json. Missing file →
// empty block (NextSeq=1). Caller is expected to Close-free; Block
// can be re-Opened cheaply per-call (small JSON).
func Open(profileRoot, agentID, sessionID string) (*Block, error) {
if profileRoot == "" || agentID == "" || sessionID == "" {
return nil, fmt.Errorf("kbblock.Open: profileRoot/agentID/sessionID all required")
}
path := filepath.Join(SessionDir(profileRoot, agentID, sessionID), FileName)
b := &Block{path: path, NextSeq: 1}
raw, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return b, nil
}
return nil, fmt.Errorf("kbblock: read %s: %w", path, err)
}
if len(raw) == 0 {
return b, nil
}
var loaded struct {
NextSeq int `json:"next_seq"`
Entries []*Entry `json:"entries"`
}
if err := json.Unmarshal(raw, &loaded); err != nil {
return nil, fmt.Errorf("kbblock: parse %s: %w", path, err)
}
if loaded.NextSeq < 1 {
loaded.NextSeq = 1
}
b.NextSeq = loaded.NextSeq
b.Entries = loaded.Entries
return b, nil
}
// Path returns the underlying file path.
func (b *Block) Path() string { return b.path }
// Len returns the entry count.
func (b *Block) Len() int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.Entries)
}
// Has reports whether a fact with this ID is already cached.
func (b *Block) Has(id int) bool {
b.mu.Lock()
defer b.mu.Unlock()
for _, e := range b.Entries {
if e.ID == id {
return true
}
}
return false
}
// Add appends a new entry. Duplicate ID → silent no-op, returns nil.
// Does NOT persist; caller must Save.
func (b *Block) Add(id int, kbCode, sourceTopic, content string, atTurn int) *Entry {
b.mu.Lock()
defer b.mu.Unlock()
for _, e := range b.Entries {
if e.ID == id {
return nil
}
}
e := &Entry{
ID: id,
KBCode: kbCode,
SourceTopic: sourceTopic,
Content: content,
InsertSeq: b.NextSeq,
AddedAtTurn: atTurn,
LastRefreshAtTurn: atTurn,
}
b.NextSeq++
b.Entries = append(b.Entries, e)
return e
}
// Remove drops entries by ID. Returns the IDs actually removed.
func (b *Block) Remove(ids ...int) []int {
b.mu.Lock()
defer b.mu.Unlock()
want := map[int]bool{}
for _, id := range ids {
want[id] = true
}
kept := b.Entries[:0]
var removed []int
for _, e := range b.Entries {
if want[e.ID] {
removed = append(removed, e.ID)
continue
}
kept = append(kept, e)
}
b.Entries = kept
return removed
}
// Lookup returns the entry with the given ID, or nil.
func (b *Block) Lookup(id int) *Entry {
b.mu.Lock()
defer b.mu.Unlock()
for _, e := range b.Entries {
if e.ID == id {
return e
}
}
return nil
}
// AllIDs returns the cached fact IDs sorted by InsertSeq ascending.
func (b *Block) AllIDs() []int {
b.mu.Lock()
defer b.mu.Unlock()
out := make([]*Entry, len(b.Entries))
copy(out, b.Entries)
sort.Slice(out, func(i, j int) bool { return out[i].InsertSeq < out[j].InsertSeq })
ids := make([]int, len(out))
for i, e := range out {
ids[i] = e.ID
}
return ids
}
// Save atomically writes (tmp+rename, 0600). Empty block with no prior
// file = no-op. Parent dirs auto-created.
func (b *Block) Save() error {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.Entries) == 0 {
if _, err := os.Stat(b.path); os.IsNotExist(err) {
return nil
}
}
if err := os.MkdirAll(filepath.Dir(b.path), 0o755); err != nil {
return err
}
payload := struct {
Version int `json:"version"`
NextSeq int `json:"next_seq"`
Entries []*Entry `json:"entries"`
}{Version: currentVersion, NextSeq: b.NextSeq, Entries: b.Entries}
data, err := json.MarshalIndent(payload, "", " ")
if err != nil {
return err
}
tmp := b.path + ".tmp"
if err := os.WriteFile(tmp, data, 0o600); err != nil {
return fmt.Errorf("kbblock: write tmp: %w", err)
}
return os.Rename(tmp, b.path)
}
// Render returns the inner body of the <kb-block> subblock — a flat
// sequence of `<kb-fact id=N kb=<code> source=topic:<slug>>\n<content>
// \n</kb-fact>` separated by single blank lines, ordered by InsertSeq
// (DESIGN-DYNAMIC-BLOCK.md §9 #4). Empty block returns "".
//
// The plugin's DynamicBlockProvider.RenderDynamicSubblock returns this
// to the Plexum host, which wraps in <kb-block>...</kb-block>.
func (b *Block) Render() string {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.Entries) == 0 {
return ""
}
ordered := make([]*Entry, len(b.Entries))
copy(ordered, b.Entries)
sort.Slice(ordered, func(i, j int) bool { return ordered[i].InsertSeq < ordered[j].InsertSeq })
var sb strings.Builder
for i, e := range ordered {
if i > 0 {
sb.WriteByte('\n')
}
fmt.Fprintf(&sb, "<kb-fact id=%d kb=%s", e.ID, e.KBCode)
if e.SourceTopic != "" {
fmt.Fprintf(&sb, " source=topic:%s", e.SourceTopic)
}
sb.WriteString(">\n")
sb.WriteString(e.Content)
if !strings.HasSuffix(e.Content, "\n") {
sb.WriteByte('\n')
}
sb.WriteString("</kb-fact>\n")
}
return sb.String()
}

View File

@@ -0,0 +1,139 @@
package kbblock
import (
"os"
"path/filepath"
"strings"
"testing"
)
func TestSessionDir(t *testing.T) {
got := SessionDir("/root/.plexum", "alice", "s_1")
want := "/root/.plexum/agents/alice/sessions/s_1/plugins/harbor-forge"
if got != want {
t.Errorf("SessionDir = %q want %q", got, want)
}
}
func TestOpenMissingFile(t *testing.T) {
b, err := Open(t.TempDir(), "alice", "s_1")
if err != nil {
t.Fatal(err)
}
if b.Len() != 0 || b.NextSeq != 1 {
t.Errorf("empty open: Len=%d NextSeq=%d", b.Len(), b.NextSeq)
}
}
func TestOpenRequiresAllArgs(t *testing.T) {
if _, err := Open("", "a", "s"); err == nil {
t.Error("want error on empty profileRoot")
}
if _, err := Open("/x", "", "s"); err == nil {
t.Error("want error on empty agentID")
}
if _, err := Open("/x", "a", ""); err == nil {
t.Error("want error on empty sessionID")
}
}
func TestAddAndDuplicate(t *testing.T) {
b, _ := Open(t.TempDir(), "a", "s")
e := b.Add(42, "KB-PAYROT", "debugging", "OOM fix", 3)
if e == nil || e.ID != 42 || e.KBCode != "KB-PAYROT" || e.SourceTopic != "debugging" ||
e.InsertSeq != 1 || e.AddedAtTurn != 3 {
t.Errorf("entry wrong: %+v", e)
}
if b.Add(42, "X", "y", "z", 7) != nil {
t.Error("dup should return nil")
}
if b.Len() != 1 {
t.Errorf("Len=%d", b.Len())
}
}
func TestRoundtrip(t *testing.T) {
dir := t.TempDir()
b, _ := Open(dir, "a", "s")
b.Add(101, "KB-A", "topic-a", "a", 1)
b.Add(202, "KB-A", "topic-b", "b", 2)
if err := b.Save(); err != nil {
t.Fatal(err)
}
b2, _ := Open(dir, "a", "s")
if b2.Len() != 2 || b2.NextSeq != 3 {
t.Errorf("reopen Len=%d NextSeq=%d", b2.Len(), b2.NextSeq)
}
if !b2.Has(101) || !b2.Has(202) {
t.Error("entries missing after reopen")
}
}
func TestRenderInsertOrder(t *testing.T) {
b, _ := Open(t.TempDir(), "a", "s")
b.Add(999, "K", "t", "first", 0)
b.Add(1, "K", "t", "second", 0)
out := b.Render()
idx999 := strings.Index(out, "id=999")
idx1 := strings.Index(out, "id=1 ")
if !(idx999 >= 0 && idx1 > idx999) {
t.Errorf("order broken: %q", out)
}
}
func TestRenderAttributes(t *testing.T) {
b, _ := Open(t.TempDir(), "a", "s")
b.Add(42, "KB-PAYROT", "debugging", "OOM fix: bump heap", 0)
out := b.Render()
if !strings.HasPrefix(out, "<kb-fact id=42 kb=KB-PAYROT source=topic:debugging>\n") {
t.Errorf("head wrong: %q", out)
}
if !strings.HasSuffix(out, "</kb-fact>\n") {
t.Errorf("tail wrong: %q", out)
}
}
func TestRenderOmitsSourceWhenEmpty(t *testing.T) {
b, _ := Open(t.TempDir(), "a", "s")
b.Add(7, "KB-X", "", "raw", 0)
out := b.Render()
if strings.Contains(out, "source=topic:") {
t.Errorf("source attr leaked: %q", out)
}
if !strings.Contains(out, "<kb-fact id=7 kb=KB-X>") {
t.Errorf("tight tag missing: %q", out)
}
}
func TestRemove(t *testing.T) {
b, _ := Open(t.TempDir(), "a", "s")
b.Add(1, "K", "t", "a", 0)
b.Add(2, "K", "t", "b", 0)
b.Add(3, "K", "t", "c", 0)
removed := b.Remove(2, 99, 3)
if len(removed) != 2 {
t.Errorf("removed=%v", removed)
}
if b.Len() != 1 || !b.Has(1) {
t.Errorf("post-state wrong: Len=%d", b.Len())
}
}
func TestRenderEmpty(t *testing.T) {
b, _ := Open(t.TempDir(), "a", "s")
if b.Render() != "" {
t.Error("empty Render should be \"\"")
}
}
func TestSaveEmptyNoFile(t *testing.T) {
dir := t.TempDir()
b, _ := Open(dir, "a", "s")
if err := b.Save(); err != nil {
t.Fatal(err)
}
path := filepath.Join(SessionDir(dir, "a", "s"), FileName)
if _, err := os.Stat(path); !os.IsNotExist(err) {
t.Errorf("file should not exist: %v", err)
}
}

285
internal/kbclient/client.go Normal file
View File

@@ -0,0 +1,285 @@
// Package kbclient is a typed HTTP client for the HarborForge.Backend
// knowledge-base REST API. Used by the dynamic-kb-* tools to power
// agent browse + cache flows.
//
// Route shapes verified against
// HarborForge.Backend/app/api/routers/knowledge.py:
//
// GET /knowledge-bases[?project=<code>] list KBs
// GET /knowledge-bases/{kb_id_or_code}/topics list topics in a KB
// GET /knowledge-bases/{kb_id_or_code}/tree full hierarchy
// GET /knowledge-facts/{fact_id} single fact
//
// HF's KB hierarchy is KB → Topics → Categories → Facts. ListFacts
// uses the /tree endpoint and filters client-side to the requested
// topic ids (the backend has no flat per-topic-facts route as of
// the route audit on 2026-06-08).
//
// Auth: v1 uses plugin-level APIKey via Bearer header. Per-agent
// token resolution (matching agent's hf-token from secret-mgr) is a
// TODO — needs SDK extension for plugin → secret-mgr access.
package kbclient
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
// Client is the HTTP wrapper. Construct once at plugin init; safe to
// share across goroutines.
type Client struct {
BaseURL string
APIKey string
HTTP *http.Client
}
// New returns a client with a 30s default timeout. BaseURL has any
// trailing slash trimmed.
func New(baseURL, apiKey string) *Client {
return &Client{
BaseURL: strings.TrimRight(baseURL, "/"),
APIKey: apiKey,
HTTP: &http.Client{Timeout: 30 * time.Second},
}
}
// ---- Payload types matching HF backend response shapes ----
// KBSummary is one row from GET /knowledge-bases.
type KBSummary struct {
ID int `json:"id"`
KnowledgeBaseCode string `json:"knowledge_base_code"`
Title string `json:"title"`
Description string `json:"description"`
}
// TopicSummary is one row from GET /knowledge-bases/{id}/topics.
type TopicSummary struct {
ID int `json:"id"`
Topic string `json:"topic"`
Description string `json:"description"`
}
// FactSummary is one item the dynamic-kb-list-facts tool surfaces to
// the agent. Built client-side from the tree response — title + a
// snippet of the content. Kept light so list-facts output stays
// scan-able even on large KBs.
type FactSummary struct {
ID int `json:"id"`
TopicID int `json:"topic_id"`
TopicSlug string `json:"topic_slug"`
Title string `json:"title"`
Snippet string `json:"snippet"`
}
// Fact is the full fact body fetched by dynamic-kb-cache before
// writing into kb-block.json.
type Fact struct {
ID int `json:"id"`
KBCode string `json:"kb_code"`
TopicSlug string `json:"topic_slug"`
Content string `json:"content"`
}
// ---- HTTP methods ----
// ListKBs hits GET /knowledge-bases[?project=<code>]. Empty
// projectCode = unfiltered.
func (c *Client) ListKBs(ctx context.Context, projectCode string) ([]KBSummary, error) {
path := "/knowledge-bases"
if projectCode != "" {
path += "?project=" + url.QueryEscape(projectCode)
}
var out []KBSummary
if err := c.getJSON(ctx, path, &out); err != nil {
return nil, err
}
return out, nil
}
// ListTopics hits GET /knowledge-bases/{kbCode}/topics. kbCode is the
// human KB code (e.g. "KB-PAYROT"); the backend resolves either int
// id or code in the same path slot.
func (c *Client) ListTopics(ctx context.Context, kbCode string) ([]TopicSummary, error) {
if kbCode == "" {
return nil, fmt.Errorf("kbclient: kb-code required")
}
path := "/knowledge-bases/" + url.PathEscape(kbCode) + "/topics"
var out []TopicSummary
if err := c.getJSON(ctx, path, &out); err != nil {
return nil, err
}
return out, nil
}
// kbTree mirrors the shape of GET /knowledge-bases/{id}/tree. We only
// pull the fields we need to build FactSummary; unknown fields are
// dropped silently.
type kbTree struct {
KnowledgeBaseCode string `json:"knowledge_base_code"`
Topics []treeNode `json:"topics"`
}
type treeNode struct {
ID int `json:"id"`
Topic string `json:"topic"` // topic-level
Category string `json:"category"` // category-level
Title string `json:"title"` // fact-level
Content string `json:"content"` // fact-level
Categories []treeNode `json:"categories"` // children of topic / category
Facts []treeNode `json:"facts"` // fact children
}
// ListFacts hits GET /knowledge-bases/{kbCode}/tree, walks the tree,
// and flattens facts under the requested topic ids into FactSummary
// rows. Snippet = first 120 chars of content (UTF-8 safe-truncated).
func (c *Client) ListFacts(ctx context.Context, kbCode string, topicIDs []int) ([]FactSummary, error) {
if kbCode == "" {
return nil, fmt.Errorf("kbclient: kb-code required")
}
if len(topicIDs) == 0 {
return nil, fmt.Errorf("kbclient: topic-ids required")
}
path := "/knowledge-bases/" + url.PathEscape(kbCode) + "/tree"
var tree kbTree
if err := c.getJSON(ctx, path, &tree); err != nil {
return nil, err
}
wantTopic := map[int]bool{}
for _, id := range topicIDs {
wantTopic[id] = true
}
var out []FactSummary
for _, topic := range tree.Topics {
if !wantTopic[topic.ID] {
continue
}
walkFactsInto(&out, topic.ID, topic.Topic, topic.Categories, topic.Facts)
}
return out, nil
}
// walkFactsInto descends category subtree, collecting facts. cur is
// the topic context (id + slug) for tagging.
func walkFactsInto(out *[]FactSummary, topicID int, topicSlug string, cats, facts []treeNode) {
for _, f := range facts {
*out = append(*out, FactSummary{
ID: f.ID,
TopicID: topicID,
TopicSlug: topicSlug,
Title: f.Title,
Snippet: snippet(f.Content, 120),
})
}
for _, c := range cats {
walkFactsInto(out, topicID, topicSlug, c.Categories, c.Facts)
}
}
// GetFacts pulls each fact's full content by ID. Calls
// /knowledge-facts/{id} per fact; rate-limited to sequential issue
// (small N expected — agents cache a handful per call). Pass kbCode
// for tagging the returned Fact records; backend doesn't return it.
func (c *Client) GetFacts(ctx context.Context, kbCode string, factIDs []int) ([]Fact, error) {
if kbCode == "" {
return nil, fmt.Errorf("kbclient: kb-code required")
}
out := make([]Fact, 0, len(factIDs))
for _, id := range factIDs {
path := "/knowledge-facts/" + strconv.Itoa(id)
var node struct {
ID int `json:"id"`
Title string `json:"title"`
Content string `json:"content"`
TopicID int `json:"topic_id"`
TopicSlug string `json:"topic"`
CategoryID int `json:"category_id"`
}
if err := c.getJSON(ctx, path, &node); err != nil {
// 404 → skip silently; other errors fail the batch
if isNotFound(err) {
continue
}
return nil, err
}
out = append(out, Fact{
ID: node.ID,
KBCode: kbCode,
TopicSlug: node.TopicSlug,
Content: node.Content,
})
}
return out, nil
}
// ---- transport plumbing ----
func (c *Client) getJSON(ctx context.Context, path string, out any) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.BaseURL+path, nil)
if err != nil {
return err
}
if c.APIKey != "" {
req.Header.Set("Authorization", "Bearer "+c.APIKey)
}
req.Header.Set("Accept", "application/json")
res, err := c.HTTP.Do(req)
if err != nil {
return fmt.Errorf("GET %s: %w", path, err)
}
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
if res.StatusCode == 404 {
return notFoundErr{path: path}
}
if res.StatusCode >= 400 {
return fmt.Errorf("GET %s: HTTP %d: %s", path, res.StatusCode, truncate(body, 200))
}
if len(body) == 0 {
return nil
}
if err := json.Unmarshal(body, out); err != nil {
return fmt.Errorf("GET %s decode: %w (body=%q)", path, err, truncate(body, 200))
}
return nil
}
type notFoundErr struct{ path string }
func (e notFoundErr) Error() string { return "GET " + e.path + ": 404 not found" }
func isNotFound(err error) bool {
_, ok := err.(notFoundErr)
return ok
}
func truncate(b []byte, n int) string {
if len(b) > n {
return string(b[:n]) + "..."
}
return string(b)
}
// snippet returns the first n bytes of s, padded with "..." if cut.
// Naive byte slice; HF content is UTF-8 and we accept the small risk
// of cutting mid-rune for log-level rendering (the agent gets full
// content via GetFacts before caching).
func snippet(s string, n int) string {
s = strings.TrimSpace(s)
if len(s) <= n {
return s
}
return s[:n] + "..."
}
// Compile-time guard: caller code expected to wrap bytes.Buffer usage
// (currently unused but here for future POST/PATCH endpoints).
var _ = bytes.NewReader([]byte{})

238
internal/monitor/pusher.go Normal file
View File

@@ -0,0 +1,238 @@
// Pusher periodically uploads system telemetry to the HarborForge
// backend's /monitor/server/heartbeat endpoint. Replaces the standalone
// `harborforge-monitor` daemon — the plugin's lifecycle (host gateway
// start/stop) bounds the heartbeat loop, so no separate process need
// supervise it.
//
// Wire shape mirrors HarborForge.Monitor's `telemetry.Payload`
// (flat `cpu_pct/mem_pct/...` fields + `X-API-Key` header). The
// translation from internal `telemetry.Snapshot` to that shape lives
// in buildPayload; HF backend stays unchanged.
package monitor
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
)
// PushPayload is the wire shape POSTed to /monitor/server/heartbeat —
// 1:1 with HarborForge.Monitor's `telemetry.Payload`.
type PushPayload struct {
Identifier string `json:"identifier"`
PluginVersion string `json:"plugin_version,omitempty"`
Agents []any `json:"agents"`
NginxInstalled bool `json:"nginx_installed"`
NginxSites []string `json:"nginx_sites"`
CPUPct float64 `json:"cpu_pct,omitempty"`
MemPct float64 `json:"mem_pct,omitempty"`
DiskPct float64 `json:"disk_pct,omitempty"`
SwapPct float64 `json:"swap_pct,omitempty"`
LoadAvg []float64 `json:"load_avg,omitempty"`
UptimeSeconds uint64 `json:"uptime_seconds,omitempty"`
}
// PusherConfig is the operator-supplied tuning.
type PusherConfig struct {
BackendURL string // e.g. https://hf-api.hangman-lab.top
APIKey string // sent as X-API-Key
Interval time.Duration // default 30s when <=0
}
// Pusher runs the periodic POST loop. One per plugin process.
type Pusher struct {
cfg PusherConfig
collect func() telemetry.Snapshot
log LogFunc
http *http.Client
// stats — for the monitor_telemetry tool / status surfacing.
mu sync.RWMutex
lastSentAt time.Time
lastStatus int
lastErr string
successHits uint64
errHits uint64
}
// NewPusher constructs the loop runner. collect must be a snapshot
// producer (caller usually wires it to telemetry.Collect with
// SampleCPU=true).
func NewPusher(cfg PusherConfig, collect func() telemetry.Snapshot, log LogFunc) *Pusher {
if cfg.Interval <= 0 {
cfg.Interval = 30 * time.Second
}
if log == nil {
log = func(string, string, map[string]any) {}
}
return &Pusher{
cfg: cfg,
collect: collect,
log: log,
http: &http.Client{Timeout: 15 * time.Second},
}
}
// Run drives the push loop until ctx is cancelled. Returns ctx.Err().
// First push happens immediately so the backend sees this claw alive
// without waiting an interval.
func (p *Pusher) Run(ctx context.Context) error {
if p.cfg.BackendURL == "" {
p.log("warn", "monitor push disabled (empty backendURL)", nil)
return nil
}
if p.cfg.APIKey == "" {
p.log("warn", "monitor push disabled (empty apiKey)", nil)
return nil
}
url := strings.TrimRight(p.cfg.BackendURL, "/") + "/monitor/server/heartbeat"
tick := time.NewTicker(p.cfg.Interval)
defer tick.Stop()
p.pushOnce(ctx, url)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
p.pushOnce(ctx, url)
}
}
}
func (p *Pusher) pushOnce(ctx context.Context, url string) {
snap := p.collect()
body, err := json.Marshal(buildPayload(snap))
if err != nil {
p.recordErr("marshal: " + err.Error())
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
p.recordErr("build req: " + err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-API-Key", p.cfg.APIKey)
res, err := p.http.Do(req)
if err != nil {
p.recordErr("send: " + err.Error())
p.log("warn", "monitor push failed", map[string]any{"err": err.Error()})
return
}
defer res.Body.Close()
raw, _ := io.ReadAll(res.Body)
if res.StatusCode < 200 || res.StatusCode >= 300 {
p.recordErr(fmt.Sprintf("%d: %s", res.StatusCode, truncate(raw, 200)))
p.log("warn", "monitor push non-2xx", map[string]any{
"status": res.StatusCode, "body": truncate(raw, 200),
})
return
}
p.recordOK(res.StatusCode)
}
// Stats exposes a copy of the latest push state for diagnostics
// (harborforge_monitor_telemetry tool surfaces this).
type PushStats struct {
LastSentAt time.Time
LastStatus int
LastErr string
SuccessCount uint64
ErrorCount uint64
}
func (p *Pusher) Stats() PushStats {
p.mu.RLock()
defer p.mu.RUnlock()
return PushStats{
LastSentAt: p.lastSentAt,
LastStatus: p.lastStatus,
LastErr: p.lastErr,
SuccessCount: p.successHits,
ErrorCount: p.errHits,
}
}
func (p *Pusher) recordOK(status int) {
p.mu.Lock()
wasFirst := p.successHits == 0
p.lastSentAt = time.Now().UTC()
p.lastStatus = status
p.lastErr = ""
p.successHits++
count := p.successHits
p.mu.Unlock()
// First success is an operator signal that the push loop is live;
// log it loud so the journal carries proof. Subsequent successes
// log on a slow heartbeat (every 60 cycles) so the journal stays
// quiet but still proves the loop hasn't drifted into "0 successes
// but no errors either" territory.
if wasFirst {
p.log("info", "monitor push started", map[string]any{"status": status})
} else if count%60 == 0 {
p.log("info", "monitor push heartbeat", map[string]any{
"successes": count, "status": status,
})
}
}
func (p *Pusher) recordErr(msg string) {
p.mu.Lock()
defer p.mu.Unlock()
p.lastSentAt = time.Now().UTC()
p.lastErr = msg
p.errHits++
}
// buildPayload translates the internal Snapshot into the flat
// PushPayload shape the backend expects. agents is passed through as
// []any (one entry per agent — id/model/state preserved).
func buildPayload(snap telemetry.Snapshot) PushPayload {
agents := make([]any, 0, len(snap.Agents))
for _, a := range snap.Agents {
agents = append(agents, map[string]any{
"id": a.ID,
"model": a.Model,
"state": a.State,
})
}
return PushPayload{
Identifier: snap.Identifier,
PluginVersion: snap.PluginInfo.Version,
Agents: agents,
// nginx detection is independent monitor's responsibility today;
// HF plugin leaves it blank rather than rediscovering nginx
// state. Operators that need it can keep the standalone monitor
// alongside or wait for a follow-up commit.
NginxInstalled: false,
NginxSites: []string{},
CPUPct: round1(snap.CPU.UsedPercent),
MemPct: round1(snap.Memory.UsedPercent),
DiskPct: round1(snap.Disk.UsedPercent),
SwapPct: round1(snap.Swap.UsedPercent),
LoadAvg: []float64{round2(snap.Load.One), round2(snap.Load.Five), round2(snap.Load.Fifteen)},
UptimeSeconds: snap.UptimeSecs,
}
}
func round1(v float64) float64 { return float64(int64(v*10+0.5)) / 10 }
func round2(v float64) float64 { return float64(int64(v*100+0.5)) / 100 }
func truncate(b []byte, n int) string {
if len(b) <= n {
return string(b)
}
return string(b[:n]) + "…"
}

View File

@@ -29,14 +29,30 @@ type Snapshot struct {
Hostname string `json:"hostname"`
UptimeSecs uint64 `json:"uptime"`
Memory MemoryInfo `json:"memory"`
Swap SwapInfo `json:"swap"`
Load LoadInfo `json:"load"`
Disk DiskInfo `json:"disk"`
CPU CPUInfo `json:"cpu"`
Agents []AgentInfo `json:"agents"`
PluginInfo PluginInfo `json:"plugin"`
CapturedAt time.Time `json:"captured_at"`
HostMetadata map[string]string `json:"host_metadata,omitempty"`
}
// SwapInfo is the system swap usage. Zeroes when swap isn't configured.
type SwapInfo struct {
Total uint64 `json:"total"`
Free uint64 `json:"free"`
Used uint64 `json:"used"`
UsedPercent float64 `json:"used_percent"`
}
// CPUInfo holds the most recent CPU usage estimate. UsedPercent is
// computed across one sample interval (see Collect's cpu helper).
type CPUInfo struct {
UsedPercent float64 `json:"used_percent"`
}
// MemoryInfo mirrors OpenclawPlugin's memory shape.
type MemoryInfo struct {
Total uint64 `json:"total"` // bytes
@@ -84,15 +100,27 @@ type CollectOpts struct {
Identifier string
Version string
AgentLister func() []AgentInfo // resolved by the caller (plugin uses HostAPI to walk agents)
// SampleCPU asks Collect to take a 1-second CPU sample. Off-path
// (status endpoint, bridge serve) leave false to keep calls cheap;
// the slow push loop sets it true.
SampleCPU bool
}
// Collect produces a fresh snapshot from /proc + the supplied AgentLister.
// SampleCPU=true takes a 1-second CPU sample (two reads of /proc/stat
// with a sleep between); otherwise CPU usage stays zero. Set true on
// the slow push loop, false on the cheap on-demand status endpoint.
func Collect(opts CollectOpts) Snapshot {
now := time.Now().UTC()
host, _ := os.Hostname()
mem := readMemInfo()
mem, swap := readMemAndSwap()
load := readLoadAvg()
disk := readDiskRoot()
cpu := CPUInfo{}
if opts.SampleCPU {
cpu.UsedPercent = sampleCPUPercent(time.Second)
}
var agents []AgentInfo
if opts.AgentLister != nil {
agents = opts.AgentLister()
@@ -103,8 +131,10 @@ func Collect(opts CollectOpts) Snapshot {
Hostname: host,
UptimeSecs: readUptime(),
Memory: mem,
Swap: swap,
Load: load,
Disk: disk,
CPU: cpu,
Agents: agents,
PluginInfo: PluginInfo{
Name: "harbor-forge",
@@ -117,10 +147,10 @@ func Collect(opts CollectOpts) Snapshot {
// ---- /proc helpers ----
func readMemInfo() MemoryInfo {
func readMemAndSwap() (MemoryInfo, SwapInfo) {
f, err := os.Open("/proc/meminfo")
if err != nil {
return MemoryInfo{}
return MemoryInfo{}, SwapInfo{}
}
defer f.Close()
fields := map[string]uint64{}
@@ -145,6 +175,12 @@ func readMemInfo() MemoryInfo {
// All MemInfo values are in KB; convert to bytes.
fields[key] = v * 1024
}
mem := buildMemInfo(fields)
swap := buildSwapInfo(fields)
return mem, swap
}
func buildMemInfo(fields map[string]uint64) MemoryInfo {
total := fields["MemTotal"]
free := fields["MemAvailable"]
if free == 0 {
@@ -158,6 +194,67 @@ func readMemInfo() MemoryInfo {
return MemoryInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
}
func buildSwapInfo(fields map[string]uint64) SwapInfo {
total := fields["SwapTotal"]
free := fields["SwapFree"]
if total == 0 {
return SwapInfo{}
}
used := total - free
pct := float64(used) / float64(total) * 100
return SwapInfo{Total: total, Free: free, Used: used, UsedPercent: pct}
}
// sampleCPUPercent computes overall CPU usage across one sample
// interval. Two reads of /proc/stat's aggregate "cpu" line, derive
// busy-time delta as (1 - idle/total). Returns 0 on read failure.
func sampleCPUPercent(interval time.Duration) float64 {
total1, idle1, ok := readCPUStat()
if !ok {
return 0
}
time.Sleep(interval)
total2, idle2, ok := readCPUStat()
if !ok || total2 <= total1 {
return 0
}
totalDelta := total2 - total1
idleDelta := idle2 - idle1
if idleDelta > totalDelta {
return 0
}
return float64(totalDelta-idleDelta) / float64(totalDelta) * 100
}
func readCPUStat() (total, idle uint64, ok bool) {
f, err := os.Open("/proc/stat")
if err != nil {
return 0, 0, false
}
defer f.Close()
sc := bufio.NewScanner(f)
if !sc.Scan() {
return 0, 0, false
}
parts := strings.Fields(sc.Text())
if len(parts) < 5 || parts[0] != "cpu" {
return 0, 0, false
}
for i := 1; i < len(parts); i++ {
v, err := strconv.ParseUint(parts[i], 10, 64)
if err != nil {
return 0, 0, false
}
total += v
// idle is the 4th column (parts[4]); iowait (parts[5]) is also
// idle-ish but we count it as busy to match gopsutil's default.
if i == 4 {
idle = v
}
}
return total, idle, true
}
func readLoadAvg() LoadInfo {
raw, err := os.ReadFile("/proc/loadavg")
if err != nil {

284
internal/tools/kb.go Normal file
View File

@@ -0,0 +1,284 @@
// kb.go — dynamic-kb-* tool implementations (DESIGN-DYNAMIC-BLOCK.md
// §3.3 / §4.4). Each tool returns a sdkplugin.ToolResult; errors are
// surfaced as IsError:true rather than RPC errors so the model sees a
// human-readable message.
//
// All 5 tools need the agent id (from ctx) + session id (looked up
// via Deps.SessionForAgent — main.go threads this from the per-turn
// RenderDynamicSubblock callback that updates the lookup table).
package tools
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
)
// KBDeps bundles the dependencies the KB tools need. Caller (main.go)
// constructs once and reuses across all tool calls.
type KBDeps struct {
Client *kbclient.Client
ProfileRoot string // <PLEXUM_PROFILE_ROOT>
SessionFor func(agentID string) string // returns sessionID or ""
Turn func(agentID string) int // current turn (best-effort, can return 0)
HostCallTool func(ctx context.Context, agentID, toolName string, input json.RawMessage) (json.RawMessage, error)
}
// kb tool inputs
type listKBsInput struct {
ProjectCode string `json:"project-code,omitempty"`
}
type listTopicsInput struct {
KBCode string `json:"kb-code"`
}
type listFactsInput struct {
KBCode string `json:"kb-code"`
TopicIDs []int `json:"topic-ids"`
}
type cacheInput struct {
KBCode string `json:"kb-code"`
FactIDs []int `json:"fact-ids"`
PreviousDynamicID string `json:"previous-dynamic-id,omitempty"`
}
type evictInput struct {
FactIDs []int `json:"fact-ids"`
}
type cacheResult struct {
Added []int `json:"added,omitempty"`
AlreadyCached []int `json:"already_cached,omitempty"`
Missing []int `json:"missing,omitempty"`
Note string `json:"note"`
}
type evictResult struct {
Evicted []int `json:"evicted,omitempty"`
NotCached []int `json:"not_cached,omitempty"`
Note string `json:"note"`
}
// ToolKBListKBs implements dynamic-kb-list-kbs.
func ToolKBListKBs(ctx context.Context, deps KBDeps, raw json.RawMessage) (sdkplugin.ToolResult, error) {
if deps.Client == nil {
return errResult("dynamic-kb-list-kbs: HF KB backend unavailable")
}
var in listKBsInput
if len(raw) > 0 {
_ = json.Unmarshal(raw, &in)
}
kbs, err := deps.Client.ListKBs(ctx, in.ProjectCode)
if err != nil {
return errResult("dynamic-kb-list-kbs: " + err.Error())
}
var sb strings.Builder
if in.ProjectCode != "" {
fmt.Fprintf(&sb, "project: %s\n", in.ProjectCode)
}
fmt.Fprintf(&sb, "kbs: %d\n\n", len(kbs))
for _, k := range kbs {
fmt.Fprintf(&sb, "[%s] %s\n %s\n\n", k.KnowledgeBaseCode, k.Title, k.Description)
}
sb.WriteString("Call dynamic-kb-list-topics({kb-code: \"<code>\"}) to drill into a KB.\n")
return okResult(sb.String())
}
// ToolKBListTopics implements dynamic-kb-list-topics.
func ToolKBListTopics(ctx context.Context, deps KBDeps, raw json.RawMessage) (sdkplugin.ToolResult, error) {
if deps.Client == nil {
return errResult("dynamic-kb-list-topics: HF KB backend unavailable")
}
var in listTopicsInput
if err := json.Unmarshal(raw, &in); err != nil {
return errResult("dynamic-kb-list-topics: parse: " + err.Error())
}
if in.KBCode == "" {
return errResult("dynamic-kb-list-topics: kb-code required")
}
topics, err := deps.Client.ListTopics(ctx, in.KBCode)
if err != nil {
return errResult("dynamic-kb-list-topics: " + err.Error())
}
var sb strings.Builder
fmt.Fprintf(&sb, "kb: %s\ntopics: %d\n\n", in.KBCode, len(topics))
for _, t := range topics {
fmt.Fprintf(&sb, "[%d] %s\n %s\n\n", t.ID, t.Topic, t.Description)
}
fmt.Fprintf(&sb, "Call dynamic-kb-list-facts({kb-code: %q, topic-ids: [<id>, ...]}) to drill into facts.\n",
in.KBCode)
return okResult(sb.String())
}
// ToolKBListFacts implements dynamic-kb-list-facts.
func ToolKBListFacts(ctx context.Context, deps KBDeps, raw json.RawMessage) (sdkplugin.ToolResult, error) {
if deps.Client == nil {
return errResult("dynamic-kb-list-facts: HF KB backend unavailable")
}
var in listFactsInput
if err := json.Unmarshal(raw, &in); err != nil {
return errResult("dynamic-kb-list-facts: parse: " + err.Error())
}
if in.KBCode == "" || len(in.TopicIDs) == 0 {
return errResult("dynamic-kb-list-facts: kb-code + topic-ids required")
}
facts, err := deps.Client.ListFacts(ctx, in.KBCode, in.TopicIDs)
if err != nil {
return errResult("dynamic-kb-list-facts: " + err.Error())
}
var sb strings.Builder
fmt.Fprintf(&sb, "kb: %s\ntopics: %v\nfacts: %d\n\n", in.KBCode, in.TopicIDs, len(facts))
for _, f := range facts {
fmt.Fprintf(&sb, "[%d] (topic %d/%s) %s\n %s\n\n",
f.ID, f.TopicID, f.TopicSlug, f.Title, f.Snippet)
}
sb.WriteString(
"Call dynamic-kb-cache({kb-code: \"" + in.KBCode +
"\", fact-ids: [<id>, ...]}) to commit selected facts to your kb-block.\n",
)
return okResult(sb.String())
}
// ToolKBCache implements dynamic-kb-cache. Per-agent session lookup
// resolves the kb-block.json location.
func ToolKBCache(ctx context.Context, deps KBDeps, agentID string, raw json.RawMessage) (sdkplugin.ToolResult, error) {
if deps.Client == nil {
return errResult("dynamic-kb-cache: HF KB backend unavailable")
}
sessionID := ""
if deps.SessionFor != nil {
sessionID = deps.SessionFor(agentID)
}
if agentID == "" || sessionID == "" {
return errResult("dynamic-kb-cache: no per-session context (agent/session unknown)")
}
var in cacheInput
if err := json.Unmarshal(raw, &in); err != nil {
return errResult("dynamic-kb-cache: parse: " + err.Error())
}
if in.KBCode == "" || len(in.FactIDs) == 0 {
return errResult("dynamic-kb-cache: kb-code + fact-ids required")
}
block, err := kbblock.Open(deps.ProfileRoot, agentID, sessionID)
if err != nil {
return errResult("dynamic-kb-cache: open block: " + err.Error())
}
// Split into already-cached vs to-fetch.
var toFetch []int
var alreadyCached []int
for _, id := range in.FactIDs {
if block.Has(id) {
alreadyCached = append(alreadyCached, id)
} else {
toFetch = append(toFetch, id)
}
}
var fetched []kbclient.Fact
if len(toFetch) > 0 {
fetched, err = deps.Client.GetFacts(ctx, in.KBCode, toFetch)
if err != nil {
return errResult("dynamic-kb-cache: " + err.Error())
}
}
turn := 0
if deps.Turn != nil {
turn = deps.Turn(agentID)
}
var added []int
fetchedByID := map[int]kbclient.Fact{}
for _, f := range fetched {
fetchedByID[f.ID] = f
}
for _, id := range toFetch {
f, ok := fetchedByID[id]
if !ok {
continue
}
if e := block.Add(f.ID, in.KBCode, f.TopicSlug, f.Content, turn); e != nil {
added = append(added, f.ID)
}
}
if err := block.Save(); err != nil {
return errResult("dynamic-kb-cache: save: " + err.Error())
}
missing := diffInts(toFetch, added)
sort.Ints(added)
sort.Ints(alreadyCached)
res := cacheResult{
Added: added,
AlreadyCached: alreadyCached,
Missing: missing,
Note: "Newly cached facts are available starting your next turn.",
}
out, _ := json.Marshal(res)
return okResult(string(out))
}
// ToolKBEvict implements dynamic-kb-evict.
func ToolKBEvict(ctx context.Context, deps KBDeps, agentID string, raw json.RawMessage) (sdkplugin.ToolResult, error) {
sessionID := ""
if deps.SessionFor != nil {
sessionID = deps.SessionFor(agentID)
}
if agentID == "" || sessionID == "" {
return errResult("dynamic-kb-evict: no per-session context")
}
var in evictInput
if err := json.Unmarshal(raw, &in); err != nil {
return errResult("dynamic-kb-evict: parse: " + err.Error())
}
if len(in.FactIDs) == 0 {
return errResult("dynamic-kb-evict: fact-ids required")
}
block, err := kbblock.Open(deps.ProfileRoot, agentID, sessionID)
if err != nil {
return errResult("dynamic-kb-evict: open block: " + err.Error())
}
evicted := block.Remove(in.FactIDs...)
if err := block.Save(); err != nil {
return errResult("dynamic-kb-evict: save: " + err.Error())
}
notCached := diffInts(in.FactIDs, evicted)
sort.Ints(evicted)
res := evictResult{
Evicted: evicted,
NotCached: notCached,
Note: "Evictions take effect starting your next turn.",
}
out, _ := json.Marshal(res)
return okResult(string(out))
}
// ---- helpers ----
func diffInts(a, b []int) []int {
if len(a) == 0 {
return nil
}
want := make(map[int]bool, len(b))
for _, x := range b {
want[x] = true
}
out := make([]int, 0, len(a))
for _, x := range a {
if !want[x] {
out = append(out, x)
}
}
if len(out) == 0 {
return nil
}
sort.Ints(out)
return out
}

View File

@@ -27,6 +27,7 @@ type Deps struct {
Version string
Collect func() telemetry.Snapshot
Bridge *monitor.Bridge
Pusher *monitor.Pusher
Scheduler *calendar.Scheduler
Host sdkplugin.HostAPI
@@ -34,6 +35,12 @@ type Deps struct {
// host injects this via the tool dispatch context; main.go's
// CallTool reads it from the ctx and stashes here.
AgentIDFromCtx func(ctx context.Context) string
// KB wires the dynamic-kb-* tool family (DESIGN-DYNAMIC-BLOCK.md
// §3.3 / §4.4). Zero value when HF KB backend is unconfigured;
// each KB tool then returns a graceful "backend unavailable"
// error rather than crashing.
KB KBDeps
}
// Dispatch is the entry point main.go's ToolPlugin.CallTool calls.
@@ -60,6 +67,16 @@ func Dispatch(ctx context.Context, deps Deps, name string, input json.RawMessage
return toolCalendarResume(ctx, deps)
case "harborforge_restart_status":
return toolRestartStatus(deps)
case "dynamic-kb-list-kbs":
return ToolKBListKBs(ctx, deps.KB, input)
case "dynamic-kb-list-topics":
return ToolKBListTopics(ctx, deps.KB, input)
case "dynamic-kb-list-facts":
return ToolKBListFacts(ctx, deps.KB, input)
case "dynamic-kb-cache":
return ToolKBCache(ctx, deps.KB, deps.AgentIDFromCtx(ctx), input)
case "dynamic-kb-evict":
return ToolKBEvict(ctx, deps.KB, deps.AgentIDFromCtx(ctx), input)
}
return sdkplugin.ToolResult{
IsError: true,
@@ -89,11 +106,32 @@ func toolStatus(deps Deps) (sdkplugin.ToolResult, error) {
"queries": bs.Queries,
"last_query": bs.LastQuery,
},
"monitor_push": monitorPushSummary(deps),
"calendar": sch,
}
return jsonResult(out)
}
// monitorPushSummary returns the pusher's last-known state in the same
// JSON layout the status/monitor_telemetry tools surface. Nil-safe: if
// no pusher is wired (testing, push disabled), reports enabled=false.
func monitorPushSummary(deps Deps) map[string]any {
out := map[string]any{
"enabled": deps.Config.MonitorPushEnabled,
"interval_seconds": deps.Config.MonitorPushIntervalSeconds,
"endpoint": deps.Config.BackendURL + "/monitor/server/heartbeat",
}
if deps.Pusher != nil {
st := deps.Pusher.Stats()
out["last_sent_at"] = st.LastSentAt
out["last_status"] = st.LastStatus
out["last_err"] = st.LastErr
out["success_count"] = st.SuccessCount
out["error_count"] = st.ErrorCount
}
return out
}
func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
return jsonResult(deps.Collect())
}
@@ -101,11 +139,14 @@ func toolTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
func toolMonitorTelemetry(deps Deps) (sdkplugin.ToolResult, error) {
bs := deps.Bridge.Stats()
return jsonResult(map[string]any{
"bridge": map[string]any{
"port": bs.Port,
"listening": bs.Listening,
"queries": bs.Queries,
"last_query": bs.LastQuery,
"last_snapshot": bs.LastSnap,
},
"push": monitorPushSummary(deps),
})
}
@@ -176,10 +217,14 @@ func toolCalendarResume(ctx context.Context, deps Deps) (sdkplugin.ToolResult, e
}
func toolRestartStatus(deps Deps) (sdkplugin.ToolResult, error) {
// HarborForge backend doesn't expose a restart-pending endpoint
// (verified via /openapi.json) so we report the most recent
// heartbeat freshness instead. Useful for operators sanity-
// checking that the plugin's calendar loop is still alive.
sch := deps.Scheduler.Status()
return jsonResult(map[string]any{
"pending": sch.RestartPending,
"last_heartbeat": sch.LastHeartbeat,
"pending": false,
"last_heartbeats": sch.LastHeartbeats,
"observed_at": time.Now().UTC(),
})
}

View File

@@ -49,7 +49,35 @@
"name": "harborforge_restart_status",
"description": "Check whether a Plexum host restart is pending (backend-driven flag). Reports last poll time and pending flag.",
"inputSchema": {"type": "object"}
},
{
"name": "dynamic-kb-list-kbs",
"description": "List HarborForge Knowledge Bases the agent can access. Optional project-code filter (equivalent to `hf knowledge-base list [--project <code>]`). Returns code/title/description per KB. Browse — follow up with dynamic-kb-list-topics to drill into one.",
"inputSchema": {"type": "object", "properties": {"project-code": {"type": "string"}}}
},
{
"name": "dynamic-kb-list-topics",
"description": "List topics in one KB by code. Returns topic id/slug/description per row. Browse — follow up with dynamic-kb-list-facts.",
"inputSchema": {"type": "object", "properties": {"kb-code": {"type": "string"}}, "required": ["kb-code"]}
},
{
"name": "dynamic-kb-list-facts",
"description": "List facts in given topics. Returns lightweight previews (id/topic/title/snippet). Browse — feed into dynamic-kb-cache to commit selected facts to your kb-block.",
"inputSchema": {"type": "object", "properties": {"kb-code": {"type": "string"}, "topic-ids": {"type": "array", "items": {"type": "integer"}}}, "required": ["kb-code", "topic-ids"]}
},
{
"name": "dynamic-kb-cache",
"description": "Cache specific KB facts into your per-session kb-block (visible in System prompt next turn). Pass previous-dynamic-id to consume the matching list-facts browse output.",
"inputSchema": {"type": "object", "properties": {"kb-code": {"type": "string"}, "fact-ids": {"type": "array", "items": {"type": "integer"}}, "previous-dynamic-id": {"type": "string"}}, "required": ["kb-code", "fact-ids"]}
},
{
"name": "dynamic-kb-evict",
"description": "Remove cached facts from your kb-block (free ctx). Takes effect starting your next turn.",
"inputSchema": {"type": "object", "properties": {"fact-ids": {"type": "array", "items": {"type": "integer"}}}, "required": ["fact-ids"]}
}
],
"dynamicSubblocks": [
{"name": "kb-block"}
]
}
}

View File

@@ -37,12 +37,16 @@ Next steps:
"harbor-forge"
2. Write ${PLUGIN_DIR}/config.json — sample:
{
"backendUrl": "https://monitor.hangman-lab.top",
"backendUrl": "https://hf-api.hangman-lab.top",
"identifier": "server-t3",
"apiKey": "g1_xxx",
"monitor_port": 9100,
"apiKey": "<copy from HF_MONITER_API_KEY>",
"monitor_push_enabled": true,
"monitor_push_interval_seconds": 30,
"monitor_port": 0,
"calendar_enabled": true,
"calendar_heartbeat_interval_seconds": 30
}
3. Restart the host: systemctl --user restart plexum
4. Verify push is landing (DB last_seen_at advancing) and then
remove the standalone harborforge-monitor container.
EOF