feat: schedule cache, workflow-aligned prompts, dispatchInbound wakeup

1. ScheduleCache: local cache of today's schedule, synced every 5 min
   from HF backend via new getDaySchedule() API

2. Wakeup prompts updated to reference daily-routine skill workflows
   (task-handson, plan-schedule, slot-complete)

3. Agent wakeup via dispatchInboundMessageWithDispatcher (in-process)
   - Same mechanism as Discord plugin
   - Creates unique session per slot: agent:{agentId}:hf-calendar:slot-{slotId}
   - No WebSocket, CLI, or cron dependency
   - Verified working on test environment

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
operator
2026-04-19 08:10:39 +00:00
parent 248adfaafd
commit 0dd70ec2da
6 changed files with 349 additions and 54 deletions

View File

@@ -169,6 +169,32 @@ export class CalendarBridgeClient {
return this.sendBoolean('POST', url, body);
}
/**
* Fetch the full day schedule for this agent.
*
* Unlike heartbeat() which only returns pending (NOT_STARTED/DEFERRED) slots,
* this returns ALL slots for the given date, enabling the plugin to maintain
* a complete local view of today's schedule.
*
* @param date Date string in YYYY-MM-DD format
* @returns Array of all slots for the day, or null if unreachable
*/
async getDaySchedule(date: string): Promise<CalendarSlotResponse[] | null> {
const url = `${this.baseUrl}/calendar/day?date=${encodeURIComponent(date)}`;
try {
const response = await this.fetchJson<{ slots: CalendarSlotResponse[] }>(url, {
method: 'GET',
headers: {
'X-Agent-ID': this.config.agentId,
'X-Claw-Identifier': this.config.clawIdentifier,
},
});
return response?.slots ?? null;
} catch {
return null;
}
}
// -------------------------------------------------------------------------
// Internal helpers
// -------------------------------------------------------------------------

View File

@@ -31,3 +31,4 @@
export * from './types';
export * from './calendar-bridge';
export * from './scheduler';
export * from './schedule-cache';

View File

@@ -0,0 +1,109 @@
/**
* Local cache of today's calendar schedule.
* Synced periodically from HF backend, checked locally for due slots.
*/
import type { CalendarSlotResponse } from "./types.js";
export class ScheduleCache {
private slots: Map<string, CalendarSlotResponse> = new Map();
private lastSyncAt: Date | null = null;
private cachedDate: string | null = null; // YYYY-MM-DD
/**
* Replace the cache with a fresh schedule from backend.
*/
sync(date: string, slots: CalendarSlotResponse[]): void {
// If date changed, clear old data
if (this.cachedDate !== date) {
this.slots.clear();
}
this.cachedDate = date;
// Merge: update existing slots, add new ones
const incomingIds = new Set<string>();
for (const slot of slots) {
const id = this.getSlotId(slot);
incomingIds.add(id);
this.slots.set(id, slot);
}
// Remove slots that no longer exist on backend (cancelled etc.)
for (const id of this.slots.keys()) {
if (!incomingIds.has(id)) {
this.slots.delete(id);
}
}
this.lastSyncAt = new Date();
}
/**
* Get slots that are due (scheduled_at <= now) and still pending.
*/
getDueSlots(now: Date): CalendarSlotResponse[] {
const results: CalendarSlotResponse[] = [];
for (const slot of this.slots.values()) {
if (slot.status !== "not_started" && slot.status !== "deferred") continue;
if (!slot.scheduled_at) continue;
const scheduledAt = this.parseScheduledTime(slot.scheduled_at);
if (scheduledAt && scheduledAt <= now) {
results.push(slot);
}
}
// Sort by priority descending
results.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
return results;
}
/**
* Update a slot locally (e.g., after status change).
*/
updateSlot(slotId: string, update: Partial<CalendarSlotResponse>): void {
const existing = this.slots.get(slotId);
if (existing) {
this.slots.set(slotId, { ...existing, ...update });
}
}
/**
* Remove a slot from cache.
*/
removeSlot(slotId: string): void {
this.slots.delete(slotId);
}
/**
* Get all cached slots.
*/
getAll(): CalendarSlotResponse[] {
return Array.from(this.slots.values());
}
/**
* Get cache metadata.
*/
getStatus(): { slotCount: number; lastSyncAt: string | null; cachedDate: string | null } {
return {
slotCount: this.slots.size,
lastSyncAt: this.lastSyncAt?.toISOString() ?? null,
cachedDate: this.cachedDate,
};
}
private getSlotId(slot: CalendarSlotResponse): string {
return slot.virtual_id ?? String(slot.id);
}
private parseScheduledTime(scheduledAt: string): Date | null {
// scheduled_at can be "HH:MM:SS" (time only) or full ISO
if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) {
// Time-only: combine with cached date
if (!this.cachedDate) return null;
return new Date(`${this.cachedDate}T${scheduledAt}Z`);
}
const d = new Date(scheduledAt);
return isNaN(d.getTime()) ? null : d;
}
}

View File

@@ -19,6 +19,7 @@
import { writeFileSync, readFileSync, existsSync, mkdirSync } from 'fs';
import { join, dirname } from 'path';
import { CalendarBridgeClient } from './calendar-bridge';
import { ScheduleCache } from './schedule-cache';
import {
CalendarSlotResponse,
SlotStatus,
@@ -44,6 +45,8 @@ export interface CalendarSchedulerConfig {
};
/** Heartbeat interval in milliseconds (default: 60000) */
heartbeatIntervalMs?: number;
/** Schedule sync interval in milliseconds (default: 300000 = 5 min) */
syncIntervalMs?: number;
/** Enable verbose debug logging */
debug?: boolean;
/** Directory for state persistence (default: plugin data dir) */
@@ -95,8 +98,10 @@ interface SchedulerState {
currentSlot: CalendarSlotResponse | null;
/** Last heartbeat timestamp */
lastHeartbeatAt: Date | null;
/** Interval handle for cleanup */
/** Heartbeat interval handle */
intervalHandle: ReturnType<typeof setInterval> | null;
/** Schedule sync interval handle */
syncIntervalHandle: ReturnType<typeof setInterval> | null;
/** Set of slot IDs that have been deferred in current session */
deferredSlotIds: Set<string>;
/** Whether agent is currently processing a slot */
@@ -117,10 +122,13 @@ export class CalendarScheduler {
private config: Required<CalendarSchedulerConfig>;
private state: SchedulerState;
private stateFilePath: string;
/** Local cache of today's full schedule, synced periodically from backend */
private scheduleCache: ScheduleCache = new ScheduleCache();
constructor(config: CalendarSchedulerConfig) {
this.config = {
heartbeatIntervalMs: 60000, // 1 minute default
syncIntervalMs: 300_000, // 5 minutes default
debug: false,
stateDir: this.getDefaultStateDir(),
...config,
@@ -133,6 +141,7 @@ export class CalendarScheduler {
currentSlot: null,
lastHeartbeatAt: null,
intervalHandle: null,
syncIntervalHandle: null,
deferredSlotIds: new Set(),
isProcessing: false,
isRestartPending: false,
@@ -327,14 +336,21 @@ export class CalendarScheduler {
this.state.isRestartPending = false;
this.config.logger.info('Calendar scheduler started');
// Run initial heartbeat immediately
// Run initial sync + heartbeat immediately
this.runSync();
this.runHeartbeat();
// Schedule periodic heartbeats
// Schedule periodic heartbeats (slot execution checks)
this.state.intervalHandle = setInterval(
() => this.runHeartbeat(),
this.config.heartbeatIntervalMs
);
// Schedule periodic schedule sync (full day schedule refresh)
this.state.syncIntervalHandle = setInterval(
() => this.runSync(),
this.config.syncIntervalMs
);
}
/**
@@ -348,10 +364,41 @@ export class CalendarScheduler {
clearInterval(this.state.intervalHandle);
this.state.intervalHandle = null;
}
if (this.state.syncIntervalHandle) {
clearInterval(this.state.syncIntervalHandle);
this.state.syncIntervalHandle = null;
}
this.config.logger.info('Calendar scheduler stopped');
}
/**
* Sync today's full schedule from backend into local cache.
* Runs every syncIntervalMs (default: 5 min).
* Catches new slots assigned by other agents or the manager.
*/
async runSync(): Promise<void> {
if (!this.state.isRunning || this.state.isRestartPending) return;
const today = new Date().toISOString().slice(0, 10);
try {
const slots = await this.config.bridge.getDaySchedule(today);
if (slots) {
this.scheduleCache.sync(today, slots);
this.logDebug(`Schedule synced: ${slots.length} slots for ${today}`);
}
} catch (err) {
this.config.logger.warn(`Schedule sync failed: ${String(err)}`);
}
}
/**
* Get the local schedule cache (for status reporting / tools).
*/
getScheduleCache(): ScheduleCache {
return this.scheduleCache;
}
/**
* Execute a single heartbeat cycle.
* Fetches pending slots and handles execution logic.
@@ -611,13 +658,11 @@ Task Code: ${code}
Estimated Duration: ${duration} minutes
Slot Type: ${slot.slot_type}
Priority: ${slot.priority}
Working Sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'}
Please focus on this task for the allocated time. When you finish or need to pause,
report your progress back to the calendar system.
Working sessions: ${jobData.working_sessions?.join(', ') || 'none recorded'}
Start working on ${code} now.`;
Follow the daily-routine skill's task-handson workflow to execute this task.
Use harborforge_calendar_complete when finished, or harborforge_calendar_pause to pause.
Before going idle, check for overdue slots as described in the slot-complete workflow.`;
}
/**
@@ -630,19 +675,15 @@ Start working on ${code} now.`;
switch (sysData.event) {
case 'ScheduleToday':
return `System Event: Schedule Today
Please review today's calendar and schedule any pending tasks or planning activities.
Estimated time: ${slot.estimated_duration} minutes.
Check your calendar and plan the day's work.`;
Follow the daily-routine skill's plan-schedule workflow to plan today's work.`;
case 'SummaryToday':
return `System Event: Daily Summary
Please provide a summary of today's activities and progress.
Estimated time: ${slot.estimated_duration} minutes.
Review what was accomplished and prepare end-of-day notes.`;
Review today's completed, deferred, and abandoned slots. Write a summary to your daily note (memory/YYYY-MM-DD.md).`;
case 'ScheduledGatewayRestart':
return `System Event: Scheduled Gateway Restart

View File

@@ -68,6 +68,13 @@ export default {
return getPluginConfig(api);
}
/** Resolve agent ID from env, config, or fallback. */
function resolveAgentId(): string {
if (process.env.AGENT_ID) return process.env.AGENT_ID;
const cfg = api.runtime?.config?.loadConfig?.();
return cfg?.agents?.list?.[0]?.id ?? cfg?.agents?.defaults?.id ?? 'unknown';
}
/**
* Get the monitor bridge client if monitor_port is configured.
*/
@@ -168,7 +175,7 @@ export default {
// Fallback: query backend for agent status
const live = resolveConfig();
const agentId = process.env.AGENT_ID || 'unknown';
const agentId = resolveAgentId();
try {
const response = await fetch(`${live.backendUrl}/calendar/agent/status?agent_id=${agentId}`, {
headers: {
@@ -188,56 +195,54 @@ export default {
}
/**
* Wake/spawn agent with task context for slot execution.
* This is the callback invoked by CalendarScheduler when a slot is ready.
* Wake agent via dispatchInboundMessage — same mechanism used by Discord plugin.
* Direct in-process call, no WebSocket or CLI needed.
*/
async function wakeAgent(context: AgentWakeContext): Promise<boolean> {
logger.info(`Waking agent for slot: ${context.taskDescription}`);
const agentId = resolveAgentId();
const slotId = context.slot.id ?? context.slot.virtual_id ?? 'unknown';
const sessionKey = `agent:${agentId}:hf-wakeup`;
try {
// Method 1: Use OpenClaw spawn API if available (preferred)
if (api.spawn) {
const result = await api.spawn({
task: context.prompt,
timeoutSeconds: context.slot.estimated_duration * 60, // Convert to seconds
});
// Dynamic import — resolved at runtime by OpenClaw's module system
const sdkPath = 'openclaw/plugin-sdk/reply-runtime';
const { dispatchInboundMessageWithDispatcher } = await import(
/* webpackIgnore: true */ sdkPath
);
if (result?.sessionId) {
logger.info(`Agent spawned for calendar slot: session=${result.sessionId}`);
// Track session completion
trackSessionCompletion(result.sessionId, context);
return true;
}
const cfg = api.runtime?.config?.loadConfig?.();
if (!cfg) {
logger.error('Cannot load OpenClaw config for dispatch');
return false;
}
// Method 2: Send notification/alert to wake agent (fallback)
// This relies on the agent's heartbeat to check for notifications
logger.warn('OpenClaw spawn API not available, using notification fallback');
const wakeupMessage = `You are woken up by slot ${slotId}. Follow the \`hf-wakeup\` workflow of skill \`hf-hangman-lab\` to proceed. Only reply \`WAKEUP_OK\` in this session.`;
// Send calendar wakeup notification via backend
const live = resolveConfig();
const agentId = process.env.AGENT_ID || 'unknown';
const notifyResponse = await fetch(`${live.backendUrl}/calendar/agent/notify`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Agent-ID': agentId,
'X-Claw-Identifier': live.identifier || hostname(),
const result = await dispatchInboundMessageWithDispatcher({
ctx: {
Body: wakeupMessage,
SessionKey: sessionKey,
From: 'harborforge-calendar',
Provider: 'harborforge',
},
cfg,
dispatcherOptions: {
deliver: async (payload: any) => {
const text = (payload.text || '').trim();
logger.info(`Agent wakeup reply for slot ${slotId}: ${text.slice(0, 100)}`);
},
},
body: JSON.stringify({
agent_id: agentId,
message: context.prompt,
slot_id: context.slot.id || context.slot.virtual_id,
task_description: context.taskDescription,
}),
});
return notifyResponse.ok;
logger.info(`Agent dispatched for slot ${slotId}: ${result?.status || 'ok'}`);
return true;
} catch (err) {
logger.error('Failed to wake agent:', err);
} catch (err: any) {
const msg = err?.message || err?.code || String(err);
const stack = err?.stack?.split('\n').slice(0, 3).join(' | ') || '';
logger.error(`Failed to dispatch agent for slot: ${msg} ${stack}`);
return false;
}
}
@@ -279,7 +284,8 @@ export default {
*/
function startCalendarScheduler(): void {
const live = resolveConfig();
const agentId = process.env.AGENT_ID || 'unknown';
const agentId = resolveAgentId();
logger.info(`Calendar scheduler agent: ${agentId}`);
// Create calendar bridge client
const calendarBridge = createCalendarBridgeClient(