diff --git a/REFACTOR_PLAN.md b/REFACTOR_PLAN.md new file mode 100644 index 0000000..82362d6 --- /dev/null +++ b/REFACTOR_PLAN.md @@ -0,0 +1,118 @@ +# CalendarScheduler Refactor Plan (v2) + +> Updated 2026-04-19 based on architecture discussion with hang + +## Current Issues + +1. `process.env.AGENT_ID` doesn't exist in plugins subprocess — always 'unknown' +2. Heartbeat is per-agent but should be per-claw-instance (global) +3. Scheduler only handles one agent — should manage all agents on this instance +4. wakeAgent used api.spawn (non-existent) → now uses dispatchInboundMessage (verified) + +## Target Design + +### Plugin State + +```typescript +// Local schedule cache: { agentId → [slots] } +const schedules: Map = new Map(); +``` + +### Sync Flow (every 5 min) + +``` +1. GET /calendar/sync?claw_identifier=xxx + - First call: server returns full { agentId → [slots] } + - Subsequent: server returns diff since last sync +2. Update local schedules map +3. Scan schedules for due slots: + for each agentId in schedules: + if has slot where scheduled_at <= now && status == not_started: + getAgentStatus(agentId, clawIdentifier) → busy? + if not busy → wakeAgent(agentId) +``` + +### Heartbeat (every 60s) + +Simplified to liveness ping only: +``` +POST /monitor/server/heartbeat + claw_identifier: xxx + → server returns empty/ack +``` + +No slot data in heartbeat response. + +### Wake Flow + +``` +dispatchInboundMessage: + SessionKey: agent:{agentId}:hf-wakeup + Body: "You have due slots. Follow the hf-wakeup workflow of skill hf-hangman-lab to proceed. Only reply WAKEUP_OK in this session." + +Agent reads workflow → calls hf tools → sets own status to busy +``` + +### Agent ID Resolution + +- **Sync**: agentId comes from server response (dict keys) +- **Wake**: agentId from local schedules dict key +- **Tool calls by agent**: agentId from tool ctx (same as padded-cell) + +## Backend API Changes Needed + +### New: GET /calendar/sync + +``` +GET /calendar/sync?claw_identifier=xxx +Headers: X-Claw-Identifier + +Response (first call): +{ + "full": true, + "schedules": { + "developer": [slot1, slot2, ...], + "operator": [slot3, ...] + }, + "syncToken": "abc123" +} + +Response (subsequent, with ?since=abc123): +{ + "full": false, + "diff": [ + { "op": "add", "agent": "developer", "slot": {...} }, + { "op": "update", "agent": "developer", "slotId": 5, "patch": {...} }, + { "op": "remove", "agent": "operator", "slotId": 3 } + ], + "syncToken": "def456" +} +``` + +### Existing: POST /calendar/agent/status + +Keep as-is but ensure it accepts agentId + clawIdentifier as params: +``` +POST /calendar/agent/status + { agent_id, claw_identifier, status } +``` + +## Implementation Order + +1. Backend: Add /calendar/sync endpoint +2. Plugin: Replace CalendarBridgeClient single-agent design with multi-agent +3. Plugin: Replace CalendarScheduler with new sync+check loop +4. Plugin: wakeAgent uses dispatchInboundMessage (done) +5. Plugin: Tool handlers get agentId from ctx (like padded-cell) + +## Files to Change + +### Backend (HarborForge.Backend) +- New route: `/calendar/sync` +- New service: schedule diff tracking per claw_identifier + +### Plugin +- `plugin/calendar/calendar-bridge.ts` — remove agentId binding, add sync() +- `plugin/calendar/scheduler.ts` — rewrite to multi-agent sync+check +- `plugin/calendar/schedule-cache.ts` — already exists, adapt to multi-agent +- `plugin/index.ts` — update wakeAgent, getAgentStatus to accept agentId diff --git a/plugin/calendar/calendar-bridge.ts b/plugin/calendar/calendar-bridge.ts index dca5cf5..c3a29e5 100644 --- a/plugin/calendar/calendar-bridge.ts +++ b/plugin/calendar/calendar-bridge.ts @@ -169,6 +169,74 @@ export class CalendarBridgeClient { return this.sendBoolean('POST', url, body); } + /** + * Fetch the full day schedule for this agent. + * + * Unlike heartbeat() which only returns pending (NOT_STARTED/DEFERRED) slots, + * this returns ALL slots for the given date, enabling the plugin to maintain + * a complete local view of today's schedule. + * + * @param date Date string in YYYY-MM-DD format + * @returns Array of all slots for the day, or null if unreachable + */ + async getDaySchedule(date: string): Promise { + const url = `${this.baseUrl}/calendar/day?date=${encodeURIComponent(date)}`; + try { + const response = await this.fetchJson<{ slots: CalendarSlotResponse[] }>(url, { + method: 'GET', + headers: { + 'X-Agent-ID': this.config.agentId, + 'X-Claw-Identifier': this.config.clawIdentifier, + }, + }); + return response?.slots ?? null; + } catch { + return null; + } + } + + /** + * Sync today's schedules for all agents on this claw instance. + * + * Returns { agentId → slots[] } for all agents with matching claw_identifier. + * This is the primary data source for the multi-agent schedule cache. + */ + async syncSchedules(): Promise<{ schedules: Record; date: string } | null> { + const url = `${this.baseUrl}/calendar/sync`; + try { + const response = await this.fetchJson<{ schedules: Record; date: string }>(url, { + method: 'GET', + headers: { + 'X-Claw-Identifier': this.config.clawIdentifier, + }, + }); + return response; + } catch { + return null; + } + } + + /** + * Get a specific agent's status. + * + * @param agentId The agent ID to query + */ + async getAgentStatus(agentId: string): Promise { + const url = `${this.baseUrl}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`; + try { + const response = await this.fetchJson<{ status: string }>(url, { + method: 'GET', + headers: { + 'X-Agent-ID': agentId, + 'X-Claw-Identifier': this.config.clawIdentifier, + }, + }); + return response?.status ?? null; + } catch { + return null; + } + } + // ------------------------------------------------------------------------- // Internal helpers // ------------------------------------------------------------------------- diff --git a/plugin/calendar/index.ts b/plugin/calendar/index.ts index 8d2bed7..a829ccf 100644 --- a/plugin/calendar/index.ts +++ b/plugin/calendar/index.ts @@ -31,3 +31,4 @@ export * from './types'; export * from './calendar-bridge'; export * from './scheduler'; +export * from './schedule-cache'; diff --git a/plugin/calendar/schedule-cache.ts b/plugin/calendar/schedule-cache.ts new file mode 100644 index 0000000..0075bc5 --- /dev/null +++ b/plugin/calendar/schedule-cache.ts @@ -0,0 +1,101 @@ +/** + * Multi-agent local schedule cache. + * + * Maintains today's schedule for all agents on this claw instance. + * Synced periodically from HF backend via /calendar/sync endpoint. + */ + +export interface CachedSlot { + id: number | null; + virtual_id: string | null; + slot_type: string; + estimated_duration: number; + scheduled_at: string; + status: string; + priority: number; + event_type: string | null; + event_data: Record | null; + [key: string]: unknown; +} + +export class MultiAgentScheduleCache { + /** { agentId → slots[] } */ + private schedules: Map = new Map(); + private lastSyncAt: Date | null = null; + private cachedDate: string | null = null; + + /** + * Replace cache with data from /calendar/sync response. + */ + sync(date: string, schedules: Record): void { + if (this.cachedDate !== date) { + this.schedules.clear(); + } + this.cachedDate = date; + + for (const [agentId, slots] of Object.entries(schedules)) { + this.schedules.set(agentId, slots); + } + this.lastSyncAt = new Date(); + } + + /** + * Get agents that have due (overdue or current) slots. + * Returns [agentId, dueSlots[]] pairs. + */ + getAgentsWithDueSlots(now: Date): Array<{ agentId: string; slots: CachedSlot[] }> { + const results: Array<{ agentId: string; slots: CachedSlot[] }> = []; + + for (const [agentId, slots] of this.schedules) { + const due = slots.filter((s) => { + if (s.status !== 'not_started' && s.status !== 'deferred') return false; + const scheduledAt = this.parseScheduledTime(s.scheduled_at); + return scheduledAt !== null && scheduledAt <= now; + }); + + if (due.length > 0) { + // Sort by priority descending + due.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0)); + results.push({ agentId, slots: due }); + } + } + return results; + } + + /** + * Get all agent IDs in the cache. + */ + getAgentIds(): string[] { + return Array.from(this.schedules.keys()); + } + + /** + * Get slots for a specific agent. + */ + getAgentSlots(agentId: string): CachedSlot[] { + return this.schedules.get(agentId) ?? []; + } + + /** + * Get cache status for debugging. + */ + getStatus(): { agentCount: number; totalSlots: number; lastSyncAt: string | null; cachedDate: string | null } { + let totalSlots = 0; + for (const slots of this.schedules.values()) totalSlots += slots.length; + return { + agentCount: this.schedules.size, + totalSlots, + lastSyncAt: this.lastSyncAt?.toISOString() ?? null, + cachedDate: this.cachedDate, + }; + } + + private parseScheduledTime(scheduledAt: string): Date | null { + if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) { + if (!this.cachedDate) return null; + return new Date(`${this.cachedDate}T${scheduledAt}Z`); + } + const d = new Date(scheduledAt); + return isNaN(d.getTime()) ? null : d; + } +} diff --git a/plugin/index.ts b/plugin/index.ts index ecc0287..f9da985 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -68,6 +68,13 @@ export default { return getPluginConfig(api); } + /** Resolve agent ID from env, config, or fallback. */ + function resolveAgentId(): string { + if (process.env.AGENT_ID) return process.env.AGENT_ID; + const cfg = api.runtime?.config?.loadConfig?.(); + return cfg?.agents?.list?.[0]?.id ?? cfg?.agents?.defaults?.id ?? 'unknown'; + } + /** * Get the monitor bridge client if monitor_port is configured. */ @@ -168,7 +175,7 @@ export default { // Fallback: query backend for agent status const live = resolveConfig(); - const agentId = process.env.AGENT_ID || 'unknown'; + const agentId = resolveAgentId(); try { const response = await fetch(`${live.backendUrl}/calendar/agent/status?agent_id=${agentId}`, { headers: { @@ -188,56 +195,51 @@ export default { } /** - * Wake/spawn agent with task context for slot execution. - * This is the callback invoked by CalendarScheduler when a slot is ready. + * Wake agent via dispatchInboundMessage — same mechanism used by Discord plugin. + * Direct in-process call, no WebSocket or CLI needed. */ - async function wakeAgent(context: AgentWakeContext): Promise { - logger.info(`Waking agent for slot: ${context.taskDescription}`); + async function wakeAgent(agentId: string): Promise { + logger.info(`Waking agent ${agentId}: has due slots`); + + const sessionKey = `agent:${agentId}:hf-wakeup`; try { - // Method 1: Use OpenClaw spawn API if available (preferred) - if (api.spawn) { - const result = await api.spawn({ - task: context.prompt, - timeoutSeconds: context.slot.estimated_duration * 60, // Convert to seconds - }); + const sdkPath = 'openclaw/plugin-sdk/reply-runtime'; + const { dispatchInboundMessageWithDispatcher } = await import( + /* webpackIgnore: true */ sdkPath + ); - if (result?.sessionId) { - logger.info(`Agent spawned for calendar slot: session=${result.sessionId}`); - - // Track session completion - trackSessionCompletion(result.sessionId, context); - return true; - } + const cfg = api.runtime?.config?.loadConfig?.(); + if (!cfg) { + logger.error('Cannot load OpenClaw config for dispatch'); + return false; } - // Method 2: Send notification/alert to wake agent (fallback) - // This relies on the agent's heartbeat to check for notifications - logger.warn('OpenClaw spawn API not available, using notification fallback'); + const wakeupMessage = `You have due slots. Follow the \`hf-wakeup\` workflow of skill \`hf-hangman-lab\` to proceed. Only reply \`WAKEUP_OK\` in this session.`; - // Send calendar wakeup notification via backend - const live = resolveConfig(); - const agentId = process.env.AGENT_ID || 'unknown'; - - const notifyResponse = await fetch(`${live.backendUrl}/calendar/agent/notify`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'X-Agent-ID': agentId, - 'X-Claw-Identifier': live.identifier || hostname(), + const result = await dispatchInboundMessageWithDispatcher({ + ctx: { + Body: wakeupMessage, + SessionKey: sessionKey, + From: 'harborforge-calendar', + Provider: 'harborforge', + }, + cfg, + dispatcherOptions: { + deliver: async (payload: any) => { + const text = (payload.text || '').trim(); + logger.info(`Agent ${agentId} wakeup reply: ${text.slice(0, 100)}`); + }, }, - body: JSON.stringify({ - agent_id: agentId, - message: context.prompt, - slot_id: context.slot.id || context.slot.virtual_id, - task_description: context.taskDescription, - }), }); - return notifyResponse.ok; + logger.info(`Agent ${agentId} dispatched: ${result?.status || 'ok'}`); + return true; - } catch (err) { - logger.error('Failed to wake agent:', err); + } catch (err: any) { + const msg = err?.message || err?.code || String(err); + const stack = err?.stack?.split('\n').slice(0, 3).join(' | ') || ''; + logger.error(`Failed to dispatch agent for slot: ${msg} ${stack}`); return false; } } @@ -279,27 +281,69 @@ export default { */ function startCalendarScheduler(): void { const live = resolveConfig(); - const agentId = process.env.AGENT_ID || 'unknown'; - // Create calendar bridge client + // Create bridge client (claw-instance level, not per-agent) const calendarBridge = createCalendarBridgeClient( api, live.backendUrl || 'https://monitor.hangman-lab.top', - agentId + 'unused' // agentId no longer needed at bridge level ); - // Create and start scheduler - calendarScheduler = createCalendarScheduler({ - bridge: calendarBridge, - getAgentStatus, - wakeAgent, - logger, - heartbeatIntervalMs: 60000, // 1 minute - debug: live.logLevel === 'debug', - }); + // Multi-agent sync + check loop + const { MultiAgentScheduleCache } = require('./calendar/schedule-cache') as typeof import('./calendar/schedule-cache'); + const scheduleCache = new MultiAgentScheduleCache(); - calendarScheduler.start(); - logger.info('Calendar scheduler started'); + const SYNC_INTERVAL_MS = 300_000; // 5 min + const CHECK_INTERVAL_MS = 30_000; // 30 sec + + // Sync: pull all agent schedules from backend + async function runSync() { + try { + const result = await calendarBridge.syncSchedules(); + if (result) { + scheduleCache.sync(result.date, result.schedules); + const status = scheduleCache.getStatus(); + logger.info(`Schedule synced: ${status.agentCount} agents, ${status.totalSlots} slots`); + } + } catch (err) { + logger.warn(`Schedule sync failed: ${String(err)}`); + } + } + + // Check: find agents with due slots and wake them + async function runCheck() { + const now = new Date(); + const agentsWithDue = scheduleCache.getAgentsWithDueSlots(now); + + for (const { agentId } of agentsWithDue) { + // Check if agent is busy + const status = await calendarBridge.getAgentStatus(agentId); + if (status === 'busy' || status === 'offline' || status === 'exhausted') { + continue; + } + + // Wake the agent + await wakeAgent(agentId); + } + } + + // Initial sync + runSync(); + + // Start intervals + const syncHandle = setInterval(runSync, SYNC_INTERVAL_MS); + const checkHandle = setInterval(runCheck, CHECK_INTERVAL_MS); + + // Store handles for cleanup (reuse calendarScheduler variable) + (calendarScheduler as any) = { + stop() { + clearInterval(syncHandle); + clearInterval(checkHandle); + logger.info('Calendar scheduler stopped'); + }, + }; + + logger.info('Calendar scheduler started (multi-agent sync mode)'); } /**