diff --git a/REFACTOR_PLAN.md b/REFACTOR_PLAN.md new file mode 100644 index 0000000..1b2f63c --- /dev/null +++ b/REFACTOR_PLAN.md @@ -0,0 +1,112 @@ +# CalendarScheduler Refactor Plan + +## Current Design + +``` +Every 60s: + heartbeat() → POST /calendar/agent/heartbeat → returns pending slots + if idle → select highest priority → executeSlot → wakeAgent(spawn) + if busy → defer all pending slots +``` + +**Problems:** +1. Every heartbeat queries backend for pending slots — no local awareness of full schedule +2. Cannot detect slots assigned by other agents between heartbeats +3. 60s interval is too frequent for sync but too infrequent for precise wakeup +4. Wakeup via `api.spawn()` creates a plain session, not a Discord private channel + +## Target Design + +``` +Every 5-10 min (sync interval): + syncSchedule() → GET /calendar/day → update local today cache + +Every 30s (check interval): + checkDueSlots() → scan local cache for due slots + if due slot found: + confirmAgentStatus() → GET /calendar/agent/status + if not busy → wakeAgent (via Dirigent moderator bot private channel) +``` + +## Changes Required + +### 1. Add Local Schedule Cache + +New class `ScheduleCache`: +```typescript +class ScheduleCache { + private slots: Map; // slotId → slot + private lastSyncAt: Date | null; + + async sync(bridge: CalendarBridgeClient): Promise; // fetch today's full schedule + getDueSlots(now: Date): CalendarSlotResponse[]; // scheduled_at <= now && NOT_STARTED/DEFERRED + updateSlot(id: string, update: Partial): void; // local update + getAll(): CalendarSlotResponse[]; +} +``` + +### 2. Add CalendarBridgeClient.getDaySchedule() + +New endpoint call: +```typescript +async getDaySchedule(date: string): Promise +// GET /calendar/day?date=YYYY-MM-DD +``` + +This fetches ALL slots for the day, not just pending ones. The existing `heartbeat()` only returns NOT_STARTED/DEFERRED. + +### 3. Split Heartbeat into Sync + Check + +**Replace** single `runHeartbeat()` with two intervals: + +```typescript +// Sync: every 5 min — pull full schedule from backend +this.syncInterval = setInterval(() => this.runSync(), 300_000); + +// Check: every 30s — scan local cache for due slots +this.checkInterval = setInterval(() => this.runCheck(), 30_000); +``` + +`runSync()`: +1. `bridge.getDaySchedule(today)` → update cache +2. Still send heartbeat to keep backend informed of agent liveness + +`runCheck()`: +1. `cache.getDueSlots(now)` → find due slots +2. Filter out session-deferred slots +3. If agent idle → select highest priority → execute + +### 4. Wakeup via Dirigent (future) + +Change `wakeAgent()` to create a private Discord channel via Dirigent moderator bot instead of `api.spawn()`. This requires: +- Access to Dirigent's moderator bot token or cross-plugin API +- Creating a private channel with only the target agent +- Posting the wakeup prompt as a message + +**For now:** Keep `api.spawn()` as the wakeup method. The Dirigent integration can be added later as it requires cross-plugin coordination. + +## Implementation Order + +1. Add `ScheduleCache` class (new file: `plugin/calendar/schedule-cache.ts`) +2. Add `getDaySchedule()` to `CalendarBridgeClient` +3. Refactor `CalendarScheduler`: + - Replace single interval with sync + check intervals + - Use cache instead of heartbeat for slot discovery + - Keep heartbeat for agent liveness reporting (reduced frequency) +4. Update state persistence for new structure +5. Keep existing wakeAgent/completion/abort/pause/resume tools unchanged + +## Files to Modify + +| File | Changes | +|------|---------| +| `plugin/calendar/schedule-cache.ts` | New file | +| `plugin/calendar/calendar-bridge.ts` | Add `getDaySchedule()` | +| `plugin/calendar/scheduler.ts` | Refactor heartbeat → sync + check | +| `plugin/calendar/index.ts` | Export new types | + +## Risk Assessment + +- **Low risk:** ScheduleCache is additive, doesn't break existing behavior +- **Medium risk:** Splitting heartbeat changes core scheduling logic +- **Mitigation:** Keep `heartbeat()` method intact, use it for liveness reporting alongside new sync diff --git a/plugin/calendar/calendar-bridge.ts b/plugin/calendar/calendar-bridge.ts index dca5cf5..4b92c27 100644 --- a/plugin/calendar/calendar-bridge.ts +++ b/plugin/calendar/calendar-bridge.ts @@ -169,6 +169,32 @@ export class CalendarBridgeClient { return this.sendBoolean('POST', url, body); } + /** + * Fetch the full day schedule for this agent. + * + * Unlike heartbeat() which only returns pending (NOT_STARTED/DEFERRED) slots, + * this returns ALL slots for the given date, enabling the plugin to maintain + * a complete local view of today's schedule. + * + * @param date Date string in YYYY-MM-DD format + * @returns Array of all slots for the day, or null if unreachable + */ + async getDaySchedule(date: string): Promise { + const url = `${this.baseUrl}/calendar/day?date=${encodeURIComponent(date)}`; + try { + const response = await this.fetchJson<{ slots: CalendarSlotResponse[] }>(url, { + method: 'GET', + headers: { + 'X-Agent-ID': this.config.agentId, + 'X-Claw-Identifier': this.config.clawIdentifier, + }, + }); + return response?.slots ?? null; + } catch { + return null; + } + } + // ------------------------------------------------------------------------- // Internal helpers // ------------------------------------------------------------------------- diff --git a/plugin/calendar/index.ts b/plugin/calendar/index.ts index 8d2bed7..a829ccf 100644 --- a/plugin/calendar/index.ts +++ b/plugin/calendar/index.ts @@ -31,3 +31,4 @@ export * from './types'; export * from './calendar-bridge'; export * from './scheduler'; +export * from './schedule-cache'; diff --git a/plugin/calendar/schedule-cache.ts b/plugin/calendar/schedule-cache.ts new file mode 100644 index 0000000..9f8bfe6 --- /dev/null +++ b/plugin/calendar/schedule-cache.ts @@ -0,0 +1,109 @@ +/** + * Local cache of today's calendar schedule. + * Synced periodically from HF backend, checked locally for due slots. + */ + +import type { CalendarSlotResponse } from "./types.js"; + +export class ScheduleCache { + private slots: Map = new Map(); + private lastSyncAt: Date | null = null; + private cachedDate: string | null = null; // YYYY-MM-DD + + /** + * Replace the cache with a fresh schedule from backend. + */ + sync(date: string, slots: CalendarSlotResponse[]): void { + // If date changed, clear old data + if (this.cachedDate !== date) { + this.slots.clear(); + } + this.cachedDate = date; + + // Merge: update existing slots, add new ones + const incomingIds = new Set(); + for (const slot of slots) { + const id = this.getSlotId(slot); + incomingIds.add(id); + this.slots.set(id, slot); + } + + // Remove slots that no longer exist on backend (cancelled etc.) + for (const id of this.slots.keys()) { + if (!incomingIds.has(id)) { + this.slots.delete(id); + } + } + + this.lastSyncAt = new Date(); + } + + /** + * Get slots that are due (scheduled_at <= now) and still pending. + */ + getDueSlots(now: Date): CalendarSlotResponse[] { + const results: CalendarSlotResponse[] = []; + for (const slot of this.slots.values()) { + if (slot.status !== "not_started" && slot.status !== "deferred") continue; + if (!slot.scheduled_at) continue; + + const scheduledAt = this.parseScheduledTime(slot.scheduled_at); + if (scheduledAt && scheduledAt <= now) { + results.push(slot); + } + } + // Sort by priority descending + results.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0)); + return results; + } + + /** + * Update a slot locally (e.g., after status change). + */ + updateSlot(slotId: string, update: Partial): void { + const existing = this.slots.get(slotId); + if (existing) { + this.slots.set(slotId, { ...existing, ...update }); + } + } + + /** + * Remove a slot from cache. + */ + removeSlot(slotId: string): void { + this.slots.delete(slotId); + } + + /** + * Get all cached slots. + */ + getAll(): CalendarSlotResponse[] { + return Array.from(this.slots.values()); + } + + /** + * Get cache metadata. + */ + getStatus(): { slotCount: number; lastSyncAt: string | null; cachedDate: string | null } { + return { + slotCount: this.slots.size, + lastSyncAt: this.lastSyncAt?.toISOString() ?? null, + cachedDate: this.cachedDate, + }; + } + + private getSlotId(slot: CalendarSlotResponse): string { + return slot.virtual_id ?? String(slot.id); + } + + private parseScheduledTime(scheduledAt: string): Date | null { + // scheduled_at can be "HH:MM:SS" (time only) or full ISO + if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) { + // Time-only: combine with cached date + if (!this.cachedDate) return null; + return new Date(`${this.cachedDate}T${scheduledAt}Z`); + } + const d = new Date(scheduledAt); + return isNaN(d.getTime()) ? null : d; + } +} diff --git a/plugin/calendar/scheduler.ts b/plugin/calendar/scheduler.ts index bb0675f..e77f5de 100644 --- a/plugin/calendar/scheduler.ts +++ b/plugin/calendar/scheduler.ts @@ -19,6 +19,7 @@ import { writeFileSync, readFileSync, existsSync, mkdirSync } from 'fs'; import { join, dirname } from 'path'; import { CalendarBridgeClient } from './calendar-bridge'; +import { ScheduleCache } from './schedule-cache'; import { CalendarSlotResponse, SlotStatus, @@ -44,6 +45,8 @@ export interface CalendarSchedulerConfig { }; /** Heartbeat interval in milliseconds (default: 60000) */ heartbeatIntervalMs?: number; + /** Schedule sync interval in milliseconds (default: 300000 = 5 min) */ + syncIntervalMs?: number; /** Enable verbose debug logging */ debug?: boolean; /** Directory for state persistence (default: plugin data dir) */ @@ -95,8 +98,10 @@ interface SchedulerState { currentSlot: CalendarSlotResponse | null; /** Last heartbeat timestamp */ lastHeartbeatAt: Date | null; - /** Interval handle for cleanup */ + /** Heartbeat interval handle */ intervalHandle: ReturnType | null; + /** Schedule sync interval handle */ + syncIntervalHandle: ReturnType | null; /** Set of slot IDs that have been deferred in current session */ deferredSlotIds: Set; /** Whether agent is currently processing a slot */ @@ -117,10 +122,13 @@ export class CalendarScheduler { private config: Required; private state: SchedulerState; private stateFilePath: string; + /** Local cache of today's full schedule, synced periodically from backend */ + private scheduleCache: ScheduleCache = new ScheduleCache(); constructor(config: CalendarSchedulerConfig) { this.config = { heartbeatIntervalMs: 60000, // 1 minute default + syncIntervalMs: 300_000, // 5 minutes default debug: false, stateDir: this.getDefaultStateDir(), ...config, @@ -133,6 +141,7 @@ export class CalendarScheduler { currentSlot: null, lastHeartbeatAt: null, intervalHandle: null, + syncIntervalHandle: null, deferredSlotIds: new Set(), isProcessing: false, isRestartPending: false, @@ -327,14 +336,21 @@ export class CalendarScheduler { this.state.isRestartPending = false; this.config.logger.info('Calendar scheduler started'); - // Run initial heartbeat immediately + // Run initial sync + heartbeat immediately + this.runSync(); this.runHeartbeat(); - // Schedule periodic heartbeats + // Schedule periodic heartbeats (slot execution checks) this.state.intervalHandle = setInterval( () => this.runHeartbeat(), this.config.heartbeatIntervalMs ); + + // Schedule periodic schedule sync (full day schedule refresh) + this.state.syncIntervalHandle = setInterval( + () => this.runSync(), + this.config.syncIntervalMs + ); } /** @@ -348,10 +364,41 @@ export class CalendarScheduler { clearInterval(this.state.intervalHandle); this.state.intervalHandle = null; } + if (this.state.syncIntervalHandle) { + clearInterval(this.state.syncIntervalHandle); + this.state.syncIntervalHandle = null; + } this.config.logger.info('Calendar scheduler stopped'); } + /** + * Sync today's full schedule from backend into local cache. + * Runs every syncIntervalMs (default: 5 min). + * Catches new slots assigned by other agents or the manager. + */ + async runSync(): Promise { + if (!this.state.isRunning || this.state.isRestartPending) return; + + const today = new Date().toISOString().slice(0, 10); + try { + const slots = await this.config.bridge.getDaySchedule(today); + if (slots) { + this.scheduleCache.sync(today, slots); + this.logDebug(`Schedule synced: ${slots.length} slots for ${today}`); + } + } catch (err) { + this.config.logger.warn(`Schedule sync failed: ${String(err)}`); + } + } + + /** + * Get the local schedule cache (for status reporting / tools). + */ + getScheduleCache(): ScheduleCache { + return this.scheduleCache; + } + /** * Execute a single heartbeat cycle. * Fetches pending slots and handles execution logic. @@ -611,13 +658,11 @@ Task Code: ${code} Estimated Duration: ${duration} minutes Slot Type: ${slot.slot_type} Priority: ${slot.priority} +Working Sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'} -Please focus on this task for the allocated time. When you finish or need to pause, -report your progress back to the calendar system. - -Working sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'} - -Start working on ${code} now.`; +Follow the daily-routine skill's task-handson workflow to execute this task. +Use harborforge_calendar_complete when finished, or harborforge_calendar_pause to pause. +Before going idle, check for overdue slots as described in the slot-complete workflow.`; } /** @@ -630,19 +675,15 @@ Start working on ${code} now.`; switch (sysData.event) { case 'ScheduleToday': return `System Event: Schedule Today - -Please review today's calendar and schedule any pending tasks or planning activities. Estimated time: ${slot.estimated_duration} minutes. -Check your calendar and plan the day's work.`; +Follow the daily-routine skill's plan-schedule workflow to plan today's work.`; case 'SummaryToday': return `System Event: Daily Summary - -Please provide a summary of today's activities and progress. Estimated time: ${slot.estimated_duration} minutes. -Review what was accomplished and prepare end-of-day notes.`; +Review today's completed, deferred, and abandoned slots. Write a summary to your daily note (memory/YYYY-MM-DD.md).`; case 'ScheduledGatewayRestart': return `System Event: Scheduled Gateway Restart diff --git a/plugin/index.ts b/plugin/index.ts index ecc0287..3e63cd4 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -188,56 +188,52 @@ export default { } /** - * Wake/spawn agent with task context for slot execution. - * This is the callback invoked by CalendarScheduler when a slot is ready. + * Wake agent via dispatchInboundMessage — same mechanism used by Discord plugin. + * Direct in-process call, no WebSocket or CLI needed. */ async function wakeAgent(context: AgentWakeContext): Promise { logger.info(`Waking agent for slot: ${context.taskDescription}`); + const agentId = process.env.AGENT_ID || 'unknown'; + const slotId = context.slot.id ?? context.slot.virtual_id ?? 'unknown'; + const sessionKey = `agent:${agentId}:hf-calendar:slot-${slotId}`; + try { - // Method 1: Use OpenClaw spawn API if available (preferred) - if (api.spawn) { - const result = await api.spawn({ - task: context.prompt, - timeoutSeconds: context.slot.estimated_duration * 60, // Convert to seconds - }); + // Dynamic import — resolved at runtime by OpenClaw's module system + const sdkPath = 'openclaw/plugin-sdk/reply-runtime'; + const { dispatchInboundMessageWithDispatcher } = await import( + /* webpackIgnore: true */ sdkPath + ); - if (result?.sessionId) { - logger.info(`Agent spawned for calendar slot: session=${result.sessionId}`); - - // Track session completion - trackSessionCompletion(result.sessionId, context); - return true; - } + const cfg = api.runtime?.config?.loadConfig?.(); + if (!cfg) { + logger.error('Cannot load OpenClaw config for dispatch'); + return false; } - // Method 2: Send notification/alert to wake agent (fallback) - // This relies on the agent's heartbeat to check for notifications - logger.warn('OpenClaw spawn API not available, using notification fallback'); - - // Send calendar wakeup notification via backend - const live = resolveConfig(); - const agentId = process.env.AGENT_ID || 'unknown'; - - const notifyResponse = await fetch(`${live.backendUrl}/calendar/agent/notify`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'X-Agent-ID': agentId, - 'X-Claw-Identifier': live.identifier || hostname(), + const result = await dispatchInboundMessageWithDispatcher({ + ctx: { + Body: context.prompt, + SessionKey: sessionKey, + From: 'harborforge-calendar', + Provider: 'harborforge', + }, + cfg, + dispatcherOptions: { + deliver: async (payload: any) => { + // No delivery — agent works silently + logger.info(`Agent reply for slot ${slotId}: ${(payload.text || '').slice(0, 100)}`); + }, }, - body: JSON.stringify({ - agent_id: agentId, - message: context.prompt, - slot_id: context.slot.id || context.slot.virtual_id, - task_description: context.taskDescription, - }), }); - return notifyResponse.ok; + logger.info(`Agent dispatched for slot ${slotId}: ${result?.status || 'ok'}`); + return true; - } catch (err) { - logger.error('Failed to wake agent:', err); + } catch (err: any) { + const msg = err?.message || err?.code || String(err); + const stack = err?.stack?.split('\n').slice(0, 3).join(' | ') || ''; + logger.error(`Failed to dispatch agent for slot: ${msg} ${stack}`); return false; } }