// Scheduler — loops over every Plexum agent, heartbeats per-agent, // picks the highest-priority pending slot for each, dispatches via // host.WakeAgent. Mirrors HarborForge.OpenclawPlugin's per-agent // scheduler loop (PLG-CAL-002). // // In-memory state: per-agent active slot map. A daemon restart drops // it; next heartbeat reconciles from the backend's canonical state. // // Wake semantics: WakeAgent is fire-and-forget; the SDK's wake queue // (depth 1 replace-newest) handles state-aware dispatch. We mark the // slot Ongoing optimistically the moment we call WakeAgent; agents // drive complete/abort/pause/resume via the harborforge_calendar_* // tools. package calendar import ( "context" "errors" "fmt" "sync" "time" sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" ) // Scheduler is the long-running calendar driver. type Scheduler struct { cfg Config bridge *Bridge host sdkplugin.HostAPI agentLister func() []ReportableAgent pluginInfo PluginInfoTag mu sync.Mutex activeByAgentID map[string]*ActiveSlot activeBySlotIdent map[string]*ActiveSlot history []HistoryEntry lastHeartbeats map[string]time.Time lastErrors map[string]string } // Config bundles scheduler tunables. type Config struct { HeartbeatInterval time.Duration HistoryCap int // bound on activity history; default 32 } // PluginInfoTag tags heartbeat reports so the backend knows which // plugin / version is reporting. type PluginInfoTag struct { Name string Version string Backend string // "plexum" } // ReportableAgent is the per-agent projection the scheduler needs for // heartbeat enumeration. type ReportableAgent struct { ID string Model string State AgentStatusValue } // ActiveSlot tracks an in-flight slot from dispatch to terminal state. type ActiveSlot struct { Slot Slot StartedAt time.Time LastHeartbeat time.Time } // HistoryEntry records one resolved slot for the calendar_status tool. type HistoryEntry struct { Ident string AgentID string Status SlotStatus ResolvedAt time.Time Reason string Summary string } // NewScheduler constructs a Scheduler in stopped state. func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI, pluginInfo PluginInfoTag, agentLister func() []ReportableAgent) *Scheduler { if cfg.HeartbeatInterval <= 0 { cfg.HeartbeatInterval = 30 * time.Second } if cfg.HistoryCap <= 0 { cfg.HistoryCap = 32 } return &Scheduler{ cfg: cfg, bridge: bridge, host: host, agentLister: agentLister, pluginInfo: pluginInfo, activeByAgentID: map[string]*ActiveSlot{}, activeBySlotIdent: map[string]*ActiveSlot{}, lastHeartbeats: map[string]time.Time{}, lastErrors: map[string]string{}, } } // Run blocks until ctx cancels. func (s *Scheduler) Run(ctx context.Context) error { t := time.NewTicker(s.cfg.HeartbeatInterval) defer t.Stop() s.tick(ctx) for { select { case <-ctx.Done(): return nil case <-t.C: s.tick(ctx) } } } func (s *Scheduler) tick(ctx context.Context) { if s.agentLister == nil { return } now := time.Now().UTC() agents := s.agentLister() s.host.Log("info", "calendar tick", map[string]any{"agents": len(agents)}) for _, agent := range agents { s.tickForAgent(ctx, agent, now) } } func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now time.Time) { resp, err := s.bridge.Heartbeat(ctx, agent.ID) s.mu.Lock() s.lastHeartbeats[agent.ID] = now if err != nil { s.lastErrors[agent.ID] = err.Error() s.mu.Unlock() s.host.Log("warn", "calendar heartbeat failed", map[string]any{ "agent": agent.ID, "err": err.Error(), }) return } delete(s.lastErrors, agent.ID) s.mu.Unlock() s.host.Log("info", "calendar heartbeat ok", map[string]any{ "agent": agent.ID, "slots": len(resp.Slots), "agent_status": string(resp.AgentStatus), }) // Pick highest-priority NotStarted slot; defer the rest. var chosen *Slot for i := range resp.Slots { slot := &resp.Slots[i] if slot.Status != SlotNotStarted && slot.Status != SlotDeferred { continue } if chosen == nil || slot.Priority > chosen.Priority { chosen = slot } } if chosen != nil { s.dispatchSlot(ctx, agent.ID, *chosen) } // Defer the other unchosen NotStarted/Deferred slots (priority +1) // so they bubble up next heartbeat. We don't strictly need to push // the update; the backend's priority bookkeeping survives without // our nudge for v1. (OpenClaw plugin DOES push priority bumps — // future v2 work if backend feedback shows starvation.) } // dispatchSlot fires WakeAgent + records the slot active. Marks the // slot Ongoing on the backend so the dashboard reflects the // transition immediately. func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) { ident := slot.SlotIdent() s.mu.Lock() if _, dup := s.activeBySlotIdent[ident]; dup { s.mu.Unlock() return } if _, agentBusy := s.activeByAgentID[agentID]; agentBusy { // Don't pick up another slot until the current one resolves. s.mu.Unlock() return } now := time.Now().UTC() active := &ActiveSlot{Slot: slot, StartedAt: now, LastHeartbeat: now} s.activeBySlotIdent[ident] = active s.activeByAgentID[agentID] = active s.mu.Unlock() message := buildWakeMessage(slot) source := "calendar:" + ident if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{ AgentID: agentID, Message: message, Source: source, }); err != nil { s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error()) return } // Mark Ongoing on the backend. update := SlotAgentUpdate{ Status: SlotOngoing, StartedAt: now.Format("15:04:05"), } s.pushUpdate(ctx, agentID, slot, update) } func buildWakeMessage(slot Slot) string { // Backend EventData → prompt. v1 is intentionally simple; refine // when the prompt-engineering side of the plugin matures. if slot.EventType != nil { switch *slot.EventType { case EventTypeSystemEvent: if ev, ok := slot.EventData["event"].(string); ok { return fmt.Sprintf("[calendar system_event] %s", ev) } case EventTypeJob: code, _ := slot.EventData["code"].(string) typ, _ := slot.EventData["type"].(string) if code != "" { return fmt.Sprintf("[calendar job %s/%s] please handle this", typ, code) } } } return fmt.Sprintf("[calendar slot %s] scheduled work — please proceed", slot.SlotIdent()) } // CompleteForAgent → terminal; pushes Finished to backend. func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error { act, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } now := time.Now().UTC() duration := int(now.Sub(act.StartedAt).Minutes()) if duration < 1 { duration = 1 } if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{ Status: SlotFinished, ActualDuration: duration, }); err != nil { return err } s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotFinished, summary, "") return nil } // AbortForAgent → terminal; pushes Aborted to backend. func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error { act, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotAborted}); err != nil { return err } s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotAborted, "", reason) return nil } // PauseForAgent → non-terminal; pushes Paused. func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error { act, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotPaused}) } // ResumeForAgent → non-terminal; pushes Ongoing. func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error { act, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotOngoing}) } func (s *Scheduler) pushUpdate(ctx context.Context, agentID string, slot Slot, update SlotAgentUpdate) error { if slot.HasRealID() { return s.bridge.UpdateRealSlot(ctx, agentID, *slot.ID, update) } if slot.VirtualID != nil { return s.bridge.UpdateVirtualSlot(ctx, agentID, *slot.VirtualID, update) } return errors.New("calendar: slot has neither real id nor virtual id") } func (s *Scheduler) resolveLocally(ident, agentID string, status SlotStatus, summary, reason string) { s.mu.Lock() defer s.mu.Unlock() delete(s.activeBySlotIdent, ident) delete(s.activeByAgentID, agentID) s.history = append(s.history, HistoryEntry{ Ident: ident, AgentID: agentID, Status: status, ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason, }) if len(s.history) > s.cfg.HistoryCap { s.history = s.history[len(s.history)-s.cfg.HistoryCap:] } } func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) { s.mu.Lock() defer s.mu.Unlock() act, ok := s.activeByAgentID[agentID] if !ok || act == nil { return ActiveSlot{}, false } return *act, true } // Status is the introspection shape calendar_status returns. type Status struct { Enabled bool `json:"enabled"` LastHeartbeats map[string]time.Time `json:"last_heartbeats"` LastErrors map[string]string `json:"last_errors,omitempty"` HeartbeatEvery time.Duration `json:"heartbeat_every"` Active []ActiveSlot `json:"active"` History []HistoryEntry `json:"history"` } // SingleActiveAgentID returns the agent id when exactly one active // slot exists, empty otherwise. Used by the plugin's bestEffortAgentID // fallback for tool calls that don't carry agent context. func (s *Scheduler) SingleActiveAgentID() string { s.mu.Lock() defer s.mu.Unlock() if len(s.activeByAgentID) != 1 { return "" } for k := range s.activeByAgentID { return k } return "" } // Status returns the introspection shape calendar_status returns. func (s *Scheduler) Status() Status { s.mu.Lock() defer s.mu.Unlock() active := make([]ActiveSlot, 0, len(s.activeByAgentID)) for _, a := range s.activeByAgentID { active = append(active, *a) } hb := make(map[string]time.Time, len(s.lastHeartbeats)) for k, v := range s.lastHeartbeats { hb[k] = v } errs := make(map[string]string, len(s.lastErrors)) for k, v := range s.lastErrors { errs[k] = v } history := make([]HistoryEntry, len(s.history)) copy(history, s.history) return Status{ Enabled: true, LastHeartbeats: hb, LastErrors: errs, HeartbeatEvery: s.cfg.HeartbeatInterval, Active: active, History: history, } } // ErrNoActiveSlot is returned when an agent calls calendar_complete / // abort / pause / resume but has no slot active. var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")