/** * HarborForge Calendar Scheduler * * PLG-CAL-002: Plugin-side handling for pending slot execution. * PLG-CAL-004: ScheduledGatewayRestart event handling with state persistence. * * Responsibilities: * - Run calendar heartbeat every minute * - Detect when agent is Idle and slots are pending * - Wake agent with task context * - Handle slot status transitions (attended, ongoing, deferred) * - Manage agent status transitions (idle → busy/on_call) * - Persist state on ScheduledGatewayRestart and restore on startup * - Send final heartbeat before graceful shutdown * * Design reference: NEXT_WAVE_DEV_DIRECTION.md §6 (Agent wakeup mechanism) */ import { writeFileSync, readFileSync, existsSync, mkdirSync } from 'fs'; import { join, dirname } from 'path'; import { CalendarBridgeClient } from './calendar-bridge'; import { CalendarSlotResponse, SlotStatus, AgentStatusValue, SlotAgentUpdate, CalendarEventDataJob, CalendarEventDataSystemEvent, } from './types'; export interface CalendarSchedulerConfig { /** Calendar bridge client for backend communication */ bridge: CalendarBridgeClient; /** Function to get current agent status from backend */ getAgentStatus: () => Promise; /** Function to wake/spawn agent with task context */ wakeAgent: (context: AgentWakeContext) => Promise; /** Logger instance */ logger: { info: (...args: any[]) => void; error: (...args: any[]) => void; debug: (...args: any[]) => void; warn: (...args: any[]) => void; }; /** Heartbeat interval in milliseconds (default: 60000) */ heartbeatIntervalMs?: number; /** Enable verbose debug logging */ debug?: boolean; /** Directory for state persistence (default: plugin data dir) */ stateDir?: string; } /** * Context passed to agent when waking for slot execution. * This is the payload the agent receives to understand what to do. */ export interface AgentWakeContext { /** The slot to execute */ slot: CalendarSlotResponse; /** Human-readable task description */ taskDescription: string; /** Prompt/instructions for the agent */ prompt: string; /** Whether this is a virtual slot (needs materialization) */ isVirtual: boolean; } /** * Persisted state structure for recovery after restart. */ interface PersistedState { /** Version for migration compatibility */ version: number; /** When the state was persisted */ persistedAt: string; /** Reason for persistence (e.g., 'ScheduledGatewayRestart') */ reason: string; /** The slot that was being executed when persisted */ currentSlot: CalendarSlotResponse | null; /** Deferred slot IDs at persistence time */ deferredSlotIds: string[]; /** Whether a slot was in progress */ isProcessing: boolean; /** Agent status at persistence time */ agentStatus: AgentStatusValue | null; } /** * Current execution state tracked by the scheduler. */ interface SchedulerState { /** Whether scheduler is currently running */ isRunning: boolean; /** Currently executing slot (null if idle) */ currentSlot: CalendarSlotResponse | null; /** Last heartbeat timestamp */ lastHeartbeatAt: Date | null; /** Interval handle for cleanup */ intervalHandle: ReturnType | null; /** Set of slot IDs that have been deferred in current session */ deferredSlotIds: Set; /** Whether agent is currently processing a slot */ isProcessing: boolean; /** Whether a gateway restart is scheduled/pending */ isRestartPending: boolean; } /** State file name */ const STATE_FILENAME = 'calendar-scheduler-state.json'; /** State file version for migration compatibility */ const STATE_VERSION = 1; /** * CalendarScheduler manages the periodic heartbeat and slot execution lifecycle. */ export class CalendarScheduler { private config: Required; private state: SchedulerState; private stateFilePath: string; constructor(config: CalendarSchedulerConfig) { this.config = { heartbeatIntervalMs: 60000, // 1 minute default debug: false, stateDir: this.getDefaultStateDir(), ...config, }; this.stateFilePath = join(this.config.stateDir, STATE_FILENAME); this.state = { isRunning: false, currentSlot: null, lastHeartbeatAt: null, intervalHandle: null, deferredSlotIds: new Set(), isProcessing: false, isRestartPending: false, }; // Attempt to restore state from previous persistence this.restoreState(); } /** * Get default state directory (plugin data directory or temp fallback). */ private getDefaultStateDir(): string { // Try to use the plugin's directory or a standard data location const candidates = [ process.env.OPENCLAW_PLUGIN_DATA_DIR, process.env.HARBORFORGE_PLUGIN_DIR, join(process.cwd(), '.harborforge'), join(process.cwd(), 'data'), '/tmp/harborforge', ]; for (const dir of candidates) { if (dir) { try { if (!existsSync(dir)) { mkdirSync(dir, { recursive: true }); } // Test write access const testFile = join(dir, '.write-test'); writeFileSync(testFile, '', { flag: 'w' }); return dir; } catch { continue; } } } // Fallback to current working directory return process.cwd(); } /** * Persist current state to disk for recovery after restart. */ private persistState(reason: string): void { try { const persistedState: PersistedState = { version: STATE_VERSION, persistedAt: new Date().toISOString(), reason, currentSlot: this.state.currentSlot, deferredSlotIds: Array.from(this.state.deferredSlotIds), isProcessing: this.state.isProcessing, agentStatus: null, // Will be determined at restore time }; writeFileSync(this.stateFilePath, JSON.stringify(persistedState, null, 2)); this.config.logger.info(`[PLG-CAL-004] State persisted to ${this.stateFilePath} (reason: ${reason})`); } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to persist state:', err); } } /** * Restore state from disk if available. */ private restoreState(): void { try { if (!existsSync(this.stateFilePath)) { return; } const data = readFileSync(this.stateFilePath, 'utf-8'); const persisted: PersistedState = JSON.parse(data); // Validate version if (persisted.version !== STATE_VERSION) { this.config.logger.warn(`[PLG-CAL-004] State version mismatch: ${persisted.version} vs ${STATE_VERSION}`); this.clearPersistedState(); return; } // Restore deferred slot IDs if (persisted.deferredSlotIds && persisted.deferredSlotIds.length > 0) { this.state.deferredSlotIds = new Set(persisted.deferredSlotIds); this.config.logger.info(`[PLG-CAL-004] Restored ${persisted.deferredSlotIds.length} deferred slot(s)`); } // If there was a slot in progress, mark it for replanning if (persisted.isProcessing && persisted.currentSlot) { this.config.logger.warn( `[PLG-CAL-004] Previous session had in-progress slot: ${this.getSlotId(persisted.currentSlot)}` ); // The slot will be picked up by the next heartbeat and can be resumed or deferred } this.config.logger.info(`[PLG-CAL-004] State restored from ${persisted.persistedAt} (reason: ${persisted.reason})`); // Clear the persisted state after successful restore this.clearPersistedState(); } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to restore state:', err); } } /** * Clear persisted state file after successful restore. */ private clearPersistedState(): void { try { if (existsSync(this.stateFilePath)) { // In a real implementation, we might want to archive instead of delete // For now, we'll just clear the content to mark as processed writeFileSync(this.stateFilePath, JSON.stringify({ restored: true, at: new Date().toISOString() })); } } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to clear persisted state:', err); } } /** * Send a final heartbeat to the backend before shutdown. */ private async sendFinalHeartbeat(reason: string): Promise { try { this.config.logger.info(`[PLG-CAL-004] Sending final heartbeat (reason: ${reason})`); // Send agent status update indicating we're going offline await this.config.bridge.reportAgentStatus({ status: 'offline' }); this.config.logger.info('[PLG-CAL-004] Final heartbeat sent successfully'); } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to send final heartbeat:', err); } } /** * Handle ScheduledGatewayRestart event. * PLG-CAL-004: Persist state, send final heartbeat, pause scheduled tasks. */ private async handleScheduledGatewayRestart(slot: CalendarSlotResponse): Promise { this.config.logger.info('[PLG-CAL-004] Handling ScheduledGatewayRestart event'); // 1. Mark restart as pending to prevent new slot processing this.state.isRestartPending = true; // 2. Persist current state this.persistState('ScheduledGatewayRestart'); // 3. If there's a current slot, pause it gracefully if (this.state.isProcessing && this.state.currentSlot) { this.config.logger.info('[PLG-CAL-004] Pausing current slot before restart'); await this.pauseCurrentSlot(); } // 4. Send final heartbeat await this.sendFinalHeartbeat('ScheduledGatewayRestart'); // 5. Stop the scheduler (pause scheduled tasks) this.config.logger.info('[PLG-CAL-004] Stopping scheduler due to gateway restart'); this.stop(); // 6. Mark the slot as finished (since we've handled the restart) const update: SlotAgentUpdate = { status: SlotStatus.FINISHED, actual_duration: 0, // Restart preparation doesn't take time }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to mark restart slot as finished:', err); } } /** * Start the calendar scheduler. * Begins periodic heartbeat to check for pending slots. */ start(): void { if (this.state.isRunning) { this.config.logger.warn('Calendar scheduler already running'); return; } this.state.isRunning = true; this.state.isRestartPending = false; this.config.logger.info('Calendar scheduler started'); // Run initial heartbeat immediately this.runHeartbeat(); // Schedule periodic heartbeats this.state.intervalHandle = setInterval( () => this.runHeartbeat(), this.config.heartbeatIntervalMs ); } /** * Stop the calendar scheduler. * Cleans up intervals and resets state. */ stop(): void { this.state.isRunning = false; if (this.state.intervalHandle) { clearInterval(this.state.intervalHandle); this.state.intervalHandle = null; } this.config.logger.info('Calendar scheduler stopped'); } /** * Execute a single heartbeat cycle. * Fetches pending slots and handles execution logic. */ async runHeartbeat(): Promise { if (!this.state.isRunning) { return; } // Skip heartbeat if restart is pending if (this.state.isRestartPending) { this.logDebug('Heartbeat skipped: gateway restart pending'); return; } this.state.lastHeartbeatAt = new Date(); try { // Fetch pending slots from backend const response = await this.config.bridge.heartbeat(); if (!response) { this.logDebug('Heartbeat: backend unreachable'); return; } this.logDebug( `Heartbeat: ${response.slots.length} slots pending, agent_status=${response.agent_status}` ); // If agent is not idle, defer all pending slots if (response.agent_status !== 'idle') { await this.handleNonIdleAgent(response.slots, response.agent_status); return; } // Agent is idle again - previously deferred slots should become eligible // for selection in the next planning pass. if (this.state.deferredSlotIds.size > 0) { this.logDebug( `Agent returned to idle; clearing ${this.state.deferredSlotIds.size} deferred slot marker(s) for replanning` ); this.state.deferredSlotIds.clear(); } // Agent is idle - handle pending slots await this.handleIdleAgent(response.slots); } catch (err) { this.config.logger.error('Heartbeat error:', err); } } /** * Handle slots when agent is not idle. * Defer all pending slots with priority boost. */ private async handleNonIdleAgent( slots: CalendarSlotResponse[], agentStatus: AgentStatusValue ): Promise { if (slots.length === 0) { return; } this.config.logger.info( `Agent not idle (status=${agentStatus}), deferring ${slots.length} slot(s)` ); for (const slot of slots) { const slotId = this.getSlotId(slot); // Skip if already deferred this session if (this.state.deferredSlotIds.has(slotId)) { continue; } // Mark slot as deferred with priority boost (+1) await this.deferSlot(slot); this.state.deferredSlotIds.add(slotId); } } /** * Handle slots when agent is idle. * Select highest priority slot and wake agent. */ private async handleIdleAgent(slots: CalendarSlotResponse[]): Promise { if (slots.length === 0) { return; } // Filter out already deferred slots in this session const eligibleSlots = slots.filter( (s) => !this.state.deferredSlotIds.has(this.getSlotId(s)) ); if (eligibleSlots.length === 0) { this.logDebug('All pending slots have been deferred this session'); return; } // Select highest priority slot (backend already sorts by priority DESC) const [selectedSlot, ...remainingSlots] = eligibleSlots; this.config.logger.info( `Selected slot for execution: id=${this.getSlotId(selectedSlot)}, ` + `type=${selectedSlot.slot_type}, priority=${selectedSlot.priority}` ); // Mark remaining slots as deferred for (const slot of remainingSlots) { await this.deferSlot(slot); this.state.deferredSlotIds.add(this.getSlotId(slot)); } // Check if this is a ScheduledGatewayRestart event if (this.isScheduledGatewayRestart(selectedSlot)) { await this.handleScheduledGatewayRestart(selectedSlot); return; } // Wake agent to execute selected slot await this.executeSlot(selectedSlot); } /** * Check if a slot is a ScheduledGatewayRestart system event. */ private isScheduledGatewayRestart(slot: CalendarSlotResponse): boolean { if (slot.event_type !== 'system_event' || !slot.event_data) { return false; } const sysData = slot.event_data as CalendarEventDataSystemEvent; return sysData.event === 'ScheduledGatewayRestart'; } /** * Execute a slot by waking the agent. */ private async executeSlot(slot: CalendarSlotResponse): Promise { if (this.state.isProcessing) { this.config.logger.warn('Already processing a slot, deferring new slot'); await this.deferSlot(slot); return; } this.state.isProcessing = true; this.state.currentSlot = slot; try { // Mark slot as attended and ongoing before waking agent const update: SlotAgentUpdate = { status: SlotStatus.ONGOING, started_at: this.formatTime(new Date()), }; let updateSuccess: boolean; if (slot.id) { updateSuccess = await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { const updated = await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); updateSuccess = updated !== null; // Update slot reference if materialized if (updated) { this.state.currentSlot = updated; } } else { updateSuccess = false; } if (!updateSuccess) { this.config.logger.error('Failed to update slot status before execution'); this.state.isProcessing = false; this.state.currentSlot = null; return; } // Report agent status change to backend const newAgentStatus = slot.slot_type === 'on_call' ? 'on_call' : 'busy'; await this.config.bridge.reportAgentStatus({ status: newAgentStatus }); // Build wake context for agent const wakeContext = this.buildWakeContext(slot); // Wake the agent const wakeSuccess = await this.config.wakeAgent(wakeContext); if (!wakeSuccess) { this.config.logger.error('Failed to wake agent for slot execution'); // Revert slot to not_started status await this.revertSlot(slot); await this.config.bridge.reportAgentStatus({ status: 'idle' }); this.state.isProcessing = false; this.state.currentSlot = null; await this.triggerReplan('wake failure'); return; } // Note: isProcessing remains true until agent signals completion // This is handled by external completion callback } catch (err) { this.config.logger.error('Error executing slot:', err); this.state.isProcessing = false; this.state.currentSlot = null; } } /** * Build the wake context for an agent based on slot details. */ private buildWakeContext(slot: CalendarSlotResponse): AgentWakeContext { const isVirtual = slot.virtual_id !== null; const slotId = this.getSlotId(slot); // Build task description based on event type let taskDescription: string; let prompt: string; if (slot.event_type === 'job' && slot.event_data) { const jobData = slot.event_data as CalendarEventDataJob; taskDescription = `${jobData.type} ${jobData.code}`; prompt = this.buildJobPrompt(slot, jobData); } else if (slot.event_type === 'system_event' && slot.event_data) { const sysData = slot.event_data as CalendarEventDataSystemEvent; taskDescription = `System Event: ${sysData.event}`; prompt = this.buildSystemPrompt(slot, sysData); } else if (slot.event_type === 'entertainment') { taskDescription = 'Entertainment slot'; prompt = this.buildEntertainmentPrompt(slot); } else { taskDescription = `Generic ${slot.slot_type} slot`; prompt = this.buildGenericPrompt(slot); } return { slot, taskDescription, prompt, isVirtual, }; } /** * Build prompt for job-type slots. */ private buildJobPrompt( slot: CalendarSlotResponse, jobData: CalendarEventDataJob ): string { const duration = slot.estimated_duration; const type = jobData.type; const code = jobData.code; return `You have a scheduled ${type} job to work on. Task Code: ${code} Estimated Duration: ${duration} minutes Slot Type: ${slot.slot_type} Priority: ${slot.priority} Please focus on this task for the allocated time. When you finish or need to pause, report your progress back to the calendar system. Working sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'} Start working on ${code} now.`; } /** * Build prompt for system event slots. */ private buildSystemPrompt( slot: CalendarSlotResponse, sysData: CalendarEventDataSystemEvent ): string { switch (sysData.event) { case 'ScheduleToday': return `System Event: Schedule Today Please review today's calendar and schedule any pending tasks or planning activities. Estimated time: ${slot.estimated_duration} minutes. Check your calendar and plan the day's work.`; case 'SummaryToday': return `System Event: Daily Summary Please provide a summary of today's activities and progress. Estimated time: ${slot.estimated_duration} minutes. Review what was accomplished and prepare end-of-day notes.`; case 'ScheduledGatewayRestart': return `System Event: Scheduled Gateway Restart The OpenClaw gateway is scheduled to restart soon. Please: 1. Persist any important state 2. Complete or gracefully pause current tasks 3. Prepare for restart Time remaining: ${slot.estimated_duration} minutes.`; default: return `System Event: ${sysData.event} A system event has been scheduled. Please handle accordingly. Estimated time: ${slot.estimated_duration} minutes.`; } } /** * Build prompt for entertainment slots. */ private buildEntertainmentPrompt(slot: CalendarSlotResponse): string { return `Scheduled Entertainment Break Duration: ${slot.estimated_duration} minutes Take a break and enjoy some leisure time. This slot is reserved for non-work activities to help maintain work-life balance.`; } /** * Build generic prompt for slots without specific event data. */ private buildGenericPrompt(slot: CalendarSlotResponse): string { return `Scheduled Calendar Slot Type: ${slot.slot_type} Duration: ${slot.estimated_duration} minutes Priority: ${slot.priority} Please use this time for the scheduled activity.`; } /** * Mark a slot as deferred with priority boost. */ private async deferSlot(slot: CalendarSlotResponse): Promise { const update: SlotAgentUpdate = { status: SlotStatus.DEFERRED, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } this.logDebug(`Deferred slot: ${this.getSlotId(slot)}`); } catch (err) { this.config.logger.error('Failed to defer slot:', err); } } /** * Revert a slot to not_started status after failed execution attempt. */ private async revertSlot(slot: CalendarSlotResponse): Promise { const update: SlotAgentUpdate = { status: SlotStatus.NOT_STARTED, started_at: undefined, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } } catch (err) { this.config.logger.error('Failed to revert slot:', err); } } /** * Complete the current slot execution. * Call this when the agent finishes the task. */ async completeCurrentSlot(actualDurationMinutes: number): Promise { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to complete'); return; } const slot = this.state.currentSlot; const update: SlotAgentUpdate = { status: SlotStatus.FINISHED, actual_duration: actualDurationMinutes, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } // Report agent back to idle await this.config.bridge.reportAgentStatus({ status: 'idle' }); this.config.logger.info( `Completed slot ${this.getSlotId(slot)}, actual_duration=${actualDurationMinutes}min` ); } catch (err) { this.config.logger.error('Failed to complete slot:', err); } finally { this.state.isProcessing = false; this.state.currentSlot = null; await this.triggerReplan('slot completion'); } } /** * Abort the current slot execution. * Call this when the agent cannot complete the task. */ async abortCurrentSlot(reason?: string): Promise { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to abort'); return; } const slot = this.state.currentSlot; const update: SlotAgentUpdate = { status: SlotStatus.ABORTED, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } // Report agent back to idle await this.config.bridge.reportAgentStatus({ status: 'idle' }); this.config.logger.info( `Aborted slot ${this.getSlotId(slot)}${reason ? `: ${reason}` : ''}` ); } catch (err) { this.config.logger.error('Failed to abort slot:', err); } finally { this.state.isProcessing = false; this.state.currentSlot = null; await this.triggerReplan('slot abort'); } } /** * Pause the current slot execution. * Call this when the agent needs to temporarily pause. */ async pauseCurrentSlot(): Promise { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to pause'); return; } const slot = this.state.currentSlot; const update: SlotAgentUpdate = { status: SlotStatus.PAUSED, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } this.config.logger.info(`Paused slot ${this.getSlotId(slot)}`); } catch (err) { this.config.logger.error('Failed to pause slot:', err); } } /** * Resume a paused slot. */ async resumeCurrentSlot(): Promise { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to resume'); return; } const slot = this.state.currentSlot; const update: SlotAgentUpdate = { status: SlotStatus.ONGOING, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } this.config.logger.info(`Resumed slot ${this.getSlotId(slot)}`); } catch (err) { this.config.logger.error('Failed to resume slot:', err); } } /** * Trigger an immediate replanning pass after the current slot lifecycle ends. * This lets previously deferred/not-started slots compete again as soon as * the agent becomes idle. */ private async triggerReplan(reason: string): Promise { if (!this.state.isRunning) { return; } this.logDebug(`Triggering immediate replanning after ${reason}`); try { await this.runHeartbeat(); } catch (err) { this.config.logger.error(`Failed to trigger replanning after ${reason}:`, err); } } /** * Get a stable ID for a slot (real or virtual). */ private getSlotId(slot: CalendarSlotResponse): string { return slot.id?.toString() || slot.virtual_id || 'unknown'; } /** * Format a Date as ISO time string (HH:MM:SS). */ private formatTime(date: Date): string { return date.toTimeString().split(' ')[0]; } /** * Debug logging helper. */ private logDebug(message: string): void { if (this.config.debug) { this.config.logger.debug(`[CalendarScheduler] ${message}`); } } /** * Get current scheduler state (for introspection). */ getState(): Readonly { return { ...this.state }; } /** * Check if scheduler is running. */ isRunning(): boolean { return this.state.isRunning; } /** * Check if currently processing a slot. */ isProcessing(): boolean { return this.state.isProcessing; } /** * Get the current slot being executed (if any). */ getCurrentSlot(): CalendarSlotResponse | null { return this.state.currentSlot; } /** * Check if a gateway restart is pending. */ isRestartPending(): boolean { return this.state.isRestartPending; } /** * Get the path to the state file. */ getStateFilePath(): string { return this.stateFilePath; } } /** * Factory function to create a CalendarScheduler from plugin context. */ export function createCalendarScheduler( config: CalendarSchedulerConfig ): CalendarScheduler { return new CalendarScheduler(config); }