Merge pull request 'fix: advance turn in before_message_write to prevent race condition' (#10) from fix/turn-advance-race-condition into main

Reviewed-on: nav/WhisperGate#10
This commit was merged in pull request #10.
This commit is contained in:
h z
2026-03-03 03:47:35 +00:00
2 changed files with 190 additions and 100 deletions

View File

@@ -28,6 +28,7 @@ const sessionAllowed = new Map<string, boolean>(); // Track if session was allow
const sessionInjected = new Set<string>(); // Track which sessions have already injected the end marker const sessionInjected = new Set<string>(); // Track which sessions have already injected the end marker
const sessionChannelId = new Map<string, string>(); // Track sessionKey -> channelId mapping const sessionChannelId = new Map<string, string>(); // Track sessionKey -> channelId mapping
const sessionAccountId = new Map<string, string>(); // Track sessionKey -> accountId mapping const sessionAccountId = new Map<string, string>(); // Track sessionKey -> accountId mapping
const sessionTurnHandled = new Set<string>(); // Track sessions where turn was already advanced in before_message_write
const MAX_SESSION_DECISIONS = 2000; const MAX_SESSION_DECISIONS = 2000;
const DECISION_TTL_MS = 5 * 60 * 1000; const DECISION_TTL_MS = 5 * 60 * 1000;
function buildEndMarkerInstruction(endSymbols: string[], isGroupChat: boolean): string { function buildEndMarkerInstruction(endSymbols: string[], isGroupChat: boolean): string {
@@ -931,9 +932,14 @@ export default {
} }
// Extract content from event.message (AgentMessage) // Extract content from event.message (AgentMessage)
// Only process assistant messages — before_message_write fires for both
// user (incoming) and assistant (outgoing) messages. Incoming messages may
// contain end symbols from OTHER agents, which would incorrectly advance the turn.
let content = ""; let content = "";
const msg = (event as Record<string, unknown>).message as Record<string, unknown> | undefined; const msg = (event as Record<string, unknown>).message as Record<string, unknown> | undefined;
if (msg) { if (msg) {
const role = msg.role as string | undefined;
if (role && role !== "assistant") return;
// AgentMessage may have content as string or nested // AgentMessage may have content as string or nested
if (typeof msg.content === "string") { if (typeof msg.content === "string") {
content = msg.content; content = msg.content;
@@ -959,10 +965,27 @@ export default {
if (!key || !channelId || !accountId) return; if (!key || !channelId || !accountId) return;
// Only the current speaker should advance the turn.
// Other agents also trigger before_message_write (for incoming messages or forced no-reply),
// but they must not affect turn state.
const currentTurn = getTurnDebugInfo(channelId);
if (currentTurn.currentSpeaker !== accountId) {
api.logger.info(
`whispergate: before_message_write skipping non-current-speaker session=${key} accountId=${accountId} currentSpeaker=${currentTurn.currentSpeaker}`,
);
return;
}
const live = getLivePluginConfig(api, baseConfig as WhisperGateConfig) as WhisperGateConfig & DebugConfig; const live = getLivePluginConfig(api, baseConfig as WhisperGateConfig) as WhisperGateConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
const policy = resolvePolicy(live, channelId, policyState.channelPolicies);
const trimmed = content.trim(); const trimmed = content.trim();
const isNoReply = /^NO_REPLY$/i.test(trimmed); const isEmpty = trimmed.length === 0;
const isNoReply = /^NO_REPLY$/i.test(trimmed) || /^HEARTBEAT_OK$/i.test(trimmed);
const lastChar = trimmed.length > 0 ? Array.from(trimmed).pop() || "" : "";
const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar);
const wasNoReply = isEmpty || isNoReply;
// Log turn state for debugging // Log turn state for debugging
const turnDebug = getTurnDebugInfo(channelId); const turnDebug = getTurnDebugInfo(channelId);
@@ -970,15 +993,10 @@ export default {
`whispergate: DEBUG turn state channel=${channelId} turnOrder=${JSON.stringify(turnDebug.turnOrder)} currentSpeaker=${turnDebug.currentSpeaker} noRepliedThisCycle=${JSON.stringify([...turnDebug.noRepliedThisCycle])}`, `whispergate: DEBUG turn state channel=${channelId} turnOrder=${JSON.stringify(turnDebug.turnOrder)} currentSpeaker=${turnDebug.currentSpeaker} noRepliedThisCycle=${JSON.stringify([...turnDebug.noRepliedThisCycle])}`,
); );
if (!isNoReply) {
api.logger.info(
`whispergate: before_message_write content is not NO_REPLY, skipping channel=${channelId}`,
);
return;
}
// Check if this session was forced no-reply or allowed to speak // Check if this session was forced no-reply or allowed to speak
const wasAllowed = sessionAllowed.get(key); const wasAllowed = sessionAllowed.get(key);
if (wasNoReply) {
api.logger.info( api.logger.info(
`whispergate: DEBUG NO_REPLY detected session=${key} wasAllowed=${wasAllowed}`, `whispergate: DEBUG NO_REPLY detected session=${key} wasAllowed=${wasAllowed}`,
); );
@@ -995,10 +1013,10 @@ export default {
} }
// Allowed to speak (current speaker) but chose NO_REPLY - advance turn // Allowed to speak (current speaker) but chose NO_REPLY - advance turn
ensureTurnOrder(api, channelId); ensureTurnOrder(api, channelId, live);
const nextSpeaker = onSpeakerDone(channelId, accountId, true); const nextSpeaker = onSpeakerDone(channelId, accountId, true);
sessionAllowed.delete(key); sessionAllowed.delete(key);
sessionTurnHandled.add(key);
api.logger.info( api.logger.info(
`whispergate: before_message_write real no-reply session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`, `whispergate: before_message_write real no-reply session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`,
@@ -1026,6 +1044,24 @@ export default {
api.logger.warn(`whispergate: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`); api.logger.warn(`whispergate: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`);
} }
} }
} else if (hasEndSymbol) {
// End symbol detected — advance turn NOW (before message is broadcast to other agents)
// This prevents the race condition where other agents receive the message
// before message_sent fires and advances the turn.
ensureTurnOrder(api, channelId, live);
const nextSpeaker = onSpeakerDone(channelId, accountId, false);
sessionAllowed.delete(key);
sessionTurnHandled.add(key);
api.logger.info(
`whispergate: before_message_write end-symbol turn advance session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`,
);
} else {
api.logger.info(
`whispergate: before_message_write no turn action needed session=${key} channel=${channelId}`,
);
return;
}
} catch (err) { } catch (err) {
api.logger.warn(`whispergate: before_message_write hook failed: ${String(err)}`); api.logger.warn(`whispergate: before_message_write hook failed: ${String(err)}`);
} }
@@ -1079,6 +1115,15 @@ export default {
const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar); const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar);
const wasNoReply = isEmpty || isNoReply; const wasNoReply = isEmpty || isNoReply;
// Skip if turn was already advanced in before_message_write
if (key && sessionTurnHandled.has(key)) {
sessionTurnHandled.delete(key);
api.logger.info(
`whispergate: message_sent skipping turn advance (already handled in before_message_write) session=${key} channel=${channelId}`,
);
return;
}
if (wasNoReply || hasEndSymbol) { if (wasNoReply || hasEndSymbol) {
const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply); const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply);
const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol"; const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol";

View File

@@ -28,6 +28,7 @@ const sessionAllowed = new Map<string, boolean>(); // Track if session was allow
const sessionInjected = new Set<string>(); // Track which sessions have already injected the end marker const sessionInjected = new Set<string>(); // Track which sessions have already injected the end marker
const sessionChannelId = new Map<string, string>(); // Track sessionKey -> channelId mapping const sessionChannelId = new Map<string, string>(); // Track sessionKey -> channelId mapping
const sessionAccountId = new Map<string, string>(); // Track sessionKey -> accountId mapping const sessionAccountId = new Map<string, string>(); // Track sessionKey -> accountId mapping
const sessionTurnHandled = new Set<string>(); // Track sessions where turn was already advanced in before_message_write
const MAX_SESSION_DECISIONS = 2000; const MAX_SESSION_DECISIONS = 2000;
const DECISION_TTL_MS = 5 * 60 * 1000; const DECISION_TTL_MS = 5 * 60 * 1000;
function buildEndMarkerInstruction(endSymbols: string[], isGroupChat: boolean): string { function buildEndMarkerInstruction(endSymbols: string[], isGroupChat: boolean): string {
@@ -931,9 +932,14 @@ export default {
} }
// Extract content from event.message (AgentMessage) // Extract content from event.message (AgentMessage)
// Only process assistant messages — before_message_write fires for both
// user (incoming) and assistant (outgoing) messages. Incoming messages may
// contain end symbols from OTHER agents, which would incorrectly advance the turn.
let content = ""; let content = "";
const msg = (event as Record<string, unknown>).message as Record<string, unknown> | undefined; const msg = (event as Record<string, unknown>).message as Record<string, unknown> | undefined;
if (msg) { if (msg) {
const role = msg.role as string | undefined;
if (role && role !== "assistant") return;
// AgentMessage may have content as string or nested // AgentMessage may have content as string or nested
if (typeof msg.content === "string") { if (typeof msg.content === "string") {
content = msg.content; content = msg.content;
@@ -959,10 +965,27 @@ export default {
if (!key || !channelId || !accountId) return; if (!key || !channelId || !accountId) return;
// Only the current speaker should advance the turn.
// Other agents also trigger before_message_write (for incoming messages or forced no-reply),
// but they must not affect turn state.
const currentTurn = getTurnDebugInfo(channelId);
if (currentTurn.currentSpeaker !== accountId) {
api.logger.info(
`whispergate: before_message_write skipping non-current-speaker session=${key} accountId=${accountId} currentSpeaker=${currentTurn.currentSpeaker}`,
);
return;
}
const live = getLivePluginConfig(api, baseConfig as WhisperGateConfig) as WhisperGateConfig & DebugConfig; const live = getLivePluginConfig(api, baseConfig as WhisperGateConfig) as WhisperGateConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
const policy = resolvePolicy(live, channelId, policyState.channelPolicies);
const trimmed = content.trim(); const trimmed = content.trim();
const isNoReply = /^NO_REPLY$/i.test(trimmed); const isEmpty = trimmed.length === 0;
const isNoReply = /^NO_REPLY$/i.test(trimmed) || /^HEARTBEAT_OK$/i.test(trimmed);
const lastChar = trimmed.length > 0 ? Array.from(trimmed).pop() || "" : "";
const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar);
const wasNoReply = isEmpty || isNoReply;
// Log turn state for debugging // Log turn state for debugging
const turnDebug = getTurnDebugInfo(channelId); const turnDebug = getTurnDebugInfo(channelId);
@@ -970,15 +993,10 @@ export default {
`whispergate: DEBUG turn state channel=${channelId} turnOrder=${JSON.stringify(turnDebug.turnOrder)} currentSpeaker=${turnDebug.currentSpeaker} noRepliedThisCycle=${JSON.stringify([...turnDebug.noRepliedThisCycle])}`, `whispergate: DEBUG turn state channel=${channelId} turnOrder=${JSON.stringify(turnDebug.turnOrder)} currentSpeaker=${turnDebug.currentSpeaker} noRepliedThisCycle=${JSON.stringify([...turnDebug.noRepliedThisCycle])}`,
); );
if (!isNoReply) {
api.logger.info(
`whispergate: before_message_write content is not NO_REPLY, skipping channel=${channelId}`,
);
return;
}
// Check if this session was forced no-reply or allowed to speak // Check if this session was forced no-reply or allowed to speak
const wasAllowed = sessionAllowed.get(key); const wasAllowed = sessionAllowed.get(key);
if (wasNoReply) {
api.logger.info( api.logger.info(
`whispergate: DEBUG NO_REPLY detected session=${key} wasAllowed=${wasAllowed}`, `whispergate: DEBUG NO_REPLY detected session=${key} wasAllowed=${wasAllowed}`,
); );
@@ -995,10 +1013,10 @@ export default {
} }
// Allowed to speak (current speaker) but chose NO_REPLY - advance turn // Allowed to speak (current speaker) but chose NO_REPLY - advance turn
ensureTurnOrder(api, channelId); ensureTurnOrder(api, channelId, live);
const nextSpeaker = onSpeakerDone(channelId, accountId, true); const nextSpeaker = onSpeakerDone(channelId, accountId, true);
sessionAllowed.delete(key); sessionAllowed.delete(key);
sessionTurnHandled.add(key);
api.logger.info( api.logger.info(
`whispergate: before_message_write real no-reply session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`, `whispergate: before_message_write real no-reply session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`,
@@ -1026,6 +1044,24 @@ export default {
api.logger.warn(`whispergate: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`); api.logger.warn(`whispergate: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`);
} }
} }
} else if (hasEndSymbol) {
// End symbol detected — advance turn NOW (before message is broadcast to other agents)
// This prevents the race condition where other agents receive the message
// before message_sent fires and advances the turn.
ensureTurnOrder(api, channelId, live);
const nextSpeaker = onSpeakerDone(channelId, accountId, false);
sessionAllowed.delete(key);
sessionTurnHandled.add(key);
api.logger.info(
`whispergate: before_message_write end-symbol turn advance session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`,
);
} else {
api.logger.info(
`whispergate: before_message_write no turn action needed session=${key} channel=${channelId}`,
);
return;
}
} catch (err) { } catch (err) {
api.logger.warn(`whispergate: before_message_write hook failed: ${String(err)}`); api.logger.warn(`whispergate: before_message_write hook failed: ${String(err)}`);
} }
@@ -1079,6 +1115,15 @@ export default {
const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar); const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar);
const wasNoReply = isEmpty || isNoReply; const wasNoReply = isEmpty || isNoReply;
// Skip if turn was already advanced in before_message_write
if (key && sessionTurnHandled.has(key)) {
sessionTurnHandled.delete(key);
api.logger.info(
`whispergate: message_sent skipping turn advance (already handled in before_message_write) session=${key} channel=${channelId}`,
);
return;
}
if (wasNoReply || hasEndSymbol) { if (wasNoReply || hasEndSymbol) {
const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply); const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply);
const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol"; const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol";