diff --git a/plugin/turn-manager.ts b/plugin/turn-manager.ts index 0c4d737..fed88e3 100644 --- a/plugin/turn-manager.ts +++ b/plugin/turn-manager.ts @@ -55,8 +55,22 @@ export function getInitializingChannels(): Set { return _G._tmInitializingChannels as Set; } +/** + * Maximum blocked-pending entries tracked per agent per channel. + * Caps the drain time: in a busy channel many messages can arrive during a non-speaker + * turn, each incrementing the counter. Without a cap the counter grows unboundedly. + * Also applied retroactively in markTurnStarted to recover from accumulated debt. + */ +const MAX_BLOCKED_PENDING = 3; + export function markTurnStarted(channelId: string, agentId: string): void { pendingTurns().add(`${channelId}:${agentId}`); + // Cap existing blocked-pending at MAX to recover from accumulated debt + // (can occur when many messages arrive during a long non-speaker period). + const bpc = blockedPendingCounts(); + const key = `${channelId}:${agentId}`; + const current = bpc.get(key) ?? 0; + if (current > MAX_BLOCKED_PENDING) bpc.set(key, MAX_BLOCKED_PENDING); } export function isTurnPending(channelId: string, agentId: string): boolean { @@ -74,10 +88,12 @@ export function clearTurnPending(channelId: string, agentId: string): void { * causing false empty-turn detection. We count them and skip one per agent_end * until the count reaches zero, at which point the next agent_end is real. */ + export function incrementBlockedPending(channelId: string, agentId: string): void { const bpc = blockedPendingCounts(); const key = `${channelId}:${agentId}`; - bpc.set(key, (bpc.get(key) ?? 0) + 1); + const current = bpc.get(key) ?? 0; + if (current < MAX_BLOCKED_PENDING) bpc.set(key, current + 1); } /** Returns true (and decrements) if this agent_end should be treated as a stale blocked completion. */