import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { ChannelStore } from "../core/channel-store.js"; import type { IdentityRegistry } from "../core/identity-registry.js"; import { parseDiscordChannelId } from "./before-model-resolve.js"; import { isCurrentSpeaker, getAnchor, advanceSpeaker, wakeFromDormant, isDormant, hasSpeakers, getDebugInfo, isTurnPending, clearTurnPending, consumeBlockedPending, type SpeakerEntry, } from "../turn-manager.js"; import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js"; import { pollForTailMatch, sendScheduleTrigger } from "../core/moderator-discord.js"; /** * Process-level deduplication for agent_end events. * OpenClaw hot-reloads plugin modules (re-imports), stacking duplicate handlers. * Using globalThis ensures the dedup Set survives module reloads and is shared * by all handler instances in the same process. */ const _AGENT_END_DEDUP_KEY = "_dirigentProcessedAgentEndRunIds"; if (!(globalThis as Record)[_AGENT_END_DEDUP_KEY]) { (globalThis as Record)[_AGENT_END_DEDUP_KEY] = new Set(); } const processedAgentEndRunIds: Set = (globalThis as Record)[_AGENT_END_DEDUP_KEY] as Set; /** * Per-channel advance lock: prevents two concurrent agent_end handler instances * (from different VM contexts with different runIds) from both calling advanceSpeaker * for the same channel at the same time, which would double-advance the speaker index. */ const _ADVANCE_LOCK_KEY = "_dirigentAdvancingChannels"; if (!(globalThis as Record)[_ADVANCE_LOCK_KEY]) { (globalThis as Record)[_ADVANCE_LOCK_KEY] = new Set(); } const advancingChannels: Set = (globalThis as Record)[_ADVANCE_LOCK_KEY] as Set; /** Extract plain text from agent_end event.messages last assistant entry. */ function extractFinalText(messages: unknown[]): string { for (let i = messages.length - 1; i >= 0; i--) { const msg = messages[i] as Record | null; if (!msg || msg.role !== "assistant") continue; const content = msg.content; if (typeof content === "string") return content; if (Array.isArray(content)) { let text = ""; for (const part of content) { const p = part as Record; if (p?.type === "text" && typeof p.text === "string") text += p.text; } return text; } break; } return ""; } function isEmptyTurn(text: string): boolean { const t = text.trim(); if (t === "") return true; // Check if the last non-empty line is NO or NO_REPLY (agents often write // explanatory text before the final answer on the last line). const lines = t.split("\n").map((l) => l.trim()).filter((l) => l !== ""); const last = lines[lines.length - 1] ?? ""; return /^NO$/i.test(last) || /^NO_REPLY$/i.test(last); } export type AgentEndDeps = { api: OpenClawPluginApi; channelStore: ChannelStore; identityRegistry: IdentityRegistry; moderatorBotToken: string | undefined; scheduleIdentifier: string; debugMode: boolean; /** Called when discussion channel enters dormant — to send idle reminder. */ onDiscussionDormant?: (channelId: string) => Promise; }; /** Exposed so message-received can interrupt an in-progress tail-match wait. */ export type InterruptFn = (channelId: string) => void; export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, debugMode, onDiscussionDormant } = deps; const interruptedChannels = new Set(); function interrupt(channelId: string): void { interruptedChannels.add(channelId); setTimeout(() => interruptedChannels.delete(channelId), 5000); } async function buildSpeakerList(channelId: string): Promise { if (!moderatorBotToken) return []; const agentIds = await fetchVisibleChannelBotAccountIds(api, channelId, identityRegistry); const result: SpeakerEntry[] = []; for (const agentId of agentIds) { const identity = identityRegistry.findByAgentId(agentId); if (identity) result.push({ agentId, discordUserId: identity.discordUserId }); } return result; } async function triggerNextSpeaker(channelId: string, next: SpeakerEntry): Promise { if (!moderatorBotToken) return; const msg = `<@${next.discordUserId}>${scheduleIdentifier}`; await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, debugMode); api.logger.info(`dirigent: triggered next speaker agentId=${next.agentId} channel=${channelId}`); } api.on("agent_end", async (event, ctx) => { try { // Deduplicate: skip if this runId was already processed by another handler // instance (can happen when OpenClaw hot-reloads the plugin in the same process). const runId = (event as Record).runId as string | undefined; if (runId) { if (processedAgentEndRunIds.has(runId)) return; processedAgentEndRunIds.add(runId); // Evict old entries to prevent unbounded growth (keep last 500) if (processedAgentEndRunIds.size > 500) { const oldest = processedAgentEndRunIds.values().next().value; if (oldest) processedAgentEndRunIds.delete(oldest); } } const sessionKey = ctx.sessionKey; if (!sessionKey) return; const channelId = parseDiscordChannelId(sessionKey); if (!channelId) return; const mode = channelStore.getMode(channelId); if (mode === "none" || mode === "work" || mode === "report") return; if (!hasSpeakers(channelId)) return; const agentId = ctx.agentId; if (!agentId) return; // Extract final text early so we can drain blocked_pending at every early-return point. // This prevents the counter from staying inflated when stale NO_REPLYs are discarded // by !isCurrentSpeaker or !isTurnPending without reaching consumeBlockedPending. const messages = Array.isArray((event as Record).messages) ? ((event as Record).messages as unknown[]) : []; const finalText = extractFinalText(messages); const empty = isEmptyTurn(finalText); if (!isCurrentSpeaker(channelId, agentId)) { // Drain blocked_pending for non-speaker stale NO_REPLYs. Without this, suppressions // that happen while this agent is not the current speaker inflate the counter and cause // its subsequent real empty turn to be misidentified as stale. if (empty) consumeBlockedPending(channelId, agentId); return; } // Only process agent_ends for turns that were explicitly started by before_model_resolve. // This prevents stale NO_REPLY completions (from initial suppression) from being counted. if (!isTurnPending(channelId, agentId)) { api.logger.info(`dirigent: agent_end skipping stale turn agentId=${agentId} channel=${channelId}`); // Also drain here: stale may arrive after clearTurnPending but before markTurnStarted. if (empty) consumeBlockedPending(channelId, agentId); return; } // Consume a blocked-pending slot only for self-wakeup stales: NO_REPLY completions // from suppressed self-wakeup before_model_resolve calls that fire while the agent // is current speaker with a turn already in progress. if (empty && consumeBlockedPending(channelId, agentId)) { api.logger.info(`dirigent: agent_end skipping blocked-pending NO_REPLY agentId=${agentId} channel=${channelId}`); return; } clearTurnPending(channelId, agentId); api.logger.info( `dirigent: agent_end channel=${channelId} agentId=${agentId} empty=${empty} text=${finalText.slice(0, 80)}`, ); if (!empty) { // Real turn: wait for Discord delivery before triggering next speaker. // Anchor was set in before_model_resolve just before the LLM call, so any // message from the agent after the anchor must be from this turn. const identity = identityRegistry.findByAgentId(agentId); if (identity && moderatorBotToken) { const anchorId = getAnchor(channelId, agentId) ?? "0"; const { matched: _matched, interrupted } = await pollForTailMatch({ token: moderatorBotToken, channelId, anchorId, agentDiscordUserId: identity.discordUserId, isInterrupted: () => interruptedChannels.has(channelId), }); if (interrupted) { if (isDormant(channelId)) { // Channel is dormant: a new external message woke it — restart from first speaker api.logger.info(`dirigent: tail-match interrupted (dormant) channel=${channelId} — waking`); const first = wakeFromDormant(channelId); if (first) await triggerNextSpeaker(channelId, first); return; } // Not dormant: interrupt was a spurious trigger (e.g. moderator bot message). // Fall through to normal advance so the turn cycle continues correctly. api.logger.info(`dirigent: tail-match interrupted (non-dormant) channel=${channelId} — advancing normally`); } } } // Guard against concurrent advanceSpeaker calls for the same channel. // Two VM context instances may both reach this point with different runIds // (when the dedup Set doesn't catch them); only the first should advance. if (advancingChannels.has(channelId)) { api.logger.info(`dirigent: agent_end advance already in progress, skipping channel=${channelId} agentId=${agentId}`); return; } advancingChannels.add(channelId); let next: ReturnType | null = null; let enteredDormant = false; try { // Determine shuffle mode from current list size const debugBefore = getDebugInfo(channelId); const currentListSize = debugBefore.exists ? (debugBefore.speakerList?.length ?? 0) : 0; const isShuffle = currentListSize > 2; // In shuffle mode, pass agentId as previousLastAgentId so new list avoids it as first const previousLastAgentId = isShuffle ? agentId : undefined; ({ next, enteredDormant } = await advanceSpeaker( channelId, agentId, empty, () => buildSpeakerList(channelId), previousLastAgentId, )); } finally { advancingChannels.delete(channelId); } if (enteredDormant) { api.logger.info(`dirigent: channel=${channelId} entered dormant`); if (mode === "discussion") { await onDiscussionDormant?.(channelId).catch((err) => { api.logger.warn(`dirigent: onDiscussionDormant failed: ${String(err)}`); }); } return; } if (next) await triggerNextSpeaker(channelId, next); } catch (err) { api.logger.warn(`dirigent: agent_end hook error: ${String(err)}`); } }); return interrupt; }