feat: add local schedule cache + periodic sync to CalendarScheduler

- New ScheduleCache class: maintains today's full schedule locally
- CalendarBridgeClient.getDaySchedule(): fetch all slots for a date
- Scheduler now runs two intervals:
  - Heartbeat (60s): existing slot execution flow (unchanged)
  - Sync (5min): pulls full day schedule into local cache
- Exposes getScheduleCache() for tools and status reporting

This enables the plugin to detect slots assigned by other agents
between heartbeats and provides a complete local view of the schedule.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
operator
2026-04-18 17:45:32 +00:00
parent 9af8204376
commit 4a66a7cc90
5 changed files with 298 additions and 3 deletions

View File

@@ -169,6 +169,32 @@ export class CalendarBridgeClient {
return this.sendBoolean('POST', url, body);
}
/**
* Fetch the full day schedule for this agent.
*
* Unlike heartbeat() which only returns pending (NOT_STARTED/DEFERRED) slots,
* this returns ALL slots for the given date, enabling the plugin to maintain
* a complete local view of today's schedule.
*
* @param date Date string in YYYY-MM-DD format
* @returns Array of all slots for the day, or null if unreachable
*/
async getDaySchedule(date: string): Promise<CalendarSlotResponse[] | null> {
const url = `${this.baseUrl}/calendar/day?date=${encodeURIComponent(date)}`;
try {
const response = await this.fetchJson<{ slots: CalendarSlotResponse[] }>(url, {
method: 'GET',
headers: {
'X-Agent-ID': this.config.agentId,
'X-Claw-Identifier': this.config.clawIdentifier,
},
});
return response?.slots ?? null;
} catch {
return null;
}
}
// -------------------------------------------------------------------------
// Internal helpers
// -------------------------------------------------------------------------

View File

@@ -31,3 +31,4 @@
export * from './types';
export * from './calendar-bridge';
export * from './scheduler';
export * from './schedule-cache';

View File

@@ -0,0 +1,109 @@
/**
* Local cache of today's calendar schedule.
* Synced periodically from HF backend, checked locally for due slots.
*/
import type { CalendarSlotResponse } from "./types.js";
export class ScheduleCache {
private slots: Map<string, CalendarSlotResponse> = new Map();
private lastSyncAt: Date | null = null;
private cachedDate: string | null = null; // YYYY-MM-DD
/**
* Replace the cache with a fresh schedule from backend.
*/
sync(date: string, slots: CalendarSlotResponse[]): void {
// If date changed, clear old data
if (this.cachedDate !== date) {
this.slots.clear();
}
this.cachedDate = date;
// Merge: update existing slots, add new ones
const incomingIds = new Set<string>();
for (const slot of slots) {
const id = this.getSlotId(slot);
incomingIds.add(id);
this.slots.set(id, slot);
}
// Remove slots that no longer exist on backend (cancelled etc.)
for (const id of this.slots.keys()) {
if (!incomingIds.has(id)) {
this.slots.delete(id);
}
}
this.lastSyncAt = new Date();
}
/**
* Get slots that are due (scheduled_at <= now) and still pending.
*/
getDueSlots(now: Date): CalendarSlotResponse[] {
const results: CalendarSlotResponse[] = [];
for (const slot of this.slots.values()) {
if (slot.status !== "not_started" && slot.status !== "deferred") continue;
if (!slot.scheduled_at) continue;
const scheduledAt = this.parseScheduledTime(slot.scheduled_at);
if (scheduledAt && scheduledAt <= now) {
results.push(slot);
}
}
// Sort by priority descending
results.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
return results;
}
/**
* Update a slot locally (e.g., after status change).
*/
updateSlot(slotId: string, update: Partial<CalendarSlotResponse>): void {
const existing = this.slots.get(slotId);
if (existing) {
this.slots.set(slotId, { ...existing, ...update });
}
}
/**
* Remove a slot from cache.
*/
removeSlot(slotId: string): void {
this.slots.delete(slotId);
}
/**
* Get all cached slots.
*/
getAll(): CalendarSlotResponse[] {
return Array.from(this.slots.values());
}
/**
* Get cache metadata.
*/
getStatus(): { slotCount: number; lastSyncAt: string | null; cachedDate: string | null } {
return {
slotCount: this.slots.size,
lastSyncAt: this.lastSyncAt?.toISOString() ?? null,
cachedDate: this.cachedDate,
};
}
private getSlotId(slot: CalendarSlotResponse): string {
return slot.virtual_id ?? String(slot.id);
}
private parseScheduledTime(scheduledAt: string): Date | null {
// scheduled_at can be "HH:MM:SS" (time only) or full ISO
if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) {
// Time-only: combine with cached date
if (!this.cachedDate) return null;
return new Date(`${this.cachedDate}T${scheduledAt}Z`);
}
const d = new Date(scheduledAt);
return isNaN(d.getTime()) ? null : d;
}
}

View File

@@ -19,6 +19,7 @@
import { writeFileSync, readFileSync, existsSync, mkdirSync } from 'fs';
import { join, dirname } from 'path';
import { CalendarBridgeClient } from './calendar-bridge';
import { ScheduleCache } from './schedule-cache';
import {
CalendarSlotResponse,
SlotStatus,
@@ -44,6 +45,8 @@ export interface CalendarSchedulerConfig {
};
/** Heartbeat interval in milliseconds (default: 60000) */
heartbeatIntervalMs?: number;
/** Schedule sync interval in milliseconds (default: 300000 = 5 min) */
syncIntervalMs?: number;
/** Enable verbose debug logging */
debug?: boolean;
/** Directory for state persistence (default: plugin data dir) */
@@ -95,8 +98,10 @@ interface SchedulerState {
currentSlot: CalendarSlotResponse | null;
/** Last heartbeat timestamp */
lastHeartbeatAt: Date | null;
/** Interval handle for cleanup */
/** Heartbeat interval handle */
intervalHandle: ReturnType<typeof setInterval> | null;
/** Schedule sync interval handle */
syncIntervalHandle: ReturnType<typeof setInterval> | null;
/** Set of slot IDs that have been deferred in current session */
deferredSlotIds: Set<string>;
/** Whether agent is currently processing a slot */
@@ -117,10 +122,13 @@ export class CalendarScheduler {
private config: Required<CalendarSchedulerConfig>;
private state: SchedulerState;
private stateFilePath: string;
/** Local cache of today's full schedule, synced periodically from backend */
private scheduleCache: ScheduleCache = new ScheduleCache();
constructor(config: CalendarSchedulerConfig) {
this.config = {
heartbeatIntervalMs: 60000, // 1 minute default
syncIntervalMs: 300_000, // 5 minutes default
debug: false,
stateDir: this.getDefaultStateDir(),
...config,
@@ -133,6 +141,7 @@ export class CalendarScheduler {
currentSlot: null,
lastHeartbeatAt: null,
intervalHandle: null,
syncIntervalHandle: null,
deferredSlotIds: new Set(),
isProcessing: false,
isRestartPending: false,
@@ -327,14 +336,21 @@ export class CalendarScheduler {
this.state.isRestartPending = false;
this.config.logger.info('Calendar scheduler started');
// Run initial heartbeat immediately
// Run initial sync + heartbeat immediately
this.runSync();
this.runHeartbeat();
// Schedule periodic heartbeats
// Schedule periodic heartbeats (slot execution checks)
this.state.intervalHandle = setInterval(
() => this.runHeartbeat(),
this.config.heartbeatIntervalMs
);
// Schedule periodic schedule sync (full day schedule refresh)
this.state.syncIntervalHandle = setInterval(
() => this.runSync(),
this.config.syncIntervalMs
);
}
/**
@@ -348,10 +364,41 @@ export class CalendarScheduler {
clearInterval(this.state.intervalHandle);
this.state.intervalHandle = null;
}
if (this.state.syncIntervalHandle) {
clearInterval(this.state.syncIntervalHandle);
this.state.syncIntervalHandle = null;
}
this.config.logger.info('Calendar scheduler stopped');
}
/**
* Sync today's full schedule from backend into local cache.
* Runs every syncIntervalMs (default: 5 min).
* Catches new slots assigned by other agents or the manager.
*/
async runSync(): Promise<void> {
if (!this.state.isRunning || this.state.isRestartPending) return;
const today = new Date().toISOString().slice(0, 10);
try {
const slots = await this.config.bridge.getDaySchedule(today);
if (slots) {
this.scheduleCache.sync(today, slots);
this.logDebug(`Schedule synced: ${slots.length} slots for ${today}`);
}
} catch (err) {
this.config.logger.warn(`Schedule sync failed: ${String(err)}`);
}
}
/**
* Get the local schedule cache (for status reporting / tools).
*/
getScheduleCache(): ScheduleCache {
return this.scheduleCache;
}
/**
* Execute a single heartbeat cycle.
* Fetches pending slots and handles execution logic.