/** * HarborForge Plugin for OpenClaw * * Provides monitor-related tools and exposes OpenClaw metadata * for the HarborForge Monitor bridge (via monitor_port). * * Also integrates with HarborForge Calendar system to wake agents * for scheduled tasks (PLG-CAL-002, PLG-CAL-004). * * Sidecar architecture has been removed. Telemetry data is now * served directly by the plugin when Monitor queries via the * local monitor_port communication path. */ import { hostname, freemem, totalmem, uptime, loadavg, platform } from 'node:os'; import { definePluginEntry } from 'openclaw/plugin-sdk/plugin-entry'; import { MultiAgentScheduleCache } from './calendar/schedule-cache.js'; import { MultiAgentSchedulerHandle } from './calendar/multi-agent-handle.js'; import { getPluginConfig } from './core/config.js'; import { MonitorBridgeClient, type OpenClawMeta } from './core/monitor-bridge.js'; import type { OpenClawAgentInfo } from './core/openclaw-agents.js'; import { registerGatewayStartHook } from './hooks/gateway-start.js'; import { registerGatewayStopHook } from './hooks/gateway-stop.js'; import { createCalendarBridgeClient, CalendarScheduler, } from './calendar/index.js'; // --------------------------------------------------------------------------- // Module-scope calendar scheduler singleton. // // `register()` is called multiple times per gateway boot — once per agent // (we see 5 `HarborForge plugin registered` lines for 5 agents on dind-t2). // `gateway_start` only fires once, so before this lift the // `startCalendarScheduler()` setup ran inside ONE closure while four other // closures kept their own `calendarScheduler = null`. Whichever of the five // tool registrations the gateway picked at call time was effectively a coin // flip, and four times out of five `harborforge_calendar_status` returned // `Calendar scheduler not running` even though the scheduler was active. // // Keeping the singleton at module scope removes the per-`register()` shadow: // the scheduler is started once, every closure reads the same binding, and // `startCalendarScheduler()` is idempotent so duplicate `gateway_start` // firings are harmless. // --------------------------------------------------------------------------- let calendarScheduler: MultiAgentSchedulerHandle | CalendarScheduler | null = null; interface PluginAPI { logger: { info: (...args: any[]) => void; error: (...args: any[]) => void; debug: (...args: any[]) => void; warn: (...args: any[]) => void; }; version?: string; runtime?: { version?: string; config?: { loadConfig?: () => any; }; }; config?: Record; pluginConfig?: Record; on: (event: string, handler: () => void) => void; registerTool: (factory: (ctx: any) => any) => void; /** Spawn a sub-agent with task context (OpenClaw 2.1+) */ spawn?: (options: { agentId?: string; task: string; model?: string; timeoutSeconds?: number; }) => Promise<{ sessionId: string; status: string }>; /** Get current agent status */ getAgentStatus?: () => Promise<{ status: string } | null>; } /** * Coerce a tool execute() return value into the MCP `{ content: [...] }` * shape that the openclaw Codex tool dispatcher requires. * * Background: openclaw's `convertToolContents()` does `result.content.reduce(...)` * to compute total text length before flattening. Every HF tool here returned a * bare object (`{ running, processing, currentSlot, ... }`) which has no * `.content` field, so `undefined.reduce` threw and every call to * `harborforge_*` from a Codex-harness agent surfaced as the cryptic * `Cannot read properties of undefined (reading 'reduce')`. The fix is to * wrap every tool's execute return; doing it at the `registerTool` boundary * keeps each tool body unchanged. */ function ensureMcpContentShape(result: unknown): { content: Array<{ type: 'text'; text: string }> } { if ( result && typeof result === 'object' && Array.isArray((result as { content?: unknown }).content) ) { return result as { content: Array<{ type: 'text'; text: string }> }; } const text = typeof result === 'string' ? result : JSON.stringify(result, null, 2); return { content: [{ type: 'text', text }] }; } function register(api: PluginAPI): void { const logger = api.logger || { info: (...args: any[]) => console.log('[HarborForge]', ...args), error: (...args: any[]) => console.error('[HarborForge]', ...args), debug: (...args: any[]) => console.debug('[HarborForge]', ...args), warn: (...args: any[]) => console.warn('[HarborForge]', ...args), }; // Wrap api.registerTool so every tool's execute() return is coerced into // the MCP `{ content: [...] }` shape openclaw expects. See // `ensureMcpContentShape` above. const _origRegisterTool = api.registerTool.bind(api); api.registerTool = (factory: (ctx: any) => any) => { _origRegisterTool((ctx: any) => { const def = factory(ctx); if (!def || typeof def.execute !== 'function') return def; const origExecute = def.execute; return { ...def, execute: async (...args: any[]) => ensureMcpContentShape(await origExecute(...args)), }; }); }; function resolveConfig() { return getPluginConfig(api); } /** Resolve agent ID from env, config, or fallback. */ function resolveAgentId(): string { if (process.env.AGENT_ID) return process.env.AGENT_ID; const cfg = api.runtime?.config?.loadConfig?.(); return cfg?.agents?.list?.[0]?.id ?? cfg?.agents?.defaults?.id ?? 'unknown'; } /** * Get the monitor bridge client if monitor_port is configured. */ function getBridgeClient(): MonitorBridgeClient | null { const live = resolveConfig(); const port = live.monitor_port; if (!port || port <= 0) return null; return new MonitorBridgeClient(port); } /** * Collect current system telemetry snapshot. * This data is exposed to the Monitor bridge when it queries the plugin. */ function collectTelemetry() { const live = resolveConfig(); const load = loadavg(); return { identifier: live.identifier || hostname(), platform: platform(), hostname: hostname(), uptime: uptime(), memory: { total: totalmem(), free: freemem(), used: totalmem() - freemem(), usagePercent: ((totalmem() - freemem()) / totalmem()) * 100, }, load: { avg1: load[0], avg5: load[1], avg15: load[2], }, openclaw: { version: api.runtime?.version || api.version || 'unknown', pluginVersion: '0.3.4', // Bumped for PLG-CAL-004 }, timestamp: new Date().toISOString(), }; } // Periodic metadata push interval handle let metaPushInterval: ReturnType | null = null; // (calendarScheduler is module-scope — see top of file for the why. // Tools and lifecycle hooks all reference the same binding so the // multi-register/single-start mismatch can't shadow them again.) /** * Push OpenClaw metadata to the Monitor bridge. * This enriches Monitor heartbeats with OpenClaw version/plugin/agent info. * Failures are non-fatal — Monitor continues to work without this data. */ async function pushMetaToMonitor() { const bridgeClient = getBridgeClient(); if (!bridgeClient) return; let agentNames: string[] = []; try { const cfg = api.runtime?.config?.loadConfig?.(); const agentsList = cfg?.agents?.list; if (Array.isArray(agentsList)) { agentNames = agentsList .map((a: any) => typeof a === 'string' ? a : a?.name) .filter(Boolean); } } catch { /* non-fatal */ } const meta: OpenClawMeta = { version: api.runtime?.version || api.version || 'unknown', plugin_version: '0.3.4', agents: agentNames.map(name => ({ name })), }; const ok = await bridgeClient.pushOpenClawMeta(meta); if (ok) { logger.debug('pushed OpenClaw metadata to Monitor bridge'); } else { logger.debug('Monitor bridge unreachable for metadata push (non-fatal)'); } } /** * Get current agent status from OpenClaw. * Falls back to querying backend if OpenClaw API unavailable. */ async function getAgentStatus(): Promise<'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | null> { // Try OpenClaw API first (if available) if (api.getAgentStatus) { try { const status = await api.getAgentStatus(); if (status?.status) { return status.status as 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline'; } } catch (err) { logger.debug('Failed to get agent status from OpenClaw API:', err); } } // Fallback: query backend for agent status const live = resolveConfig(); const agentId = resolveAgentId(); try { const response = await fetch(`${live.backendUrl}/calendar/agent/status?agent_id=${agentId}`, { headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': live.identifier || hostname(), }, }); if (response.ok) { const data = await response.json(); return data.status; } } catch (err) { logger.debug('Failed to get agent status from backend:', err); } return null; } /** * Wake agent via dispatchInboundMessage — same mechanism used by Discord plugin. * Direct in-process call, no WebSocket or CLI needed. */ async function wakeAgent( agentId: string, dueSlots?: Array<{ id?: number | null; virtual_id?: string | null; event_data?: any; scheduled_at?: string; priority?: number; slot_type?: string; [k: string]: unknown; }> ): Promise { logger.info(`Waking agent ${agentId}: has due slots`); const sessionKey = `agent:${agentId}:hf-wakeup`; try { const sdkPath = 'openclaw/plugin-sdk/reply-runtime'; const { dispatchInboundMessageWithDispatcher } = await import( /* webpackIgnore: true */ sdkPath ); // api.config first (current public API). Fall back to deprecated // runtime.config.loadConfig() for older host versions. Both should // contain agents.list / channels for dispatch routing. const cfg = (api as any).config ?? api.runtime?.config?.loadConfig?.(); if (!cfg) { logger.error('Cannot load OpenClaw config for dispatch'); return false; } // Inline the highest-priority due slot's context so the agent does // not need a second round-trip to harborforge_calendar_status. The // agent can read event_data.task_code / task_title etc. directly. let slotBlock = ''; const top = dueSlots && dueSlots.length ? dueSlots[0] : undefined; if (top) { slotBlock = `\n\nMatching slot:\n\`\`\`json\n${JSON.stringify( { slot_id: top.id ?? null, virtual_id: top.virtual_id ?? null, scheduled_at: top.scheduled_at ?? null, priority: top.priority ?? null, slot_type: top.slot_type ?? null, event_data: top.event_data ?? null, }, null, 2 )}\n\`\`\``; } // First-line ack `WAKEUP_OK` is the plugin's ack-receipt token; the // agent MUST then continue in the same session and drive the // `hf-wakeup` workflow to completion (calendar_status → task fetch → // sub-workflow → calendar_complete/abort). Without that continuation // the scheduler keeps re-waking every 30s because the slot stays // `not_started` forever. const wakeupMessage = `You have due slots. **First line of your reply MUST be exactly ` + `\`WAKEUP_OK\`** so the plugin records the ack. Then, **in this ` + `same session**, drive the \`hf-wakeup\` workflow of skill ` + `\`hf-hangman-lab\` to completion — read slot context, call the ` + `harborforge_calendar_* tools, route to the right sub-workflow, ` + `and finish with harborforge_calendar_complete or abort. Do NOT ` + `stop after the ack — the scheduler will re-wake you every 30s ` + `until the slot transitions out of \`not_started\`.${slotBlock}`; const result = await dispatchInboundMessageWithDispatcher({ ctx: { Body: wakeupMessage, SessionKey: sessionKey, From: 'harborforge-calendar', Provider: 'harborforge', }, cfg, dispatcherOptions: { deliver: async (payload: any) => { const text = (payload.text || '').trim(); logger.info(`Agent ${agentId} wakeup reply: ${text.slice(0, 100)}`); }, }, }); logger.info(`Agent ${agentId} dispatched: ${result?.status || 'ok'}`); return true; } catch (err: any) { const msg = err?.message || err?.code || String(err); const stack = err?.stack?.split('\n').slice(0, 3).join(' | ') || ''; logger.error(`Failed to dispatch agent for slot: ${msg} ${stack}`); return false; } } // (trackSessionCompletion removed — legacy single-agent poll loop that // called calendarScheduler.completeCurrentSlot. The multi-agent path // closes slots via the harborforge_calendar_complete tool, driven by // the agent itself, not by a timer.) /** * Initialize and start the calendar scheduler. * * Idempotent — `gateway_start` may fire once per `register()` invocation * (the host calls `register` per agent), and we only want one set of * sync/check intervals across the whole process. */ function startCalendarScheduler(): void { if (calendarScheduler) { logger.info('Calendar scheduler already started, skipping duplicate gateway_start'); return; } const live = resolveConfig(); // Create bridge client (claw-instance level, not per-agent) const calendarBridge = createCalendarBridgeClient( api, live.backendUrl || 'https://monitor.hangman-lab.top', 'unused' // agentId no longer needed at bridge level ); // Multi-agent sync + check loop const scheduleCache = new MultiAgentScheduleCache(); const SYNC_INTERVAL_MS = 300_000; // 5 min const CHECK_INTERVAL_MS = 30_000; // 30 sec // Sync: pull all agent schedules from backend async function runSync() { try { const result = await calendarBridge.syncSchedules(); if (result) { scheduleCache.sync(result.date, result.schedules); const status = scheduleCache.getStatus(); logger.info(`Schedule synced: ${status.agentCount} agents, ${status.totalSlots} slots`); } } catch (err) { logger.warn(`Schedule sync failed: ${String(err)}`); } } // Cross-plugin exposure: agent status lookup for other plugins // (currently Fabric.OpenclawPlugin uses this to skip delivering // `announce` channel messages to busy agents — see DIALECTIC-V2 // design doc, Phase 1). Backed by calendarBridge.getAgentStatus // with a small TTL cache to avoid hammering the HF backend. type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline'; const HF_STATUS_CACHE_TTL_MS = 30_000; const hfStatusCache = new Map(); const _G = globalThis as Record; _G['__hfAgentStatus'] = { async get(agentId: string): Promise { if (!agentId) return undefined; const cached = hfStatusCache.get(agentId); if (cached && Date.now() - cached.at < HF_STATUS_CACHE_TTL_MS) { return cached.status; } try { const status = await calendarBridge.getAgentStatus(agentId); if (status) { const typed = status as HfStatus; hfStatusCache.set(agentId, { status: typed, at: Date.now() }); return typed; } } catch { /* fall through to cached-or-undefined */ } return cached?.status; }, }; // Track wakes already dispatched for a slot in the current sync // window — the simplified inline scheduler does not PATCH slot // status server-side, so without dedupe the check loop re-wakes // the same slot every 30s. Set is cleared by runSync (fresh wake // budget per sync). const wakedSlotKeys = new Set(); // Check: find agents with due slots and wake them async function runCheck() { const now = new Date(); const agentsWithDue = scheduleCache.getAgentsWithDueSlots(now); for (const { agentId, slots } of agentsWithDue) { // Filter out slots we've already woken this sync window const fresh = slots.filter((s) => { const key = `${agentId}::${s.id ?? s.virtual_id ?? s.scheduled_at}`; if (wakedSlotKeys.has(key)) return false; return true; }); if (fresh.length === 0) continue; // Check if agent is busy (best effort; backend may 405 the GET // — treat unknown as not-busy so wakeup still fires) let status: string | null = null; try { status = await calendarBridge.getAgentStatus(agentId); } catch { status = null; } if (status === 'busy' || status === 'offline' || status === 'exhausted') { continue; } // Wake the agent with the slot context inlined const ok = await wakeAgent(agentId, fresh); if (ok) { // Top slot is the one inlined in the wakeup message; record it as // the agent's "current" so harborforge_calendar_complete/abort/… // can resolve a slot without an explicit param. const top = fresh[0]; if (top && calendarScheduler instanceof MultiAgentSchedulerHandle) { calendarScheduler.recordWoken(agentId, top); } for (const s of fresh) { const key = `${agentId}::${s.id ?? s.virtual_id ?? s.scheduled_at}`; wakedSlotKeys.add(key); } } } } // Initial sync (also resets the wake-dedupe window) const runSyncReset = async () => { wakedSlotKeys.clear(); await runSync(); }; runSyncReset(); // Start intervals const syncHandle = setInterval(runSyncReset, SYNC_INTERVAL_MS); const checkHandle = setInterval(runCheck, CHECK_INTERVAL_MS); // Install the multi-agent handle so calendar tools resolve per-caller. calendarScheduler = new MultiAgentSchedulerHandle({ bridge: calendarBridge, cache: scheduleCache, syncHandle, checkHandle, logger, }); logger.info('Calendar scheduler started (multi-agent sync mode)'); } /** * Stop the calendar scheduler. */ function stopCalendarScheduler(): void { if (calendarScheduler) { calendarScheduler.stop(); calendarScheduler = null; logger.info('Calendar scheduler stopped'); } } registerGatewayStartHook(api, { logger, pushMetaToMonitor, startCalendarScheduler, setMetaPushInterval(handle) { metaPushInterval = handle; }, }); registerGatewayStopHook(api, { logger, getMetaPushInterval() { return metaPushInterval; }, clearMetaPushInterval() { metaPushInterval = null; }, stopCalendarScheduler, }); // Tool: plugin status api.registerTool((ctx) => ({ name: 'harborforge_status', description: 'Get HarborForge plugin status and current telemetry snapshot', parameters: { type: 'object', properties: {}, }, async execute() { const live = resolveConfig(); const bridgeClient = getBridgeClient(); let monitorBridge = null; if (bridgeClient) { const health = await bridgeClient.health(); monitorBridge = health ? { connected: true, ...health } : { connected: false, error: 'Monitor bridge unreachable' }; } // Get calendar scheduler status. In multi-agent mode `currentSlot` // depends on the caller, so look it up via ctx.agentId. const callerAgentId = ctx?.agentId ?? resolveAgentId(); const calendarStatus = calendarScheduler ? calendarScheduler instanceof MultiAgentSchedulerHandle ? { running: calendarScheduler.isRunning(), processing: calendarScheduler.isProcessing(), mode: 'multi-agent', callerAgentId, currentSlot: calendarScheduler.resolveCurrentSlot(callerAgentId), isRestartPending: calendarScheduler.isRestartPending(), } : { running: calendarScheduler.isRunning(), processing: calendarScheduler.isProcessing(), mode: 'single-agent', currentSlot: calendarScheduler.getCurrentSlot(), isRestartPending: calendarScheduler.isRestartPending(), } : null; return { enabled: live.enabled !== false, config: { backendUrl: live.backendUrl, identifier: live.identifier || hostname(), monitorPort: live.monitor_port ?? null, reportIntervalSec: live.reportIntervalSec, hasApiKey: Boolean(live.apiKey), }, monitorBridge, calendar: calendarStatus, telemetry: collectTelemetry(), }; }, })); // Tool: telemetry snapshot (for Monitor bridge queries) api.registerTool(() => ({ name: 'harborforge_telemetry', description: 'Get current system telemetry data for HarborForge Monitor', parameters: { type: 'object', properties: {}, }, async execute() { return collectTelemetry(); }, })); // Tool: query Monitor bridge for host hardware telemetry api.registerTool(() => ({ name: 'harborforge_monitor_telemetry', description: 'Query HarborForge Monitor bridge for host hardware telemetry (CPU, memory, disk, etc.)', parameters: { type: 'object', properties: {}, }, async execute() { const bridgeClient = getBridgeClient(); if (!bridgeClient) { return { error: 'Monitor bridge not configured (monitor_port not set or 0)', }; } const data = await bridgeClient.telemetry(); if (!data) { return { error: 'Monitor bridge unreachable', }; } return data; }, })); // Tool: calendar slot management api.registerTool((ctx) => ({ name: 'harborforge_calendar_status', description: 'Get current calendar scheduler status and pending slots', parameters: { type: 'object', properties: {}, }, async execute() { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } const callerAgentId = ctx?.agentId ?? resolveAgentId(); if (calendarScheduler instanceof MultiAgentSchedulerHandle) { return { running: calendarScheduler.isRunning(), processing: calendarScheduler.isProcessing(), mode: 'multi-agent', callerAgentId, currentSlot: calendarScheduler.resolveCurrentSlot(callerAgentId), agentSlots: calendarScheduler.cachedSlotsFor(callerAgentId), state: calendarScheduler.getState(), isRestartPending: calendarScheduler.isRestartPending(), stateFilePath: calendarScheduler.getStateFilePath(), }; } return { running: calendarScheduler.isRunning(), processing: calendarScheduler.isProcessing(), mode: 'single-agent', currentSlot: calendarScheduler.getCurrentSlot(), state: calendarScheduler.getState(), isRestartPending: calendarScheduler.isRestartPending(), stateFilePath: calendarScheduler.getStateFilePath(), }; }, })); // Tool: complete current slot (for agent to report completion) api.registerTool((ctx) => ({ name: 'harborforge_calendar_complete', description: 'Complete the current calendar slot with actual duration', parameters: { type: 'object', properties: { actualDurationMinutes: { type: 'number', description: 'Actual time spent on the task in minutes', }, }, required: ['actualDurationMinutes'], }, async execute(params: { actualDurationMinutes: number }) { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } if (calendarScheduler instanceof MultiAgentSchedulerHandle) { const agentId = ctx?.agentId ?? resolveAgentId(); const res = await calendarScheduler.completeSlot(agentId, params.actualDurationMinutes); return res.ok ? { success: true, message: 'Slot completed', slot: res.slot } : { error: res.error }; } await calendarScheduler.completeCurrentSlot(params.actualDurationMinutes); return { success: true, message: 'Slot completed' }; }, })); // Tool: abort current slot (for agent to report failure) api.registerTool((ctx) => ({ name: 'harborforge_calendar_abort', description: 'Abort the current calendar slot', parameters: { type: 'object', properties: { reason: { type: 'string', description: 'Reason for aborting', }, }, }, async execute(params: { reason?: string }) { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } if (calendarScheduler instanceof MultiAgentSchedulerHandle) { const agentId = ctx?.agentId ?? resolveAgentId(); const res = await calendarScheduler.abortSlot(agentId, params.reason); return res.ok ? { success: true, message: 'Slot aborted', slot: res.slot } : { error: res.error }; } await calendarScheduler.abortCurrentSlot(params.reason); return { success: true, message: 'Slot aborted' }; }, })); // Tool: pause current slot api.registerTool((ctx) => ({ name: 'harborforge_calendar_pause', description: 'Pause the current calendar slot', parameters: { type: 'object', properties: {}, }, async execute() { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } if (calendarScheduler instanceof MultiAgentSchedulerHandle) { const agentId = ctx?.agentId ?? resolveAgentId(); const res = await calendarScheduler.pauseSlot(agentId); return res.ok ? { success: true, message: 'Slot paused', slot: res.slot } : { error: res.error }; } await calendarScheduler.pauseCurrentSlot(); return { success: true, message: 'Slot paused' }; }, })); // Tool: resume current slot api.registerTool((ctx) => ({ name: 'harborforge_calendar_resume', description: 'Resume the paused calendar slot', parameters: { type: 'object', properties: {}, }, async execute() { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } if (calendarScheduler instanceof MultiAgentSchedulerHandle) { const agentId = ctx?.agentId ?? resolveAgentId(); const res = await calendarScheduler.resumeSlot(agentId); return res.ok ? { success: true, message: 'Slot resumed', slot: res.slot } : { error: res.error }; } await calendarScheduler.resumeCurrentSlot(); return { success: true, message: 'Slot resumed' }; }, })); // Tool: check ScheduledGatewayRestart status api.registerTool(() => ({ name: 'harborforge_restart_status', description: 'Check if a gateway restart is pending (PLG-CAL-004)', parameters: { type: 'object', properties: {}, }, async execute() { if (!calendarScheduler) { return { error: 'Calendar scheduler not running' }; } const isPending = calendarScheduler.isRestartPending(); const stateFilePath = calendarScheduler.getStateFilePath(); return { isRestartPending: isPending, stateFilePath: stateFilePath, message: isPending ? 'A gateway restart has been scheduled. The scheduler has been paused.' : 'No gateway restart is pending.', }; }, })); logger.info('HarborForge plugin registered (id: harbor-forge)'); } // HarborForge's local PluginAPI is broader than the standard OpenClawPluginApi // (it surfaces optional `version`/`runtime`/`spawn` accessors that older // OpenClaw builds exposed). The cast at the definePluginEntry boundary // acknowledges that gap — the runtime api object is whatever the gateway // passes us, and each access is guarded with optional chaining / fallbacks. export default definePluginEntry({ id: 'harbor-forge', name: 'HarborForge', description: 'HarborForge plugin for OpenClaw - project management, monitoring, and CLI integration', register: register as (api: any) => void, });