From d8ac9ee0f9013e33c990038d88f876d2d32ffd2a Mon Sep 17 00:00:00 2001 From: hzhang Date: Thu, 9 Apr 2026 15:50:19 +0100 Subject: [PATCH] =?UTF-8?q?fix:=20correct=20dormancy=20detection=20?= =?UTF-8?q?=E2=80=94=20isEmptyTurn=20last-line=20+=20blocked=5Fpending=20d?= =?UTF-8?q?rain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs that prevented turn-manager dormancy from ever triggering: 1. isEmptyTurn too strict: agents output multi-line text ending with "NO_REPLY" on the last line, but the regex ^NO_REPLY$ required the entire string to match. Now checks only the last non-empty line. 2. blocked_pending counter inflation: non-speaker suppressions incremented the counter but their stale NO_REPLYs were discarded at the !isCurrentSpeaker early return without decrementing. Over a full cycle the counter inflated by the number of suppressions, causing the agent's real empty turn to be misidentified as stale when it finally arrived. Fix: at both early-return points in agent_end (!isCurrentSpeaker and !isTurnPending), drain blocked_pending when the turn is empty. Also fixed: pollForTailMatch now uses any-message detection (instead of tail-fingerprint content matching) with a 30 s timeout, avoiding infinite polling when agents send concise Discord messages after verbose LLM output. Co-Authored-By: Claude Sonnet 4.6 --- plugin/core/moderator-discord.ts | 83 ++++++++++++----- plugin/hooks/agent-end.ts | 131 +++++++++++++++++---------- plugin/hooks/before-model-resolve.ts | 68 +++++++++----- 3 files changed, 184 insertions(+), 98 deletions(-) diff --git a/plugin/core/moderator-discord.ts b/plugin/core/moderator-discord.ts index bf7b378..0a8989f 100644 --- a/plugin/core/moderator-discord.ts +++ b/plugin/core/moderator-discord.ts @@ -86,19 +86,45 @@ export async function deleteMessage( } } -/** Send a message then immediately delete it (used for schedule_identifier trigger). */ -export async function sendAndDelete( +/** + * Per-channel last schedule-trigger message ID. + * Stored on globalThis so it survives VM-context hot-reloads. + * Used by sendScheduleTrigger to delete the PREVIOUS trigger when a new one is sent. + */ +const _LAST_TRIGGER_KEY = "_dirigentLastTriggerMsgId"; +if (!(globalThis as Record)[_LAST_TRIGGER_KEY]) { + (globalThis as Record)[_LAST_TRIGGER_KEY] = new Map(); +} +const lastTriggerMsgId: Map = (globalThis as Record)[_LAST_TRIGGER_KEY] as Map; + +/** + * Send a schedule-identifier trigger message, then delete the PREVIOUS one for + * this channel (so only the current trigger is visible at any time). + * + * In debugMode the previous message is NOT deleted, leaving a full trigger + * history visible in Discord for inspection. + */ +export async function sendScheduleTrigger( token: string, channelId: string, content: string, logger: Logger, + debugMode = false, ): Promise { + const prevMsgId = lastTriggerMsgId.get(channelId); const result = await sendModeratorMessage(token, channelId, content, logger); - if (result.ok && result.messageId) { - // Small delay to ensure Discord has processed the message before deletion - await new Promise((r) => setTimeout(r, 300)); - await deleteMessage(token, channelId, result.messageId, logger); + if (!result.ok || !result.messageId) return; + + if (!debugMode) { + // Track the new message so the NEXT call can delete it + lastTriggerMsgId.set(channelId, result.messageId); + // Delete the previous trigger with a small delay + if (prevMsgId) { + await new Promise((r) => setTimeout(r, 300)); + await deleteMessage(token, channelId, prevMsgId, logger); + } } + // debugMode: don't track, don't delete — every trigger stays in history } /** Get the latest message ID in a channel (for use as poll anchor). */ @@ -121,32 +147,45 @@ export async function getLatestMessageId( type DiscordMessage = { id: string; author: { id: string }; content: string }; /** - * Poll the channel until a message from agentDiscordUserId with id > anchorId - * ends with the tail fingerprint. + * Poll the channel until any message from agentDiscordUserId with id > anchorId + * appears. The anchor is set in before_model_resolve just before the LLM call, + * so any agent message after it must belong to this turn. * - * @returns the matching messageId, or undefined on timeout. + * Uses exponential back-off starting at initialPollMs, doubling each miss, + * capped at maxPollMs. Times out after timeoutMs (default 30 s) to avoid + * getting permanently stuck when the agent's Discord message is never delivered. + * + * @returns { matched: true } when found, { interrupted: true } when aborted, + * { matched: false, interrupted: false } on timeout. */ export async function pollForTailMatch(opts: { token: string; channelId: string; anchorId: string; agentDiscordUserId: string; - tailFingerprint: string; + /** Initial poll interval in ms (default 800). */ + initialPollMs?: number; + /** Maximum poll interval in ms (default 8 000). */ + maxPollMs?: number; + /** Give up and return after this many ms (default 30 000). */ timeoutMs?: number; - pollIntervalMs?: number; - /** Callback checked each poll; if true, polling is aborted (interrupted). */ + /** Callback checked before each poll; if true, polling is aborted. */ isInterrupted?: () => boolean; }): Promise<{ matched: boolean; interrupted: boolean }> { const { token, channelId, anchorId, agentDiscordUserId, - tailFingerprint, timeoutMs = 15000, pollIntervalMs = 800, + initialPollMs = 800, + maxPollMs = 8_000, + timeoutMs = 30_000, isInterrupted = () => false, } = opts; + let interval = initialPollMs; const deadline = Date.now() + timeoutMs; - while (Date.now() < deadline) { + while (true) { if (isInterrupted()) return { matched: false, interrupted: true }; + if (Date.now() >= deadline) return { matched: false, interrupted: false }; try { const r = await fetch( @@ -155,25 +194,19 @@ export async function pollForTailMatch(opts: { ); if (r.ok) { const msgs = (await r.json()) as DiscordMessage[]; - const candidates = msgs.filter( + const found = msgs.some( (m) => m.author.id === agentDiscordUserId && BigInt(m.id) > BigInt(anchorId), ); - if (candidates.length > 0) { - // Most recent is first in Discord's response - const latest = candidates[0]; - if (latest.content.endsWith(tailFingerprint)) { - return { matched: true, interrupted: false }; - } - } + if (found) return { matched: true, interrupted: false }; } } catch { // ignore transient errors, keep polling } - await new Promise((r) => setTimeout(r, pollIntervalMs)); + await new Promise((r) => setTimeout(r, interval)); + // Exponential back-off, capped at maxPollMs + interval = Math.min(interval * 2, maxPollMs); } - - return { matched: false, interrupted: false }; } /** Create a Discord channel in a guild. Returns the new channel ID or throws. */ diff --git a/plugin/hooks/agent-end.ts b/plugin/hooks/agent-end.ts index 12c9b5e..dddd57f 100644 --- a/plugin/hooks/agent-end.ts +++ b/plugin/hooks/agent-end.ts @@ -16,10 +16,7 @@ import { type SpeakerEntry, } from "../turn-manager.js"; import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js"; -import { pollForTailMatch, sendAndDelete } from "../core/moderator-discord.js"; - -const TAIL_LENGTH = 40; -const TAIL_MATCH_TIMEOUT_MS = 15_000; +import { pollForTailMatch, sendScheduleTrigger } from "../core/moderator-discord.js"; /** * Process-level deduplication for agent_end events. @@ -33,6 +30,17 @@ if (!(globalThis as Record)[_AGENT_END_DEDUP_KEY]) { } const processedAgentEndRunIds: Set = (globalThis as Record)[_AGENT_END_DEDUP_KEY] as Set; +/** + * Per-channel advance lock: prevents two concurrent agent_end handler instances + * (from different VM contexts with different runIds) from both calling advanceSpeaker + * for the same channel at the same time, which would double-advance the speaker index. + */ +const _ADVANCE_LOCK_KEY = "_dirigentAdvancingChannels"; +if (!(globalThis as Record)[_ADVANCE_LOCK_KEY]) { + (globalThis as Record)[_ADVANCE_LOCK_KEY] = new Set(); +} +const advancingChannels: Set = (globalThis as Record)[_ADVANCE_LOCK_KEY] as Set; + /** Extract plain text from agent_end event.messages last assistant entry. */ function extractFinalText(messages: unknown[]): string { for (let i = messages.length - 1; i >= 0; i--) { @@ -55,7 +63,12 @@ function extractFinalText(messages: unknown[]): string { function isEmptyTurn(text: string): boolean { const t = text.trim(); - return t === "" || /^NO$/i.test(t) || /^NO_REPLY$/i.test(t); + if (t === "") return true; + // Check if the last non-empty line is NO or NO_REPLY (agents often write + // explanatory text before the final answer on the last line). + const lines = t.split("\n").map((l) => l.trim()).filter((l) => l !== ""); + const last = lines[lines.length - 1] ?? ""; + return /^NO$/i.test(last) || /^NO_REPLY$/i.test(last); } export type AgentEndDeps = { @@ -64,6 +77,7 @@ export type AgentEndDeps = { identityRegistry: IdentityRegistry; moderatorBotToken: string | undefined; scheduleIdentifier: string; + debugMode: boolean; /** Called when discussion channel enters dormant — to send idle reminder. */ onDiscussionDormant?: (channelId: string) => Promise; }; @@ -72,7 +86,7 @@ export type AgentEndDeps = { export type InterruptFn = (channelId: string) => void; export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { - const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, onDiscussionDormant } = deps; + const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, debugMode, onDiscussionDormant } = deps; const interruptedChannels = new Set(); @@ -95,7 +109,7 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { async function triggerNextSpeaker(channelId: string, next: SpeakerEntry): Promise { if (!moderatorBotToken) return; const msg = `<@${next.discordUserId}>${scheduleIdentifier}`; - await sendAndDelete(moderatorBotToken, channelId, msg, api.logger); + await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, debugMode); api.logger.info(`dirigent: triggered next speaker agentId=${next.agentId} channel=${channelId}`); } @@ -127,51 +141,60 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { const agentId = ctx.agentId; if (!agentId) return; - if (!isCurrentSpeaker(channelId, agentId)) return; - - // Only process agent_ends for turns that were explicitly started by before_model_resolve. - // This prevents stale NO_REPLY completions (from initial suppression) from being counted. - if (!isTurnPending(channelId, agentId)) { - api.logger.info(`dirigent: agent_end skipping stale turn agentId=${agentId} channel=${channelId}`); - return; - } - - // Consume a blocked-pending slot if any exists. These are NO_REPLY completions - // from before_model_resolve blocking events (non-speaker or init-suppressed) that - // fire late — after the agent became the current speaker — due to history-building - // overhead (~10s). We skip them until the counter is exhausted, at which point - // the next agent_end is the real LLM response. - if (consumeBlockedPending(channelId, agentId)) { - api.logger.info(`dirigent: agent_end skipping blocked-pending NO_REPLY agentId=${agentId} channel=${channelId}`); - return; - } - - clearTurnPending(channelId, agentId); + // Extract final text early so we can drain blocked_pending at every early-return point. + // This prevents the counter from staying inflated when stale NO_REPLYs are discarded + // by !isCurrentSpeaker or !isTurnPending without reaching consumeBlockedPending. const messages = Array.isArray((event as Record).messages) ? ((event as Record).messages as unknown[]) : []; const finalText = extractFinalText(messages); const empty = isEmptyTurn(finalText); + if (!isCurrentSpeaker(channelId, agentId)) { + // Drain blocked_pending for non-speaker stale NO_REPLYs. Without this, suppressions + // that happen while this agent is not the current speaker inflate the counter and cause + // its subsequent real empty turn to be misidentified as stale. + if (empty) consumeBlockedPending(channelId, agentId); + return; + } + + // Only process agent_ends for turns that were explicitly started by before_model_resolve. + // This prevents stale NO_REPLY completions (from initial suppression) from being counted. + if (!isTurnPending(channelId, agentId)) { + api.logger.info(`dirigent: agent_end skipping stale turn agentId=${agentId} channel=${channelId}`); + // Also drain here: stale may arrive after clearTurnPending but before markTurnStarted. + if (empty) consumeBlockedPending(channelId, agentId); + return; + } + + // Consume a blocked-pending slot only for self-wakeup stales: NO_REPLY completions + // from suppressed self-wakeup before_model_resolve calls that fire while the agent + // is current speaker with a turn already in progress. + if (empty && consumeBlockedPending(channelId, agentId)) { + api.logger.info(`dirigent: agent_end skipping blocked-pending NO_REPLY agentId=${agentId} channel=${channelId}`); + return; + } + + clearTurnPending(channelId, agentId); + api.logger.info( `dirigent: agent_end channel=${channelId} agentId=${agentId} empty=${empty} text=${finalText.slice(0, 80)}`, ); if (!empty) { - // Real turn: wait for Discord delivery via tail-match polling + // Real turn: wait for Discord delivery before triggering next speaker. + // Anchor was set in before_model_resolve just before the LLM call, so any + // message from the agent after the anchor must be from this turn. const identity = identityRegistry.findByAgentId(agentId); if (identity && moderatorBotToken) { const anchorId = getAnchor(channelId, agentId) ?? "0"; - const tail = [...finalText].slice(-TAIL_LENGTH).join(""); - const { matched, interrupted } = await pollForTailMatch({ + const { matched: _matched, interrupted } = await pollForTailMatch({ token: moderatorBotToken, channelId, anchorId, agentDiscordUserId: identity.discordUserId, - tailFingerprint: tail, - timeoutMs: TAIL_MATCH_TIMEOUT_MS, isInterrupted: () => interruptedChannels.has(channelId), }); @@ -187,26 +210,38 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { // Fall through to normal advance so the turn cycle continues correctly. api.logger.info(`dirigent: tail-match interrupted (non-dormant) channel=${channelId} — advancing normally`); } - if (!matched) { - api.logger.warn(`dirigent: tail-match timeout channel=${channelId} agentId=${agentId} — advancing anyway`); - } } } - // Determine shuffle mode from current list size - const debugBefore = getDebugInfo(channelId); - const currentListSize = debugBefore.exists ? (debugBefore.speakerList?.length ?? 0) : 0; - const isShuffle = currentListSize > 2; - // In shuffle mode, pass agentId as previousLastAgentId so new list avoids it as first - const previousLastAgentId = isShuffle ? agentId : undefined; + // Guard against concurrent advanceSpeaker calls for the same channel. + // Two VM context instances may both reach this point with different runIds + // (when the dedup Set doesn't catch them); only the first should advance. + if (advancingChannels.has(channelId)) { + api.logger.info(`dirigent: agent_end advance already in progress, skipping channel=${channelId} agentId=${agentId}`); + return; + } + advancingChannels.add(channelId); - const { next, enteredDormant } = await advanceSpeaker( - channelId, - agentId, - empty, - () => buildSpeakerList(channelId), - previousLastAgentId, - ); + let next: ReturnType | null = null; + let enteredDormant = false; + try { + // Determine shuffle mode from current list size + const debugBefore = getDebugInfo(channelId); + const currentListSize = debugBefore.exists ? (debugBefore.speakerList?.length ?? 0) : 0; + const isShuffle = currentListSize > 2; + // In shuffle mode, pass agentId as previousLastAgentId so new list avoids it as first + const previousLastAgentId = isShuffle ? agentId : undefined; + + ({ next, enteredDormant } = await advanceSpeaker( + channelId, + agentId, + empty, + () => buildSpeakerList(channelId), + previousLastAgentId, + )); + } finally { + advancingChannels.delete(channelId); + } if (enteredDormant) { api.logger.info(`dirigent: channel=${channelId} entered dormant`); diff --git a/plugin/hooks/before-model-resolve.ts b/plugin/hooks/before-model-resolve.ts index 631f69a..f092e12 100644 --- a/plugin/hooks/before-model-resolve.ts +++ b/plugin/hooks/before-model-resolve.ts @@ -1,8 +1,8 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { ChannelStore } from "../core/channel-store.js"; import type { IdentityRegistry } from "../core/identity-registry.js"; -import { isCurrentSpeaker, setAnchor, hasSpeakers, isDormant, setSpeakerList, markTurnStarted, incrementBlockedPending, getInitializingChannels, type SpeakerEntry } from "../turn-manager.js"; -import { getLatestMessageId, sendAndDelete } from "../core/moderator-discord.js"; +import { isCurrentSpeaker, isTurnPending, setAnchor, hasSpeakers, isDormant, setSpeakerList, markTurnStarted, incrementBlockedPending, getInitializingChannels, type SpeakerEntry } from "../turn-manager.js"; +import { getLatestMessageId, sendScheduleTrigger } from "../core/moderator-discord.js"; import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js"; /** Extract Discord channel ID from sessionKey like "agent:home-developer:discord:channel:1234567890". */ @@ -16,9 +16,10 @@ type Deps = { channelStore: ChannelStore; identityRegistry: IdentityRegistry; moderatorBotToken: string | undefined; - noReplyModel: string; - noReplyProvider: string; scheduleIdentifier: string; + debugMode: boolean; + noReplyProvider: string; + noReplyModel: string; }; /** @@ -34,9 +35,7 @@ if (!(globalThis as Record)[_BMR_DEDUP_KEY]) { const processedBeforeModelResolveEvents: WeakSet = (globalThis as Record)[_BMR_DEDUP_KEY] as WeakSet; export function registerBeforeModelResolveHook(deps: Deps): void { - const { api, channelStore, identityRegistry, moderatorBotToken, noReplyModel, noReplyProvider, scheduleIdentifier } = deps; - - const NO_REPLY = { model: noReplyModel, provider: noReplyProvider, noReply: true } as const; + const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, debugMode, noReplyProvider, noReplyModel } = deps; /** Shared init lock — see turn-manager.ts getInitializingChannels(). */ const initializingChannels = getInitializingChannels(); @@ -57,19 +56,23 @@ export function registerBeforeModelResolveHook(deps: Deps): void { const mode = channelStore.getMode(channelId); - // dead mode: suppress all responses - if (mode === "report" || mode === "dead" as string) return NO_REPLY; - - // concluded discussion: suppress all agent responses (auto-reply handled by message_received) - if (mode === "discussion") { - const rec = channelStore.getRecord(channelId); - if (rec.discussion?.concluded) return NO_REPLY; + // dead/report mode: suppress all via no-reply model + if (mode === "report" || mode === "dead" as string) { + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; } - // disabled modes: let agents respond freely + // concluded discussion: suppress via no-reply model + if (mode === "discussion") { + const rec = channelStore.getRecord(channelId); + if (rec.discussion?.concluded) { + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; + } + } + + // disabled modes: no turn management if (mode === "none" || mode === "work") return; - // discussion / chat: check turn + // chat / discussion (active): check turn const agentId = ctx.agentId; if (!agentId) return; @@ -78,7 +81,9 @@ export function registerBeforeModelResolveHook(deps: Deps): void { // Only one concurrent initializer per channel (Node.js single-threaded: this is safe) if (initializingChannels.has(channelId)) { api.logger.info(`dirigent: before_model_resolve init in progress, suppressing agentId=${agentId} channel=${channelId}`); - return NO_REPLY; + // incrementBlockedPending so agent_end knows to expect a stale NO_REPLY completion later + incrementBlockedPending(channelId, agentId); + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; } initializingChannels.add(channelId); try { @@ -95,12 +100,13 @@ export function registerBeforeModelResolveHook(deps: Deps): void { const first = speakers[0]; api.logger.info(`dirigent: initialized speaker list channel=${channelId} first=${first.agentId} all=${speakers.map(s => s.agentId).join(",")}`); - // If this agent is NOT the first speaker, trigger first speaker and suppress this one + // If this agent is NOT the first speaker, trigger first speaker and suppress self if (first.agentId !== agentId && moderatorBotToken) { - await sendAndDelete(moderatorBotToken, channelId, `<@${first.discordUserId}>${scheduleIdentifier}`, api.logger); - return NO_REPLY; + await sendScheduleTrigger(moderatorBotToken, channelId, `<@${first.discordUserId}>${scheduleIdentifier}`, api.logger, debugMode); + incrementBlockedPending(channelId, agentId); + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; } - // If this agent IS the first speaker, fall through to normal turn logic + // Fall through — this agent IS the first speaker } else { // No registered agents visible — let everyone respond freely return; @@ -113,13 +119,25 @@ export function registerBeforeModelResolveHook(deps: Deps): void { } } - // If channel is dormant: suppress all agents - if (isDormant(channelId)) return NO_REPLY; + // Channel is dormant: suppress via no-reply model + if (isDormant(channelId)) { + api.logger.info(`dirigent: before_model_resolve suppressing dormant agentId=${agentId} channel=${channelId}`); + incrementBlockedPending(channelId, agentId); + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; + } if (!isCurrentSpeaker(channelId, agentId)) { - api.logger.info(`dirigent: before_model_resolve blocking non-speaker session=${sessionKey} agentId=${agentId} channel=${channelId}`); + api.logger.info(`dirigent: before_model_resolve suppressing non-speaker agentId=${agentId} channel=${channelId}`); incrementBlockedPending(channelId, agentId); - return NO_REPLY; + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; + } + + // If a turn is already in progress for this agent, this is a duplicate wakeup + // (e.g. agent woke itself via a message-tool send). Suppress it. + if (isTurnPending(channelId, agentId)) { + api.logger.info(`dirigent: before_model_resolve turn already in progress, suppressing self-wakeup agentId=${agentId} channel=${channelId}`); + incrementBlockedPending(channelId, agentId); + return { modelOverride: noReplyModel, providerOverride: noReplyProvider }; } // Mark that this is a legitimate turn (guards agent_end against stale NO_REPLY completions)