feat: wake agent via cron service instead of WebSocket/spawn
Use the same mechanism as dreaming/memory-core: acquire cron service from gateway_start event, create one-shot cron job with wakeMode: "now" and deleteAfterRun: true. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,10 +7,20 @@ export function registerGatewayStartHook(api: any, deps: {
|
|||||||
pushMetaToMonitor: () => Promise<void>;
|
pushMetaToMonitor: () => Promise<void>;
|
||||||
startCalendarScheduler: () => void;
|
startCalendarScheduler: () => void;
|
||||||
setMetaPushInterval: (handle: ReturnType<typeof setInterval>) => void;
|
setMetaPushInterval: (handle: ReturnType<typeof setInterval>) => void;
|
||||||
|
onCronServiceAvailable?: (cron: any) => void;
|
||||||
}) {
|
}) {
|
||||||
const { logger, pushMetaToMonitor, startCalendarScheduler, setMetaPushInterval } = deps;
|
const { logger, pushMetaToMonitor, startCalendarScheduler, setMetaPushInterval } = deps;
|
||||||
|
|
||||||
api.on('gateway_start', () => {
|
api.on('gateway_start', (event?: any) => {
|
||||||
|
// Extract cron service from gateway startup event (same as memory-core dreaming)
|
||||||
|
if (event && deps.onCronServiceAvailable) {
|
||||||
|
const context = event?.context;
|
||||||
|
const cron = context?.cron ?? context?.deps?.cron;
|
||||||
|
if (cron && typeof cron.add === 'function') {
|
||||||
|
deps.onCronServiceAvailable(cron);
|
||||||
|
logger.info('HarborForge: cron service acquired from gateway_start');
|
||||||
|
}
|
||||||
|
}
|
||||||
logger.info('HarborForge plugin active');
|
logger.info('HarborForge plugin active');
|
||||||
const live = getPluginConfig(api);
|
const live = getPluginConfig(api);
|
||||||
|
|
||||||
|
|||||||
115
plugin/index.ts
115
plugin/index.ts
@@ -187,100 +187,46 @@ export default {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cron service reference, acquired from gateway_start event
|
||||||
|
let cronService: any = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wake agent via gateway WebSocket API.
|
* Wake agent via cron service — same mechanism used by dreaming/memory-core.
|
||||||
* Uses callGateway("agent") to trigger an agent turn — the same mechanism
|
* Creates a one-shot cron job with wakeMode: "now" and deleteAfterRun: true.
|
||||||
* used by sessions_spawn and cron internally.
|
|
||||||
*/
|
*/
|
||||||
async function wakeAgent(context: AgentWakeContext): Promise<boolean> {
|
async function wakeAgent(context: AgentWakeContext): Promise<boolean> {
|
||||||
logger.info(`Waking agent for slot: ${context.taskDescription}`);
|
logger.info(`Waking agent for slot: ${context.taskDescription}`);
|
||||||
|
|
||||||
const agentId = process.env.AGENT_ID || 'unknown';
|
if (!cronService) {
|
||||||
|
logger.error('Cron service not available — cannot wake agent');
|
||||||
try {
|
|
||||||
// Connect to gateway via WebSocket and trigger an agent turn.
|
|
||||||
// Uses the same gateway RPC that sessions_spawn and cron use internally.
|
|
||||||
const cfg = api.runtime?.config?.loadConfig?.() ?? {};
|
|
||||||
const gwCfg = cfg.gateway ?? {};
|
|
||||||
const gwPort = gwCfg.port ?? 18789;
|
|
||||||
const gwToken = gwCfg.auth?.token ?? '';
|
|
||||||
const gatewayUrl = `ws://127.0.0.1:${gwPort}`;
|
|
||||||
|
|
||||||
const result = await new Promise<{ sessionId?: string; error?: string }>((resolve, reject) => {
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
try { client.close(); } catch {}
|
|
||||||
reject(new Error('Gateway connection timeout'));
|
|
||||||
}, 15000);
|
|
||||||
|
|
||||||
// Pass token via Authorization header in the upgrade request
|
|
||||||
const wsUrl = gwToken ? `${gatewayUrl}?token=${encodeURIComponent(gwToken)}` : gatewayUrl;
|
|
||||||
const client = new (globalThis as any).WebSocket(wsUrl);
|
|
||||||
|
|
||||||
client.onerror = (err: any) => {
|
|
||||||
clearTimeout(timeout);
|
|
||||||
reject(err?.error || err);
|
|
||||||
};
|
|
||||||
|
|
||||||
client.onopen = () => {
|
|
||||||
// Gateway uses "connect" method with auth in params
|
|
||||||
client.send(JSON.stringify({
|
|
||||||
jsonrpc: '2.0',
|
|
||||||
method: 'connect',
|
|
||||||
params: {
|
|
||||||
clientName: 'harbor-forge-calendar',
|
|
||||||
mode: 'backend',
|
|
||||||
...(gwToken ? { auth: { token: gwToken } } : {}),
|
|
||||||
},
|
|
||||||
id: 1,
|
|
||||||
}));
|
|
||||||
};
|
|
||||||
|
|
||||||
let helloAcked = false;
|
|
||||||
|
|
||||||
client.onmessage = (ev: any) => {
|
|
||||||
try {
|
|
||||||
const msg = JSON.parse(typeof ev.data === 'string' ? ev.data : ev.data.toString());
|
|
||||||
|
|
||||||
if (!helloAcked && msg.id === 1) {
|
|
||||||
helloAcked = true;
|
|
||||||
client.send(JSON.stringify({
|
|
||||||
jsonrpc: '2.0',
|
|
||||||
method: 'agent',
|
|
||||||
params: {
|
|
||||||
message: context.prompt,
|
|
||||||
agentId,
|
|
||||||
timeoutSeconds: context.slot.estimated_duration * 60,
|
|
||||||
},
|
|
||||||
id: 2,
|
|
||||||
}));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg.id === 2) {
|
|
||||||
clearTimeout(timeout);
|
|
||||||
try { client.close(); } catch {}
|
|
||||||
if (msg.error) {
|
|
||||||
resolve({ error: msg.error.message || JSON.stringify(msg.error) });
|
|
||||||
} else {
|
|
||||||
resolve({ sessionId: msg.result?.sessionId || 'ok' });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// ignore parse errors
|
|
||||||
}
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
if (result.error) {
|
|
||||||
logger.error(`Gateway agent call failed: ${result.error}`);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info(`Agent woken via gateway for slot: session=${result.sessionId}`);
|
const agentId = process.env.AGENT_ID || 'unknown';
|
||||||
|
const slotId = context.slot.id ?? context.slot.virtual_id ?? 'unknown';
|
||||||
|
|
||||||
|
try {
|
||||||
|
await cronService.add({
|
||||||
|
name: `hf-calendar-slot-${slotId}`,
|
||||||
|
description: `[managed-by=harbor-forge.calendar] Slot ${slotId}: ${context.taskDescription}`,
|
||||||
|
deleteAfterRun: true,
|
||||||
|
wakeMode: 'now',
|
||||||
|
agentId,
|
||||||
|
payload: {
|
||||||
|
kind: 'agentTurn',
|
||||||
|
message: context.prompt,
|
||||||
|
timeoutSeconds: context.slot.estimated_duration * 60,
|
||||||
|
},
|
||||||
|
delivery: {
|
||||||
|
mode: 'none',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
logger.info(`Agent wake cron job created for slot ${slotId}`);
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error('Failed to wake agent via gateway:', err);
|
logger.error('Failed to create wake cron job:', err);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -363,6 +309,9 @@ export default {
|
|||||||
setMetaPushInterval(handle) {
|
setMetaPushInterval(handle) {
|
||||||
metaPushInterval = handle;
|
metaPushInterval = handle;
|
||||||
},
|
},
|
||||||
|
onCronServiceAvailable(cron) {
|
||||||
|
cronService = cron;
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
registerGatewayStopHook(api, {
|
registerGatewayStopHook(api, {
|
||||||
|
|||||||
Reference in New Issue
Block a user