/** * Turn Manager (v2) * * Per-channel state machine governing who speaks when. * Called from before_model_resolve (check turn) and agent_end (advance turn). */ export type SpeakerEntry = { agentId: string; discordUserId: string; }; type ChannelTurnState = { speakerList: SpeakerEntry[]; currentIndex: number; /** Tracks which agents sent empty turns in the current cycle. */ emptyThisCycle: Set; /** Tracks which agents completed a turn at all this cycle. */ completedThisCycle: Set; dormant: boolean; /** Discord message ID recorded at before_model_resolve, used as poll anchor. */ anchorMessageId: Map; // agentId → messageId }; /** * All mutable state is stored on globalThis so it persists across VM-context * hot-reloads within the same gateway process. OpenClaw re-imports this module * in a fresh isolated VM context on each reload, but all contexts share the real * globalThis object because they run in the same Node.js process. */ const _G = globalThis as Record; function channelStates(): Map { if (!(_G._tmChannelStates instanceof Map)) _G._tmChannelStates = new Map(); return _G._tmChannelStates as Map; } function pendingTurns(): Set { if (!(_G._tmPendingTurns instanceof Set)) _G._tmPendingTurns = new Set(); return _G._tmPendingTurns as Set; } function blockedPendingCounts(): Map { if (!(_G._tmBlockedPendingCounts instanceof Map)) _G._tmBlockedPendingCounts = new Map(); return _G._tmBlockedPendingCounts as Map; } /** * Shared initialization lock: prevents multiple concurrent VM contexts from * simultaneously initializing the same channel's speaker list. * Used by both before_model_resolve and message_received hooks. */ export function getInitializingChannels(): Set { if (!(_G._tmInitializingChannels instanceof Set)) _G._tmInitializingChannels = new Set(); return _G._tmInitializingChannels as Set; } /** * Maximum blocked-pending entries tracked per agent per channel. * Caps the drain time: in a busy channel many messages can arrive during a non-speaker * turn, each incrementing the counter. Without a cap the counter grows unboundedly. * Also applied retroactively in markTurnStarted to recover from accumulated debt. */ const MAX_BLOCKED_PENDING = 3; export function markTurnStarted(channelId: string, agentId: string): void { pendingTurns().add(`${channelId}:${agentId}`); // Cap existing blocked-pending at MAX to recover from accumulated debt // (can occur when many messages arrive during a long non-speaker period). const bpc = blockedPendingCounts(); const key = `${channelId}:${agentId}`; const current = bpc.get(key) ?? 0; if (current > MAX_BLOCKED_PENDING) bpc.set(key, MAX_BLOCKED_PENDING); } export function isTurnPending(channelId: string, agentId: string): boolean { return pendingTurns().has(`${channelId}:${agentId}`); } export function clearTurnPending(channelId: string, agentId: string): void { pendingTurns().delete(`${channelId}:${agentId}`); } /** * Counts NO_REPLY completions currently in-flight for an agent that was * blocked (non-speaker or init-suppressed). These completions take ~10s to * arrive (history-building overhead) and may arrive after markTurnStarted, * causing false empty-turn detection. We count them and skip one per agent_end * until the count reaches zero, at which point the next agent_end is real. */ export function incrementBlockedPending(channelId: string, agentId: string): void { const bpc = blockedPendingCounts(); const key = `${channelId}:${agentId}`; const current = bpc.get(key) ?? 0; if (current < MAX_BLOCKED_PENDING) bpc.set(key, current + 1); } /** Returns true (and decrements) if this agent_end should be treated as a stale blocked completion. */ export function consumeBlockedPending(channelId: string, agentId: string): boolean { const bpc = blockedPendingCounts(); const key = `${channelId}:${agentId}`; const count = bpc.get(key) ?? 0; if (count <= 0) return false; bpc.set(key, count - 1); return true; } export function resetBlockedPending(channelId: string, agentId: string): void { blockedPendingCounts().delete(`${channelId}:${agentId}`); } function getState(channelId: string): ChannelTurnState | undefined { return channelStates().get(channelId); } function ensureState(channelId: string): ChannelTurnState { const cs = channelStates(); let s = cs.get(channelId); if (!s) { s = { speakerList: [], currentIndex: 0, emptyThisCycle: new Set(), completedThisCycle: new Set(), dormant: false, anchorMessageId: new Map(), }; cs.set(channelId, s); } return s; } /** Replace the speaker list (called at cycle boundaries and on init). */ export function setSpeakerList(channelId: string, speakers: SpeakerEntry[]): void { const s = ensureState(channelId); s.speakerList = speakers; s.currentIndex = 0; } /** Get the currently active speaker, or null if dormant / list empty. */ export function getCurrentSpeaker(channelId: string): SpeakerEntry | null { const s = getState(channelId); if (!s || s.dormant || s.speakerList.length === 0) return null; return s.speakerList[s.currentIndex] ?? null; } /** Check if a given agentId is the current speaker. */ export function isCurrentSpeaker(channelId: string, agentId: string): boolean { const speaker = getCurrentSpeaker(channelId); return speaker?.agentId === agentId; } /** Record the Discord anchor message ID for an agent's upcoming turn. */ export function setAnchor(channelId: string, agentId: string, messageId: string): void { const s = ensureState(channelId); s.anchorMessageId.set(agentId, messageId); } export function getAnchor(channelId: string, agentId: string): string | undefined { return getState(channelId)?.anchorMessageId.get(agentId); } /** * Advance the speaker after a turn completes. * Returns the new current speaker (or null if dormant). * * @param isEmpty - whether the completed turn was an empty turn * @param rebuildFn - async function that fetches current Discord members and * returns a new SpeakerEntry[]. Called at cycle boundaries. * @param previousLastAgentId - for shuffle mode: the last speaker of the * previous cycle (cannot become the new first speaker). */ export async function advanceSpeaker( channelId: string, agentId: string, isEmpty: boolean, rebuildFn: () => Promise, previousLastAgentId?: string, ): Promise<{ next: SpeakerEntry | null; enteredDormant: boolean }> { const s = ensureState(channelId); // Record this turn s.completedThisCycle.add(agentId); if (isEmpty) s.emptyThisCycle.add(agentId); const wasLastInCycle = s.currentIndex >= s.speakerList.length - 1; if (!wasLastInCycle) { // Middle of cycle — just advance pointer s.currentIndex++; s.dormant = false; return { next: s.speakerList[s.currentIndex] ?? null, enteredDormant: false }; } // === Cycle boundary === const newSpeakers = await rebuildFn(); const previousAgentIds = new Set(s.speakerList.map((sp) => sp.agentId)); const hasNewAgents = newSpeakers.some((sp) => !previousAgentIds.has(sp.agentId)); const allEmpty = s.completedThisCycle.size > 0 && [...s.completedThisCycle].every((id) => s.emptyThisCycle.has(id)); // Reset cycle tracking s.emptyThisCycle = new Set(); s.completedThisCycle = new Set(); if (allEmpty && !hasNewAgents) { // Enter dormant s.speakerList = newSpeakers; s.currentIndex = 0; s.dormant = true; return { next: null, enteredDormant: true }; } // Continue with updated list (apply shuffle if caller provides previousLastAgentId) s.speakerList = previousLastAgentId != null ? shuffleList(newSpeakers, previousLastAgentId) : newSpeakers; s.currentIndex = 0; s.dormant = false; return { next: s.speakerList[0] ?? null, enteredDormant: false }; } /** * Wake the channel from dormant. * Returns the new first speaker. */ export function wakeFromDormant(channelId: string): SpeakerEntry | null { const s = getState(channelId); if (!s) return null; s.dormant = false; s.currentIndex = 0; s.emptyThisCycle = new Set(); s.completedThisCycle = new Set(); return s.speakerList[0] ?? null; } export function isDormant(channelId: string): boolean { return getState(channelId)?.dormant ?? false; } export function hasSpeakers(channelId: string): boolean { const s = getState(channelId); return (s?.speakerList.length ?? 0) > 0; } /** * Shuffle a speaker list. Constraint: previousLastAgentId cannot be new first speaker. */ export function shuffleList(list: SpeakerEntry[], previousLastAgentId?: string): SpeakerEntry[] { if (list.length <= 1) return list; const arr = [...list]; for (let i = arr.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); [arr[i], arr[j]] = [arr[j], arr[i]]; } if (previousLastAgentId && arr[0].agentId === previousLastAgentId && arr.length > 1) { const swapIdx = 1 + Math.floor(Math.random() * (arr.length - 1)); [arr[0], arr[swapIdx]] = [arr[swapIdx], arr[0]]; } return arr; } export function getDebugInfo(channelId: string) { const s = getState(channelId); if (!s) return { exists: false }; return { exists: true, speakerList: s.speakerList.map((sp) => sp.agentId), currentIndex: s.currentIndex, currentSpeaker: s.speakerList[s.currentIndex]?.agentId ?? null, dormant: s.dormant, emptyThisCycle: [...s.emptyThisCycle], completedThisCycle: [...s.completedThisCycle], }; } /** Remove a channel's turn state entirely (e.g. when archived). */ export function clearChannel(channelId: string): void { channelStates().delete(channelId); }