Files
HarborForge.PlexumPlugin/cmd/plexum-harborforge-plugin/main.go
hzhang e801b719a1 chore(kb): downgrade unknown-subblock log to warn; remove per-turn info log
The per-turn 'dynamic-block render: kb-block' INFO line was added for
2e sim verification; with confirmation that the host RPC fires every
turn for every agent, the line becomes noise. Trimmed to keep the
plugin's logs focused on actionable events. The 'unknown name' branch
is now a warn (defensive — shouldn't fire if manifest matches).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
EOF
2026-06-05 23:23:07 +01:00

348 lines
11 KiB
Go

// plexum-harborforge-plugin — Plexum-side HarborForge plugin.
//
// Mirrors HarborForge.OpenclawPlugin's responsibilities, recast on the
// Plexum SDK:
//
// - eager activation: plugin spawns at host start so the Monitor
// bridge listener and Calendar scheduler are running before any
// turn fires
// - 9 harborforge_* tools backed by tools.Dispatch
// - state-aware wake-agent via HostAPI.WakeAgent for Calendar slots
//
// Config layout: <profile>/plugins/harbor-forge/config.json — see
// internal/config/config.go for the schema.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/calendar"
hfcfg "git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/config"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbblock"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/kbclient"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/monitor"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/telemetry"
"git.hangman-lab.top/zhi/HarborForge.PlexumPlugin/internal/tools"
)
// Version is injected via -ldflags "-X main.Version=…" at build time.
var Version = "0.1.0"
// harborForgePlugin satisfies sdkplugin.ToolPlugin AND
// sdkplugin.DynamicBlockProvider (the <kb-block> subblock contributor
// per Plexum DESIGN-DYNAMIC-BLOCK.md §3.3).
type harborForgePlugin struct {
host sdkplugin.HostAPI
cfg hfcfg.Resolved
bridge *monitor.Bridge
pusher *monitor.Pusher
sched *calendar.Scheduler
kbClient *kbclient.Client
deps tools.Deps
cancelBg context.CancelFunc
wg sync.WaitGroup
agentCache sync.Map // sessionID/turnID → agentID stash (best-effort)
// profileRoot caches PLEXUM_PROFILE_ROOT for kbblock path resolution.
profileRoot string
// agentSession maps agentID → sessionID, populated each turn by
// the host's plexum/plugin/dynamic-block/render RPC (which carries
// both ids). dynamic-kb-cache + dynamic-kb-evict read this to
// resolve the per-session kb-block.json path. Best-effort —
// before the first RenderDynamicSubblock fires for an agent the
// map is empty and the kb tools surface a clear error.
agentSession sync.Map // agentID (string) → sessionID (string)
}
func (p *harborForgePlugin) Manifest() sdkplugin.Manifest {
return manifestFromDisk()
}
func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) error {
p.host = host
profileRoot := os.Getenv("PLEXUM_PROFILE_ROOT")
if profileRoot == "" {
home, _ := os.UserHomeDir()
profileRoot = filepath.Join(home, ".plexum")
}
p.profileRoot = profileRoot
raw, err := hfcfg.Load(profileRoot)
if err != nil {
return fmt.Errorf("load harbor-forge config: %w", err)
}
p.cfg = hfcfg.Resolve(raw)
host.Log("info", "harbor-forge plugin initialized", map[string]any{
"version": Version,
"backend": p.cfg.BackendURL,
"identifier": p.cfg.Identifier,
"monitor_port": p.cfg.MonitorPort,
"monitor_push_enabled": p.cfg.MonitorPushEnabled,
"calendar_enabled": p.cfg.CalendarEnabled,
})
bgCtx, cancel := context.WithCancel(context.Background())
p.cancelBg = cancel
// Listers + collectors capture bgCtx (not Init ctx) — Init returns
// once MCP initialize completes, but the plugin process lives on
// and so do the goroutines + closures we registered.
makeCollector := func(sampleCPU bool) func() telemetry.Snapshot {
return func() telemetry.Snapshot {
return telemetry.Collect(telemetry.CollectOpts{
Identifier: p.cfg.Identifier,
Version: Version,
SampleCPU: sampleCPU,
AgentLister: func() []telemetry.AgentInfo {
return p.listAgents(bgCtx, profileRoot)
},
})
}
}
// Bridge serves on-demand reads; cheap, no CPU sampling.
collect := makeCollector(false)
// Pusher runs the slow push loop; CPU sampling fine here.
collectForPush := makeCollector(true)
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if err := p.bridge.Start(bgCtx); err != nil {
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
}
// Active push loop — replaces the standalone harborforge-monitor
// container. Off by default; operator opts in via
// monitor_push_enabled + apiKey.
p.pusher = monitor.NewPusher(monitor.PusherConfig{
BackendURL: p.cfg.BackendURL,
APIKey: p.cfg.APIKey,
Interval: time.Duration(p.cfg.MonitorPushIntervalSeconds) * time.Second,
}, collectForPush,
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
if p.cfg.MonitorPushEnabled {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.pusher.Run(bgCtx); err != nil && !errors.Is(err, context.Canceled) {
host.Log("warn", "monitor pusher exited", map[string]any{"err": err.Error()})
}
}()
}
calBackend := p.cfg.CalendarBackendURL
if calBackend == "" {
calBackend = p.cfg.BackendURL
}
bridge := calendar.New(calBackend, p.cfg.Identifier)
p.sched = calendar.NewScheduler(
calendar.Config{
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
},
bridge, host,
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
func() []calendar.ReportableAgent {
return p.listReportableAgents(bgCtx, profileRoot)
},
)
if p.cfg.CalendarEnabled {
p.wg.Add(1)
go func() {
defer p.wg.Done()
if err := p.sched.Run(bgCtx); err != nil {
host.Log("warn", "calendar scheduler exited", map[string]any{"err": err.Error()})
}
}()
} else {
host.Log("info", "calendar scheduler disabled by config", nil)
}
// KB HTTP client — shares plugin-level APIKey via Bearer (per-agent
// hf-token resolution is a TODO when SDK exposes secret-mgr access).
if p.cfg.BackendURL != "" && p.cfg.APIKey != "" {
p.kbClient = kbclient.New(p.cfg.BackendURL, p.cfg.APIKey)
} else {
host.Log("info", "kb client not initialized (need BackendURL + APIKey); dynamic-kb-* tools will return backend-unavailable", nil)
}
p.deps = tools.Deps{
Config: p.cfg,
Version: Version,
Collect: collect,
Bridge: p.bridge,
Pusher: p.pusher,
Scheduler: p.sched,
Host: host,
AgentIDFromCtx: func(ctx context.Context) string {
// Host attaches the caller agent id via tools/call
// `_meta.agent_id`; SDK unpacks it into ctx.
if id := sdkplugin.AgentIDFromContext(ctx); id != "" {
return id
}
// Fallback for host paths that don't carry an agent
// (channel-driven, CLI plugin-call). When a single
// calendar slot is active we can deterministically
// attribute the call to that slot's owner.
return p.sched.SingleActiveAgentID()
},
KB: tools.KBDeps{
Client: p.kbClient,
ProfileRoot: profileRoot,
SessionFor: func(agentID string) string {
if v, ok := p.agentSession.Load(agentID); ok {
return v.(string)
}
return ""
},
Turn: func(agentID string) int { return 0 }, // best-effort; turn ctx not available to plugins yet
},
}
return nil
}
// RenderDynamicSubblock implements sdkplugin.DynamicBlockProvider. The
// host calls this once per turn per declared subblock name from this
// plugin's manifest (DESIGN-DYNAMIC-BLOCK.md §2 / §3.3). For
// "kb-block" we open the per-session kb-block.json and return its
// rendered body (host wraps in <kb-block>...</kb-block>). Empty block
// → return "" → host omits the subblock for this turn.
//
// Side effect: stash agentID → sessionID into p.agentSession so the
// dynamic-kb-cache + dynamic-kb-evict tools can resolve the same
// kb-block file when they fire later in the same turn.
func (p *harborForgePlugin) RenderDynamicSubblock(ctx context.Context, agentID, sessionID, name string) (string, error) {
if agentID == "" || sessionID == "" {
return "", nil
}
// Cache the (agent, session) pair for tool dispatch this turn.
p.agentSession.Store(agentID, sessionID)
if name != "kb-block" {
// Host shouldn't request unknown subblock names per manifest
// contract, but log defensively so operator notices wiring drift.
p.host.Log("warn", "dynamic-block render: unknown subblock name",
map[string]any{"agent": agentID, "session": sessionID, "name": name})
return "", nil
}
block, err := kbblock.Open(p.profileRoot, agentID, sessionID)
if err != nil {
p.host.Log("warn", "open kb-block", map[string]any{
"agent": agentID, "session": sessionID, "err": err.Error(),
})
return "", nil
}
return block.Render(), nil
}
func (p *harborForgePlugin) CallTool(ctx context.Context, name string, input json.RawMessage) (sdkplugin.ToolResult, error) {
return tools.Dispatch(ctx, p.deps, name, input)
}
// ---- agent enumeration ----
// listAgents walks <profile>/agents/*/agent.json + state.json so the
// telemetry payload includes every Plexum agent visible on this host.
// Best-effort: read failures degrade to empty list.
func (p *harborForgePlugin) listAgents(ctx context.Context, profileRoot string) []telemetry.AgentInfo {
root := filepath.Join(profileRoot, "agents")
entries, err := os.ReadDir(root)
if err != nil {
return nil
}
out := make([]telemetry.AgentInfo, 0, len(entries))
for _, e := range entries {
if !e.IsDir() {
continue
}
agentID := e.Name()
var info telemetry.AgentInfo
info.ID = agentID
if raw, err := os.ReadFile(filepath.Join(root, agentID, "agent.json")); err == nil {
var meta struct{ Model string `json:"model"` }
_ = json.Unmarshal(raw, &meta)
info.Model = meta.Model
}
// State via HostAPI.ReadAgentState — host-side, ground truth.
if snap, err := p.host.ReadAgentState(ctx, agentID); err == nil {
info.State = snap.State
}
out = append(out, info)
}
return out
}
func (p *harborForgePlugin) listReportableAgents(ctx context.Context, profileRoot string) []calendar.ReportableAgent {
telem := p.listAgents(ctx, profileRoot)
out := make([]calendar.ReportableAgent, 0, len(telem))
for _, a := range telem {
out = append(out, calendar.ReportableAgent{
ID: a.ID, Model: a.Model,
State: mapStateToCalendar(a.State),
})
}
return out
}
func mapStateToCalendar(s string) calendar.AgentStatusValue {
switch strings.ToLower(s) {
case "idle":
return calendar.AgentStatusIdle
case "working":
return calendar.AgentStatusOnCall
case "busy":
return calendar.AgentStatusBusy
case "offline":
return calendar.AgentStatusOffline
}
return calendar.AgentStatusOffline
}
func manifestFromDisk() sdkplugin.Manifest {
// Bundled manifest.json is the authoritative shape; the binary
// version reads it next to itself to avoid hand-syncing two
// definitions. Falls back to a minimal in-code manifest if the
// file is missing (development / first build).
exe, err := os.Executable()
if err == nil {
raw, err := os.ReadFile(filepath.Join(filepath.Dir(exe), "manifest.json"))
if err == nil {
var m sdkplugin.Manifest
if err := json.Unmarshal(raw, &m); err == nil && m.Name != "" {
return m
}
}
}
return sdkplugin.Manifest{
Name: "harbor-forge",
Version: Version,
Activation: sdkplugin.ActivationEager,
Executable: "plexum-harborforge-plugin",
}
}
func main() {
p := &harborForgePlugin{}
defer func() {
if p.cancelBg != nil {
p.cancelBg()
}
p.wg.Wait()
}()
if err := sdkplugin.Serve(p); err != nil && !errors.Is(err, context.Canceled) {
fmt.Fprintf(os.Stderr, "plexum-harborforge-plugin: %v\n", err)
os.Exit(1)
}
}