feat: schedule cache, workflow-aligned prompts, dispatchInbound wakeup
1. ScheduleCache: local cache of today's schedule, synced every 5 min
from HF backend via new getDaySchedule() API
2. Wakeup prompts updated to reference daily-routine skill workflows
(task-handson, plan-schedule, slot-complete)
3. Agent wakeup via dispatchInboundMessageWithDispatcher (in-process)
- Same mechanism as Discord plugin
- Creates unique session per slot: agent:{agentId}:hf-calendar:slot-{slotId}
- No WebSocket, CLI, or cron dependency
- Verified working on test environment
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -169,6 +169,74 @@ export class CalendarBridgeClient {
|
||||
return this.sendBoolean('POST', url, body);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the full day schedule for this agent.
|
||||
*
|
||||
* Unlike heartbeat() which only returns pending (NOT_STARTED/DEFERRED) slots,
|
||||
* this returns ALL slots for the given date, enabling the plugin to maintain
|
||||
* a complete local view of today's schedule.
|
||||
*
|
||||
* @param date Date string in YYYY-MM-DD format
|
||||
* @returns Array of all slots for the day, or null if unreachable
|
||||
*/
|
||||
async getDaySchedule(date: string): Promise<CalendarSlotResponse[] | null> {
|
||||
const url = `${this.baseUrl}/calendar/day?date=${encodeURIComponent(date)}`;
|
||||
try {
|
||||
const response = await this.fetchJson<{ slots: CalendarSlotResponse[] }>(url, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'X-Agent-ID': this.config.agentId,
|
||||
'X-Claw-Identifier': this.config.clawIdentifier,
|
||||
},
|
||||
});
|
||||
return response?.slots ?? null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync today's schedules for all agents on this claw instance.
|
||||
*
|
||||
* Returns { agentId → slots[] } for all agents with matching claw_identifier.
|
||||
* This is the primary data source for the multi-agent schedule cache.
|
||||
*/
|
||||
async syncSchedules(): Promise<{ schedules: Record<string, any[]>; date: string } | null> {
|
||||
const url = `${this.baseUrl}/calendar/sync`;
|
||||
try {
|
||||
const response = await this.fetchJson<{ schedules: Record<string, any[]>; date: string }>(url, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'X-Claw-Identifier': this.config.clawIdentifier,
|
||||
},
|
||||
});
|
||||
return response;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a specific agent's status.
|
||||
*
|
||||
* @param agentId The agent ID to query
|
||||
*/
|
||||
async getAgentStatus(agentId: string): Promise<string | null> {
|
||||
const url = `${this.baseUrl}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
|
||||
try {
|
||||
const response = await this.fetchJson<{ status: string }>(url, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'X-Agent-ID': agentId,
|
||||
'X-Claw-Identifier': this.config.clawIdentifier,
|
||||
},
|
||||
});
|
||||
return response?.status ?? null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@@ -31,3 +31,4 @@
|
||||
export * from './types';
|
||||
export * from './calendar-bridge';
|
||||
export * from './scheduler';
|
||||
export * from './schedule-cache';
|
||||
|
||||
101
plugin/calendar/schedule-cache.ts
Normal file
101
plugin/calendar/schedule-cache.ts
Normal file
@@ -0,0 +1,101 @@
|
||||
/**
|
||||
* Multi-agent local schedule cache.
|
||||
*
|
||||
* Maintains today's schedule for all agents on this claw instance.
|
||||
* Synced periodically from HF backend via /calendar/sync endpoint.
|
||||
*/
|
||||
|
||||
export interface CachedSlot {
|
||||
id: number | null;
|
||||
virtual_id: string | null;
|
||||
slot_type: string;
|
||||
estimated_duration: number;
|
||||
scheduled_at: string;
|
||||
status: string;
|
||||
priority: number;
|
||||
event_type: string | null;
|
||||
event_data: Record<string, unknown> | null;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
export class MultiAgentScheduleCache {
|
||||
/** { agentId → slots[] } */
|
||||
private schedules: Map<string, CachedSlot[]> = new Map();
|
||||
private lastSyncAt: Date | null = null;
|
||||
private cachedDate: string | null = null;
|
||||
|
||||
/**
|
||||
* Replace cache with data from /calendar/sync response.
|
||||
*/
|
||||
sync(date: string, schedules: Record<string, CachedSlot[]>): void {
|
||||
if (this.cachedDate !== date) {
|
||||
this.schedules.clear();
|
||||
}
|
||||
this.cachedDate = date;
|
||||
|
||||
for (const [agentId, slots] of Object.entries(schedules)) {
|
||||
this.schedules.set(agentId, slots);
|
||||
}
|
||||
this.lastSyncAt = new Date();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get agents that have due (overdue or current) slots.
|
||||
* Returns [agentId, dueSlots[]] pairs.
|
||||
*/
|
||||
getAgentsWithDueSlots(now: Date): Array<{ agentId: string; slots: CachedSlot[] }> {
|
||||
const results: Array<{ agentId: string; slots: CachedSlot[] }> = [];
|
||||
|
||||
for (const [agentId, slots] of this.schedules) {
|
||||
const due = slots.filter((s) => {
|
||||
if (s.status !== 'not_started' && s.status !== 'deferred') return false;
|
||||
const scheduledAt = this.parseScheduledTime(s.scheduled_at);
|
||||
return scheduledAt !== null && scheduledAt <= now;
|
||||
});
|
||||
|
||||
if (due.length > 0) {
|
||||
// Sort by priority descending
|
||||
due.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
|
||||
results.push({ agentId, slots: due });
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all agent IDs in the cache.
|
||||
*/
|
||||
getAgentIds(): string[] {
|
||||
return Array.from(this.schedules.keys());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get slots for a specific agent.
|
||||
*/
|
||||
getAgentSlots(agentId: string): CachedSlot[] {
|
||||
return this.schedules.get(agentId) ?? [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get cache status for debugging.
|
||||
*/
|
||||
getStatus(): { agentCount: number; totalSlots: number; lastSyncAt: string | null; cachedDate: string | null } {
|
||||
let totalSlots = 0;
|
||||
for (const slots of this.schedules.values()) totalSlots += slots.length;
|
||||
return {
|
||||
agentCount: this.schedules.size,
|
||||
totalSlots,
|
||||
lastSyncAt: this.lastSyncAt?.toISOString() ?? null,
|
||||
cachedDate: this.cachedDate,
|
||||
};
|
||||
}
|
||||
|
||||
private parseScheduledTime(scheduledAt: string): Date | null {
|
||||
if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) {
|
||||
if (!this.cachedDate) return null;
|
||||
return new Date(`${this.cachedDate}T${scheduledAt}Z`);
|
||||
}
|
||||
const d = new Date(scheduledAt);
|
||||
return isNaN(d.getTime()) ? null : d;
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@
|
||||
import { writeFileSync, readFileSync, existsSync, mkdirSync } from 'fs';
|
||||
import { join, dirname } from 'path';
|
||||
import { CalendarBridgeClient } from './calendar-bridge';
|
||||
import { ScheduleCache } from './schedule-cache';
|
||||
import {
|
||||
CalendarSlotResponse,
|
||||
SlotStatus,
|
||||
@@ -44,6 +45,8 @@ export interface CalendarSchedulerConfig {
|
||||
};
|
||||
/** Heartbeat interval in milliseconds (default: 60000) */
|
||||
heartbeatIntervalMs?: number;
|
||||
/** Schedule sync interval in milliseconds (default: 300000 = 5 min) */
|
||||
syncIntervalMs?: number;
|
||||
/** Enable verbose debug logging */
|
||||
debug?: boolean;
|
||||
/** Directory for state persistence (default: plugin data dir) */
|
||||
@@ -95,8 +98,10 @@ interface SchedulerState {
|
||||
currentSlot: CalendarSlotResponse | null;
|
||||
/** Last heartbeat timestamp */
|
||||
lastHeartbeatAt: Date | null;
|
||||
/** Interval handle for cleanup */
|
||||
/** Heartbeat interval handle */
|
||||
intervalHandle: ReturnType<typeof setInterval> | null;
|
||||
/** Schedule sync interval handle */
|
||||
syncIntervalHandle: ReturnType<typeof setInterval> | null;
|
||||
/** Set of slot IDs that have been deferred in current session */
|
||||
deferredSlotIds: Set<string>;
|
||||
/** Whether agent is currently processing a slot */
|
||||
@@ -117,10 +122,13 @@ export class CalendarScheduler {
|
||||
private config: Required<CalendarSchedulerConfig>;
|
||||
private state: SchedulerState;
|
||||
private stateFilePath: string;
|
||||
/** Local cache of today's full schedule, synced periodically from backend */
|
||||
private scheduleCache: ScheduleCache = new ScheduleCache();
|
||||
|
||||
constructor(config: CalendarSchedulerConfig) {
|
||||
this.config = {
|
||||
heartbeatIntervalMs: 60000, // 1 minute default
|
||||
syncIntervalMs: 300_000, // 5 minutes default
|
||||
debug: false,
|
||||
stateDir: this.getDefaultStateDir(),
|
||||
...config,
|
||||
@@ -133,6 +141,7 @@ export class CalendarScheduler {
|
||||
currentSlot: null,
|
||||
lastHeartbeatAt: null,
|
||||
intervalHandle: null,
|
||||
syncIntervalHandle: null,
|
||||
deferredSlotIds: new Set(),
|
||||
isProcessing: false,
|
||||
isRestartPending: false,
|
||||
@@ -327,14 +336,21 @@ export class CalendarScheduler {
|
||||
this.state.isRestartPending = false;
|
||||
this.config.logger.info('Calendar scheduler started');
|
||||
|
||||
// Run initial heartbeat immediately
|
||||
// Run initial sync + heartbeat immediately
|
||||
this.runSync();
|
||||
this.runHeartbeat();
|
||||
|
||||
// Schedule periodic heartbeats
|
||||
// Schedule periodic heartbeats (slot execution checks)
|
||||
this.state.intervalHandle = setInterval(
|
||||
() => this.runHeartbeat(),
|
||||
this.config.heartbeatIntervalMs
|
||||
);
|
||||
|
||||
// Schedule periodic schedule sync (full day schedule refresh)
|
||||
this.state.syncIntervalHandle = setInterval(
|
||||
() => this.runSync(),
|
||||
this.config.syncIntervalMs
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -348,10 +364,41 @@ export class CalendarScheduler {
|
||||
clearInterval(this.state.intervalHandle);
|
||||
this.state.intervalHandle = null;
|
||||
}
|
||||
if (this.state.syncIntervalHandle) {
|
||||
clearInterval(this.state.syncIntervalHandle);
|
||||
this.state.syncIntervalHandle = null;
|
||||
}
|
||||
|
||||
this.config.logger.info('Calendar scheduler stopped');
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync today's full schedule from backend into local cache.
|
||||
* Runs every syncIntervalMs (default: 5 min).
|
||||
* Catches new slots assigned by other agents or the manager.
|
||||
*/
|
||||
async runSync(): Promise<void> {
|
||||
if (!this.state.isRunning || this.state.isRestartPending) return;
|
||||
|
||||
const today = new Date().toISOString().slice(0, 10);
|
||||
try {
|
||||
const slots = await this.config.bridge.getDaySchedule(today);
|
||||
if (slots) {
|
||||
this.scheduleCache.sync(today, slots);
|
||||
this.logDebug(`Schedule synced: ${slots.length} slots for ${today}`);
|
||||
}
|
||||
} catch (err) {
|
||||
this.config.logger.warn(`Schedule sync failed: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the local schedule cache (for status reporting / tools).
|
||||
*/
|
||||
getScheduleCache(): ScheduleCache {
|
||||
return this.scheduleCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a single heartbeat cycle.
|
||||
* Fetches pending slots and handles execution logic.
|
||||
@@ -611,13 +658,11 @@ Task Code: ${code}
|
||||
Estimated Duration: ${duration} minutes
|
||||
Slot Type: ${slot.slot_type}
|
||||
Priority: ${slot.priority}
|
||||
Working Sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'}
|
||||
|
||||
Please focus on this task for the allocated time. When you finish or need to pause,
|
||||
report your progress back to the calendar system.
|
||||
|
||||
Working sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'}
|
||||
|
||||
Start working on ${code} now.`;
|
||||
Follow the daily-routine skill's task-handson workflow to execute this task.
|
||||
Use harborforge_calendar_complete when finished, or harborforge_calendar_pause to pause.
|
||||
Before going idle, check for overdue slots as described in the slot-complete workflow.`;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -630,19 +675,15 @@ Start working on ${code} now.`;
|
||||
switch (sysData.event) {
|
||||
case 'ScheduleToday':
|
||||
return `System Event: Schedule Today
|
||||
|
||||
Please review today's calendar and schedule any pending tasks or planning activities.
|
||||
Estimated time: ${slot.estimated_duration} minutes.
|
||||
|
||||
Check your calendar and plan the day's work.`;
|
||||
Follow the daily-routine skill's plan-schedule workflow to plan today's work.`;
|
||||
|
||||
case 'SummaryToday':
|
||||
return `System Event: Daily Summary
|
||||
|
||||
Please provide a summary of today's activities and progress.
|
||||
Estimated time: ${slot.estimated_duration} minutes.
|
||||
|
||||
Review what was accomplished and prepare end-of-day notes.`;
|
||||
Review today's completed, deferred, and abandoned slots. Write a summary to your daily note (memory/YYYY-MM-DD.md).`;
|
||||
|
||||
case 'ScheduledGatewayRestart':
|
||||
return `System Event: Scheduled Gateway Restart
|
||||
|
||||
Reference in New Issue
Block a user