diff --git a/plugin/hooks/before-model-resolve.ts b/plugin/hooks/before-model-resolve.ts new file mode 100644 index 0000000..589bcc3 --- /dev/null +++ b/plugin/hooks/before-model-resolve.ts @@ -0,0 +1,168 @@ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { evaluateDecision, type Decision, type DirigentConfig } from "../rules.js"; +import { checkTurn } from "../turn-manager.js"; +import { deriveDecisionInputFromPrompt } from "../decision-input.js"; + +type DebugConfig = { + enableDebugLogs?: boolean; + debugLogChannelIds?: string[]; +}; + +type DecisionRecord = { + decision: Decision; + createdAt: number; + needsRestore?: boolean; +}; + +type BeforeModelResolveDeps = { + api: OpenClawPluginApi; + baseConfig: DirigentConfig; + sessionDecision: Map; + sessionAllowed: Map; + sessionChannelId: Map; + sessionAccountId: Map; + policyState: { channelPolicies: Record }; + DECISION_TTL_MS: number; + ensurePolicyStateLoaded: (api: OpenClawPluginApi, config: DirigentConfig) => void; + getLivePluginConfig: (api: OpenClawPluginApi, fallback: DirigentConfig) => DirigentConfig; + resolveAccountId: (api: OpenClawPluginApi, agentId: string) => string | undefined; + pruneDecisionMap: () => void; + shouldDebugLog: (config: DirigentConfig & DebugConfig, channelId?: string) => boolean; + ensureTurnOrder: (api: OpenClawPluginApi, channelId: string) => void; +}; + +export function registerBeforeModelResolveHook(deps: BeforeModelResolveDeps): void { + const { + api, + baseConfig, + sessionDecision, + sessionAllowed, + sessionChannelId, + sessionAccountId, + policyState, + DECISION_TTL_MS, + ensurePolicyStateLoaded, + getLivePluginConfig, + resolveAccountId, + pruneDecisionMap, + shouldDebugLog, + ensureTurnOrder, + } = deps; + + api.on("before_model_resolve", async (event, ctx) => { + const key = ctx.sessionKey; + if (!key) return; + + const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig; + ensurePolicyStateLoaded(api, live); + + const prompt = ((event as Record).prompt as string) || ""; + + if (live.enableDebugLogs) { + api.logger.info( + `dirigent: DEBUG_BEFORE_MODEL_RESOLVE ctx=${JSON.stringify({ sessionKey: ctx.sessionKey, messageProvider: ctx.messageProvider, agentId: ctx.agentId })} ` + + `promptPreview=${prompt.slice(0, 300)}`, + ); + } + + const derived = deriveDecisionInputFromPrompt({ + prompt, + messageProvider: ctx.messageProvider, + sessionKey: key, + ctx: ctx as Record, + event: event as Record, + }); + + const hasConvMarker = prompt.includes("Conversation info (untrusted metadata):"); + if (live.discordOnly !== false && (!hasConvMarker || derived.channel !== "discord")) return; + + if (derived.channelId) { + sessionChannelId.set(key, derived.channelId); + } + const resolvedAccountId = resolveAccountId(api, ctx.agentId || ""); + if (resolvedAccountId) { + sessionAccountId.set(key, resolvedAccountId); + } + + let rec = sessionDecision.get(key); + if (!rec || Date.now() - rec.createdAt > DECISION_TTL_MS) { + if (rec) sessionDecision.delete(key); + const decision = evaluateDecision({ + config: live, + channel: derived.channel, + channelId: derived.channelId, + channelPolicies: policyState.channelPolicies as Record, + senderId: derived.senderId, + content: derived.content, + }); + rec = { decision, createdAt: Date.now() }; + sessionDecision.set(key, rec); + pruneDecisionMap(); + if (shouldDebugLog(live, derived.channelId)) { + api.logger.info( + `dirigent: debug before_model_resolve recompute session=${key} ` + + `channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` + + `convSenderId=${String((derived.conv as Record).sender_id ?? "")} ` + + `convSender=${String((derived.conv as Record).sender ?? "")} ` + + `convChannelId=${String((derived.conv as Record).channel_id ?? "")} ` + + `decision=${decision.reason} shouldNoReply=${decision.shouldUseNoReply} shouldInject=${decision.shouldInjectEndMarkerPrompt}`, + ); + } + } + + if (derived.channelId) { + ensureTurnOrder(api, derived.channelId); + const accountId = resolveAccountId(api, ctx.agentId || ""); + if (accountId) { + const turnCheck = checkTurn(derived.channelId, accountId); + if (!turnCheck.allowed) { + sessionAllowed.set(key, false); + api.logger.info( + `dirigent: turn gate blocked session=${key} accountId=${accountId} currentSpeaker=${turnCheck.currentSpeaker} reason=${turnCheck.reason}`, + ); + return { + providerOverride: live.noReplyProvider, + modelOverride: live.noReplyModel, + }; + } + sessionAllowed.set(key, true); + } + } + + if (!rec.decision.shouldUseNoReply) { + if (rec.needsRestore) { + sessionDecision.delete(key); + return { + providerOverride: undefined, + modelOverride: undefined, + }; + } + return; + } + + rec.needsRestore = true; + sessionDecision.set(key, rec); + + if (live.enableDebugLogs) { + const hasConvMarker2 = prompt.includes("Conversation info (untrusted metadata):"); + api.logger.info( + `dirigent: DEBUG_NO_REPLY_TRIGGER session=${key} ` + + `channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` + + `convSenderId=${String((derived.conv as Record).sender_id ?? "")} ` + + `convSender=${String((derived.conv as Record).sender ?? "")} ` + + `decision=${rec.decision.reason} ` + + `shouldNoReply=${rec.decision.shouldUseNoReply} shouldInject=${rec.decision.shouldInjectEndMarkerPrompt} ` + + `hasConvMarker=${hasConvMarker2} promptLen=${prompt.length}`, + ); + } + + api.logger.info( + `dirigent: override model for session=${key}, provider=${live.noReplyProvider}, model=${live.noReplyModel}, reason=${rec.decision.reason}`, + ); + + return { + providerOverride: live.noReplyProvider, + modelOverride: live.noReplyModel, + }; + }); +} diff --git a/plugin/hooks/message-sent.ts b/plugin/hooks/message-sent.ts new file mode 100644 index 0000000..2c87284 --- /dev/null +++ b/plugin/hooks/message-sent.ts @@ -0,0 +1,123 @@ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { resolvePolicy, type DirigentConfig } from "../rules.js"; +import { onSpeakerDone, setWaitingForHuman } from "../turn-manager.js"; +import { extractDiscordChannelId, extractDiscordChannelIdFromSessionKey } from "../channel-resolver.js"; + +type DebugConfig = { + enableDebugLogs?: boolean; + debugLogChannelIds?: string[]; +}; + +type MessageSentDeps = { + api: OpenClawPluginApi; + baseConfig: DirigentConfig; + policyState: { channelPolicies: Record }; + sessionChannelId: Map; + sessionAccountId: Map; + sessionTurnHandled: Set; + ensurePolicyStateLoaded: (api: OpenClawPluginApi, config: DirigentConfig) => void; + getLivePluginConfig: (api: OpenClawPluginApi, fallback: DirigentConfig) => DirigentConfig; + resolveDiscordUserId: (api: OpenClawPluginApi, accountId: string) => string | undefined; + sendModeratorMessage: ( + botToken: string, + channelId: string, + content: string, + logger: { info: (m: string) => void; warn: (m: string) => void }, + ) => Promise; +}; + +export function registerMessageSentHook(deps: MessageSentDeps): void { + const { + api, + baseConfig, + policyState, + sessionChannelId, + sessionAccountId, + sessionTurnHandled, + ensurePolicyStateLoaded, + getLivePluginConfig, + resolveDiscordUserId, + sendModeratorMessage, + } = deps; + + api.on("message_sent", async (event, ctx) => { + try { + const key = ctx.sessionKey; + const c = (ctx || {}) as Record; + const e = (event || {}) as Record; + + api.logger.info( + `dirigent: DEBUG message_sent RAW ctxKeys=${JSON.stringify(Object.keys(c))} eventKeys=${JSON.stringify(Object.keys(e))} ` + + `ctx.channelId=${String(c.channelId ?? "undefined")} ctx.conversationId=${String(c.conversationId ?? "undefined")} ` + + `ctx.accountId=${String(c.accountId ?? "undefined")} event.to=${String(e.to ?? "undefined")} ` + + `session=${key ?? "undefined"}`, + ); + + let channelId = extractDiscordChannelId(c, e); + if (!channelId && key) { + channelId = sessionChannelId.get(key); + } + if (!channelId && key) { + channelId = extractDiscordChannelIdFromSessionKey(key); + } + const accountId = (ctx.accountId as string | undefined) || (key ? sessionAccountId.get(key) : undefined); + const content = (event.content as string) || ""; + + api.logger.info( + `dirigent: DEBUG message_sent RESOLVED session=${key ?? "undefined"} channelId=${channelId ?? "undefined"} accountId=${accountId ?? "undefined"} content=${content.slice(0, 100)}`, + ); + + if (!channelId || !accountId) return; + + const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig; + ensurePolicyStateLoaded(api, live); + const policy = resolvePolicy(live, channelId, policyState.channelPolicies as Record); + + const trimmed = content.trim(); + const isEmpty = trimmed.length === 0; + const isNoReply = /^NO_REPLY$/i.test(trimmed) || /^HEARTBEAT_OK$/i.test(trimmed); + const lastChar = trimmed.length > 0 ? Array.from(trimmed).pop() || "" : ""; + const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar); + const waitId = live.waitIdentifier || "👤"; + const hasWaitIdentifier = !!lastChar && lastChar === waitId; + const wasNoReply = isEmpty || isNoReply; + + if (key && sessionTurnHandled.has(key)) { + sessionTurnHandled.delete(key); + api.logger.info( + `dirigent: message_sent skipping turn advance (already handled in before_message_write) session=${key} channel=${channelId}`, + ); + return; + } + + if (hasWaitIdentifier) { + setWaitingForHuman(channelId); + api.logger.info( + `dirigent: message_sent wait-for-human triggered channel=${channelId} from=${accountId}`, + ); + return; + } + + if (wasNoReply || hasEndSymbol) { + const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply); + const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol"; + api.logger.info( + `dirigent: turn onSpeakerDone channel=${channelId} from=${accountId} next=${nextSpeaker ?? "dormant"} trigger=${trigger}`, + ); + + if (wasNoReply && nextSpeaker && live.moderatorBotToken) { + const nextUserId = resolveDiscordUserId(api, nextSpeaker); + if (nextUserId) { + const schedulingId = live.schedulingIdentifier || "➡️"; + const handoffMsg = `<@${nextUserId}>${schedulingId}`; + await sendModeratorMessage(live.moderatorBotToken, channelId, handoffMsg, api.logger); + } else { + api.logger.warn(`dirigent: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`); + } + } + } + } catch (err) { + api.logger.warn(`dirigent: message_sent hook failed: ${String(err)}`); + } + }); +} diff --git a/plugin/index.ts b/plugin/index.ts index d0c73f4..20ba5cf 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -7,6 +7,8 @@ import { checkTurn, advanceTurn, resetTurn, onNewMessage, onSpeakerDone, initTur import { startModeratorPresence, stopModeratorPresence } from "./moderator-presence.js"; import { extractDiscordChannelId, extractDiscordChannelIdFromSessionKey } from "./channel-resolver.js"; import { deriveDecisionInputFromPrompt } from "./decision-input.js"; +import { registerBeforeModelResolveHook } from "./hooks/before-model-resolve.js"; +import { registerMessageSentHook } from "./hooks/message-sent.js"; // ── No-Reply API child process lifecycle ────────────────────────────── let noReplyProcess: ChildProcess | null = null; @@ -792,127 +794,21 @@ export default { } }); - api.on("before_model_resolve", async (event, ctx) => { - const key = ctx.sessionKey; - if (!key) return; - - const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig; - ensurePolicyStateLoaded(api, live); - - const prompt = ((event as Record).prompt as string) || ""; - - if (live.enableDebugLogs) { - api.logger.info( - `dirigent: DEBUG_BEFORE_MODEL_RESOLVE ctx=${JSON.stringify({ sessionKey: ctx.sessionKey, messageProvider: ctx.messageProvider, agentId: ctx.agentId })} ` + - `promptPreview=${prompt.slice(0, 300)}`, - ); - } - - const derived = deriveDecisionInputFromPrompt({ - prompt, - messageProvider: ctx.messageProvider, - sessionKey: key, - ctx: ctx as Record, - event: event as Record, - }); - // Only proceed if: discord channel AND prompt contains untrusted metadata - const hasConvMarker = prompt.includes("Conversation info (untrusted metadata):"); - if (live.discordOnly !== false && (!hasConvMarker || derived.channel !== "discord")) return; - - // Always save channelId and accountId mappings for use in later hooks - if (derived.channelId) { - sessionChannelId.set(key, derived.channelId); - } - const resolvedAccountId = resolveAccountId(api, ctx.agentId || ""); - if (resolvedAccountId) { - sessionAccountId.set(key, resolvedAccountId); - } - - let rec = sessionDecision.get(key); - if (!rec || Date.now() - rec.createdAt > DECISION_TTL_MS) { - if (rec) sessionDecision.delete(key); - const decision = evaluateDecision({ - config: live, - channel: derived.channel, - channelId: derived.channelId, - channelPolicies: policyState.channelPolicies, - senderId: derived.senderId, - content: derived.content, - }); - rec = { decision, createdAt: Date.now() }; - sessionDecision.set(key, rec); - pruneDecisionMap(); - if (shouldDebugLog(live, derived.channelId)) { - api.logger.info( - `dirigent: debug before_model_resolve recompute session=${key} ` + - `channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` + - `convSenderId=${String((derived.conv as Record).sender_id ?? "")} ` + - `convSender=${String((derived.conv as Record).sender ?? "")} ` + - `convChannelId=${String((derived.conv as Record).channel_id ?? "")} ` + - `decision=${decision.reason} shouldNoReply=${decision.shouldUseNoReply} shouldInject=${decision.shouldInjectEndMarkerPrompt}`, - ); - } - } - - // Turn-based check: ALWAYS check turn order regardless of evaluateDecision result. - // This ensures only the current speaker can respond even for human messages. - if (derived.channelId) { - ensureTurnOrder(api, derived.channelId); - const accountId = resolveAccountId(api, ctx.agentId || ""); - if (accountId) { - const turnCheck = checkTurn(derived.channelId, accountId); - if (!turnCheck.allowed) { - // Forced no-reply - record this session as not allowed to speak - sessionAllowed.set(key, false); - api.logger.info( - `dirigent: turn gate blocked session=${key} accountId=${accountId} currentSpeaker=${turnCheck.currentSpeaker} reason=${turnCheck.reason}`, - ); - return { - providerOverride: live.noReplyProvider, - modelOverride: live.noReplyModel, - }; - } - // Allowed to speak - record this session as allowed - sessionAllowed.set(key, true); - } - } - - if (!rec.decision.shouldUseNoReply) { - if (rec.needsRestore) { - sessionDecision.delete(key); - return { - providerOverride: undefined, - modelOverride: undefined, - }; - } - return; - } - - rec.needsRestore = true; - sessionDecision.set(key, rec); - - if (live.enableDebugLogs) { - const prompt = ((event as Record).prompt as string) || ""; - const hasConvMarker = prompt.includes("Conversation info (untrusted metadata):"); - api.logger.info( - `dirigent: DEBUG_NO_REPLY_TRIGGER session=${key} ` + - `channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` + - `convSenderId=${String((derived.conv as Record).sender_id ?? "")} ` + - `convSender=${String((derived.conv as Record).sender ?? "")} ` + - `decision=${rec.decision.reason} ` + - `shouldNoReply=${rec.decision.shouldUseNoReply} shouldInject=${rec.decision.shouldInjectEndMarkerPrompt} ` + - `hasConvMarker=${hasConvMarker} promptLen=${prompt.length}`, - ); - } - - api.logger.info( - `dirigent: override model for session=${key}, provider=${live.noReplyProvider}, model=${live.noReplyModel}, reason=${rec.decision.reason}`, - ); - - return { - providerOverride: live.noReplyProvider, - modelOverride: live.noReplyModel, - }; + registerBeforeModelResolveHook({ + api, + baseConfig: baseConfig as DirigentConfig, + sessionDecision, + sessionAllowed, + sessionChannelId, + sessionAccountId, + policyState, + DECISION_TTL_MS, + ensurePolicyStateLoaded, + getLivePluginConfig, + resolveAccountId, + pruneDecisionMap, + shouldDebugLog, + ensureTurnOrder, }); api.on("before_prompt_build", async (event, ctx) => { @@ -1206,87 +1102,17 @@ export default { }); // Turn advance: when an agent sends a message, check if it signals end of turn - api.on("message_sent", async (event, ctx) => { - try { - const key = ctx.sessionKey; - const c = (ctx || {}) as Record; - const e = (event || {}) as Record; - - api.logger.info( - `dirigent: DEBUG message_sent RAW ctxKeys=${JSON.stringify(Object.keys(c))} eventKeys=${JSON.stringify(Object.keys(e))} ` + - `ctx.channelId=${String(c.channelId ?? "undefined")} ctx.conversationId=${String(c.conversationId ?? "undefined")} ` + - `ctx.accountId=${String(c.accountId ?? "undefined")} event.to=${String(e.to ?? "undefined")} ` + - `session=${key ?? "undefined"}`, - ); - - let channelId = extractDiscordChannelId(c, e); - if (!channelId && key) { - channelId = sessionChannelId.get(key); - } - if (!channelId && key) { - channelId = extractDiscordChannelIdFromSessionKey(key); - } - const accountId = (ctx.accountId as string | undefined) || (key ? sessionAccountId.get(key) : undefined); - const content = (event.content as string) || ""; - - api.logger.info( - `dirigent: DEBUG message_sent RESOLVED session=${key ?? "undefined"} channelId=${channelId ?? "undefined"} accountId=${accountId ?? "undefined"} content=${content.slice(0, 100)}`, - ); - - if (!channelId || !accountId) return; - - const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig; - ensurePolicyStateLoaded(api, live); - const policy = resolvePolicy(live, channelId, policyState.channelPolicies); - - const trimmed = content.trim(); - const isEmpty = trimmed.length === 0; - const isNoReply = /^NO_REPLY$/i.test(trimmed) || /^HEARTBEAT_OK$/i.test(trimmed); - const lastChar = trimmed.length > 0 ? Array.from(trimmed).pop() || "" : ""; - const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar); - const waitId = live.waitIdentifier || "👤"; - const hasWaitIdentifier = !!lastChar && lastChar === waitId; - const wasNoReply = isEmpty || isNoReply; - - // Skip if turn was already advanced in before_message_write - if (key && sessionTurnHandled.has(key)) { - sessionTurnHandled.delete(key); - api.logger.info( - `dirigent: message_sent skipping turn advance (already handled in before_message_write) session=${key} channel=${channelId}`, - ); - return; - } - - // Wait identifier detection (fallback if not caught in before_message_write) - if (hasWaitIdentifier) { - setWaitingForHuman(channelId); - api.logger.info( - `dirigent: message_sent wait-for-human triggered channel=${channelId} from=${accountId}`, - ); - return; - } - - if (wasNoReply || hasEndSymbol) { - const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply); - const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol"; - api.logger.info( - `dirigent: turn onSpeakerDone channel=${channelId} from=${accountId} next=${nextSpeaker ?? "dormant"} trigger=${trigger}`, - ); - // Moderator handoff using scheduling identifier format - if (wasNoReply && nextSpeaker && live.moderatorBotToken) { - const nextUserId = resolveDiscordUserId(api, nextSpeaker); - if (nextUserId) { - const schedulingId = live.schedulingIdentifier || "➡️"; - const handoffMsg = `<@${nextUserId}>${schedulingId}`; - sendModeratorMessage(live.moderatorBotToken, channelId, handoffMsg, api.logger); - } else { - api.logger.warn(`dirigent: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`); - } - } - } - } catch (err) { - api.logger.warn(`dirigent: message_sent hook failed: ${String(err)}`); - } + registerMessageSentHook({ + api, + baseConfig: baseConfig as DirigentConfig, + policyState, + sessionChannelId, + sessionAccountId, + sessionTurnHandled, + ensurePolicyStateLoaded, + getLivePluginConfig, + resolveDiscordUserId, + sendModeratorMessage, }); }, };