import * as fs from 'fs'; import * as path from 'path'; import { EventEmitter } from 'events'; export type AgentState = 'idle' | 'busy' | 'focus' | 'freeze' | 'pre-freeze' | 'pre-freeze-focus'; export interface AgentStatus { agentId: string; state: AgentState; workflow: string | null; activeSessions: string[]; lastSessions: string[]; updatedAt: number; } export interface GlobalStatus { restartScheduledBy: string | null; restartSession: string | null; restartStatus: 'idle' | 'waiting' | 'restarting' | 'rollback'; updatedAt: number; } export interface StatusManagerOptions { dataDir?: string; persistenceInterval?: number; } /** * Manages agent states and global restart status */ export class StatusManager extends EventEmitter { private agents: Map = new Map(); private global: GlobalStatus; private dataDir: string; private persistenceInterval: number; private persistenceTimer: NodeJS.Timeout | null = null; constructor(options: StatusManagerOptions = {}) { super(); this.dataDir = options.dataDir || path.join(process.env.HOME || '.', '.paddedcell'); this.persistenceInterval = options.persistenceInterval || 5000; this.global = { restartScheduledBy: null, restartSession: null, restartStatus: 'idle', updatedAt: Date.now(), }; this.ensureDataDir(); this.loadFromDisk(); this.startPersistence(); } private ensureDataDir(): void { if (!fs.existsSync(this.dataDir)) { fs.mkdirSync(this.dataDir, { recursive: true, mode: 0o700 }); } } private getAgentFilePath(agentId: string): string { return path.join(this.dataDir, `agent_${agentId}.json`); } private getGlobalFilePath(): string { return path.join(this.dataDir, 'global.json'); } private loadFromDisk(): void { // Load global status const globalPath = this.getGlobalFilePath(); if (fs.existsSync(globalPath)) { try { const data = fs.readFileSync(globalPath, 'utf8'); this.global = JSON.parse(data); } catch (err) { console.error('Failed to load global status:', err); } } // Load agent statuses try { const files = fs.readdirSync(this.dataDir); for (const file of files) { if (file.startsWith('agent_') && file.endsWith('.json')) { try { const data = fs.readFileSync(path.join(this.dataDir, file), 'utf8'); const agent = JSON.parse(data) as AgentStatus; this.agents.set(agent.agentId, agent); } catch (err) { console.error(`Failed to load agent status from ${file}:`, err); } } } } catch (err) { console.error('Failed to read data directory:', err); } } private saveToDisk(): void { // Save global status try { fs.writeFileSync( this.getGlobalFilePath(), JSON.stringify(this.global, null, 2), { mode: 0o600 } ); } catch (err) { console.error('Failed to save global status:', err); } // Save agent statuses for (const [agentId, agent] of this.agents) { try { fs.writeFileSync( this.getAgentFilePath(agentId), JSON.stringify(agent, null, 2), { mode: 0o600 } ); } catch (err) { console.error(`Failed to save agent status for ${agentId}:`, err); } } } private startPersistence(): void { this.persistenceTimer = setInterval(() => { this.saveToDisk(); }, this.persistenceInterval); } stopPersistence(): void { if (this.persistenceTimer) { clearInterval(this.persistenceTimer); this.persistenceTimer = null; } this.saveToDisk(); } // Agent state management getOrCreateAgent(agentId: string): AgentStatus { if (!this.agents.has(agentId)) { const agent: AgentStatus = { agentId, state: 'idle', workflow: null, activeSessions: [], lastSessions: [], updatedAt: Date.now(), }; this.agents.set(agentId, agent); this.emit('agentCreated', agent); } return this.agents.get(agentId)!; } getAgent(agentId: string): AgentStatus | undefined { return this.agents.get(agentId); } getAllAgents(): AgentStatus[] { return Array.from(this.agents.values()); } onMessageStart(session: string, agentId: string): void { const agent = this.getOrCreateAgent(agentId); // Don't update state for heartbeat sessions if (this.isHeartbeatSession(session)) { return; } if (agent.state === 'idle') { agent.state = 'busy'; } if (!agent.activeSessions.includes(session)) { agent.activeSessions.push(session); } agent.updatedAt = Date.now(); this.emit('stateChanged', agent); } onMessageEnd(session: string, agentId: string): void { const agent = this.getOrCreateAgent(agentId); // Remove from active sessions agent.activeSessions = agent.activeSessions.filter(s => s !== session); // Add to last sessions if not already there if (!agent.lastSessions.includes(session)) { agent.lastSessions.unshift(session); if (agent.lastSessions.length > 10) { agent.lastSessions = agent.lastSessions.slice(0, 10); } } // State transitions if (agent.activeSessions.length === 0) { if (agent.state === 'busy') { agent.state = 'idle'; } else if (agent.state === 'pre-freeze' || agent.state === 'pre-freeze-focus') { agent.state = 'freeze'; } } agent.updatedAt = Date.now(); this.emit('stateChanged', agent); // Check if all agents are frozen (for restart completion) if (agent.state === 'freeze') { this.checkAllFrozen(); } } setWorkflow(agentId: string, workflow: string | null): void { const agent = this.getOrCreateAgent(agentId); agent.workflow = workflow; if (workflow) { agent.state = 'focus'; } else { // Transition from focus to idle or busy if (agent.activeSessions.length === 0) { agent.state = 'idle'; } else { agent.state = 'busy'; } } agent.updatedAt = Date.now(); this.emit('stateChanged', agent); } isHeartbeatSession(session: string): boolean { // Check if session is a heartbeat session // This can be customized based on naming convention or metadata return session.includes('heartbeat') || session.includes('poll'); } // Query restart logic queryRestart(requesterAgentId: string, requesterSessionKey: string): 'OK' | 'NOT_READY' | 'ALREADY_SCHEDULED' { // Check if restart is already scheduled if (this.global.restartStatus !== 'idle') { // If same agent is requesting, allow continuation if (this.global.restartScheduledBy === requesterAgentId) { return this.allAgentsFrozen() ? 'OK' : 'NOT_READY'; } return 'ALREADY_SCHEDULED'; } // Schedule restart this.global.restartScheduledBy = requesterAgentId; this.global.restartSession = requesterSessionKey; this.global.restartStatus = 'waiting'; this.global.updatedAt = Date.now(); // Transition agents to freeze/pre-freeze states for (const [agentId, agent] of this.agents) { if (agentId === requesterAgentId) { // Don't freeze the requester agent continue; } switch (agent.state) { case 'idle': agent.state = 'freeze'; break; case 'busy': agent.state = 'pre-freeze'; break; case 'focus': agent.state = 'pre-freeze-focus'; // Notify agent to prepare for restart this.emit('preparingRestart', agent); break; } agent.updatedAt = Date.now(); this.emit('stateChanged', agent); } this.saveToDisk(); // Check if all are frozen immediately if (this.allAgentsFrozen()) { return 'OK'; } return 'NOT_READY'; } allAgentsFrozen(): boolean { for (const [agentId, agent] of this.agents) { // Skip the agent that scheduled the restart if (agentId === this.global.restartScheduledBy) { continue; } if (agent.state !== 'freeze') { return false; } } return true; } private checkAllFrozen(): void { if (this.allAgentsFrozen() && this.global.restartStatus === 'waiting') { this.emit('allFrozen'); } } // Restart completion completeRestart(success: boolean, log?: string): void { if (success) { this.global.restartStatus = 'idle'; // Unfreeze all agents for (const agent of this.agents.values()) { if (agent.state === 'freeze') { // Restore previous state from lastSessions agent.state = agent.activeSessions.length > 0 ? 'busy' : 'idle'; agent.updatedAt = Date.now(); this.emit('stateChanged', agent); this.emit('unfrozen', agent); } } this.emit('restartCompleted'); } else { this.global.restartStatus = 'rollback'; this.emit('restartFailed', log); } this.global.restartScheduledBy = null; this.global.restartSession = null; this.global.updatedAt = Date.now(); this.saveToDisk(); } // Global status getters getGlobalStatus(): GlobalStatus { return { ...this.global }; } isRestartScheduled(): boolean { return this.global.restartStatus !== 'idle'; } // For focus mode: check if agent should respond shouldRespond(agentId: string, session: string): boolean { const agent = this.getAgent(agentId); if (!agent) return true; // In focus mode, only respond to workflow sessions if (agent.state === 'focus' || agent.state === 'pre-freeze-focus') { return agent.workflow !== null && session.includes(agent.workflow); } // In freeze/pre-freeze states, don't accept new messages if (agent.state === 'freeze' || agent.state === 'pre-freeze') { return false; } return true; } getBusyMessage(agentId: string): string { const agent = this.getAgent(agentId); if (!agent) return '在忙,无法应答'; switch (agent.state) { case 'focus': case 'pre-freeze-focus': return '当前处于专注模式,无法应答非工作流消息'; case 'freeze': case 'pre-freeze': return '系统正在准备重启,请稍后再试'; case 'busy': return '正在处理其他消息,请稍后再试'; default: return '在忙,无法应答'; } } }