Phase 4b. Wires fade-out for HarborForge kb-block entries using the
new Plexum-sdk-go/fade primitives + RenderDynamicSubblockRequest's
CurrentTurn field.
internal/kbblock/kbblock.go:
- Entry gains Seed int64 (json:"seed") for per-entry RNG seed
- Add() generates a fresh Seed at insert time
- renderLocked() refactored — Render() unchanged behaviour;
RenderFaded(currentTurn, params) added — applies sdkfade.Fade
per entry given (currentTurn - LastRefreshAtTurn) elapsed
- Tick(currentTurn, params) drops entries whose underscore ratio
crossed the m% threshold, returns dropped IDs
- Refresh(ids, currentTurn) resets fade state (regenerates Seed
+ bumps LastRefreshAtTurn) — exposed for a future
dynamic-kb-refresh tool (not in this commit)
cmd/plexum-harborforge-plugin/main.go:
- RenderDynamicSubblock signature now sdkplugin.
RenderDynamicSubblockRequest (per SDK d6fdb9f)
- Each turn: Tick → drop crossed-threshold entries (logs IDs
dropped) → Save (only when something dropped) → RenderFaded
returned. Uses sdkfade.DefaultFadeParams() (5/10/70 per §9 #3).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
363 lines
12 KiB
Go
363 lines
12 KiB
Go
// plexum-harborforge-plugin — Plexum-side HarborForge plugin.
|
|
//
|
|
// Mirrors HarborForge.OpenclawPlugin's responsibilities, recast on the
|
|
// Plexum SDK:
|
|
//
|
|
// - eager activation: plugin spawns at host start so the Monitor
|
|
// bridge listener and Calendar scheduler are running before any
|
|
// turn fires
|
|
// - 9 harborforge_* tools backed by tools.Dispatch
|
|
// - state-aware wake-agent via HostAPI.WakeAgent for Calendar slots
|
|
//
|
|
// Config layout: <profile>/plugins/harbor-forge/config.json — see
|
|
// internal/config/config.go for the schema.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
sdkfade "git.hangman-lab.top/hzhang/Plexum-sdk-go/fade"
|
|
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
|
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar"
|
|
hfcfg "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config"
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor"
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
|
|
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/tools"
|
|
)
|
|
|
|
// Version is injected via -ldflags "-X main.Version=…" at build time.
|
|
var Version = "0.1.0"
|
|
|
|
// harborForgePlugin satisfies sdkplugin.ToolPlugin AND
|
|
// sdkplugin.DynamicBlockProvider (the <kb-block> subblock contributor
|
|
// per Plexum DESIGN-DYNAMIC-BLOCK.md §3.3).
|
|
type harborForgePlugin struct {
|
|
host sdkplugin.HostAPI
|
|
cfg hfcfg.Resolved
|
|
bridge *monitor.Bridge
|
|
pusher *monitor.Pusher
|
|
sched *calendar.Scheduler
|
|
kbClient *kbclient.Client
|
|
deps tools.Deps
|
|
cancelBg context.CancelFunc
|
|
wg sync.WaitGroup
|
|
agentCache sync.Map // sessionID/turnID → agentID stash (best-effort)
|
|
|
|
// profileRoot caches PLEXUM_PROFILE_ROOT for kbblock path resolution.
|
|
profileRoot string
|
|
|
|
// agentSession maps agentID → sessionID, populated each turn by
|
|
// the host's plexum/plugin/dynamic-block/render RPC (which carries
|
|
// both ids). dynamic-kb-cache + dynamic-kb-evict read this to
|
|
// resolve the per-session kb-block.json path. Best-effort —
|
|
// before the first RenderDynamicSubblock fires for an agent the
|
|
// map is empty and the kb tools surface a clear error.
|
|
agentSession sync.Map // agentID (string) → sessionID (string)
|
|
}
|
|
|
|
func (p *harborForgePlugin) Manifest() sdkplugin.Manifest {
|
|
return manifestFromDisk()
|
|
}
|
|
|
|
func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) error {
|
|
p.host = host
|
|
|
|
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
|
|
if profileRoot == "" {
|
|
home, _ := os.UserHomeDir()
|
|
profileRoot = filepath.Join(home, ".plexum")
|
|
}
|
|
p.profileRoot = profileRoot
|
|
raw, err := hfcfg.Load(profileRoot)
|
|
if err != nil {
|
|
return fmt.Errorf("load harbor-forge config: %w", err)
|
|
}
|
|
p.cfg = hfcfg.Resolve(raw)
|
|
host.Log("info", "harbor-forge plugin initialized", map[string]any{
|
|
"version": Version,
|
|
"backend": p.cfg.BackendURL,
|
|
"identifier": p.cfg.Identifier,
|
|
"monitor_port": p.cfg.MonitorPort,
|
|
"monitor_push_enabled": p.cfg.MonitorPushEnabled,
|
|
"calendar_enabled": p.cfg.CalendarEnabled,
|
|
})
|
|
|
|
bgCtx, cancel := context.WithCancel(context.Background())
|
|
p.cancelBg = cancel
|
|
|
|
// Listers + collectors capture bgCtx (not Init ctx) — Init returns
|
|
// once MCP initialize completes, but the plugin process lives on
|
|
// and so do the goroutines + closures we registered.
|
|
makeCollector := func(sampleCPU bool) func() telemetry.Snapshot {
|
|
return func() telemetry.Snapshot {
|
|
return telemetry.Collect(telemetry.CollectOpts{
|
|
Identifier: p.cfg.Identifier,
|
|
Version: Version,
|
|
SampleCPU: sampleCPU,
|
|
AgentLister: func() []telemetry.AgentInfo {
|
|
return p.listAgents(bgCtx, profileRoot)
|
|
},
|
|
})
|
|
}
|
|
}
|
|
// Bridge serves on-demand reads; cheap, no CPU sampling.
|
|
collect := makeCollector(false)
|
|
// Pusher runs the slow push loop; CPU sampling fine here.
|
|
collectForPush := makeCollector(true)
|
|
|
|
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
|
|
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
|
|
|
if err := p.bridge.Start(bgCtx); err != nil {
|
|
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
|
|
}
|
|
|
|
// Active push loop — replaces the standalone harborforge-monitor
|
|
// container. Off by default; operator opts in via
|
|
// monitor_push_enabled + apiKey.
|
|
p.pusher = monitor.NewPusher(monitor.PusherConfig{
|
|
BackendURL: p.cfg.BackendURL,
|
|
APIKey: p.cfg.APIKey,
|
|
Interval: time.Duration(p.cfg.MonitorPushIntervalSeconds) * time.Second,
|
|
}, collectForPush,
|
|
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
|
if p.cfg.MonitorPushEnabled {
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
if err := p.pusher.Run(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
|
|
host.Log("warn", "monitor pusher exited", map[string]any{"err": err.Error()})
|
|
}
|
|
}()
|
|
}
|
|
|
|
calBackend := p.cfg.CalendarBackendURL
|
|
if calBackend == "" {
|
|
calBackend = p.cfg.BackendURL
|
|
}
|
|
bridge := calendar.New(calBackend, p.cfg.Identifier)
|
|
p.sched = calendar.NewScheduler(
|
|
calendar.Config{
|
|
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
|
|
},
|
|
bridge, host,
|
|
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
|
|
func() []calendar.ReportableAgent {
|
|
return p.listReportableAgents(bgCtx, profileRoot)
|
|
},
|
|
)
|
|
if p.cfg.CalendarEnabled {
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer p.wg.Done()
|
|
if err := p.sched.Run(bgCtx); err != nil {
|
|
host.Log("warn", "calendar scheduler exited", map[string]any{"err": err.Error()})
|
|
}
|
|
}()
|
|
} else {
|
|
host.Log("info", "calendar scheduler disabled by config", nil)
|
|
}
|
|
|
|
// KB HTTP client — shares plugin-level APIKey via Bearer (per-agent
|
|
// hf-token resolution is a TODO when SDK exposes secret-mgr access).
|
|
if p.cfg.BackendURL != "" && p.cfg.APIKey != "" {
|
|
p.kbClient = kbclient.New(p.cfg.BackendURL, p.cfg.APIKey)
|
|
} else {
|
|
host.Log("info", "kb client not initialized (need BackendURL + APIKey); dynamic-kb-* tools will return backend-unavailable", nil)
|
|
}
|
|
|
|
p.deps = tools.Deps{
|
|
Config: p.cfg,
|
|
Version: Version,
|
|
Collect: collect,
|
|
Bridge: p.bridge,
|
|
Pusher: p.pusher,
|
|
Scheduler: p.sched,
|
|
Host: host,
|
|
AgentIDFromCtx: func(ctx context.Context) string {
|
|
// Host attaches the caller agent id via tools/call
|
|
// `_meta.agent_id`; SDK unpacks it into ctx.
|
|
if id := sdkplugin.AgentIDFromContext(ctx); id != "" {
|
|
return id
|
|
}
|
|
// Fallback for host paths that don't carry an agent
|
|
// (channel-driven, CLI plugin-call). When a single
|
|
// calendar slot is active we can deterministically
|
|
// attribute the call to that slot's owner.
|
|
return p.sched.SingleActiveAgentID()
|
|
},
|
|
KB: tools.KBDeps{
|
|
Client: p.kbClient,
|
|
ProfileRoot: profileRoot,
|
|
SessionFor: func(agentID string) string {
|
|
if v, ok := p.agentSession.Load(agentID); ok {
|
|
return v.(string)
|
|
}
|
|
return ""
|
|
},
|
|
Turn: func(agentID string) int { return 0 }, // best-effort; turn ctx not available to plugins yet
|
|
},
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RenderDynamicSubblock implements sdkplugin.DynamicBlockProvider. The
|
|
// host calls this once per turn per declared subblock name from this
|
|
// plugin's manifest (DESIGN-DYNAMIC-BLOCK.md §2 / §3.3). For
|
|
// "kb-block" we open the per-session kb-block.json, Tick fade-out
|
|
// drops on entries that crossed the m% threshold (§9 #3), Save the
|
|
// (possibly mutated) block, and return RenderFaded body for the host
|
|
// to wrap in <kb-block>...</kb-block>. Empty block → return "" → host
|
|
// omits the subblock for this turn.
|
|
//
|
|
// Side effect: stash agentID → sessionID into p.agentSession so the
|
|
// dynamic-kb-cache + dynamic-kb-evict tools can resolve the same
|
|
// kb-block file when they fire later in the same turn.
|
|
func (p *harborForgePlugin) RenderDynamicSubblock(ctx context.Context, req sdkplugin.RenderDynamicSubblockRequest) (string, error) {
|
|
if req.AgentID == "" || req.SessionID == "" {
|
|
return "", nil
|
|
}
|
|
// Cache the (agent, session) pair for tool dispatch this turn.
|
|
p.agentSession.Store(req.AgentID, req.SessionID)
|
|
|
|
if req.Name != "kb-block" {
|
|
p.host.Log("warn", "dynamic-block render: unknown subblock name",
|
|
map[string]any{"agent": req.AgentID, "session": req.SessionID, "name": req.Name})
|
|
return "", nil
|
|
}
|
|
block, err := kbblock.Open(p.profileRoot, req.AgentID, req.SessionID)
|
|
if err != nil {
|
|
p.host.Log("warn", "open kb-block", map[string]any{
|
|
"agent": req.AgentID, "session": req.SessionID, "err": err.Error(),
|
|
})
|
|
return "", nil
|
|
}
|
|
// Apply fade tick (drop entries that crossed the m% threshold).
|
|
// Uses DESIGN-DYNAMIC-BLOCK.md §9 #3 spec defaults (5/10/70).
|
|
fadeParams := sdkfade.DefaultFadeParams()
|
|
if tick := block.Tick(req.CurrentTurn, fadeParams); len(tick.FadedOut) > 0 {
|
|
p.host.Log("info", "kb-block fade tick", map[string]any{
|
|
"agent": req.AgentID,
|
|
"session": req.SessionID,
|
|
"dropped": tick.FadedOut,
|
|
})
|
|
if err := block.Save(); err != nil {
|
|
p.host.Log("warn", "save kb-block after tick",
|
|
map[string]any{"err": err.Error()})
|
|
}
|
|
}
|
|
return block.RenderFaded(req.CurrentTurn, fadeParams), nil
|
|
}
|
|
|
|
func (p *harborForgePlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (sdkplugin.ToolResult, error) {
|
|
return tools.Dispatch(ctx, p.deps, name, input)
|
|
}
|
|
|
|
// ---- agent enumeration ----
|
|
|
|
// listAgents walks <profile>/agents/*/agent.json + state.json so the
|
|
// telemetry payload includes every Plexum agent visible on this host.
|
|
// Best-effort: read failures degrade to empty list.
|
|
func (p *harborForgePlugin) listAgents(ctx context.Context, profileRoot string) []telemetry.AgentInfo {
|
|
root := filepath.Join(profileRoot, "agents")
|
|
entries, err := os.ReadDir(root)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
out := make([]telemetry.AgentInfo, 0, len(entries))
|
|
for _, e := range entries {
|
|
if !e.IsDir() {
|
|
continue
|
|
}
|
|
agentID := e.Name()
|
|
var info telemetry.AgentInfo
|
|
info.ID = agentID
|
|
if raw, err := os.ReadFile(filepath.Join(root, agentID, "agent.json")); err == nil {
|
|
var meta struct{ Model string `json:"model"` }
|
|
_ = json.Unmarshal(raw, &meta)
|
|
info.Model = meta.Model
|
|
}
|
|
// State via HostAPI.ReadAgentState — host-side, ground truth.
|
|
if snap, err := p.host.ReadAgentState(ctx, agentID); err == nil {
|
|
info.State = snap.State
|
|
}
|
|
out = append(out, info)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (p *harborForgePlugin) listReportableAgents(ctx context.Context, profileRoot string) []calendar.ReportableAgent {
|
|
telem := p.listAgents(ctx, profileRoot)
|
|
out := make([]calendar.ReportableAgent, 0, len(telem))
|
|
for _, a := range telem {
|
|
out = append(out, calendar.ReportableAgent{
|
|
ID: a.ID, Model: a.Model,
|
|
State: mapStateToCalendar(a.State),
|
|
})
|
|
}
|
|
return out
|
|
}
|
|
|
|
func mapStateToCalendar(s string) calendar.AgentStatusValue {
|
|
switch strings.ToLower(s) {
|
|
case "idle":
|
|
return calendar.AgentStatusIdle
|
|
case "working":
|
|
return calendar.AgentStatusOnCall
|
|
case "busy":
|
|
return calendar.AgentStatusBusy
|
|
case "offline":
|
|
return calendar.AgentStatusOffline
|
|
}
|
|
return calendar.AgentStatusOffline
|
|
}
|
|
|
|
func manifestFromDisk() sdkplugin.Manifest {
|
|
// Bundled manifest.json is the authoritative shape; the binary
|
|
// version reads it next to itself to avoid hand-syncing two
|
|
// definitions. Falls back to a minimal in-code manifest if the
|
|
// file is missing (development / first build).
|
|
exe, err := os.Executable()
|
|
if err == nil {
|
|
raw, err := os.ReadFile(filepath.Join(filepath.Dir(exe), "manifest.json"))
|
|
if err == nil {
|
|
var m sdkplugin.Manifest
|
|
if err := json.Unmarshal(raw, &m); err == nil && m.Name != "" {
|
|
return m
|
|
}
|
|
}
|
|
}
|
|
return sdkplugin.Manifest{
|
|
Name: "harbor-forge",
|
|
Version: Version,
|
|
Activation: sdkplugin.ActivationEager,
|
|
Executable: "plexum-harborforge-plugin",
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
p := &harborForgePlugin{}
|
|
defer func() {
|
|
if p.cancelBg != nil {
|
|
p.cancelBg()
|
|
}
|
|
p.wg.Wait()
|
|
}()
|
|
if err := sdkplugin.Serve(p); err != nil && !errors.Is(err, context.Canceled) {
|
|
fmt.Fprintf(os.Stderr, "plexum-harborforge-plugin: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|