diff --git a/plugin/hooks/gateway-start.ts b/plugin/hooks/gateway-start.ts index 26aee53..de3f6ab 100644 --- a/plugin/hooks/gateway-start.ts +++ b/plugin/hooks/gateway-start.ts @@ -7,10 +7,20 @@ export function registerGatewayStartHook(api: any, deps: { pushMetaToMonitor: () => Promise; startCalendarScheduler: () => void; setMetaPushInterval: (handle: ReturnType) => void; + onCronServiceAvailable?: (cron: any) => void; }) { const { logger, pushMetaToMonitor, startCalendarScheduler, setMetaPushInterval } = deps; - api.on('gateway_start', () => { + api.on('gateway_start', (event?: any) => { + // Extract cron service from gateway startup event (same as memory-core dreaming) + if (event && deps.onCronServiceAvailable) { + const context = event?.context; + const cron = context?.cron ?? context?.deps?.cron; + if (cron && typeof cron.add === 'function') { + deps.onCronServiceAvailable(cron); + logger.info('HarborForge: cron service acquired from gateway_start'); + } + } logger.info('HarborForge plugin active'); const live = getPluginConfig(api); diff --git a/plugin/index.ts b/plugin/index.ts index 2a66a23..48be1fd 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -187,100 +187,46 @@ export default { return null; } + // Cron service reference, acquired from gateway_start event + let cronService: any = null; + /** - * Wake agent via gateway WebSocket API. - * Uses callGateway("agent") to trigger an agent turn — the same mechanism - * used by sessions_spawn and cron internally. + * Wake agent via cron service — same mechanism used by dreaming/memory-core. + * Creates a one-shot cron job with wakeMode: "now" and deleteAfterRun: true. */ async function wakeAgent(context: AgentWakeContext): Promise { logger.info(`Waking agent for slot: ${context.taskDescription}`); + if (!cronService) { + logger.error('Cron service not available — cannot wake agent'); + return false; + } + const agentId = process.env.AGENT_ID || 'unknown'; + const slotId = context.slot.id ?? context.slot.virtual_id ?? 'unknown'; try { - // Connect to gateway via WebSocket and trigger an agent turn. - // Uses the same gateway RPC that sessions_spawn and cron use internally. - const cfg = api.runtime?.config?.loadConfig?.() ?? {}; - const gwCfg = cfg.gateway ?? {}; - const gwPort = gwCfg.port ?? 18789; - const gwToken = gwCfg.auth?.token ?? ''; - const gatewayUrl = `ws://127.0.0.1:${gwPort}`; - - const result = await new Promise<{ sessionId?: string; error?: string }>((resolve, reject) => { - const timeout = setTimeout(() => { - try { client.close(); } catch {} - reject(new Error('Gateway connection timeout')); - }, 15000); - - // Pass token via Authorization header in the upgrade request - const wsUrl = gwToken ? `${gatewayUrl}?token=${encodeURIComponent(gwToken)}` : gatewayUrl; - const client = new (globalThis as any).WebSocket(wsUrl); - - client.onerror = (err: any) => { - clearTimeout(timeout); - reject(err?.error || err); - }; - - client.onopen = () => { - // Gateway uses "connect" method with auth in params - client.send(JSON.stringify({ - jsonrpc: '2.0', - method: 'connect', - params: { - clientName: 'harbor-forge-calendar', - mode: 'backend', - ...(gwToken ? { auth: { token: gwToken } } : {}), - }, - id: 1, - })); - }; - - let helloAcked = false; - - client.onmessage = (ev: any) => { - try { - const msg = JSON.parse(typeof ev.data === 'string' ? ev.data : ev.data.toString()); - - if (!helloAcked && msg.id === 1) { - helloAcked = true; - client.send(JSON.stringify({ - jsonrpc: '2.0', - method: 'agent', - params: { - message: context.prompt, - agentId, - timeoutSeconds: context.slot.estimated_duration * 60, - }, - id: 2, - })); - return; - } - - if (msg.id === 2) { - clearTimeout(timeout); - try { client.close(); } catch {} - if (msg.error) { - resolve({ error: msg.error.message || JSON.stringify(msg.error) }); - } else { - resolve({ sessionId: msg.result?.sessionId || 'ok' }); - } - } - } catch { - // ignore parse errors - } - }; + await cronService.add({ + name: `hf-calendar-slot-${slotId}`, + description: `[managed-by=harbor-forge.calendar] Slot ${slotId}: ${context.taskDescription}`, + deleteAfterRun: true, + wakeMode: 'now', + agentId, + payload: { + kind: 'agentTurn', + message: context.prompt, + timeoutSeconds: context.slot.estimated_duration * 60, + }, + delivery: { + mode: 'none', + }, }); - if (result.error) { - logger.error(`Gateway agent call failed: ${result.error}`); - return false; - } - - logger.info(`Agent woken via gateway for slot: session=${result.sessionId}`); + logger.info(`Agent wake cron job created for slot ${slotId}`); return true; } catch (err) { - logger.error('Failed to wake agent via gateway:', err); + logger.error('Failed to create wake cron job:', err); return false; } } @@ -363,6 +309,9 @@ export default { setMetaPushInterval(handle) { metaPushInterval = handle; }, + onCronServiceAvailable(cron) { + cronService = cron; + }, }); registerGatewayStopHook(api, {