// Scheduler — main loop that heartbeats the backend, dispatches // returned slots via Plexum's WakeAgent, and tracks per-agent active // slot state for the calendar_* tools. // // State is in-memory: a daemon restart drops everything. Next // heartbeat reconciles (backend keeps the canonical SlotStatus). // // Concurrency: // - one heartbeat ticker goroutine // - per-slot dispatch is fire-and-forget via WakeAgent (queue-aware) // - mu guards activeBySlot + activeByAgent maps package calendar import ( "context" "errors" "fmt" "sync" "time" sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" ) // Scheduler orchestrates the calendar loop. type Scheduler struct { cfg Config bridge *Bridge host sdkplugin.HostAPI agentLister func() []ReportableAgent identifier string pluginInfo PluginInfoTag mu sync.Mutex activeBySlotID map[string]*ActiveSlot activeByAgentID map[string]*ActiveSlot history []HistoryEntry lastHeartbeat time.Time lastResponse HeartbeatResponse restartPending bool } // Config bundles scheduler tunables. type Config struct { HeartbeatInterval time.Duration HistoryCap int // bound on activity history; default 32 } // ReportableAgent is the projection of a Plexum agent the scheduler // needs for heartbeat — id + model + current sm state. type ReportableAgent struct { ID string Model string State AgentStatusValue } // ActiveSlot tracks an in-flight slot (between WakeAgent dispatch and // terminal status update). type ActiveSlot struct { Slot Slot StartedAt time.Time LastHeartbeat time.Time State SlotStatus } // HistoryEntry is one resolved slot kept for the calendar_status tool. type HistoryEntry struct { SlotID string AgentID string Status SlotStatus ResolvedAt time.Time Reason string Summary string } // NewScheduler constructs a Scheduler in stopped state. func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI, identifier string, pluginInfo PluginInfoTag, agentLister func() []ReportableAgent) *Scheduler { if cfg.HeartbeatInterval <= 0 { cfg.HeartbeatInterval = 30 * time.Second } if cfg.HistoryCap <= 0 { cfg.HistoryCap = 32 } return &Scheduler{ cfg: cfg, bridge: bridge, host: host, agentLister: agentLister, identifier: identifier, pluginInfo: pluginInfo, activeBySlotID: map[string]*ActiveSlot{}, activeByAgentID: map[string]*ActiveSlot{}, } } // Run blocks until ctx cancels, ticking heartbeats every // cfg.HeartbeatInterval. Returns nil on graceful shutdown. func (s *Scheduler) Run(ctx context.Context) error { t := time.NewTicker(s.cfg.HeartbeatInterval) defer t.Stop() // First heartbeat immediately so initial state lands fast. s.heartbeatOnce(ctx) for { select { case <-ctx.Done(): return nil case <-t.C: s.heartbeatOnce(ctx) } } } func (s *Scheduler) heartbeatOnce(ctx context.Context) { payload := HeartbeatPayload{ Identifier: s.identifier, APIKey: s.bridge.APIKey, PluginInfo: s.pluginInfo, CapturedAt: time.Now().UTC(), } if s.agentLister != nil { for _, a := range s.agentLister() { payload.AgentList = append(payload.AgentList, AgentReport{ ID: a.ID, Model: a.Model, Status: a.State, }) } } resp, err := s.bridge.Heartbeat(ctx, payload) s.mu.Lock() s.lastHeartbeat = time.Now() if err == nil { s.lastResponse = resp s.restartPending = resp.RestartPending } s.mu.Unlock() if err != nil { return // network blip; next tick retries } for _, slot := range resp.SlotsToFire { s.dispatchSlot(ctx, slot) } } // dispatchSlot fires the slot via host.WakeAgent and records it as // active. WakeAgent handles state-aware queueing — if the agent is // busy, our calendar slot enqueues at depth 1 and the previous wake // is dropped per replace-newest semantics. We mark the slot // in_progress optimistically when we ENQUEUED; backend reconciles on // its own watchdog. func (s *Scheduler) dispatchSlot(ctx context.Context, slot Slot) { // Skip already-active slots (heartbeat may re-list a slot we // already started — backend hasn't seen our optimistic update yet). s.mu.Lock() if _, ok := s.activeBySlotID[slot.ID]; ok { s.mu.Unlock() return } now := time.Now().UTC() act := &ActiveSlot{ Slot: slot, StartedAt: now, LastHeartbeat: now, State: SlotInProgress, } s.activeBySlotID[slot.ID] = act s.activeByAgentID[slot.AgentID] = act s.mu.Unlock() message := slot.WakeOptions.OverrideMessage if message == "" { message = slot.PromptText } if message == "" { message = fmt.Sprintf("[calendar] slot %s: %s", slot.ID, slot.Title) } source := fmt.Sprintf("calendar:slot-%s", slot.ID) if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{ AgentID: slot.AgentID, Message: message, Source: source, }); err != nil { // Wake itself failed (plumbing). Mark slot aborted + // notify backend. s.resolveSlot(ctx, slot.ID, SlotAborted, "", "wake-agent failed: "+err.Error()) return } } // resolveSlot moves an active slot to a terminal status, records // history, and tells the backend. Safe to call concurrently. func (s *Scheduler) resolveSlot(ctx context.Context, slotID string, status SlotStatus, summary, reason string) error { s.mu.Lock() act, ok := s.activeBySlotID[slotID] if !ok { s.mu.Unlock() return fmt.Errorf("calendar: slot %s not active", slotID) } delete(s.activeBySlotID, slotID) delete(s.activeByAgentID, act.Slot.AgentID) s.appendHistoryLocked(HistoryEntry{ SlotID: slotID, AgentID: act.Slot.AgentID, Status: status, ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason, }) s.mu.Unlock() return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{ Status: status, Summary: summary, Reason: reason, }) } // SetSlotState is a non-terminal status change (paused/resumed). // Records the new state in-memory and tells the backend. func (s *Scheduler) SetSlotState(ctx context.Context, slotID string, status SlotStatus, reason string) error { s.mu.Lock() act, ok := s.activeBySlotID[slotID] if !ok { s.mu.Unlock() return fmt.Errorf("calendar: slot %s not active", slotID) } act.State = status act.LastHeartbeat = time.Now().UTC() s.mu.Unlock() return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{ Status: status, Reason: reason, }) } func (s *Scheduler) appendHistoryLocked(entry HistoryEntry) { s.history = append(s.history, entry) if len(s.history) > s.cfg.HistoryCap { s.history = s.history[len(s.history)-s.cfg.HistoryCap:] } } // CompleteForAgent / AbortForAgent / PauseForAgent / ResumeForAgent // are the agent-facing tool entry points. They look up the agent's // active slot, transition or terminate it, and notify the backend. // CompleteForAgent terminates the agent's active slot as completed. func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error { slot, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } return s.resolveSlot(ctx, slot.Slot.ID, SlotCompleted, summary, "") } // AbortForAgent terminates the agent's active slot as aborted. func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error { slot, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } return s.resolveSlot(ctx, slot.Slot.ID, SlotAborted, "", reason) } // PauseForAgent transitions the agent's slot to paused. func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error { slot, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } return s.SetSlotState(ctx, slot.Slot.ID, SlotPaused, reason) } // ResumeForAgent transitions the agent's slot back to in_progress. func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error { slot, ok := s.activeSlotForAgent(agentID) if !ok { return ErrNoActiveSlot } return s.SetSlotState(ctx, slot.Slot.ID, SlotInProgress, "") } // activeSlotForAgent returns the per-agent active slot copy under lock. func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) { s.mu.Lock() defer s.mu.Unlock() act, ok := s.activeByAgentID[agentID] if !ok || act == nil { return ActiveSlot{}, false } return *act, true } // Status returns the introspection shape for the calendar_status tool. func (s *Scheduler) Status() SchedulerStatus { s.mu.Lock() defer s.mu.Unlock() active := make([]ActiveSlot, 0, len(s.activeBySlotID)) for _, a := range s.activeBySlotID { active = append(active, *a) } history := make([]HistoryEntry, len(s.history)) copy(history, s.history) return SchedulerStatus{ Enabled: true, LastHeartbeat: s.lastHeartbeat, HeartbeatEvery: s.cfg.HeartbeatInterval, Active: active, History: history, RestartPending: s.restartPending, } } // SchedulerStatus is the shape calendar_status returns. type SchedulerStatus struct { Enabled bool `json:"enabled"` LastHeartbeat time.Time `json:"last_heartbeat"` HeartbeatEvery time.Duration `json:"heartbeat_every"` Active []ActiveSlot `json:"active"` History []HistoryEntry `json:"history"` RestartPending bool `json:"restart_pending"` } // ErrNoActiveSlot is returned by calendar_complete/abort/pause/resume // when the agent has no slot in progress. var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")