Files
HarborForge.PlexumPlugin/cmd/plexum-harborforge-plugin/main.go
hzhang 9e43021ba5 feat(kb): dynamic-kb-* tool family + <kb-block> subblock provider
Implements the HF side of Plexum DESIGN-DYNAMIC-BLOCK.md Phase 2 — the
HarborForge knowledge-base block agents cache facts into per session.
Cross-runtime aligned with the ClawSkills workflow text (same tool
names + input schemas + return shapes will land on openclaw side later).

manifest.json:
  - 5 new dynamic-kb-* tool contracts (list-kbs / list-topics /
    list-facts / cache / evict)
  - dynamicSubblocks contract entry declaring this plugin owns
    the "kb-block" <dynamic-block> subblock

internal/kbblock/ (new):
  - Per-session storage at <PLEXUM_PROFILE_ROOT>/agents/<id>/sessions/
    <sid>/plugins/harbor-forge/kb-block.json
  - Entry carries ID (HF backend DB primary key) + KBCode + SourceTopic
    + Content + InsertSeq for §9 #4 cache-insertion-order rendering
  - Render emits <kb-fact id=N kb=<code> source=topic:<slug>>...
    </kb-fact> (no title/description per §9 #8; source attr omitted
    when empty)
  - Fade NOT applied in v1 — §9 #3 lock has fade params shared with
    memory but implementation deferred until prod data informs whether
    KB needs it; agent dynamic-kb-evict is the only eviction path
  - 11 unit tests

internal/kbclient/ (new):
  - Typed HTTP client for HarborForge.Backend KB routes verified
    against app/api/routers/knowledge.py
  - GET /knowledge-bases[?project=<code>] (list KBs)
  - GET /knowledge-bases/{kb_code}/topics (list topics)
  - GET /knowledge-bases/{kb_code}/tree (full hierarchy — ListFacts
    flattens this client-side filtered by topic ids; backend has no
    flat list-facts-in-topic route)
  - GET /knowledge-facts/{id} per fact (GetFacts batch loop)
  - Auth: plugin-level Bearer APIKey. Per-agent hf-token resolution
    is a TODO when SDK exposes secret-mgr access.

internal/tools/kb.go (new) + tools.go:
  - 5 tool functions hooked into Dispatch
  - KBDeps struct bundles Client + ProfileRoot + SessionFor + Turn
  - Cache/evict use SessionFor lookup populated by main.go's
    RenderDynamicSubblock (called per turn by host; carries sessionID)

cmd/plexum-harborforge-plugin/main.go:
  - kbClient field initialized when BackendURL + APIKey present
  - profileRoot cached for kbblock path resolution
  - agentSession sync.Map tracks agentID → sessionID; populated by
    RenderDynamicSubblock so subsequent tool calls in the same turn
    can resolve the per-session kb-block.json path
  - Implements sdkplugin.DynamicBlockProvider.RenderDynamicSubblock:
    opens kbblock for (agentID, sessionID) and returns its Render()
    body; host wraps in <kb-block>...</kb-block>

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-05 20:15:34 +01:00

344 lines
11 KiB
Go

// plexum-harborforge-plugin — Plexum-side HarborForge plugin.
//
// Mirrors HarborForge.OpenclawPlugin's responsibilities, recast on the
// Plexum SDK:
//
// - eager activation: plugin spawns at host start so the Monitor
// bridge listener and Calendar scheduler are running before any
// turn fires
// - 9 harborforge_* tools backed by tools.Dispatch
// - state-aware wake-agent via HostAPI.WakeAgent for Calendar slots
//
// Config layout: <profile>/plugins/harbor-forge/config.json — see
// internal/config/config.go for the schema.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar"
hfcfg "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/tools"
)
// Version is injected via -ldflags "-X main.Version=…" at build time.
var Version = "0.1.0"
// harborForgePlugin satisfies sdkplugin.ToolPlugin AND
// sdkplugin.DynamicBlockProvider (the <kb-block> subblock contributor
// per Plexum DESIGN-DYNAMIC-BLOCK.md §3.3).
type harborForgePlugin struct {
host sdkplugin.HostAPI
cfg hfcfg.Resolved
bridge *monitor.Bridge
pusher *monitor.Pusher
sched *calendar.Scheduler
kbClient *kbclient.Client
deps tools.Deps
cancelBg context.CancelFunc
wg sync.WaitGroup
agentCache sync.Map // sessionID/turnID → agentID stash (best-effort)
// profileRoot caches PLEXUM_PROFILE_ROOT for kbblock path resolution.
profileRoot string
// agentSession maps agentID → sessionID, populated each turn by
// the host's plexum/plugin/dynamic-block/render RPC (which carries
// both ids). dynamic-kb-cache + dynamic-kb-evict read this to
// resolve the per-session kb-block.json path. Best-effort —
// before the first RenderDynamicSubblock fires for an agent the
// map is empty and the kb tools surface a clear error.
agentSession sync.Map // agentID (string) → sessionID (string)
}
func (p *harborForgePlugin) Manifest() sdkplugin.Manifest {
return manifestFromDisk()
}
func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) error {
p.host = host
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
if profileRoot == "" {
home, _ := os.UserHomeDir()
profileRoot = filepath.Join(home, ".plexum")
}
p.profileRoot = profileRoot
raw, err := hfcfg.Load(profileRoot)
if err != nil {
return fmt.Errorf("load harbor-forge config: %w", err)
}
p.cfg = hfcfg.Resolve(raw)
host.Log("info", "harbor-forge plugin initialized", map[string]any{
"version": Version,
"backend": p.cfg.BackendURL,
"identifier": p.cfg.Identifier,
"monitor_port": p.cfg.MonitorPort,
"monitor_push_enabled": p.cfg.MonitorPushEnabled,
"calendar_enabled": p.cfg.CalendarEnabled,
})
bgCtx, cancel := context.WithCancel(context.Background())
p.cancelBg = cancel
// Listers + collectors capture bgCtx (not Init ctx) — Init returns
// once MCP initialize completes, but the plugin process lives on
// and so do the goroutines + closures we registered.
makeCollector := func(sampleCPU bool) func() telemetry.Snapshot {
return func() telemetry.Snapshot {
return telemetry.Collect(telemetry.CollectOpts{
Identifier: p.cfg.Identifier,
Version: Version,
SampleCPU: sampleCPU,
AgentLister: func() []telemetry.AgentInfo {
return p.listAgents(bgCtx, profileRoot)
},
})
}
}
// Bridge serves on-demand reads; cheap, no CPU sampling.
collect := makeCollector(false)
// Pusher runs the slow push loop; CPU sampling fine here.
collectForPush := makeCollector(true)
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if err := p.bridge.Start(bgCtx); err != nil {
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
}
// Active push loop — replaces the standalone harborforge-monitor
// container. Off by default; operator opts in via
// monitor_push_enabled + apiKey.
p.pusher = monitor.NewPusher(monitor.PusherConfig{
BackendURL: p.cfg.BackendURL,
APIKey: p.cfg.APIKey,
Interval: time.Duration(p.cfg.MonitorPushIntervalSeconds) * time.Second,
}, collectForPush,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if p.cfg.MonitorPushEnabled {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.pusher.Run(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
host.Log("warn", "monitor pusher exited", map[string]any{"err": err.Error()})
}
}()
}
calBackend := p.cfg.CalendarBackendURL
if calBackend == "" {
calBackend = p.cfg.BackendURL
}
bridge := calendar.New(calBackend, p.cfg.Identifier)
p.sched = calendar.NewScheduler(
calendar.Config{
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
},
bridge, host,
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
func() []calendar.ReportableAgent {
return p.listReportableAgents(bgCtx, profileRoot)
},
)
if p.cfg.CalendarEnabled {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.sched.Run(bgCtx); err != nil {
host.Log("warn", "calendar scheduler exited", map[string]any{"err": err.Error()})
}
}()
} else {
host.Log("info", "calendar scheduler disabled by config", nil)
}
// KB HTTP client — shares plugin-level APIKey via Bearer (per-agent
// hf-token resolution is a TODO when SDK exposes secret-mgr access).
if p.cfg.BackendURL != "" && p.cfg.APIKey != "" {
p.kbClient = kbclient.New(p.cfg.BackendURL, p.cfg.APIKey)
} else {
host.Log("info", "kb client not initialized (need BackendURL + APIKey); dynamic-kb-* tools will return backend-unavailable", nil)
}
p.deps = tools.Deps{
Config: p.cfg,
Version: Version,
Collect: collect,
Bridge: p.bridge,
Pusher: p.pusher,
Scheduler: p.sched,
Host: host,
AgentIDFromCtx: func(ctx context.Context) string {
// Host attaches the caller agent id via tools/call
// `_meta.agent_id`; SDK unpacks it into ctx.
if id := sdkplugin.AgentIDFromContext(ctx); id != "" {
return id
}
// Fallback for host paths that don't carry an agent
// (channel-driven, CLI plugin-call). When a single
// calendar slot is active we can deterministically
// attribute the call to that slot's owner.
return p.sched.SingleActiveAgentID()
},
KB: tools.KBDeps{
Client: p.kbClient,
ProfileRoot: profileRoot,
SessionFor: func(agentID string) string {
if v, ok := p.agentSession.Load(agentID); ok {
return v.(string)
}
return ""
},
Turn: func(agentID string) int { return 0 }, // best-effort; turn ctx not available to plugins yet
},
}
return nil
}
// RenderDynamicSubblock implements sdkplugin.DynamicBlockProvider. The
// host calls this once per turn per declared subblock name from this
// plugin's manifest (DESIGN-DYNAMIC-BLOCK.md §2 / §3.3). For
// "kb-block" we open the per-session kb-block.json and return its
// rendered body (host wraps in <kb-block>...</kb-block>). Empty block
// → return "" → host omits the subblock for this turn.
//
// Side effect: stash agentID → sessionID into p.agentSession so the
// dynamic-kb-cache + dynamic-kb-evict tools can resolve the same
// kb-block file when they fire later in the same turn.
func (p *harborForgePlugin) RenderDynamicSubblock(ctx context.Context, agentID, sessionID, name string) (string, error) {
if agentID == "" || sessionID == "" {
return "", nil
}
// Cache the (agent, session) pair for tool dispatch this turn.
p.agentSession.Store(agentID, sessionID)
if name != "kb-block" {
return "", nil
}
block, err := kbblock.Open(p.profileRoot, agentID, sessionID)
if err != nil {
p.host.Log("warn", "open kb-block", map[string]any{
"agent": agentID, "session": sessionID, "err": err.Error(),
})
return "", nil
}
return block.Render(), nil
}
func (p *harborForgePlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (sdkplugin.ToolResult, error) {
return tools.Dispatch(ctx, p.deps, name, input)
}
// ---- agent enumeration ----
// listAgents walks <profile>/agents/*/agent.json + state.json so the
// telemetry payload includes every Plexum agent visible on this host.
// Best-effort: read failures degrade to empty list.
func (p *harborForgePlugin) listAgents(ctx context.Context, profileRoot string) []telemetry.AgentInfo {
root := filepath.Join(profileRoot, "agents")
entries, err := os.ReadDir(root)
if err != nil {
return nil
}
out := make([]telemetry.AgentInfo, 0, len(entries))
for _, e := range entries {
if !e.IsDir() {
continue
}
agentID := e.Name()
var info telemetry.AgentInfo
info.ID = agentID
if raw, err := os.ReadFile(filepath.Join(root, agentID, "agent.json")); err == nil {
var meta struct{ Model string `json:"model"` }
_ = json.Unmarshal(raw, &meta)
info.Model = meta.Model
}
// State via HostAPI.ReadAgentState — host-side, ground truth.
if snap, err := p.host.ReadAgentState(ctx, agentID); err == nil {
info.State = snap.State
}
out = append(out, info)
}
return out
}
func (p *harborForgePlugin) listReportableAgents(ctx context.Context, profileRoot string) []calendar.ReportableAgent {
telem := p.listAgents(ctx, profileRoot)
out := make([]calendar.ReportableAgent, 0, len(telem))
for _, a := range telem {
out = append(out, calendar.ReportableAgent{
ID: a.ID, Model: a.Model,
State: mapStateToCalendar(a.State),
})
}
return out
}
func mapStateToCalendar(s string) calendar.AgentStatusValue {
switch strings.ToLower(s) {
case "idle":
return calendar.AgentStatusIdle
case "working":
return calendar.AgentStatusOnCall
case "busy":
return calendar.AgentStatusBusy
case "offline":
return calendar.AgentStatusOffline
}
return calendar.AgentStatusOffline
}
func manifestFromDisk() sdkplugin.Manifest {
// Bundled manifest.json is the authoritative shape; the binary
// version reads it next to itself to avoid hand-syncing two
// definitions. Falls back to a minimal in-code manifest if the
// file is missing (development / first build).
exe, err := os.Executable()
if err == nil {
raw, err := os.ReadFile(filepath.Join(filepath.Dir(exe), "manifest.json"))
if err == nil {
var m sdkplugin.Manifest
if err := json.Unmarshal(raw, &m); err == nil && m.Name != "" {
return m
}
}
}
return sdkplugin.Manifest{
Name: "harbor-forge",
Version: Version,
Activation: sdkplugin.ActivationEager,
Executable: "plexum-harborforge-plugin",
}
}
func main() {
p := &harborForgePlugin{}
defer func() {
if p.cancelBg != nil {
p.cancelBg()
}
p.wg.Wait()
}()
if err := sdkplugin.Serve(p); err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "plexum-harborforge-plugin: %v\n", err)
os.Exit(1)
}
}