/** * MultiAgentSchedulerHandle — runtime façade that backs the public * `harborforge_calendar_*` tools when the plugin runs in multi-agent sync * mode. * * Background * ---------- * The old single-agent path used `CalendarScheduler` which kept a "current * slot" cursor and exposed `isRunning() / completeCurrentSlot() / abortCurrentSlot() / …`. * In multi-agent mode the plugin doesn't own a single cursor — one plugin * instance services every agent on the claw — so the previous code stubbed the * `calendarScheduler` variable to `{ stop() }`. That made every tool fail * with `calendarScheduler. is not a function`. * * This handle restores the same surface area (`isRunning / getCurrentSlot / * completeCurrentSlot / …`) but resolves the "current slot" per caller via * the agentId/sessionKey supplied by the OpenClaw tool-factory context. The * scheduler records the slot it just dispatched to each agent in * {@link recordWoken}; the tool resolves the caller, looks up the last woken * slot, and PATCHes the backend via the shared bridge. * * Tools must pass the calling agentId (from `OpenClawPluginToolContext.agentId`) * into every method. The handle does not consult `process.env.AGENT_ID` — the * gateway sets that to the host's primary agent which is meaningless in * multi-agent mode. */ import type { CalendarBridgeClient } from './calendar-bridge.js'; import type { MultiAgentScheduleCache, CachedSlot } from './schedule-cache.js'; import { SlotStatus } from './types.js'; export interface MultiAgentSchedulerHandleParams { bridge: CalendarBridgeClient; cache: MultiAgentScheduleCache; /** setInterval handles cleared on stop() */ syncHandle: ReturnType; checkHandle: ReturnType; logger: { info: (...args: unknown[]) => void; warn: (...args: unknown[]) => void; error: (...args: unknown[]) => void; }; } /** Last slot we dispatched to an agent. Used as the implicit "current slot". */ export interface WokenSlot { agentId: string; slotId: number | null; virtualId: string | null; scheduledAt: string | null; slotType: string | null; estimatedDuration: number | null; wokenAt: string; } /** Public surface — mirrors the relevant subset of the old single-agent scheduler. */ export class MultiAgentSchedulerHandle { private readonly bridge: CalendarBridgeClient; private readonly cache: MultiAgentScheduleCache; private readonly syncHandle: ReturnType; private readonly checkHandle: ReturnType; private readonly logger: MultiAgentSchedulerHandleParams['logger']; private readonly woken: Map = new Map(); private stopped = false; constructor(params: MultiAgentSchedulerHandleParams) { this.bridge = params.bridge; this.cache = params.cache; this.syncHandle = params.syncHandle; this.checkHandle = params.checkHandle; this.logger = params.logger; } // ---------- lifecycle ---------- /** True while the sync/check intervals are still ticking. */ isRunning(): boolean { return !this.stopped; } /** Always false in multi-agent mode — there is no per-instance work queue. */ isProcessing(): boolean { return false; } stop(): void { if (this.stopped) return; this.stopped = true; clearInterval(this.syncHandle); clearInterval(this.checkHandle); this.logger.info('Calendar scheduler stopped (multi-agent mode)'); } // ---------- wake bookkeeping ---------- /** Record that we just dispatched a slot to `agentId`. */ recordWoken(agentId: string, slot: CachedSlot): void { this.woken.set(agentId, { agentId, slotId: typeof slot.id === 'number' ? slot.id : null, virtualId: typeof slot.virtual_id === 'string' ? slot.virtual_id : null, scheduledAt: typeof slot.scheduled_at === 'string' ? slot.scheduled_at : null, slotType: typeof slot.slot_type === 'string' ? slot.slot_type : null, estimatedDuration: typeof slot.estimated_duration === 'number' ? slot.estimated_duration : null, wokenAt: new Date().toISOString(), }); } // ---------- per-agent reads ---------- /** * The slot most recently dispatched to `agentId`, or null if we never woke * them (e.g. tool called outside a wakeup). Callers can fall back to * scanning {@link cachedSlotsFor} for `not_started`/`deferred` slots if they * want a heuristic "current". */ getWokenSlot(agentId: string | null | undefined): WokenSlot | null { if (!agentId) return null; return this.woken.get(agentId) ?? null; } /** Today's cached slots for an agent (whatever runSync last pulled). */ cachedSlotsFor(agentId: string | null | undefined): CachedSlot[] { if (!agentId) return []; return this.cache.getAgentSlots(agentId); } /** * Implicit "current" slot: the last woken slot if we know about it, * otherwise the highest-priority `not_started`/`deferred`/`ongoing` cached * slot for the agent (so a tool called between sync windows still finds * something sensible). */ resolveCurrentSlot(agentId: string | null | undefined): WokenSlot | null { const woken = this.getWokenSlot(agentId); if (woken) return woken; if (!agentId) return null; const slots = this.cache.getAgentSlots(agentId) .filter((s) => s.status === 'not_started' || s.status === 'deferred' || s.status === 'ongoing') .sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0)); const top = slots[0]; if (!top) return null; return { agentId, slotId: typeof top.id === 'number' ? top.id : null, virtualId: typeof top.virtual_id === 'string' ? top.virtual_id : null, scheduledAt: typeof top.scheduled_at === 'string' ? top.scheduled_at : null, slotType: typeof top.slot_type === 'string' ? top.slot_type : null, estimatedDuration: typeof top.estimated_duration === 'number' ? top.estimated_duration : null, wokenAt: 'inferred-from-cache', }; } // ---------- per-agent writes ---------- async completeSlot(agentId: string, actualDurationMinutes: number): Promise { return this.transition(agentId, SlotStatus.FINISHED, { actual_duration: actualDurationMinutes }); } async abortSlot(agentId: string, reason?: string): Promise { if (reason) this.logger.info(`Aborting slot for ${agentId}: ${reason}`); return this.transition(agentId, SlotStatus.ABORTED); } async pauseSlot(agentId: string): Promise { return this.transition(agentId, SlotStatus.PAUSED); } /** Resume puts the slot back into `ongoing` so it isn't picked up as `not_started`. */ async resumeSlot(agentId: string): Promise { return this.transition(agentId, SlotStatus.ONGOING); } private async transition( agentId: string, status: SlotStatus, extra: { actual_duration?: number } = {} ): Promise { const slot = this.resolveCurrentSlot(agentId); if (!slot) { return { ok: false, error: `No tracked slot for agent '${agentId}'` }; } if (slot.slotId == null && slot.virtualId == null) { return { ok: false, error: 'Resolved slot has neither id nor virtual_id' }; } const update = { status, ...extra }; try { if (slot.slotId != null) { const ok = await this.bridge.updateSlotAs(agentId, slot.slotId, update); if (!ok) return { ok: false, error: `Backend rejected slot ${slot.slotId} → ${status}` }; // Clear the wake cursor only on terminal transitions so re-wakes don't // re-trigger the same slot mutation. if (status === SlotStatus.FINISHED || status === SlotStatus.ABORTED) { this.woken.delete(agentId); } return { ok: true, slot, status }; } // virtual slot path: bridge has updateVirtualSlot const materialised = await this.bridge.updateVirtualSlot(slot.virtualId!, update); if (!materialised) { return { ok: false, error: `Backend rejected virtual slot ${slot.virtualId}` }; } if (status === SlotStatus.FINISHED || status === SlotStatus.ABORTED) { this.woken.delete(agentId); } return { ok: true, slot, status, materialised }; } catch (err: unknown) { const msg = (err as { message?: string } | undefined)?.message ?? String(err); return { ok: false, error: msg }; } } // ---------- gateway-restart parity (no-op in multi-agent mode) ---------- /** Multi-agent scheduler does not persist a `ScheduledGatewayRestart` flag. */ isRestartPending(): boolean { return false; } getStateFilePath(): string | null { return null; } getState(): { mode: 'multi-agent'; agents: number; lastWoken: WokenSlot[] } { return { mode: 'multi-agent', agents: this.cache.getStatus().agentCount, lastWoken: Array.from(this.woken.values()), }; } } export interface SlotMutationResult { ok: boolean; error?: string; status?: SlotStatus; slot?: WokenSlot; /** Present only when a virtual slot was materialised. */ materialised?: unknown; }