Files
Dirigent/plugin/turn-manager.ts
hzhang c40b756bec fix: cap blocked-pending counter to prevent unbounded drain loops
In busy channels, many messages arrive during a non-speaker turn,
each incrementing the blocked-pending counter. Without a cap the
counter grows faster than it drains, causing the speaker to spin
indefinitely consuming NO_REPLY completions.

Cap at MAX_BLOCKED_PENDING=3 in both incrementBlockedPending and
markTurnStarted (retroactive cap to recover from accumulated debt).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 08:11:44 +01:00

285 lines
9.7 KiB
TypeScript

/**
* Turn Manager (v2)
*
* Per-channel state machine governing who speaks when.
* Called from before_model_resolve (check turn) and agent_end (advance turn).
*/
export type SpeakerEntry = {
agentId: string;
discordUserId: string;
};
type ChannelTurnState = {
speakerList: SpeakerEntry[];
currentIndex: number;
/** Tracks which agents sent empty turns in the current cycle. */
emptyThisCycle: Set<string>;
/** Tracks which agents completed a turn at all this cycle. */
completedThisCycle: Set<string>;
dormant: boolean;
/** Discord message ID recorded at before_model_resolve, used as poll anchor. */
anchorMessageId: Map<string, string>; // agentId → messageId
};
/**
* All mutable state is stored on globalThis so it persists across VM-context
* hot-reloads within the same gateway process. OpenClaw re-imports this module
* in a fresh isolated VM context on each reload, but all contexts share the real
* globalThis object because they run in the same Node.js process.
*/
const _G = globalThis as Record<string, unknown>;
function channelStates(): Map<string, ChannelTurnState> {
if (!(_G._tmChannelStates instanceof Map)) _G._tmChannelStates = new Map<string, ChannelTurnState>();
return _G._tmChannelStates as Map<string, ChannelTurnState>;
}
function pendingTurns(): Set<string> {
if (!(_G._tmPendingTurns instanceof Set)) _G._tmPendingTurns = new Set<string>();
return _G._tmPendingTurns as Set<string>;
}
function blockedPendingCounts(): Map<string, number> {
if (!(_G._tmBlockedPendingCounts instanceof Map)) _G._tmBlockedPendingCounts = new Map<string, number>();
return _G._tmBlockedPendingCounts as Map<string, number>;
}
/**
* Shared initialization lock: prevents multiple concurrent VM contexts from
* simultaneously initializing the same channel's speaker list.
* Used by both before_model_resolve and message_received hooks.
*/
export function getInitializingChannels(): Set<string> {
if (!(_G._tmInitializingChannels instanceof Set)) _G._tmInitializingChannels = new Set<string>();
return _G._tmInitializingChannels as Set<string>;
}
/**
* Maximum blocked-pending entries tracked per agent per channel.
* Caps the drain time: in a busy channel many messages can arrive during a non-speaker
* turn, each incrementing the counter. Without a cap the counter grows unboundedly.
* Also applied retroactively in markTurnStarted to recover from accumulated debt.
*/
const MAX_BLOCKED_PENDING = 3;
export function markTurnStarted(channelId: string, agentId: string): void {
pendingTurns().add(`${channelId}:${agentId}`);
// Cap existing blocked-pending at MAX to recover from accumulated debt
// (can occur when many messages arrive during a long non-speaker period).
const bpc = blockedPendingCounts();
const key = `${channelId}:${agentId}`;
const current = bpc.get(key) ?? 0;
if (current > MAX_BLOCKED_PENDING) bpc.set(key, MAX_BLOCKED_PENDING);
}
export function isTurnPending(channelId: string, agentId: string): boolean {
return pendingTurns().has(`${channelId}:${agentId}`);
}
export function clearTurnPending(channelId: string, agentId: string): void {
pendingTurns().delete(`${channelId}:${agentId}`);
}
/**
* Counts NO_REPLY completions currently in-flight for an agent that was
* blocked (non-speaker or init-suppressed). These completions take ~10s to
* arrive (history-building overhead) and may arrive after markTurnStarted,
* causing false empty-turn detection. We count them and skip one per agent_end
* until the count reaches zero, at which point the next agent_end is real.
*/
export function incrementBlockedPending(channelId: string, agentId: string): void {
const bpc = blockedPendingCounts();
const key = `${channelId}:${agentId}`;
const current = bpc.get(key) ?? 0;
if (current < MAX_BLOCKED_PENDING) bpc.set(key, current + 1);
}
/** Returns true (and decrements) if this agent_end should be treated as a stale blocked completion. */
export function consumeBlockedPending(channelId: string, agentId: string): boolean {
const bpc = blockedPendingCounts();
const key = `${channelId}:${agentId}`;
const count = bpc.get(key) ?? 0;
if (count <= 0) return false;
bpc.set(key, count - 1);
return true;
}
export function resetBlockedPending(channelId: string, agentId: string): void {
blockedPendingCounts().delete(`${channelId}:${agentId}`);
}
function getState(channelId: string): ChannelTurnState | undefined {
return channelStates().get(channelId);
}
function ensureState(channelId: string): ChannelTurnState {
const cs = channelStates();
let s = cs.get(channelId);
if (!s) {
s = {
speakerList: [],
currentIndex: 0,
emptyThisCycle: new Set(),
completedThisCycle: new Set(),
dormant: false,
anchorMessageId: new Map(),
};
cs.set(channelId, s);
}
return s;
}
/** Replace the speaker list (called at cycle boundaries and on init). */
export function setSpeakerList(channelId: string, speakers: SpeakerEntry[]): void {
const s = ensureState(channelId);
s.speakerList = speakers;
s.currentIndex = 0;
}
/** Get the currently active speaker, or null if dormant / list empty. */
export function getCurrentSpeaker(channelId: string): SpeakerEntry | null {
const s = getState(channelId);
if (!s || s.dormant || s.speakerList.length === 0) return null;
return s.speakerList[s.currentIndex] ?? null;
}
/** Check if a given agentId is the current speaker. */
export function isCurrentSpeaker(channelId: string, agentId: string): boolean {
const speaker = getCurrentSpeaker(channelId);
return speaker?.agentId === agentId;
}
/** Record the Discord anchor message ID for an agent's upcoming turn. */
export function setAnchor(channelId: string, agentId: string, messageId: string): void {
const s = ensureState(channelId);
s.anchorMessageId.set(agentId, messageId);
}
export function getAnchor(channelId: string, agentId: string): string | undefined {
return getState(channelId)?.anchorMessageId.get(agentId);
}
/**
* Advance the speaker after a turn completes.
* Returns the new current speaker (or null if dormant).
*
* @param isEmpty - whether the completed turn was an empty turn
* @param rebuildFn - async function that fetches current Discord members and
* returns a new SpeakerEntry[]. Called at cycle boundaries.
* @param previousLastAgentId - for shuffle mode: the last speaker of the
* previous cycle (cannot become the new first speaker).
*/
export async function advanceSpeaker(
channelId: string,
agentId: string,
isEmpty: boolean,
rebuildFn: () => Promise<SpeakerEntry[]>,
previousLastAgentId?: string,
): Promise<{ next: SpeakerEntry | null; enteredDormant: boolean }> {
const s = ensureState(channelId);
// Record this turn
s.completedThisCycle.add(agentId);
if (isEmpty) s.emptyThisCycle.add(agentId);
const wasLastInCycle = s.currentIndex >= s.speakerList.length - 1;
if (!wasLastInCycle) {
// Middle of cycle — just advance pointer
s.currentIndex++;
s.dormant = false;
return { next: s.speakerList[s.currentIndex] ?? null, enteredDormant: false };
}
// === Cycle boundary ===
const newSpeakers = await rebuildFn();
const previousAgentIds = new Set(s.speakerList.map((sp) => sp.agentId));
const hasNewAgents = newSpeakers.some((sp) => !previousAgentIds.has(sp.agentId));
const allEmpty =
s.completedThisCycle.size > 0 &&
[...s.completedThisCycle].every((id) => s.emptyThisCycle.has(id));
// Reset cycle tracking
s.emptyThisCycle = new Set();
s.completedThisCycle = new Set();
if (allEmpty && !hasNewAgents) {
// Enter dormant
s.speakerList = newSpeakers;
s.currentIndex = 0;
s.dormant = true;
return { next: null, enteredDormant: true };
}
// Continue with updated list (apply shuffle if caller provides previousLastAgentId)
s.speakerList = previousLastAgentId != null
? shuffleList(newSpeakers, previousLastAgentId)
: newSpeakers;
s.currentIndex = 0;
s.dormant = false;
return { next: s.speakerList[0] ?? null, enteredDormant: false };
}
/**
* Wake the channel from dormant.
* Returns the new first speaker.
*/
export function wakeFromDormant(channelId: string): SpeakerEntry | null {
const s = getState(channelId);
if (!s) return null;
s.dormant = false;
s.currentIndex = 0;
s.emptyThisCycle = new Set();
s.completedThisCycle = new Set();
return s.speakerList[0] ?? null;
}
export function isDormant(channelId: string): boolean {
return getState(channelId)?.dormant ?? false;
}
export function hasSpeakers(channelId: string): boolean {
const s = getState(channelId);
return (s?.speakerList.length ?? 0) > 0;
}
/**
* Shuffle a speaker list. Constraint: previousLastAgentId cannot be new first speaker.
*/
export function shuffleList(list: SpeakerEntry[], previousLastAgentId?: string): SpeakerEntry[] {
if (list.length <= 1) return list;
const arr = [...list];
for (let i = arr.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[arr[i], arr[j]] = [arr[j], arr[i]];
}
if (previousLastAgentId && arr[0].agentId === previousLastAgentId && arr.length > 1) {
const swapIdx = 1 + Math.floor(Math.random() * (arr.length - 1));
[arr[0], arr[swapIdx]] = [arr[swapIdx], arr[0]];
}
return arr;
}
export function getDebugInfo(channelId: string) {
const s = getState(channelId);
if (!s) return { exists: false };
return {
exists: true,
speakerList: s.speakerList.map((sp) => sp.agentId),
currentIndex: s.currentIndex,
currentSpeaker: s.speakerList[s.currentIndex]?.agentId ?? null,
dormant: s.dormant,
emptyThisCycle: [...s.emptyThisCycle],
completedThisCycle: [...s.completedThisCycle],
};
}
/** Remove a channel's turn state entirely (e.g. when archived). */
export function clearChannel(channelId: string): void {
channelStates().delete(channelId);
}