Files
hzhang bc1ab7b6ea fix: snake_case SlotStatus + scheduler debug logs
Two issues found while end-to-end testing against a running
harborforge-backend:

  - SlotStatus enum values: backend stores snake_case
    ("not_started" / "ongoing" / …), not the camelCase the
    OpenClaw plugin's TypeScript types.ts misled the initial
    drop into using. Heartbeat responses came back with
    Slot.Status="not_started" which the scheduler never matched
    against SlotStatus("NotStarted"), so dispatchSlot never
    fired. Aligned with backend's actual enum string values
    (verified via heartbeat response shape).

  - Added info-level logs at slot selection + dispatchSlot
    entry + WakeAgent fire/result so operators can see the
    plugin's decision chain in production without enabling
    debug. Cheap (~one tick per agent per heartbeat interval).

E2E in sim: backend returns slots=1 → selection chosen=true →
dispatch enter → WakeAgent enqueued ok → backend slot ongoing
→ next heartbeat returns slots=0.
2026-06-03 11:42:18 +01:00

385 lines
12 KiB
Go

// Scheduler — loops over every Plexum agent, heartbeats per-agent,
// picks the highest-priority pending slot for each, dispatches via
// host.WakeAgent. Mirrors HarborForge.OpenclawPlugin's per-agent
// scheduler loop (PLG-CAL-002).
//
// In-memory state: per-agent active slot map. A daemon restart drops
// it; next heartbeat reconciles from the backend's canonical state.
//
// Wake semantics: WakeAgent is fire-and-forget; the SDK's wake queue
// (depth 1 replace-newest) handles state-aware dispatch. We mark the
// slot Ongoing optimistically the moment we call WakeAgent; agents
// drive complete/abort/pause/resume via the harborforge_calendar_*
// tools.
package calendar
import (
"context"
"errors"
"fmt"
"sync"
"time"
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
)
// Scheduler is the long-running calendar driver.
type Scheduler struct {
cfg Config
bridge *Bridge
host sdkplugin.HostAPI
agentLister func() []ReportableAgent
pluginInfo PluginInfoTag
mu sync.Mutex
activeByAgentID map[string]*ActiveSlot
activeBySlotIdent map[string]*ActiveSlot
history []HistoryEntry
lastHeartbeats map[string]time.Time
lastErrors map[string]string
}
// Config bundles scheduler tunables.
type Config struct {
HeartbeatInterval time.Duration
HistoryCap int // bound on activity history; default 32
}
// PluginInfoTag tags heartbeat reports so the backend knows which
// plugin / version is reporting.
type PluginInfoTag struct {
Name string
Version string
Backend string // "plexum"
}
// ReportableAgent is the per-agent projection the scheduler needs for
// heartbeat enumeration.
type ReportableAgent struct {
ID string
Model string
State AgentStatusValue
}
// ActiveSlot tracks an in-flight slot from dispatch to terminal state.
type ActiveSlot struct {
Slot Slot
StartedAt time.Time
LastHeartbeat time.Time
}
// HistoryEntry records one resolved slot for the calendar_status tool.
type HistoryEntry struct {
Ident string
AgentID string
Status SlotStatus
ResolvedAt time.Time
Reason string
Summary string
}
// NewScheduler constructs a Scheduler in stopped state.
func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
pluginInfo PluginInfoTag,
agentLister func() []ReportableAgent) *Scheduler {
if cfg.HeartbeatInterval <= 0 {
cfg.HeartbeatInterval = 30 * time.Second
}
if cfg.HistoryCap <= 0 {
cfg.HistoryCap = 32
}
return &Scheduler{
cfg: cfg,
bridge: bridge,
host: host,
agentLister: agentLister,
pluginInfo: pluginInfo,
activeByAgentID: map[string]*ActiveSlot{},
activeBySlotIdent: map[string]*ActiveSlot{},
lastHeartbeats: map[string]time.Time{},
lastErrors: map[string]string{},
}
}
// Run blocks until ctx cancels.
func (s *Scheduler) Run(ctx context.Context) error {
t := time.NewTicker(s.cfg.HeartbeatInterval)
defer t.Stop()
s.tick(ctx)
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
s.tick(ctx)
}
}
}
func (s *Scheduler) tick(ctx context.Context) {
if s.agentLister == nil {
return
}
now := time.Now().UTC()
agents := s.agentLister()
s.host.Log("info", "calendar tick", map[string]any{"agents": len(agents)})
for _, agent := range agents {
s.tickForAgent(ctx, agent, now)
}
}
func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now time.Time) {
resp, err := s.bridge.Heartbeat(ctx, agent.ID)
s.mu.Lock()
s.lastHeartbeats[agent.ID] = now
if err != nil {
s.lastErrors[agent.ID] = err.Error()
s.mu.Unlock()
s.host.Log("warn", "calendar heartbeat failed", map[string]any{
"agent": agent.ID, "err": err.Error(),
})
return
}
delete(s.lastErrors, agent.ID)
s.mu.Unlock()
s.host.Log("info", "calendar heartbeat ok", map[string]any{
"agent": agent.ID, "slots": len(resp.Slots), "agent_status": string(resp.AgentStatus),
})
// Pick highest-priority NotStarted slot; defer the rest.
var chosen *Slot
for i := range resp.Slots {
slot := &resp.Slots[i]
if slot.Status != SlotNotStarted && slot.Status != SlotDeferred {
continue
}
if chosen == nil || slot.Priority > chosen.Priority {
chosen = slot
}
}
s.host.Log("info", "calendar slot selection", map[string]any{
"agent": agent.ID, "available": len(resp.Slots), "chosen": chosen != nil,
})
if chosen != nil {
s.dispatchSlot(ctx, agent.ID, *chosen)
}
// Defer the other unchosen NotStarted/Deferred slots (priority +1)
// so they bubble up next heartbeat. We don't strictly need to push
// the update; the backend's priority bookkeeping survives without
// our nudge for v1. (OpenClaw plugin DOES push priority bumps —
// future v2 work if backend feedback shows starvation.)
}
// dispatchSlot fires WakeAgent + records the slot active. Marks the
// slot Ongoing on the backend so the dashboard reflects the
// transition immediately.
func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) {
ident := slot.SlotIdent()
s.host.Log("info", "calendar dispatchSlot enter", map[string]any{
"agent": agentID, "slot_ident": ident,
})
s.mu.Lock()
if _, dup := s.activeBySlotIdent[ident]; dup {
s.mu.Unlock()
s.host.Log("info", "calendar dispatchSlot skipped (already active)", map[string]any{"slot": ident})
return
}
if _, agentBusy := s.activeByAgentID[agentID]; agentBusy {
// Don't pick up another slot until the current one resolves.
s.mu.Unlock()
s.host.Log("info", "calendar dispatchSlot skipped (agent has active slot)", map[string]any{"agent": agentID})
return
}
now := time.Now().UTC()
active := &ActiveSlot{Slot: slot, StartedAt: now, LastHeartbeat: now}
s.activeBySlotIdent[ident] = active
s.activeByAgentID[agentID] = active
s.mu.Unlock()
message := buildWakeMessage(slot)
source := "calendar:" + ident
s.host.Log("info", "calendar firing WakeAgent", map[string]any{
"agent": agentID, "slot": ident, "source": source, "msg_len": len(message),
})
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
AgentID: agentID, Message: message, Source: source,
}); err != nil {
s.host.Log("warn", "calendar WakeAgent failed", map[string]any{
"agent": agentID, "err": err.Error(),
})
s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error())
return
}
s.host.Log("info", "calendar WakeAgent enqueued ok", map[string]any{
"agent": agentID, "slot": ident,
})
// Mark Ongoing on the backend.
update := SlotAgentUpdate{
Status: SlotOngoing, StartedAt: now.Format("15:04:05"),
}
s.pushUpdate(ctx, agentID, slot, update)
}
func buildWakeMessage(slot Slot) string {
// Backend EventData → prompt. v1 is intentionally simple; refine
// when the prompt-engineering side of the plugin matures.
if slot.EventType != nil {
switch *slot.EventType {
case EventTypeSystemEvent:
if ev, ok := slot.EventData["event"].(string); ok {
return fmt.Sprintf("[calendar system_event] %s", ev)
}
case EventTypeJob:
code, _ := slot.EventData["code"].(string)
typ, _ := slot.EventData["type"].(string)
if code != "" {
return fmt.Sprintf("[calendar job %s/%s] please handle this", typ, code)
}
}
}
return fmt.Sprintf("[calendar slot %s] scheduled work — please proceed", slot.SlotIdent())
}
// CompleteForAgent → terminal; pushes Finished to backend.
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
now := time.Now().UTC()
duration := int(now.Sub(act.StartedAt).Minutes())
if duration < 1 {
duration = 1
}
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{
Status: SlotFinished, ActualDuration: duration,
}); err != nil {
return err
}
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotFinished, summary, "")
return nil
}
// AbortForAgent → terminal; pushes Aborted to backend.
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotAborted}); err != nil {
return err
}
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotAborted, "", reason)
return nil
}
// PauseForAgent → non-terminal; pushes Paused.
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotPaused})
}
// ResumeForAgent → non-terminal; pushes Ongoing.
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
act, ok := s.activeSlotForAgent(agentID)
if !ok {
return ErrNoActiveSlot
}
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotOngoing})
}
func (s *Scheduler) pushUpdate(ctx context.Context, agentID string, slot Slot, update SlotAgentUpdate) error {
if slot.HasRealID() {
return s.bridge.UpdateRealSlot(ctx, agentID, *slot.ID, update)
}
if slot.VirtualID != nil {
return s.bridge.UpdateVirtualSlot(ctx, agentID, *slot.VirtualID, update)
}
return errors.New("calendar: slot has neither real id nor virtual id")
}
func (s *Scheduler) resolveLocally(ident, agentID string, status SlotStatus, summary, reason string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.activeBySlotIdent, ident)
delete(s.activeByAgentID, agentID)
s.history = append(s.history, HistoryEntry{
Ident: ident, AgentID: agentID, Status: status,
ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason,
})
if len(s.history) > s.cfg.HistoryCap {
s.history = s.history[len(s.history)-s.cfg.HistoryCap:]
}
}
func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
s.mu.Lock()
defer s.mu.Unlock()
act, ok := s.activeByAgentID[agentID]
if !ok || act == nil {
return ActiveSlot{}, false
}
return *act, true
}
// Status is the introspection shape calendar_status returns.
type Status struct {
Enabled bool `json:"enabled"`
LastHeartbeats map[string]time.Time `json:"last_heartbeats"`
LastErrors map[string]string `json:"last_errors,omitempty"`
HeartbeatEvery time.Duration `json:"heartbeat_every"`
Active []ActiveSlot `json:"active"`
History []HistoryEntry `json:"history"`
}
// SingleActiveAgentID returns the agent id when exactly one active
// slot exists, empty otherwise. Used by the plugin's bestEffortAgentID
// fallback for tool calls that don't carry agent context.
func (s *Scheduler) SingleActiveAgentID() string {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.activeByAgentID) != 1 {
return ""
}
for k := range s.activeByAgentID {
return k
}
return ""
}
// Status returns the introspection shape calendar_status returns.
func (s *Scheduler) Status() Status {
s.mu.Lock()
defer s.mu.Unlock()
active := make([]ActiveSlot, 0, len(s.activeByAgentID))
for _, a := range s.activeByAgentID {
active = append(active, *a)
}
hb := make(map[string]time.Time, len(s.lastHeartbeats))
for k, v := range s.lastHeartbeats {
hb[k] = v
}
errs := make(map[string]string, len(s.lastErrors))
for k, v := range s.lastErrors {
errs[k] = v
}
history := make([]HistoryEntry, len(s.history))
copy(history, s.history)
return Status{
Enabled: true,
LastHeartbeats: hb,
LastErrors: errs,
HeartbeatEvery: s.cfg.HeartbeatInterval,
Active: active,
History: history,
}
}
// ErrNoActiveSlot is returned when an agent calls calendar_complete /
// abort / pause / resume but has no slot active.
var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")