feat: rewrite plugin as v2 with globalThis-based turn management
Complete rewrite of the Dirigent plugin turn management system to work correctly with OpenClaw's VM-context-per-session architecture: - All turn state stored on globalThis (persists across VM context hot-reloads) - Hooks registered unconditionally on every api instance; event-level dedup (runId Set for agent_end, WeakSet for before_model_resolve) prevents double-processing - Gateway lifecycle events (gateway_start/stop) guarded once via globalThis flag - Shared initializingChannels lock prevents concurrent channel init across VM contexts in message_received and before_model_resolve - New ChannelStore and IdentityRegistry replace old policy/session-state modules - Added agent_end hook with tail-match polling for Discord delivery confirmation - Added web control page, padded-cell auto-scan, discussion tool support - Removed obsolete v1 modules: channel-resolver, channel-modes, discussion-service, session-state, turn-bootstrap, policy/store, rules, decision-input Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
221
plugin/hooks/agent-end.ts
Normal file
221
plugin/hooks/agent-end.ts
Normal file
@@ -0,0 +1,221 @@
|
||||
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
||||
import type { ChannelStore } from "../core/channel-store.js";
|
||||
import type { IdentityRegistry } from "../core/identity-registry.js";
|
||||
import { parseDiscordChannelId } from "./before-model-resolve.js";
|
||||
import {
|
||||
isCurrentSpeaker,
|
||||
getAnchor,
|
||||
advanceSpeaker,
|
||||
wakeFromDormant,
|
||||
hasSpeakers,
|
||||
getDebugInfo,
|
||||
isTurnPending,
|
||||
clearTurnPending,
|
||||
consumeBlockedPending,
|
||||
type SpeakerEntry,
|
||||
} from "../turn-manager.js";
|
||||
import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js";
|
||||
import { pollForTailMatch, sendAndDelete } from "../core/moderator-discord.js";
|
||||
|
||||
const TAIL_LENGTH = 40;
|
||||
const TAIL_MATCH_TIMEOUT_MS = 15_000;
|
||||
|
||||
/**
|
||||
* Process-level deduplication for agent_end events.
|
||||
* OpenClaw hot-reloads plugin modules (re-imports), stacking duplicate handlers.
|
||||
* Using globalThis ensures the dedup Set survives module reloads and is shared
|
||||
* by all handler instances in the same process.
|
||||
*/
|
||||
const _AGENT_END_DEDUP_KEY = "_dirigentProcessedAgentEndRunIds";
|
||||
if (!(globalThis as Record<string, unknown>)[_AGENT_END_DEDUP_KEY]) {
|
||||
(globalThis as Record<string, unknown>)[_AGENT_END_DEDUP_KEY] = new Set<string>();
|
||||
}
|
||||
const processedAgentEndRunIds: Set<string> = (globalThis as Record<string, unknown>)[_AGENT_END_DEDUP_KEY] as Set<string>;
|
||||
|
||||
/** Extract plain text from agent_end event.messages last assistant entry. */
|
||||
function extractFinalText(messages: unknown[]): string {
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const msg = messages[i] as Record<string, unknown> | null;
|
||||
if (!msg || msg.role !== "assistant") continue;
|
||||
const content = msg.content;
|
||||
if (typeof content === "string") return content;
|
||||
if (Array.isArray(content)) {
|
||||
let text = "";
|
||||
for (const part of content) {
|
||||
const p = part as Record<string, unknown>;
|
||||
if (p?.type === "text" && typeof p.text === "string") text += p.text;
|
||||
}
|
||||
return text;
|
||||
}
|
||||
break;
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
function isEmptyTurn(text: string): boolean {
|
||||
const t = text.trim();
|
||||
return t === "" || /^NO$/i.test(t) || /^NO_REPLY$/i.test(t);
|
||||
}
|
||||
|
||||
export type AgentEndDeps = {
|
||||
api: OpenClawPluginApi;
|
||||
channelStore: ChannelStore;
|
||||
identityRegistry: IdentityRegistry;
|
||||
moderatorBotToken: string | undefined;
|
||||
scheduleIdentifier: string;
|
||||
/** Called when discussion channel enters dormant — to send idle reminder. */
|
||||
onDiscussionDormant?: (channelId: string) => Promise<void>;
|
||||
};
|
||||
|
||||
/** Exposed so message-received can interrupt an in-progress tail-match wait. */
|
||||
export type InterruptFn = (channelId: string) => void;
|
||||
|
||||
export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn {
|
||||
const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, onDiscussionDormant } = deps;
|
||||
|
||||
const interruptedChannels = new Set<string>();
|
||||
|
||||
function interrupt(channelId: string): void {
|
||||
interruptedChannels.add(channelId);
|
||||
setTimeout(() => interruptedChannels.delete(channelId), 5000);
|
||||
}
|
||||
|
||||
async function buildSpeakerList(channelId: string): Promise<SpeakerEntry[]> {
|
||||
if (!moderatorBotToken) return [];
|
||||
const agentIds = await fetchVisibleChannelBotAccountIds(api, channelId, identityRegistry);
|
||||
const result: SpeakerEntry[] = [];
|
||||
for (const agentId of agentIds) {
|
||||
const identity = identityRegistry.findByAgentId(agentId);
|
||||
if (identity) result.push({ agentId, discordUserId: identity.discordUserId });
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
async function triggerNextSpeaker(channelId: string, next: SpeakerEntry): Promise<void> {
|
||||
if (!moderatorBotToken) return;
|
||||
const msg = `<@${next.discordUserId}>${scheduleIdentifier}`;
|
||||
await sendAndDelete(moderatorBotToken, channelId, msg, api.logger);
|
||||
api.logger.info(`dirigent: triggered next speaker agentId=${next.agentId} channel=${channelId}`);
|
||||
}
|
||||
|
||||
api.on("agent_end", async (event, ctx) => {
|
||||
try {
|
||||
// Deduplicate: skip if this runId was already processed by another handler
|
||||
// instance (can happen when OpenClaw hot-reloads the plugin in the same process).
|
||||
const runId = (event as Record<string, unknown>).runId as string | undefined;
|
||||
if (runId) {
|
||||
if (processedAgentEndRunIds.has(runId)) return;
|
||||
processedAgentEndRunIds.add(runId);
|
||||
// Evict old entries to prevent unbounded growth (keep last 500)
|
||||
if (processedAgentEndRunIds.size > 500) {
|
||||
const oldest = processedAgentEndRunIds.values().next().value;
|
||||
if (oldest) processedAgentEndRunIds.delete(oldest);
|
||||
}
|
||||
}
|
||||
|
||||
const sessionKey = ctx.sessionKey;
|
||||
if (!sessionKey) return;
|
||||
|
||||
const channelId = parseDiscordChannelId(sessionKey);
|
||||
if (!channelId) return;
|
||||
|
||||
const mode = channelStore.getMode(channelId);
|
||||
if (mode === "none" || mode === "work" || mode === "report") return;
|
||||
|
||||
if (!hasSpeakers(channelId)) return;
|
||||
|
||||
const agentId = ctx.agentId;
|
||||
if (!agentId) return;
|
||||
if (!isCurrentSpeaker(channelId, agentId)) return;
|
||||
|
||||
// Only process agent_ends for turns that were explicitly started by before_model_resolve.
|
||||
// This prevents stale NO_REPLY completions (from initial suppression) from being counted.
|
||||
if (!isTurnPending(channelId, agentId)) {
|
||||
api.logger.info(`dirigent: agent_end skipping stale turn agentId=${agentId} channel=${channelId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Consume a blocked-pending slot if any exists. These are NO_REPLY completions
|
||||
// from before_model_resolve blocking events (non-speaker or init-suppressed) that
|
||||
// fire late — after the agent became the current speaker — due to history-building
|
||||
// overhead (~10s). We skip them until the counter is exhausted, at which point
|
||||
// the next agent_end is the real LLM response.
|
||||
if (consumeBlockedPending(channelId, agentId)) {
|
||||
api.logger.info(`dirigent: agent_end skipping blocked-pending NO_REPLY agentId=${agentId} channel=${channelId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
clearTurnPending(channelId, agentId);
|
||||
|
||||
const messages = Array.isArray((event as Record<string, unknown>).messages)
|
||||
? ((event as Record<string, unknown>).messages as unknown[])
|
||||
: [];
|
||||
const finalText = extractFinalText(messages);
|
||||
const empty = isEmptyTurn(finalText);
|
||||
|
||||
api.logger.info(
|
||||
`dirigent: agent_end channel=${channelId} agentId=${agentId} empty=${empty} text=${finalText.slice(0, 80)}`,
|
||||
);
|
||||
|
||||
if (!empty) {
|
||||
// Real turn: wait for Discord delivery via tail-match polling
|
||||
const identity = identityRegistry.findByAgentId(agentId);
|
||||
if (identity && moderatorBotToken) {
|
||||
const anchorId = getAnchor(channelId, agentId) ?? "0";
|
||||
const tail = [...finalText].slice(-TAIL_LENGTH).join("");
|
||||
|
||||
const { matched, interrupted } = await pollForTailMatch({
|
||||
token: moderatorBotToken,
|
||||
channelId,
|
||||
anchorId,
|
||||
agentDiscordUserId: identity.discordUserId,
|
||||
tailFingerprint: tail,
|
||||
timeoutMs: TAIL_MATCH_TIMEOUT_MS,
|
||||
isInterrupted: () => interruptedChannels.has(channelId),
|
||||
});
|
||||
|
||||
if (interrupted) {
|
||||
api.logger.info(`dirigent: tail-match interrupted channel=${channelId} — wake-from-dormant`);
|
||||
const first = wakeFromDormant(channelId);
|
||||
if (first) await triggerNextSpeaker(channelId, first);
|
||||
return;
|
||||
}
|
||||
if (!matched) {
|
||||
api.logger.warn(`dirigent: tail-match timeout channel=${channelId} agentId=${agentId} — advancing anyway`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Determine shuffle mode from current list size
|
||||
const debugBefore = getDebugInfo(channelId);
|
||||
const currentListSize = debugBefore.exists ? (debugBefore.speakerList?.length ?? 0) : 0;
|
||||
const isShuffle = currentListSize > 2;
|
||||
// In shuffle mode, pass agentId as previousLastAgentId so new list avoids it as first
|
||||
const previousLastAgentId = isShuffle ? agentId : undefined;
|
||||
|
||||
const { next, enteredDormant } = await advanceSpeaker(
|
||||
channelId,
|
||||
agentId,
|
||||
empty,
|
||||
() => buildSpeakerList(channelId),
|
||||
previousLastAgentId,
|
||||
);
|
||||
|
||||
if (enteredDormant) {
|
||||
api.logger.info(`dirigent: channel=${channelId} entered dormant`);
|
||||
if (mode === "discussion") {
|
||||
await onDiscussionDormant?.(channelId).catch((err) => {
|
||||
api.logger.warn(`dirigent: onDiscussionDormant failed: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (next) await triggerNextSpeaker(channelId, next);
|
||||
} catch (err) {
|
||||
api.logger.warn(`dirigent: agent_end hook error: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
|
||||
return interrupt;
|
||||
}
|
||||
Reference in New Issue
Block a user