diff --git a/plugin/calendar/calendar-bridge.ts b/plugin/calendar/calendar-bridge.ts index 1db6b7f..5277f99 100644 --- a/plugin/calendar/calendar-bridge.ts +++ b/plugin/calendar/calendar-bridge.ts @@ -110,6 +110,23 @@ export class CalendarBridgeClient { return this.sendBoolean('PATCH', url, update); } + /** + * Same as {@link updateSlot} but overrides the `X-Agent-ID` header for a + * single call. Used by the multi-agent scheduler handle where the bridge + * client is shared across agents and the constructor agentId is `'unused'`. + * + * Backend identifies the slot purely by `slotId`; the header is informational + * for audit. Passing the calling agent's id keeps audit/log lines correct. + */ + async updateSlotAs( + agentId: string, + slotId: number, + update: SlotAgentUpdate + ): Promise { + const url = `${this.baseUrl}/calendar/slots/${slotId}/agent-update`; + return this.sendBooleanAs(agentId, 'PATCH', url, update); + } + /** * Update a virtual (plan-generated) slot's status after agent execution. * @@ -264,6 +281,15 @@ export class CalendarBridgeClient { } private async sendBoolean(method: 'POST' | 'PATCH', url: string, body: unknown): Promise { + return this.sendBooleanAs(this.config.agentId, method, url, body); + } + + private async sendBooleanAs( + agentId: string, + method: 'POST' | 'PATCH', + url: string, + body: unknown + ): Promise { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), this.timeoutMs); @@ -272,7 +298,7 @@ export class CalendarBridgeClient { method, headers: { 'Content-Type': 'application/json', - 'X-Agent-ID': this.config.agentId, + 'X-Agent-ID': agentId, 'X-Claw-Identifier': this.config.clawIdentifier, }, body: JSON.stringify(body), diff --git a/plugin/calendar/multi-agent-handle.ts b/plugin/calendar/multi-agent-handle.ts new file mode 100644 index 0000000..cca9c93 --- /dev/null +++ b/plugin/calendar/multi-agent-handle.ts @@ -0,0 +1,240 @@ +/** + * MultiAgentSchedulerHandle — runtime façade that backs the public + * `harborforge_calendar_*` tools when the plugin runs in multi-agent sync + * mode. + * + * Background + * ---------- + * The old single-agent path used `CalendarScheduler` which kept a "current + * slot" cursor and exposed `isRunning() / completeCurrentSlot() / abortCurrentSlot() / …`. + * In multi-agent mode the plugin doesn't own a single cursor — one plugin + * instance services every agent on the claw — so the previous code stubbed the + * `calendarScheduler` variable to `{ stop() }`. That made every tool fail + * with `calendarScheduler. is not a function`. + * + * This handle restores the same surface area (`isRunning / getCurrentSlot / + * completeCurrentSlot / …`) but resolves the "current slot" per caller via + * the agentId/sessionKey supplied by the OpenClaw tool-factory context. The + * scheduler records the slot it just dispatched to each agent in + * {@link recordWoken}; the tool resolves the caller, looks up the last woken + * slot, and PATCHes the backend via the shared bridge. + * + * Tools must pass the calling agentId (from `OpenClawPluginToolContext.agentId`) + * into every method. The handle does not consult `process.env.AGENT_ID` — the + * gateway sets that to the host's primary agent which is meaningless in + * multi-agent mode. + */ + +import type { CalendarBridgeClient } from './calendar-bridge.js'; +import type { MultiAgentScheduleCache, CachedSlot } from './schedule-cache.js'; +import { SlotStatus } from './types.js'; + +export interface MultiAgentSchedulerHandleParams { + bridge: CalendarBridgeClient; + cache: MultiAgentScheduleCache; + /** setInterval handles cleared on stop() */ + syncHandle: ReturnType; + checkHandle: ReturnType; + logger: { + info: (...args: unknown[]) => void; + warn: (...args: unknown[]) => void; + error: (...args: unknown[]) => void; + }; +} + +/** Last slot we dispatched to an agent. Used as the implicit "current slot". */ +export interface WokenSlot { + agentId: string; + slotId: number | null; + virtualId: string | null; + scheduledAt: string | null; + slotType: string | null; + estimatedDuration: number | null; + wokenAt: string; +} + +/** Public surface — mirrors the relevant subset of the old single-agent scheduler. */ +export class MultiAgentSchedulerHandle { + private readonly bridge: CalendarBridgeClient; + private readonly cache: MultiAgentScheduleCache; + private readonly syncHandle: ReturnType; + private readonly checkHandle: ReturnType; + private readonly logger: MultiAgentSchedulerHandleParams['logger']; + private readonly woken: Map = new Map(); + private stopped = false; + + constructor(params: MultiAgentSchedulerHandleParams) { + this.bridge = params.bridge; + this.cache = params.cache; + this.syncHandle = params.syncHandle; + this.checkHandle = params.checkHandle; + this.logger = params.logger; + } + + // ---------- lifecycle ---------- + + /** True while the sync/check intervals are still ticking. */ + isRunning(): boolean { + return !this.stopped; + } + + /** Always false in multi-agent mode — there is no per-instance work queue. */ + isProcessing(): boolean { + return false; + } + + stop(): void { + if (this.stopped) return; + this.stopped = true; + clearInterval(this.syncHandle); + clearInterval(this.checkHandle); + this.logger.info('Calendar scheduler stopped (multi-agent mode)'); + } + + // ---------- wake bookkeeping ---------- + + /** Record that we just dispatched a slot to `agentId`. */ + recordWoken(agentId: string, slot: CachedSlot): void { + this.woken.set(agentId, { + agentId, + slotId: typeof slot.id === 'number' ? slot.id : null, + virtualId: typeof slot.virtual_id === 'string' ? slot.virtual_id : null, + scheduledAt: typeof slot.scheduled_at === 'string' ? slot.scheduled_at : null, + slotType: typeof slot.slot_type === 'string' ? slot.slot_type : null, + estimatedDuration: + typeof slot.estimated_duration === 'number' ? slot.estimated_duration : null, + wokenAt: new Date().toISOString(), + }); + } + + // ---------- per-agent reads ---------- + + /** + * The slot most recently dispatched to `agentId`, or null if we never woke + * them (e.g. tool called outside a wakeup). Callers can fall back to + * scanning {@link cachedSlotsFor} for `not_started`/`deferred` slots if they + * want a heuristic "current". + */ + getWokenSlot(agentId: string | null | undefined): WokenSlot | null { + if (!agentId) return null; + return this.woken.get(agentId) ?? null; + } + + /** Today's cached slots for an agent (whatever runSync last pulled). */ + cachedSlotsFor(agentId: string | null | undefined): CachedSlot[] { + if (!agentId) return []; + return this.cache.getAgentSlots(agentId); + } + + /** + * Implicit "current" slot: the last woken slot if we know about it, + * otherwise the highest-priority `not_started`/`deferred`/`ongoing` cached + * slot for the agent (so a tool called between sync windows still finds + * something sensible). + */ + resolveCurrentSlot(agentId: string | null | undefined): WokenSlot | null { + const woken = this.getWokenSlot(agentId); + if (woken) return woken; + if (!agentId) return null; + const slots = this.cache.getAgentSlots(agentId) + .filter((s) => s.status === 'not_started' || s.status === 'deferred' || s.status === 'ongoing') + .sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0)); + const top = slots[0]; + if (!top) return null; + return { + agentId, + slotId: typeof top.id === 'number' ? top.id : null, + virtualId: typeof top.virtual_id === 'string' ? top.virtual_id : null, + scheduledAt: typeof top.scheduled_at === 'string' ? top.scheduled_at : null, + slotType: typeof top.slot_type === 'string' ? top.slot_type : null, + estimatedDuration: + typeof top.estimated_duration === 'number' ? top.estimated_duration : null, + wokenAt: 'inferred-from-cache', + }; + } + + // ---------- per-agent writes ---------- + + async completeSlot(agentId: string, actualDurationMinutes: number): Promise { + return this.transition(agentId, SlotStatus.FINISHED, { actual_duration: actualDurationMinutes }); + } + + async abortSlot(agentId: string, reason?: string): Promise { + if (reason) this.logger.info(`Aborting slot for ${agentId}: ${reason}`); + return this.transition(agentId, SlotStatus.ABORTED); + } + + async pauseSlot(agentId: string): Promise { + return this.transition(agentId, SlotStatus.PAUSED); + } + + /** Resume puts the slot back into `ongoing` so it isn't picked up as `not_started`. */ + async resumeSlot(agentId: string): Promise { + return this.transition(agentId, SlotStatus.ONGOING); + } + + private async transition( + agentId: string, + status: SlotStatus, + extra: { actual_duration?: number } = {} + ): Promise { + const slot = this.resolveCurrentSlot(agentId); + if (!slot) { + return { ok: false, error: `No tracked slot for agent '${agentId}'` }; + } + if (slot.slotId == null && slot.virtualId == null) { + return { ok: false, error: 'Resolved slot has neither id nor virtual_id' }; + } + const update = { status, ...extra }; + try { + if (slot.slotId != null) { + const ok = await this.bridge.updateSlotAs(agentId, slot.slotId, update); + if (!ok) return { ok: false, error: `Backend rejected slot ${slot.slotId} → ${status}` }; + // Clear the wake cursor only on terminal transitions so re-wakes don't + // re-trigger the same slot mutation. + if (status === SlotStatus.FINISHED || status === SlotStatus.ABORTED) { + this.woken.delete(agentId); + } + return { ok: true, slot, status }; + } + // virtual slot path: bridge has updateVirtualSlot + const materialised = await this.bridge.updateVirtualSlot(slot.virtualId!, update); + if (!materialised) { + return { ok: false, error: `Backend rejected virtual slot ${slot.virtualId}` }; + } + if (status === SlotStatus.FINISHED || status === SlotStatus.ABORTED) { + this.woken.delete(agentId); + } + return { ok: true, slot, status, materialised }; + } catch (err: unknown) { + const msg = (err as { message?: string } | undefined)?.message ?? String(err); + return { ok: false, error: msg }; + } + } + + // ---------- gateway-restart parity (no-op in multi-agent mode) ---------- + + /** Multi-agent scheduler does not persist a `ScheduledGatewayRestart` flag. */ + isRestartPending(): boolean { + return false; + } + getStateFilePath(): string | null { + return null; + } + getState(): { mode: 'multi-agent'; agents: number; lastWoken: WokenSlot[] } { + return { + mode: 'multi-agent', + agents: this.cache.getStatus().agentCount, + lastWoken: Array.from(this.woken.values()), + }; + } +} + +export interface SlotMutationResult { + ok: boolean; + error?: string; + status?: SlotStatus; + slot?: WokenSlot; + /** Present only when a virtual slot was materialised. */ + materialised?: unknown; +} diff --git a/plugin/index.ts b/plugin/index.ts index 1510d05..253acb0 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -14,6 +14,7 @@ import { hostname, freemem, totalmem, uptime, loadavg, platform } from 'node:os'; import { definePluginEntry } from 'openclaw/plugin-sdk/plugin-entry'; import { MultiAgentScheduleCache } from './calendar/schedule-cache.js'; +import { MultiAgentSchedulerHandle } from './calendar/multi-agent-handle.js'; import { getPluginConfig } from './core/config.js'; import { MonitorBridgeClient, type OpenClawMeta } from './core/monitor-bridge.js'; import type { OpenClawAgentInfo } from './core/openclaw-agents.js'; @@ -21,9 +22,7 @@ import { registerGatewayStartHook } from './hooks/gateway-start.js'; import { registerGatewayStopHook } from './hooks/gateway-stop.js'; import { createCalendarBridgeClient, - createCalendarScheduler, CalendarScheduler, - AgentWakeContext, } from './calendar/index.js'; interface PluginAPI { @@ -109,7 +108,7 @@ function register(api: PluginAPI): void { }, openclaw: { version: api.runtime?.version || api.version || 'unknown', - pluginVersion: '0.3.1', // Bumped for PLG-CAL-004 + pluginVersion: '0.3.2', // Bumped for PLG-CAL-004 }, timestamp: new Date().toISOString(), }; @@ -118,8 +117,13 @@ function register(api: PluginAPI): void { // Periodic metadata push interval handle let metaPushInterval: ReturnType | null = null; - // Calendar scheduler instance - let calendarScheduler: CalendarScheduler | null = null; + // Calendar scheduler instance. + // + // In multi-agent sync mode (the only path today) this is a + // {@link MultiAgentSchedulerHandle}. The legacy `CalendarScheduler` type + // is retained in the union for compatibility with the typed-only single- + // agent path that may be reintroduced later. + let calendarScheduler: MultiAgentSchedulerHandle | CalendarScheduler | null = null; /** * Push OpenClaw metadata to the Monitor bridge. @@ -143,7 +147,7 @@ function register(api: PluginAPI): void { const meta: OpenClawMeta = { version: api.runtime?.version || api.version || 'unknown', - plugin_version: '0.3.1', + plugin_version: '0.3.2', agents: agentNames.map(name => ({ name })), }; @@ -280,37 +284,10 @@ function register(api: PluginAPI): void { } } - /** - * Track session completion and update slot status accordingly. - */ - function trackSessionCompletion(sessionId: string, context: AgentWakeContext): void { - // Poll for session completion (simplified approach) - // In production, this would use webhooks or event streaming - const pollInterval = 30000; // 30 seconds - const maxDuration = context.slot.estimated_duration * 60 * 1000; // Convert to ms - const startTime = Date.now(); - - const poll = async () => { - if (!calendarScheduler) return; - - const elapsed = Date.now() - startTime; - - // Check if session is complete (would use actual API in production) - // For now, estimate completion based on duration - if (elapsed >= maxDuration) { - // Assume completion - const actualMinutes = Math.round(elapsed / 60000); - await calendarScheduler.completeCurrentSlot(actualMinutes); - return; - } - - // Continue polling - setTimeout(poll, pollInterval); - }; - - // Start polling - setTimeout(poll, pollInterval); - } + // (trackSessionCompletion removed — legacy single-agent poll loop that + // called calendarScheduler.completeCurrentSlot. The multi-agent path + // closes slots via the harborforge_calendar_complete tool, driven by + // the agent itself, not by a timer.) /** * Initialize and start the calendar scheduler. @@ -381,6 +358,13 @@ function register(api: PluginAPI): void { // Wake the agent with the slot context inlined const ok = await wakeAgent(agentId, fresh); if (ok) { + // Top slot is the one inlined in the wakeup message; record it as + // the agent's "current" so harborforge_calendar_complete/abort/… + // can resolve a slot without an explicit param. + const top = fresh[0]; + if (top && calendarScheduler instanceof MultiAgentSchedulerHandle) { + calendarScheduler.recordWoken(agentId, top); + } for (const s of fresh) { const key = `${agentId}::${s.id ?? s.virtual_id ?? s.scheduled_at}`; wakedSlotKeys.add(key); @@ -400,14 +384,14 @@ function register(api: PluginAPI): void { const syncHandle = setInterval(runSyncReset, SYNC_INTERVAL_MS); const checkHandle = setInterval(runCheck, CHECK_INTERVAL_MS); - // Store handles for cleanup (reuse calendarScheduler variable) - (calendarScheduler as any) = { - stop() { - clearInterval(syncHandle); - clearInterval(checkHandle); - logger.info('Calendar scheduler stopped'); - }, - }; + // Install the multi-agent handle so calendar tools resolve per-caller. + calendarScheduler = new MultiAgentSchedulerHandle({ + bridge: calendarBridge, + cache: scheduleCache, + syncHandle, + checkHandle, + logger, + }); logger.info('Calendar scheduler started (multi-agent sync mode)'); } @@ -444,7 +428,7 @@ function register(api: PluginAPI): void { }); // Tool: plugin status - api.registerTool(() => ({ + api.registerTool((ctx) => ({ name: 'harborforge_status', description: 'Get HarborForge plugin status and current telemetry snapshot', parameters: { @@ -463,13 +447,27 @@ function register(api: PluginAPI): void { : { connected: false, error: 'Monitor bridge unreachable' }; } - // Get calendar scheduler status - const calendarStatus = calendarScheduler ? { - running: calendarScheduler.isRunning(), - processing: calendarScheduler.isProcessing(), - currentSlot: calendarScheduler.getCurrentSlot(), - isRestartPending: calendarScheduler.isRestartPending(), - } : null; + // Get calendar scheduler status. In multi-agent mode `currentSlot` + // depends on the caller, so look it up via ctx.agentId. + const callerAgentId = ctx?.agentId ?? resolveAgentId(); + const calendarStatus = calendarScheduler + ? calendarScheduler instanceof MultiAgentSchedulerHandle + ? { + running: calendarScheduler.isRunning(), + processing: calendarScheduler.isProcessing(), + mode: 'multi-agent', + callerAgentId, + currentSlot: calendarScheduler.resolveCurrentSlot(callerAgentId), + isRestartPending: calendarScheduler.isRestartPending(), + } + : { + running: calendarScheduler.isRunning(), + processing: calendarScheduler.isProcessing(), + mode: 'single-agent', + currentSlot: calendarScheduler.getCurrentSlot(), + isRestartPending: calendarScheduler.isRestartPending(), + } + : null; return { enabled: live.enabled !== false, @@ -528,7 +526,7 @@ function register(api: PluginAPI): void { })); // Tool: calendar slot management - api.registerTool(() => ({ + api.registerTool((ctx) => ({ name: 'harborforge_calendar_status', description: 'Get current calendar scheduler status and pending slots', parameters: { @@ -539,10 +537,24 @@ function register(api: PluginAPI): void { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } - + const callerAgentId = ctx?.agentId ?? resolveAgentId(); + if (calendarScheduler instanceof MultiAgentSchedulerHandle) { + return { + running: calendarScheduler.isRunning(), + processing: calendarScheduler.isProcessing(), + mode: 'multi-agent', + callerAgentId, + currentSlot: calendarScheduler.resolveCurrentSlot(callerAgentId), + agentSlots: calendarScheduler.cachedSlotsFor(callerAgentId), + state: calendarScheduler.getState(), + isRestartPending: calendarScheduler.isRestartPending(), + stateFilePath: calendarScheduler.getStateFilePath(), + }; + } return { running: calendarScheduler.isRunning(), processing: calendarScheduler.isProcessing(), + mode: 'single-agent', currentSlot: calendarScheduler.getCurrentSlot(), state: calendarScheduler.getState(), isRestartPending: calendarScheduler.isRestartPending(), @@ -552,7 +564,7 @@ function register(api: PluginAPI): void { })); // Tool: complete current slot (for agent to report completion) - api.registerTool(() => ({ + api.registerTool((ctx) => ({ name: 'harborforge_calendar_complete', description: 'Complete the current calendar slot with actual duration', parameters: { @@ -569,14 +581,20 @@ function register(api: PluginAPI): void { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } - + if (calendarScheduler instanceof MultiAgentSchedulerHandle) { + const agentId = ctx?.agentId ?? resolveAgentId(); + const res = await calendarScheduler.completeSlot(agentId, params.actualDurationMinutes); + return res.ok + ? { success: true, message: 'Slot completed', slot: res.slot } + : { error: res.error }; + } await calendarScheduler.completeCurrentSlot(params.actualDurationMinutes); return { success: true, message: 'Slot completed' }; }, })); // Tool: abort current slot (for agent to report failure) - api.registerTool(() => ({ + api.registerTool((ctx) => ({ name: 'harborforge_calendar_abort', description: 'Abort the current calendar slot', parameters: { @@ -592,14 +610,20 @@ function register(api: PluginAPI): void { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } - + if (calendarScheduler instanceof MultiAgentSchedulerHandle) { + const agentId = ctx?.agentId ?? resolveAgentId(); + const res = await calendarScheduler.abortSlot(agentId, params.reason); + return res.ok + ? { success: true, message: 'Slot aborted', slot: res.slot } + : { error: res.error }; + } await calendarScheduler.abortCurrentSlot(params.reason); return { success: true, message: 'Slot aborted' }; }, })); // Tool: pause current slot - api.registerTool(() => ({ + api.registerTool((ctx) => ({ name: 'harborforge_calendar_pause', description: 'Pause the current calendar slot', parameters: { @@ -610,14 +634,20 @@ function register(api: PluginAPI): void { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } - + if (calendarScheduler instanceof MultiAgentSchedulerHandle) { + const agentId = ctx?.agentId ?? resolveAgentId(); + const res = await calendarScheduler.pauseSlot(agentId); + return res.ok + ? { success: true, message: 'Slot paused', slot: res.slot } + : { error: res.error }; + } await calendarScheduler.pauseCurrentSlot(); return { success: true, message: 'Slot paused' }; }, })); // Tool: resume current slot - api.registerTool(() => ({ + api.registerTool((ctx) => ({ name: 'harborforge_calendar_resume', description: 'Resume the paused calendar slot', parameters: { @@ -628,7 +658,13 @@ function register(api: PluginAPI): void { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } - + if (calendarScheduler instanceof MultiAgentSchedulerHandle) { + const agentId = ctx?.agentId ?? resolveAgentId(); + const res = await calendarScheduler.resumeSlot(agentId); + return res.ok + ? { success: true, message: 'Slot resumed', slot: res.slot } + : { error: res.error }; + } await calendarScheduler.resumeCurrentSlot(); return { success: true, message: 'Slot resumed' }; }, diff --git a/plugin/package.json b/plugin/package.json index eb3d4fb..cc77f67 100644 --- a/plugin/package.json +++ b/plugin/package.json @@ -1,6 +1,6 @@ { "name": "harbor-forge-plugin", - "version": "0.2.0", + "version": "0.3.2", "description": "OpenClaw plugin for HarborForge monitor bridge and CLI integration", "type": "module", "main": "dist/index.js",