"use strict"; /** * HarborForge Calendar Scheduler * * PLG-CAL-002: Plugin-side handling for pending slot execution. * PLG-CAL-004: ScheduledGatewayRestart event handling with state persistence. * * Responsibilities: * - Run calendar heartbeat every minute * - Detect when agent is Idle and slots are pending * - Wake agent with task context * - Handle slot status transitions (attended, ongoing, deferred) * - Manage agent status transitions (idle → busy/on_call) * - Persist state on ScheduledGatewayRestart and restore on startup * - Send final heartbeat before graceful shutdown * * Design reference: NEXT_WAVE_DEV_DIRECTION.md §6 (Agent wakeup mechanism) */ Object.defineProperty(exports, "__esModule", { value: true }); exports.CalendarScheduler = void 0; exports.createCalendarScheduler = createCalendarScheduler; const fs_1 = require("fs"); const path_1 = require("path"); const types_1 = require("./types"); /** State file name */ const STATE_FILENAME = 'calendar-scheduler-state.json'; /** State file version for migration compatibility */ const STATE_VERSION = 1; /** * CalendarScheduler manages the periodic heartbeat and slot execution lifecycle. */ class CalendarScheduler { config; state; stateFilePath; constructor(config) { this.config = { heartbeatIntervalMs: 60000, // 1 minute default debug: false, stateDir: this.getDefaultStateDir(), ...config, }; this.stateFilePath = (0, path_1.join)(this.config.stateDir, STATE_FILENAME); this.state = { isRunning: false, currentSlot: null, lastHeartbeatAt: null, intervalHandle: null, deferredSlotIds: new Set(), isProcessing: false, isRestartPending: false, }; // Attempt to restore state from previous persistence this.restoreState(); } /** * Get default state directory (plugin data directory or temp fallback). */ getDefaultStateDir() { // Try to use the plugin's directory or a standard data location const candidates = [ process.env.OPENCLAW_PLUGIN_DATA_DIR, process.env.HARBORFORGE_PLUGIN_DIR, (0, path_1.join)(process.cwd(), '.harborforge'), (0, path_1.join)(process.cwd(), 'data'), '/tmp/harborforge', ]; for (const dir of candidates) { if (dir) { try { if (!(0, fs_1.existsSync)(dir)) { (0, fs_1.mkdirSync)(dir, { recursive: true }); } // Test write access const testFile = (0, path_1.join)(dir, '.write-test'); (0, fs_1.writeFileSync)(testFile, '', { flag: 'w' }); return dir; } catch { continue; } } } // Fallback to current working directory return process.cwd(); } /** * Persist current state to disk for recovery after restart. */ persistState(reason) { try { const persistedState = { version: STATE_VERSION, persistedAt: new Date().toISOString(), reason, currentSlot: this.state.currentSlot, deferredSlotIds: Array.from(this.state.deferredSlotIds), isProcessing: this.state.isProcessing, agentStatus: null, // Will be determined at restore time }; (0, fs_1.writeFileSync)(this.stateFilePath, JSON.stringify(persistedState, null, 2)); this.config.logger.info(`[PLG-CAL-004] State persisted to ${this.stateFilePath} (reason: ${reason})`); } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to persist state:', err); } } /** * Restore state from disk if available. */ restoreState() { try { if (!(0, fs_1.existsSync)(this.stateFilePath)) { return; } const data = (0, fs_1.readFileSync)(this.stateFilePath, 'utf-8'); const persisted = JSON.parse(data); // Validate version if (persisted.version !== STATE_VERSION) { this.config.logger.warn(`[PLG-CAL-004] State version mismatch: ${persisted.version} vs ${STATE_VERSION}`); this.clearPersistedState(); return; } // Restore deferred slot IDs if (persisted.deferredSlotIds && persisted.deferredSlotIds.length > 0) { this.state.deferredSlotIds = new Set(persisted.deferredSlotIds); this.config.logger.info(`[PLG-CAL-004] Restored ${persisted.deferredSlotIds.length} deferred slot(s)`); } // If there was a slot in progress, mark it for replanning if (persisted.isProcessing && persisted.currentSlot) { this.config.logger.warn(`[PLG-CAL-004] Previous session had in-progress slot: ${this.getSlotId(persisted.currentSlot)}`); // The slot will be picked up by the next heartbeat and can be resumed or deferred } this.config.logger.info(`[PLG-CAL-004] State restored from ${persisted.persistedAt} (reason: ${persisted.reason})`); // Clear the persisted state after successful restore this.clearPersistedState(); } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to restore state:', err); } } /** * Clear persisted state file after successful restore. */ clearPersistedState() { try { if ((0, fs_1.existsSync)(this.stateFilePath)) { // In a real implementation, we might want to archive instead of delete // For now, we'll just clear the content to mark as processed (0, fs_1.writeFileSync)(this.stateFilePath, JSON.stringify({ restored: true, at: new Date().toISOString() })); } } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to clear persisted state:', err); } } /** * Send a final heartbeat to the backend before shutdown. */ async sendFinalHeartbeat(reason) { try { this.config.logger.info(`[PLG-CAL-004] Sending final heartbeat (reason: ${reason})`); // Send agent status update indicating we're going offline await this.config.bridge.reportAgentStatus({ status: 'offline' }); this.config.logger.info('[PLG-CAL-004] Final heartbeat sent successfully'); } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to send final heartbeat:', err); } } /** * Handle ScheduledGatewayRestart event. * PLG-CAL-004: Persist state, send final heartbeat, pause scheduled tasks. */ async handleScheduledGatewayRestart(slot) { this.config.logger.info('[PLG-CAL-004] Handling ScheduledGatewayRestart event'); // 1. Mark restart as pending to prevent new slot processing this.state.isRestartPending = true; // 2. Persist current state this.persistState('ScheduledGatewayRestart'); // 3. If there's a current slot, pause it gracefully if (this.state.isProcessing && this.state.currentSlot) { this.config.logger.info('[PLG-CAL-004] Pausing current slot before restart'); await this.pauseCurrentSlot(); } // 4. Send final heartbeat await this.sendFinalHeartbeat('ScheduledGatewayRestart'); // 5. Stop the scheduler (pause scheduled tasks) this.config.logger.info('[PLG-CAL-004] Stopping scheduler due to gateway restart'); this.stop(); // 6. Mark the slot as finished (since we've handled the restart) const update = { status: types_1.SlotStatus.FINISHED, actual_duration: 0, // Restart preparation doesn't take time }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } } catch (err) { this.config.logger.error('[PLG-CAL-004] Failed to mark restart slot as finished:', err); } } /** * Start the calendar scheduler. * Begins periodic heartbeat to check for pending slots. */ start() { if (this.state.isRunning) { this.config.logger.warn('Calendar scheduler already running'); return; } this.state.isRunning = true; this.state.isRestartPending = false; this.config.logger.info('Calendar scheduler started'); // Run initial heartbeat immediately this.runHeartbeat(); // Schedule periodic heartbeats this.state.intervalHandle = setInterval(() => this.runHeartbeat(), this.config.heartbeatIntervalMs); } /** * Stop the calendar scheduler. * Cleans up intervals and resets state. */ stop() { this.state.isRunning = false; if (this.state.intervalHandle) { clearInterval(this.state.intervalHandle); this.state.intervalHandle = null; } this.config.logger.info('Calendar scheduler stopped'); } /** * Execute a single heartbeat cycle. * Fetches pending slots and handles execution logic. */ async runHeartbeat() { if (!this.state.isRunning) { return; } // Skip heartbeat if restart is pending if (this.state.isRestartPending) { this.logDebug('Heartbeat skipped: gateway restart pending'); return; } this.state.lastHeartbeatAt = new Date(); try { // Fetch pending slots from backend const response = await this.config.bridge.heartbeat(); if (!response) { this.logDebug('Heartbeat: backend unreachable'); return; } this.logDebug(`Heartbeat: ${response.slots.length} slots pending, agent_status=${response.agent_status}`); // If agent is not idle, defer all pending slots if (response.agent_status !== 'idle') { await this.handleNonIdleAgent(response.slots, response.agent_status); return; } // Agent is idle again - previously deferred slots should become eligible // for selection in the next planning pass. if (this.state.deferredSlotIds.size > 0) { this.logDebug(`Agent returned to idle; clearing ${this.state.deferredSlotIds.size} deferred slot marker(s) for replanning`); this.state.deferredSlotIds.clear(); } // Agent is idle - handle pending slots await this.handleIdleAgent(response.slots); } catch (err) { this.config.logger.error('Heartbeat error:', err); } } /** * Handle slots when agent is not idle. * Defer all pending slots with priority boost. */ async handleNonIdleAgent(slots, agentStatus) { if (slots.length === 0) { return; } this.config.logger.info(`Agent not idle (status=${agentStatus}), deferring ${slots.length} slot(s)`); for (const slot of slots) { const slotId = this.getSlotId(slot); // Skip if already deferred this session if (this.state.deferredSlotIds.has(slotId)) { continue; } // Mark slot as deferred with priority boost (+1) await this.deferSlot(slot); this.state.deferredSlotIds.add(slotId); } } /** * Handle slots when agent is idle. * Select highest priority slot and wake agent. */ async handleIdleAgent(slots) { if (slots.length === 0) { return; } // Filter out already deferred slots in this session const eligibleSlots = slots.filter((s) => !this.state.deferredSlotIds.has(this.getSlotId(s))); if (eligibleSlots.length === 0) { this.logDebug('All pending slots have been deferred this session'); return; } // Select highest priority slot (backend already sorts by priority DESC) const [selectedSlot, ...remainingSlots] = eligibleSlots; this.config.logger.info(`Selected slot for execution: id=${this.getSlotId(selectedSlot)}, ` + `type=${selectedSlot.slot_type}, priority=${selectedSlot.priority}`); // Mark remaining slots as deferred for (const slot of remainingSlots) { await this.deferSlot(slot); this.state.deferredSlotIds.add(this.getSlotId(slot)); } // Check if this is a ScheduledGatewayRestart event if (this.isScheduledGatewayRestart(selectedSlot)) { await this.handleScheduledGatewayRestart(selectedSlot); return; } // Wake agent to execute selected slot await this.executeSlot(selectedSlot); } /** * Check if a slot is a ScheduledGatewayRestart system event. */ isScheduledGatewayRestart(slot) { if (slot.event_type !== 'system_event' || !slot.event_data) { return false; } const sysData = slot.event_data; return sysData.event === 'ScheduledGatewayRestart'; } /** * Execute a slot by waking the agent. */ async executeSlot(slot) { if (this.state.isProcessing) { this.config.logger.warn('Already processing a slot, deferring new slot'); await this.deferSlot(slot); return; } this.state.isProcessing = true; this.state.currentSlot = slot; try { // Mark slot as attended and ongoing before waking agent const update = { status: types_1.SlotStatus.ONGOING, started_at: this.formatTime(new Date()), }; let updateSuccess; if (slot.id) { updateSuccess = await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { const updated = await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); updateSuccess = updated !== null; // Update slot reference if materialized if (updated) { this.state.currentSlot = updated; } } else { updateSuccess = false; } if (!updateSuccess) { this.config.logger.error('Failed to update slot status before execution'); this.state.isProcessing = false; this.state.currentSlot = null; return; } // Report agent status change to backend const newAgentStatus = slot.slot_type === 'on_call' ? 'on_call' : 'busy'; await this.config.bridge.reportAgentStatus({ status: newAgentStatus }); // Build wake context for agent const wakeContext = this.buildWakeContext(slot); // Wake the agent const wakeSuccess = await this.config.wakeAgent(wakeContext); if (!wakeSuccess) { this.config.logger.error('Failed to wake agent for slot execution'); // Revert slot to not_started status await this.revertSlot(slot); await this.config.bridge.reportAgentStatus({ status: 'idle' }); this.state.isProcessing = false; this.state.currentSlot = null; await this.triggerReplan('wake failure'); return; } // Note: isProcessing remains true until agent signals completion // This is handled by external completion callback } catch (err) { this.config.logger.error('Error executing slot:', err); this.state.isProcessing = false; this.state.currentSlot = null; } } /** * Build the wake context for an agent based on slot details. */ buildWakeContext(slot) { const isVirtual = slot.virtual_id !== null; const slotId = this.getSlotId(slot); // Build task description based on event type let taskDescription; let prompt; if (slot.event_type === 'job' && slot.event_data) { const jobData = slot.event_data; taskDescription = `${jobData.type} ${jobData.code}`; prompt = this.buildJobPrompt(slot, jobData); } else if (slot.event_type === 'system_event' && slot.event_data) { const sysData = slot.event_data; taskDescription = `System Event: ${sysData.event}`; prompt = this.buildSystemPrompt(slot, sysData); } else if (slot.event_type === 'entertainment') { taskDescription = 'Entertainment slot'; prompt = this.buildEntertainmentPrompt(slot); } else { taskDescription = `Generic ${slot.slot_type} slot`; prompt = this.buildGenericPrompt(slot); } return { slot, taskDescription, prompt, isVirtual, }; } /** * Build prompt for job-type slots. */ buildJobPrompt(slot, jobData) { const duration = slot.estimated_duration; const type = jobData.type; const code = jobData.code; return `You have a scheduled ${type} job to work on. Task Code: ${code} Estimated Duration: ${duration} minutes Slot Type: ${slot.slot_type} Priority: ${slot.priority} Please focus on this task for the allocated time. When you finish or need to pause, report your progress back to the calendar system. Working sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'} Start working on ${code} now.`; } /** * Build prompt for system event slots. */ buildSystemPrompt(slot, sysData) { switch (sysData.event) { case 'ScheduleToday': return `System Event: Schedule Today Please review today's calendar and schedule any pending tasks or planning activities. Estimated time: ${slot.estimated_duration} minutes. Check your calendar and plan the day's work.`; case 'SummaryToday': return `System Event: Daily Summary Please provide a summary of today's activities and progress. Estimated time: ${slot.estimated_duration} minutes. Review what was accomplished and prepare end-of-day notes.`; case 'ScheduledGatewayRestart': return `System Event: Scheduled Gateway Restart The OpenClaw gateway is scheduled to restart soon. Please: 1. Persist any important state 2. Complete or gracefully pause current tasks 3. Prepare for restart Time remaining: ${slot.estimated_duration} minutes.`; default: return `System Event: ${sysData.event} A system event has been scheduled. Please handle accordingly. Estimated time: ${slot.estimated_duration} minutes.`; } } /** * Build prompt for entertainment slots. */ buildEntertainmentPrompt(slot) { return `Scheduled Entertainment Break Duration: ${slot.estimated_duration} minutes Take a break and enjoy some leisure time. This slot is reserved for non-work activities to help maintain work-life balance.`; } /** * Build generic prompt for slots without specific event data. */ buildGenericPrompt(slot) { return `Scheduled Calendar Slot Type: ${slot.slot_type} Duration: ${slot.estimated_duration} minutes Priority: ${slot.priority} Please use this time for the scheduled activity.`; } /** * Mark a slot as deferred with priority boost. */ async deferSlot(slot) { const update = { status: types_1.SlotStatus.DEFERRED, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } this.logDebug(`Deferred slot: ${this.getSlotId(slot)}`); } catch (err) { this.config.logger.error('Failed to defer slot:', err); } } /** * Revert a slot to not_started status after failed execution attempt. */ async revertSlot(slot) { const update = { status: types_1.SlotStatus.NOT_STARTED, started_at: undefined, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } } catch (err) { this.config.logger.error('Failed to revert slot:', err); } } /** * Complete the current slot execution. * Call this when the agent finishes the task. */ async completeCurrentSlot(actualDurationMinutes) { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to complete'); return; } const slot = this.state.currentSlot; const update = { status: types_1.SlotStatus.FINISHED, actual_duration: actualDurationMinutes, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } // Report agent back to idle await this.config.bridge.reportAgentStatus({ status: 'idle' }); this.config.logger.info(`Completed slot ${this.getSlotId(slot)}, actual_duration=${actualDurationMinutes}min`); } catch (err) { this.config.logger.error('Failed to complete slot:', err); } finally { this.state.isProcessing = false; this.state.currentSlot = null; await this.triggerReplan('slot completion'); } } /** * Abort the current slot execution. * Call this when the agent cannot complete the task. */ async abortCurrentSlot(reason) { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to abort'); return; } const slot = this.state.currentSlot; const update = { status: types_1.SlotStatus.ABORTED, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } // Report agent back to idle await this.config.bridge.reportAgentStatus({ status: 'idle' }); this.config.logger.info(`Aborted slot ${this.getSlotId(slot)}${reason ? `: ${reason}` : ''}`); } catch (err) { this.config.logger.error('Failed to abort slot:', err); } finally { this.state.isProcessing = false; this.state.currentSlot = null; await this.triggerReplan('slot abort'); } } /** * Pause the current slot execution. * Call this when the agent needs to temporarily pause. */ async pauseCurrentSlot() { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to pause'); return; } const slot = this.state.currentSlot; const update = { status: types_1.SlotStatus.PAUSED, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } this.config.logger.info(`Paused slot ${this.getSlotId(slot)}`); } catch (err) { this.config.logger.error('Failed to pause slot:', err); } } /** * Resume a paused slot. */ async resumeCurrentSlot() { if (!this.state.currentSlot) { this.config.logger.warn('No current slot to resume'); return; } const slot = this.state.currentSlot; const update = { status: types_1.SlotStatus.ONGOING, }; try { if (slot.id) { await this.config.bridge.updateSlot(slot.id, update); } else if (slot.virtual_id) { await this.config.bridge.updateVirtualSlot(slot.virtual_id, update); } this.config.logger.info(`Resumed slot ${this.getSlotId(slot)}`); } catch (err) { this.config.logger.error('Failed to resume slot:', err); } } /** * Trigger an immediate replanning pass after the current slot lifecycle ends. * This lets previously deferred/not-started slots compete again as soon as * the agent becomes idle. */ async triggerReplan(reason) { if (!this.state.isRunning) { return; } this.logDebug(`Triggering immediate replanning after ${reason}`); try { await this.runHeartbeat(); } catch (err) { this.config.logger.error(`Failed to trigger replanning after ${reason}:`, err); } } /** * Get a stable ID for a slot (real or virtual). */ getSlotId(slot) { return slot.id?.toString() || slot.virtual_id || 'unknown'; } /** * Format a Date as ISO time string (HH:MM:SS). */ formatTime(date) { return date.toTimeString().split(' ')[0]; } /** * Debug logging helper. */ logDebug(message) { if (this.config.debug) { this.config.logger.debug(`[CalendarScheduler] ${message}`); } } /** * Get current scheduler state (for introspection). */ getState() { return { ...this.state }; } /** * Check if scheduler is running. */ isRunning() { return this.state.isRunning; } /** * Check if currently processing a slot. */ isProcessing() { return this.state.isProcessing; } /** * Get the current slot being executed (if any). */ getCurrentSlot() { return this.state.currentSlot; } /** * Check if a gateway restart is pending. */ isRestartPending() { return this.state.isRestartPending; } /** * Get the path to the state file. */ getStateFilePath() { return this.stateFilePath; } } exports.CalendarScheduler = CalendarScheduler; /** * Factory function to create a CalendarScheduler from plugin context. */ function createCalendarScheduler(config) { return new CalendarScheduler(config); } //# sourceMappingURL=scheduler.js.map