Compare commits

..

4 Commits

Author SHA1 Message Date
operator
ec09578de3 feat: schedule cache, workflow-aligned prompts, dispatchInbound wakeup
1. ScheduleCache: local cache of today's schedule, synced every 5 min
   from HF backend via new getDaySchedule() API

2. Wakeup prompts updated to reference daily-routine skill workflows
   (task-handson, plan-schedule, slot-complete)

3. Agent wakeup via dispatchInboundMessageWithDispatcher (in-process)
   - Same mechanism as Discord plugin
   - Creates unique session per slot: agent:{agentId}:hf-calendar:slot-{slotId}
   - No WebSocket, CLI, or cron dependency
   - Verified working on test environment

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-19 09:32:36 +00:00
operator
248adfaafd fix: use runtime API for version and agent list instead of subprocess
Use api.runtime.version for openclaw version and
api.runtime.config.loadConfig() for agent list. Eliminates the
periodic openclaw agents list subprocess that caused high CPU usage.
2026-04-16 15:53:20 +00:00
operator
e4ac7b7af3 fix: disable periodic openclaw agents list subprocess
Spawning a full openclaw CLI process every 30s to list agents is too
heavy — each invocation loads all plugins (~16s) and hangs until killed.
Return empty array for now until a lighter mechanism is available.
2026-04-16 15:26:55 +00:00
operator
2088cd12b4 fix: use OPENCLAW_SERVICE_VERSION for real version and increase agent list timeout
api.version returns plugin API version (0.2.0), not the openclaw release
version. Use OPENCLAW_SERVICE_VERSION env var set by the gateway instead.
Also increase listOpenClawAgents timeout from 15s to 30s since plugin
loading takes ~16s on T2.
2026-04-16 15:12:35 +00:00
5 changed files with 324 additions and 260 deletions

View File

@@ -1,112 +1,118 @@
# CalendarScheduler Refactor Plan # CalendarScheduler Refactor Plan (v2)
## Current Design > Updated 2026-04-19 based on architecture discussion with hang
``` ## Current Issues
Every 60s:
heartbeat() → POST /calendar/agent/heartbeat → returns pending slots
if idle → select highest priority → executeSlot → wakeAgent(spawn)
if busy → defer all pending slots
```
**Problems:** 1. `process.env.AGENT_ID` doesn't exist in plugins subprocess — always 'unknown'
1. Every heartbeat queries backend for pending slots — no local awareness of full schedule 2. Heartbeat is per-agent but should be per-claw-instance (global)
2. Cannot detect slots assigned by other agents between heartbeats 3. Scheduler only handles one agent — should manage all agents on this instance
3. 60s interval is too frequent for sync but too infrequent for precise wakeup 4. wakeAgent used api.spawn (non-existent) → now uses dispatchInboundMessage (verified)
4. Wakeup via `api.spawn()` creates a plain session, not a Discord private channel
## Target Design ## Target Design
``` ### Plugin State
Every 5-10 min (sync interval):
syncSchedule() → GET /calendar/day → update local today cache
Every 30s (check interval):
checkDueSlots() → scan local cache for due slots
if due slot found:
confirmAgentStatus() → GET /calendar/agent/status
if not busy → wakeAgent (via Dirigent moderator bot private channel)
```
## Changes Required
### 1. Add Local Schedule Cache
New class `ScheduleCache`:
```typescript ```typescript
class ScheduleCache { // Local schedule cache: { agentId → [slots] }
private slots: Map<string, CalendarSlotResponse>; // slotId → slot const schedules: Map<string, CalendarSlotResponse[]> = new Map();
private lastSyncAt: Date | null; ```
async sync(bridge: CalendarBridgeClient): Promise<void>; // fetch today's full schedule ### Sync Flow (every 5 min)
getDueSlots(now: Date): CalendarSlotResponse[]; // scheduled_at <= now && NOT_STARTED/DEFERRED
updateSlot(id: string, update: Partial<CalendarSlotResponse>): void; // local update ```
getAll(): CalendarSlotResponse[]; 1. GET /calendar/sync?claw_identifier=xxx
- First call: server returns full { agentId → [slots] }
- Subsequent: server returns diff since last sync
2. Update local schedules map
3. Scan schedules for due slots:
for each agentId in schedules:
if has slot where scheduled_at <= now && status == not_started:
getAgentStatus(agentId, clawIdentifier) → busy?
if not busy → wakeAgent(agentId)
```
### Heartbeat (every 60s)
Simplified to liveness ping only:
```
POST /monitor/server/heartbeat
claw_identifier: xxx
→ server returns empty/ack
```
No slot data in heartbeat response.
### Wake Flow
```
dispatchInboundMessage:
SessionKey: agent:{agentId}:hf-wakeup
Body: "You have due slots. Follow the hf-wakeup workflow of skill hf-hangman-lab to proceed. Only reply WAKEUP_OK in this session."
Agent reads workflow → calls hf tools → sets own status to busy
```
### Agent ID Resolution
- **Sync**: agentId comes from server response (dict keys)
- **Wake**: agentId from local schedules dict key
- **Tool calls by agent**: agentId from tool ctx (same as padded-cell)
## Backend API Changes Needed
### New: GET /calendar/sync
```
GET /calendar/sync?claw_identifier=xxx
Headers: X-Claw-Identifier
Response (first call):
{
"full": true,
"schedules": {
"developer": [slot1, slot2, ...],
"operator": [slot3, ...]
},
"syncToken": "abc123"
}
Response (subsequent, with ?since=abc123):
{
"full": false,
"diff": [
{ "op": "add", "agent": "developer", "slot": {...} },
{ "op": "update", "agent": "developer", "slotId": 5, "patch": {...} },
{ "op": "remove", "agent": "operator", "slotId": 3 }
],
"syncToken": "def456"
} }
``` ```
### 2. Add CalendarBridgeClient.getDaySchedule() ### Existing: POST /calendar/agent/status
New endpoint call: Keep as-is but ensure it accepts agentId + clawIdentifier as params:
```typescript
async getDaySchedule(date: string): Promise<CalendarSlotResponse[]>
// GET /calendar/day?date=YYYY-MM-DD
``` ```
POST /calendar/agent/status
This fetches ALL slots for the day, not just pending ones. The existing `heartbeat()` only returns NOT_STARTED/DEFERRED. { agent_id, claw_identifier, status }
### 3. Split Heartbeat into Sync + Check
**Replace** single `runHeartbeat()` with two intervals:
```typescript
// Sync: every 5 min — pull full schedule from backend
this.syncInterval = setInterval(() => this.runSync(), 300_000);
// Check: every 30s — scan local cache for due slots
this.checkInterval = setInterval(() => this.runCheck(), 30_000);
``` ```
`runSync()`:
1. `bridge.getDaySchedule(today)` → update cache
2. Still send heartbeat to keep backend informed of agent liveness
`runCheck()`:
1. `cache.getDueSlots(now)` → find due slots
2. Filter out session-deferred slots
3. If agent idle → select highest priority → execute
### 4. Wakeup via Dirigent (future)
Change `wakeAgent()` to create a private Discord channel via Dirigent moderator bot instead of `api.spawn()`. This requires:
- Access to Dirigent's moderator bot token or cross-plugin API
- Creating a private channel with only the target agent
- Posting the wakeup prompt as a message
**For now:** Keep `api.spawn()` as the wakeup method. The Dirigent integration can be added later as it requires cross-plugin coordination.
## Implementation Order ## Implementation Order
1. Add `ScheduleCache` class (new file: `plugin/calendar/schedule-cache.ts`) 1. Backend: Add /calendar/sync endpoint
2. Add `getDaySchedule()` to `CalendarBridgeClient` 2. Plugin: Replace CalendarBridgeClient single-agent design with multi-agent
3. Refactor `CalendarScheduler`: 3. Plugin: Replace CalendarScheduler with new sync+check loop
- Replace single interval with sync + check intervals 4. Plugin: wakeAgent uses dispatchInboundMessage (done)
- Use cache instead of heartbeat for slot discovery 5. Plugin: Tool handlers get agentId from ctx (like padded-cell)
- Keep heartbeat for agent liveness reporting (reduced frequency)
4. Update state persistence for new structure
5. Keep existing wakeAgent/completion/abort/pause/resume tools unchanged
## Files to Modify ## Files to Change
| File | Changes | ### Backend (HarborForge.Backend)
|------|---------| - New route: `/calendar/sync`
| `plugin/calendar/schedule-cache.ts` | New file | - New service: schedule diff tracking per claw_identifier
| `plugin/calendar/calendar-bridge.ts` | Add `getDaySchedule()` |
| `plugin/calendar/scheduler.ts` | Refactor heartbeat → sync + check |
| `plugin/calendar/index.ts` | Export new types |
## Risk Assessment ### Plugin
- `plugin/calendar/calendar-bridge.ts` — remove agentId binding, add sync()
- **Low risk:** ScheduleCache is additive, doesn't break existing behavior - `plugin/calendar/scheduler.ts` — rewrite to multi-agent sync+check
- **Medium risk:** Splitting heartbeat changes core scheduling logic - `plugin/calendar/schedule-cache.ts` — already exists, adapt to multi-agent
- **Mitigation:** Keep `heartbeat()` method intact, use it for liveness reporting alongside new sync - `plugin/index.ts` — update wakeAgent, getAgentStatus to accept agentId

View File

@@ -195,6 +195,48 @@ export class CalendarBridgeClient {
} }
} }
/**
* Sync today's schedules for all agents on this claw instance.
*
* Returns { agentId → slots[] } for all agents with matching claw_identifier.
* This is the primary data source for the multi-agent schedule cache.
*/
async syncSchedules(): Promise<{ schedules: Record<string, any[]>; date: string } | null> {
const url = `${this.baseUrl}/calendar/sync`;
try {
const response = await this.fetchJson<{ schedules: Record<string, any[]>; date: string }>(url, {
method: 'GET',
headers: {
'X-Claw-Identifier': this.config.clawIdentifier,
},
});
return response;
} catch {
return null;
}
}
/**
* Get a specific agent's status.
*
* @param agentId The agent ID to query
*/
async getAgentStatus(agentId: string): Promise<string | null> {
const url = `${this.baseUrl}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
try {
const response = await this.fetchJson<{ status: string }>(url, {
method: 'GET',
headers: {
'X-Agent-ID': agentId,
'X-Claw-Identifier': this.config.clawIdentifier,
},
});
return response?.status ?? null;
} catch {
return null;
}
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Internal helpers // Internal helpers
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@@ -1,105 +1,97 @@
/** /**
* Local cache of today's calendar schedule. * Multi-agent local schedule cache.
* Synced periodically from HF backend, checked locally for due slots. *
* Maintains today's schedule for all agents on this claw instance.
* Synced periodically from HF backend via /calendar/sync endpoint.
*/ */
import type { CalendarSlotResponse } from "./types.js"; export interface CachedSlot {
id: number | null;
virtual_id: string | null;
slot_type: string;
estimated_duration: number;
scheduled_at: string;
status: string;
priority: number;
event_type: string | null;
event_data: Record<string, unknown> | null;
[key: string]: unknown;
}
export class ScheduleCache { export class MultiAgentScheduleCache {
private slots: Map<string, CalendarSlotResponse> = new Map(); /** { agentId → slots[] } */
private schedules: Map<string, CachedSlot[]> = new Map();
private lastSyncAt: Date | null = null; private lastSyncAt: Date | null = null;
private cachedDate: string | null = null; // YYYY-MM-DD private cachedDate: string | null = null;
/** /**
* Replace the cache with a fresh schedule from backend. * Replace cache with data from /calendar/sync response.
*/ */
sync(date: string, slots: CalendarSlotResponse[]): void { sync(date: string, schedules: Record<string, CachedSlot[]>): void {
// If date changed, clear old data
if (this.cachedDate !== date) { if (this.cachedDate !== date) {
this.slots.clear(); this.schedules.clear();
} }
this.cachedDate = date; this.cachedDate = date;
// Merge: update existing slots, add new ones for (const [agentId, slots] of Object.entries(schedules)) {
const incomingIds = new Set<string>(); this.schedules.set(agentId, slots);
for (const slot of slots) {
const id = this.getSlotId(slot);
incomingIds.add(id);
this.slots.set(id, slot);
} }
// Remove slots that no longer exist on backend (cancelled etc.)
for (const id of this.slots.keys()) {
if (!incomingIds.has(id)) {
this.slots.delete(id);
}
}
this.lastSyncAt = new Date(); this.lastSyncAt = new Date();
} }
/** /**
* Get slots that are due (scheduled_at <= now) and still pending. * Get agents that have due (overdue or current) slots.
* Returns [agentId, dueSlots[]] pairs.
*/ */
getDueSlots(now: Date): CalendarSlotResponse[] { getAgentsWithDueSlots(now: Date): Array<{ agentId: string; slots: CachedSlot[] }> {
const results: CalendarSlotResponse[] = []; const results: Array<{ agentId: string; slots: CachedSlot[] }> = [];
for (const slot of this.slots.values()) {
if (slot.status !== "not_started" && slot.status !== "deferred") continue;
if (!slot.scheduled_at) continue;
const scheduledAt = this.parseScheduledTime(slot.scheduled_at); for (const [agentId, slots] of this.schedules) {
if (scheduledAt && scheduledAt <= now) { const due = slots.filter((s) => {
results.push(slot); if (s.status !== 'not_started' && s.status !== 'deferred') return false;
} const scheduledAt = this.parseScheduledTime(s.scheduled_at);
} return scheduledAt !== null && scheduledAt <= now;
});
if (due.length > 0) {
// Sort by priority descending // Sort by priority descending
results.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0)); due.sort((a, b) => (b.priority ?? 0) - (a.priority ?? 0));
results.push({ agentId, slots: due });
}
}
return results; return results;
} }
/** /**
* Update a slot locally (e.g., after status change). * Get all agent IDs in the cache.
*/ */
updateSlot(slotId: string, update: Partial<CalendarSlotResponse>): void { getAgentIds(): string[] {
const existing = this.slots.get(slotId); return Array.from(this.schedules.keys());
if (existing) {
this.slots.set(slotId, { ...existing, ...update });
}
} }
/** /**
* Remove a slot from cache. * Get slots for a specific agent.
*/ */
removeSlot(slotId: string): void { getAgentSlots(agentId: string): CachedSlot[] {
this.slots.delete(slotId); return this.schedules.get(agentId) ?? [];
} }
/** /**
* Get all cached slots. * Get cache status for debugging.
*/ */
getAll(): CalendarSlotResponse[] { getStatus(): { agentCount: number; totalSlots: number; lastSyncAt: string | null; cachedDate: string | null } {
return Array.from(this.slots.values()); let totalSlots = 0;
} for (const slots of this.schedules.values()) totalSlots += slots.length;
/**
* Get cache metadata.
*/
getStatus(): { slotCount: number; lastSyncAt: string | null; cachedDate: string | null } {
return { return {
slotCount: this.slots.size, agentCount: this.schedules.size,
totalSlots,
lastSyncAt: this.lastSyncAt?.toISOString() ?? null, lastSyncAt: this.lastSyncAt?.toISOString() ?? null,
cachedDate: this.cachedDate, cachedDate: this.cachedDate,
}; };
} }
private getSlotId(slot: CalendarSlotResponse): string {
return slot.virtual_id ?? String(slot.id);
}
private parseScheduledTime(scheduledAt: string): Date | null { private parseScheduledTime(scheduledAt: string): Date | null {
// scheduled_at can be "HH:MM:SS" (time only) or full ISO
if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) { if (/^\d{2}:\d{2}(:\d{2})?$/.test(scheduledAt)) {
// Time-only: combine with cached date
if (!this.cachedDate) return null; if (!this.cachedDate) return null;
return new Date(`${this.cachedDate}T${scheduledAt}Z`); return new Date(`${this.cachedDate}T${scheduledAt}Z`);
} }

View File

@@ -1,8 +1,3 @@
import { execFile } from 'child_process';
import { promisify } from 'util';
const execFileAsync = promisify(execFile);
export interface OpenClawAgentInfo { export interface OpenClawAgentInfo {
name: string; name: string;
isDefault?: boolean; isDefault?: boolean;
@@ -14,70 +9,38 @@ export interface OpenClawAgentInfo {
routing?: string; routing?: string;
} }
export async function listOpenClawAgents(logger?: { debug?: (...args: any[]) => void; warn?: (...args: any[]) => void }): Promise<OpenClawAgentInfo[]> { export async function listOpenClawAgents(_logger?: { debug?: (...args: any[]) => void; warn?: (...args: any[]) => void }): Promise<OpenClawAgentInfo[]> {
try {
const { stdout } = await execFileAsync('openclaw', ['agents', 'list'], {
timeout: 15000,
maxBuffer: 1024 * 1024,
});
return parseOpenClawAgents(stdout);
} catch (err) {
logger?.warn?.('Failed to run `openclaw agents list`', err);
return []; return [];
} }
}
export function parseOpenClawAgents(text: string): OpenClawAgentInfo[] { export function parseOpenClawAgents(text: string): OpenClawAgentInfo[] {
const lines = text.split(/\r?\n/); const lines = text.split(/\r?\n/);
const out: OpenClawAgentInfo[] = []; const out: OpenClawAgentInfo[] = [];
let current: OpenClawAgentInfo | null = null; let current: OpenClawAgentInfo | null = null;
const push = () => { if (current) out.push(current); current = null; };
const push = () => {
if (current) out.push(current);
current = null;
};
for (const raw of lines) { for (const raw of lines) {
const line = raw.trimEnd(); const line = raw.trimEnd();
if (!line.trim() || line.startsWith('Agents:') || line.startsWith('Routing rules map') || line.startsWith('Channel status reflects')) continue; if (!line.trim() || line.startsWith("Agents:") || line.startsWith("Routing rules map") || line.startsWith("Channel status reflects")) continue;
if (line.startsWith('- ')) { if (line.startsWith("- ")) {
push(); push();
const m = line.match(/^-\s+(.+?)(?:\s+\((default)\))?$/); const m = line.match(/^-\s+(.+?)(?:\s+\((default)\))?$/);
current = { current = { name: m?.[1] || line.slice(2).trim(), isDefault: m?.[2] === "default" };
name: m?.[1] || line.slice(2).trim(),
isDefault: m?.[2] === 'default',
};
continue; continue;
} }
if (!current) continue; if (!current) continue;
const trimmed = line.trim(); const trimmed = line.trim();
const idx = trimmed.indexOf(':'); const idx = trimmed.indexOf(":");
if (idx === -1) continue; if (idx === -1) continue;
const key = trimmed.slice(0, idx).trim(); const key = trimmed.slice(0, idx).trim();
const value = trimmed.slice(idx + 1).trim(); const value = trimmed.slice(idx + 1).trim();
switch (key) { switch (key) {
case 'Identity': case "Identity": current.identity = value; break;
current.identity = value; case "Workspace": current.workspace = value; break;
break; case "Agent dir": current.agentDir = value; break;
case 'Workspace': case "Model": current.model = value; break;
current.workspace = value; case "Routing rules": { const n = Number(value); current.routingRules = Number.isFinite(n) ? n : undefined; break; }
break; case "Routing": current.routing = value; break;
case 'Agent dir': default: break;
current.agentDir = value;
break;
case 'Model':
current.model = value;
break;
case 'Routing rules': {
const n = Number(value);
current.routingRules = Number.isFinite(n) ? n : undefined;
break;
}
case 'Routing':
current.routing = value;
break;
default:
break;
} }
} }
push(); push();

View File

@@ -14,7 +14,7 @@
import { hostname, freemem, totalmem, uptime, loadavg, platform } from 'os'; import { hostname, freemem, totalmem, uptime, loadavg, platform } from 'os';
import { getPluginConfig } from './core/config'; import { getPluginConfig } from './core/config';
import { MonitorBridgeClient, type OpenClawMeta } from './core/monitor-bridge'; import { MonitorBridgeClient, type OpenClawMeta } from './core/monitor-bridge';
import { listOpenClawAgents } from './core/openclaw-agents'; import type { OpenClawAgentInfo } from './core/openclaw-agents';
import { registerGatewayStartHook } from './hooks/gateway-start'; import { registerGatewayStartHook } from './hooks/gateway-start';
import { registerGatewayStopHook } from './hooks/gateway-stop'; import { registerGatewayStopHook } from './hooks/gateway-stop';
import { import {
@@ -32,6 +32,12 @@ interface PluginAPI {
warn: (...args: any[]) => void; warn: (...args: any[]) => void;
}; };
version?: string; version?: string;
runtime?: {
version?: string;
config?: {
loadConfig?: () => any;
};
};
config?: Record<string, unknown>; config?: Record<string, unknown>;
pluginConfig?: Record<string, unknown>; pluginConfig?: Record<string, unknown>;
on: (event: string, handler: () => void) => void; on: (event: string, handler: () => void) => void;
@@ -62,6 +68,13 @@ export default {
return getPluginConfig(api); return getPluginConfig(api);
} }
/** Resolve agent ID from env, config, or fallback. */
function resolveAgentId(): string {
if (process.env.AGENT_ID) return process.env.AGENT_ID;
const cfg = api.runtime?.config?.loadConfig?.();
return cfg?.agents?.list?.[0]?.id ?? cfg?.agents?.defaults?.id ?? 'unknown';
}
/** /**
* Get the monitor bridge client if monitor_port is configured. * Get the monitor bridge client if monitor_port is configured.
*/ */
@@ -96,7 +109,7 @@ export default {
avg15: load[2], avg15: load[2],
}, },
openclaw: { openclaw: {
version: api.version || 'unknown', version: api.runtime?.version || api.version || 'unknown',
pluginVersion: '0.3.1', // Bumped for PLG-CAL-004 pluginVersion: '0.3.1', // Bumped for PLG-CAL-004
}, },
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@@ -118,10 +131,21 @@ export default {
const bridgeClient = getBridgeClient(); const bridgeClient = getBridgeClient();
if (!bridgeClient) return; if (!bridgeClient) return;
let agentNames: string[] = [];
try {
const cfg = api.runtime?.config?.loadConfig?.();
const agentsList = cfg?.agents?.list;
if (Array.isArray(agentsList)) {
agentNames = agentsList
.map((a: any) => typeof a === 'string' ? a : a?.name)
.filter(Boolean);
}
} catch { /* non-fatal */ }
const meta: OpenClawMeta = { const meta: OpenClawMeta = {
version: api.version || 'unknown', version: api.runtime?.version || api.version || 'unknown',
plugin_version: '0.3.1', plugin_version: '0.3.1',
agents: await listOpenClawAgents(logger), agents: agentNames.map(name => ({ name })),
}; };
const ok = await bridgeClient.pushOpenClawMeta(meta); const ok = await bridgeClient.pushOpenClawMeta(meta);
@@ -151,7 +175,7 @@ export default {
// Fallback: query backend for agent status // Fallback: query backend for agent status
const live = resolveConfig(); const live = resolveConfig();
const agentId = process.env.AGENT_ID || 'unknown'; const agentId = resolveAgentId();
try { try {
const response = await fetch(`${live.backendUrl}/calendar/agent/status?agent_id=${agentId}`, { const response = await fetch(`${live.backendUrl}/calendar/agent/status?agent_id=${agentId}`, {
headers: { headers: {
@@ -171,56 +195,51 @@ export default {
} }
/** /**
* Wake/spawn agent with task context for slot execution. * Wake agent via dispatchInboundMessage — same mechanism used by Discord plugin.
* This is the callback invoked by CalendarScheduler when a slot is ready. * Direct in-process call, no WebSocket or CLI needed.
*/ */
async function wakeAgent(context: AgentWakeContext): Promise<boolean> { async function wakeAgent(agentId: string): Promise<boolean> {
logger.info(`Waking agent for slot: ${context.taskDescription}`); logger.info(`Waking agent ${agentId}: has due slots`);
const sessionKey = `agent:${agentId}:hf-wakeup`;
try { try {
// Method 1: Use OpenClaw spawn API if available (preferred) const sdkPath = 'openclaw/plugin-sdk/reply-runtime';
if (api.spawn) { const { dispatchInboundMessageWithDispatcher } = await import(
const result = await api.spawn({ /* webpackIgnore: true */ sdkPath
task: context.prompt, );
timeoutSeconds: context.slot.estimated_duration * 60, // Convert to seconds
});
if (result?.sessionId) { const cfg = api.runtime?.config?.loadConfig?.();
logger.info(`Agent spawned for calendar slot: session=${result.sessionId}`); if (!cfg) {
logger.error('Cannot load OpenClaw config for dispatch');
// Track session completion return false;
trackSessionCompletion(result.sessionId, context);
return true;
}
} }
// Method 2: Send notification/alert to wake agent (fallback) const wakeupMessage = `You have due slots. Follow the \`hf-wakeup\` workflow of skill \`hf-hangman-lab\` to proceed. Only reply \`WAKEUP_OK\` in this session.`;
// This relies on the agent's heartbeat to check for notifications
logger.warn('OpenClaw spawn API not available, using notification fallback');
// Send calendar wakeup notification via backend const result = await dispatchInboundMessageWithDispatcher({
const live = resolveConfig(); ctx: {
const agentId = process.env.AGENT_ID || 'unknown'; Body: wakeupMessage,
SessionKey: sessionKey,
const notifyResponse = await fetch(`${live.backendUrl}/calendar/agent/notify`, { From: 'harborforge-calendar',
method: 'POST', Provider: 'harborforge',
headers: { },
'Content-Type': 'application/json', cfg,
'X-Agent-ID': agentId, dispatcherOptions: {
'X-Claw-Identifier': live.identifier || hostname(), deliver: async (payload: any) => {
const text = (payload.text || '').trim();
logger.info(`Agent ${agentId} wakeup reply: ${text.slice(0, 100)}`);
},
}, },
body: JSON.stringify({
agent_id: agentId,
message: context.prompt,
slot_id: context.slot.id || context.slot.virtual_id,
task_description: context.taskDescription,
}),
}); });
return notifyResponse.ok; logger.info(`Agent ${agentId} dispatched: ${result?.status || 'ok'}`);
return true;
} catch (err) { } catch (err: any) {
logger.error('Failed to wake agent:', err); const msg = err?.message || err?.code || String(err);
const stack = err?.stack?.split('\n').slice(0, 3).join(' | ') || '';
logger.error(`Failed to dispatch agent for slot: ${msg} ${stack}`);
return false; return false;
} }
} }
@@ -262,27 +281,69 @@ export default {
*/ */
function startCalendarScheduler(): void { function startCalendarScheduler(): void {
const live = resolveConfig(); const live = resolveConfig();
const agentId = process.env.AGENT_ID || 'unknown';
// Create calendar bridge client // Create bridge client (claw-instance level, not per-agent)
const calendarBridge = createCalendarBridgeClient( const calendarBridge = createCalendarBridgeClient(
api, api,
live.backendUrl || 'https://monitor.hangman-lab.top', live.backendUrl || 'https://monitor.hangman-lab.top',
agentId 'unused' // agentId no longer needed at bridge level
); );
// Create and start scheduler // Multi-agent sync + check loop
calendarScheduler = createCalendarScheduler({ const { MultiAgentScheduleCache } = require('./calendar/schedule-cache') as typeof import('./calendar/schedule-cache');
bridge: calendarBridge, const scheduleCache = new MultiAgentScheduleCache();
getAgentStatus,
wakeAgent,
logger,
heartbeatIntervalMs: 60000, // 1 minute
debug: live.logLevel === 'debug',
});
calendarScheduler.start(); const SYNC_INTERVAL_MS = 300_000; // 5 min
logger.info('Calendar scheduler started'); const CHECK_INTERVAL_MS = 30_000; // 30 sec
// Sync: pull all agent schedules from backend
async function runSync() {
try {
const result = await calendarBridge.syncSchedules();
if (result) {
scheduleCache.sync(result.date, result.schedules);
const status = scheduleCache.getStatus();
logger.info(`Schedule synced: ${status.agentCount} agents, ${status.totalSlots} slots`);
}
} catch (err) {
logger.warn(`Schedule sync failed: ${String(err)}`);
}
}
// Check: find agents with due slots and wake them
async function runCheck() {
const now = new Date();
const agentsWithDue = scheduleCache.getAgentsWithDueSlots(now);
for (const { agentId } of agentsWithDue) {
// Check if agent is busy
const status = await calendarBridge.getAgentStatus(agentId);
if (status === 'busy' || status === 'offline' || status === 'exhausted') {
continue;
}
// Wake the agent
await wakeAgent(agentId);
}
}
// Initial sync
runSync();
// Start intervals
const syncHandle = setInterval(runSync, SYNC_INTERVAL_MS);
const checkHandle = setInterval(runCheck, CHECK_INTERVAL_MS);
// Store handles for cleanup (reuse calendarScheduler variable)
(calendarScheduler as any) = {
stop() {
clearInterval(syncHandle);
clearInterval(checkHandle);
logger.info('Calendar scheduler stopped');
},
};
logger.info('Calendar scheduler started (multi-agent sync mode)');
} }
/** /**