- Replace standalone no-reply-api Docker service with unified sidecar (services/main.mjs) that routes /no-reply/* and /moderator/* and starts/stops with openclaw-gateway - Add moderator Discord Gateway client (services/moderator/index.mjs) for real-time MESSAGE_CREATE push instead of polling; notifies plugin via HTTP callback - Add plugin HTTP routes (plugin/web/dirigent-api.ts) for moderator → plugin callbacks (wake-from-dormant, interrupt tail-match) - Fix tool registration format: AgentTool requires execute: not handler:; factory form for tools needing ctx - Rename no-reply-process.ts → sidecar-process.ts, startNoReplyApi → startSideCar - Remove dead config fields from openclaw.plugin.json (humanList, agentList, listMode, channelPoliciesFile, endSymbols, waitIdentifier, multiMessage*, bypassUserIds, etc.) - Rename noReplyPort → sideCarPort - Remove docker-compose.yml, dev-up/down scripts, package-plugin.mjs, test-no-reply-api.mjs - Update install.mjs: clean dist before build, copy services/, drop dead config writes - Update README, Makefile, smoke script for new architecture Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
273 lines
12 KiB
TypeScript
273 lines
12 KiB
TypeScript
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
|
|
import type { ChannelStore } from "../core/channel-store.js";
|
|
import type { IdentityRegistry } from "../core/identity-registry.js";
|
|
import { parseDiscordChannelId } from "./before-model-resolve.js";
|
|
import {
|
|
isCurrentSpeaker,
|
|
getAnchor,
|
|
advanceSpeaker,
|
|
wakeFromDormant,
|
|
isDormant,
|
|
hasSpeakers,
|
|
getDebugInfo,
|
|
isTurnPending,
|
|
clearTurnPending,
|
|
consumeBlockedPending,
|
|
type SpeakerEntry,
|
|
} from "../turn-manager.js";
|
|
import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js";
|
|
import { pollForTailMatch, sendScheduleTrigger } from "../core/moderator-discord.js";
|
|
|
|
/**
|
|
* Process-level deduplication for agent_end events.
|
|
* OpenClaw hot-reloads plugin modules (re-imports), stacking duplicate handlers.
|
|
* Using globalThis ensures the dedup Set survives module reloads and is shared
|
|
* by all handler instances in the same process.
|
|
*/
|
|
const _AGENT_END_DEDUP_KEY = "_dirigentProcessedAgentEndRunIds";
|
|
if (!(globalThis as Record<string, unknown>)[_AGENT_END_DEDUP_KEY]) {
|
|
(globalThis as Record<string, unknown>)[_AGENT_END_DEDUP_KEY] = new Set<string>();
|
|
}
|
|
const processedAgentEndRunIds: Set<string> = (globalThis as Record<string, unknown>)[_AGENT_END_DEDUP_KEY] as Set<string>;
|
|
|
|
/**
|
|
* Per-channel advance lock: prevents two concurrent agent_end handler instances
|
|
* (from different VM contexts with different runIds) from both calling advanceSpeaker
|
|
* for the same channel at the same time, which would double-advance the speaker index.
|
|
*/
|
|
const _ADVANCE_LOCK_KEY = "_dirigentAdvancingChannels";
|
|
if (!(globalThis as Record<string, unknown>)[_ADVANCE_LOCK_KEY]) {
|
|
(globalThis as Record<string, unknown>)[_ADVANCE_LOCK_KEY] = new Set<string>();
|
|
}
|
|
const advancingChannels: Set<string> = (globalThis as Record<string, unknown>)[_ADVANCE_LOCK_KEY] as Set<string>;
|
|
|
|
/** Extract plain text from agent_end event.messages last assistant entry. */
|
|
function extractFinalText(messages: unknown[]): string {
|
|
for (let i = messages.length - 1; i >= 0; i--) {
|
|
const msg = messages[i] as Record<string, unknown> | null;
|
|
if (!msg || msg.role !== "assistant") continue;
|
|
const content = msg.content;
|
|
if (typeof content === "string") return content;
|
|
if (Array.isArray(content)) {
|
|
let text = "";
|
|
for (const part of content) {
|
|
const p = part as Record<string, unknown>;
|
|
if (p?.type === "text" && typeof p.text === "string") text += p.text;
|
|
}
|
|
return text;
|
|
}
|
|
break;
|
|
}
|
|
return "";
|
|
}
|
|
|
|
function isEmptyTurn(text: string): boolean {
|
|
const t = text.trim();
|
|
if (t === "") return true;
|
|
// Check if the last non-empty line is NO or NO_REPLY (agents often write
|
|
// explanatory text before the final answer on the last line).
|
|
const lines = t.split("\n").map((l) => l.trim()).filter((l) => l !== "");
|
|
const last = lines[lines.length - 1] ?? "";
|
|
return /^NO$/i.test(last) || /^NO_REPLY$/i.test(last);
|
|
}
|
|
|
|
export type AgentEndDeps = {
|
|
api: OpenClawPluginApi;
|
|
channelStore: ChannelStore;
|
|
identityRegistry: IdentityRegistry;
|
|
moderatorBotToken: string | undefined;
|
|
scheduleIdentifier: string;
|
|
debugMode: boolean;
|
|
/** Called when discussion channel enters dormant — to send idle reminder. */
|
|
onDiscussionDormant?: (channelId: string) => Promise<void>;
|
|
};
|
|
|
|
/** Exposed so message-received can interrupt an in-progress tail-match wait. */
|
|
export type InterruptFn = (channelId: string) => void;
|
|
|
|
export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn {
|
|
const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, debugMode, onDiscussionDormant } = deps;
|
|
|
|
const interruptedChannels = new Set<string>();
|
|
|
|
function interrupt(channelId: string): void {
|
|
interruptedChannels.add(channelId);
|
|
setTimeout(() => interruptedChannels.delete(channelId), 5000);
|
|
}
|
|
|
|
async function buildSpeakerList(channelId: string): Promise<SpeakerEntry[]> {
|
|
if (!moderatorBotToken) return [];
|
|
const agentIds = await fetchVisibleChannelBotAccountIds(api, channelId, identityRegistry);
|
|
const result: SpeakerEntry[] = [];
|
|
for (const agentId of agentIds) {
|
|
const identity = identityRegistry.findByAgentId(agentId);
|
|
if (identity) result.push({ agentId, discordUserId: identity.discordUserId });
|
|
}
|
|
return result;
|
|
}
|
|
|
|
async function triggerNextSpeaker(channelId: string, next: SpeakerEntry): Promise<void> {
|
|
if (!moderatorBotToken) return;
|
|
const msg = `<@${next.discordUserId}>${scheduleIdentifier}`;
|
|
await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, debugMode);
|
|
api.logger.info(`dirigent: triggered next speaker agentId=${next.agentId} channel=${channelId}`);
|
|
}
|
|
|
|
api.on("agent_end", async (event, ctx) => {
|
|
try {
|
|
// Deduplicate: skip if this runId was already processed by another handler
|
|
// instance (can happen when OpenClaw hot-reloads the plugin in the same process).
|
|
const runId = (event as Record<string, unknown>).runId as string | undefined;
|
|
if (runId) {
|
|
if (processedAgentEndRunIds.has(runId)) return;
|
|
processedAgentEndRunIds.add(runId);
|
|
// Evict old entries to prevent unbounded growth (keep last 500)
|
|
if (processedAgentEndRunIds.size > 500) {
|
|
const oldest = processedAgentEndRunIds.values().next().value;
|
|
if (oldest) processedAgentEndRunIds.delete(oldest);
|
|
}
|
|
}
|
|
|
|
const sessionKey = ctx.sessionKey;
|
|
if (!sessionKey) return;
|
|
|
|
const channelId = parseDiscordChannelId(sessionKey);
|
|
if (!channelId) return;
|
|
|
|
const mode = channelStore.getMode(channelId);
|
|
if (mode === "none" || mode === "work" || mode === "report") return;
|
|
|
|
if (!hasSpeakers(channelId)) return;
|
|
|
|
const agentId = ctx.agentId;
|
|
if (!agentId) return;
|
|
|
|
// Extract final text early so we can drain blocked_pending at every early-return point.
|
|
// This prevents the counter from staying inflated when stale NO_REPLYs are discarded
|
|
// by !isCurrentSpeaker or !isTurnPending without reaching consumeBlockedPending.
|
|
const messages = Array.isArray((event as Record<string, unknown>).messages)
|
|
? ((event as Record<string, unknown>).messages as unknown[])
|
|
: [];
|
|
const finalText = extractFinalText(messages);
|
|
const empty = isEmptyTurn(finalText);
|
|
|
|
if (!isCurrentSpeaker(channelId, agentId)) {
|
|
// Drain blocked_pending for non-speaker stale NO_REPLYs. Without this, suppressions
|
|
// that happen while this agent is not the current speaker inflate the counter and cause
|
|
// its subsequent real empty turn to be misidentified as stale.
|
|
if (empty) consumeBlockedPending(channelId, agentId);
|
|
return;
|
|
}
|
|
|
|
// Only process agent_ends for turns that were explicitly started by before_model_resolve.
|
|
// This prevents stale NO_REPLY completions (from initial suppression) from being counted.
|
|
if (!isTurnPending(channelId, agentId)) {
|
|
api.logger.info(`dirigent: agent_end skipping stale turn agentId=${agentId} channel=${channelId}`);
|
|
// Also drain here: stale may arrive after clearTurnPending but before markTurnStarted.
|
|
if (empty) consumeBlockedPending(channelId, agentId);
|
|
return;
|
|
}
|
|
|
|
// Consume a blocked-pending slot only for self-wakeup stales: NO_REPLY completions
|
|
// from suppressed self-wakeup before_model_resolve calls that fire while the agent
|
|
// is current speaker with a turn already in progress.
|
|
if (empty && consumeBlockedPending(channelId, agentId)) {
|
|
api.logger.info(`dirigent: agent_end skipping blocked-pending NO_REPLY agentId=${agentId} channel=${channelId}`);
|
|
return;
|
|
}
|
|
|
|
api.logger.info(
|
|
`dirigent: agent_end channel=${channelId} agentId=${agentId} empty=${empty} text=${finalText.slice(0, 80)}`,
|
|
);
|
|
|
|
if (!empty) {
|
|
// Real turn: wait for Discord delivery before triggering next speaker.
|
|
// Anchor was set in before_model_resolve just before the LLM call, so any
|
|
// message from the agent after the anchor must be from this turn.
|
|
// NOTE: clearTurnPending is intentionally deferred until after pollForTailMatch
|
|
// returns. While waiting, isTurnPending remains true so that any re-trigger of
|
|
// this agent is correctly treated as a self-wakeup (suppressed), preventing it
|
|
// from starting a second real turn during the tail-match window.
|
|
const identity = identityRegistry.findByAgentId(agentId);
|
|
if (identity && moderatorBotToken) {
|
|
const anchorId = getAnchor(channelId, agentId) ?? "0";
|
|
|
|
const { matched: _matched, interrupted } = await pollForTailMatch({
|
|
token: moderatorBotToken,
|
|
channelId,
|
|
anchorId,
|
|
agentDiscordUserId: identity.discordUserId,
|
|
isInterrupted: () => interruptedChannels.has(channelId),
|
|
});
|
|
|
|
if (interrupted) {
|
|
if (isDormant(channelId)) {
|
|
// Channel is dormant: a new external message woke it — restart from first speaker
|
|
api.logger.info(`dirigent: tail-match interrupted (dormant) channel=${channelId} — waking`);
|
|
clearTurnPending(channelId, agentId);
|
|
const first = wakeFromDormant(channelId);
|
|
if (first) await triggerNextSpeaker(channelId, first);
|
|
return;
|
|
}
|
|
// Not dormant: interrupt was a spurious trigger (e.g. moderator bot message).
|
|
// Fall through to normal advance so the turn cycle continues correctly.
|
|
api.logger.info(`dirigent: tail-match interrupted (non-dormant) channel=${channelId} — advancing normally`);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Guard against concurrent advanceSpeaker calls for the same channel.
|
|
// Two VM context instances may both reach this point with different runIds
|
|
// (when the dedup Set doesn't catch them); only the first should advance.
|
|
if (advancingChannels.has(channelId)) {
|
|
api.logger.info(`dirigent: agent_end advance already in progress, skipping channel=${channelId} agentId=${agentId}`);
|
|
return;
|
|
}
|
|
advancingChannels.add(channelId);
|
|
|
|
let next: ReturnType<typeof import("../turn-manager.js").getCurrentSpeaker> | null = null;
|
|
let enteredDormant = false;
|
|
try {
|
|
// Determine shuffle mode from current list size
|
|
const debugBefore = getDebugInfo(channelId);
|
|
const currentListSize = debugBefore.exists ? (debugBefore.speakerList?.length ?? 0) : 0;
|
|
const isShuffle = currentListSize > 2;
|
|
// In shuffle mode, pass agentId as previousLastAgentId so new list avoids it as first
|
|
const previousLastAgentId = isShuffle ? agentId : undefined;
|
|
|
|
({ next, enteredDormant } = await advanceSpeaker(
|
|
channelId,
|
|
agentId,
|
|
empty,
|
|
() => buildSpeakerList(channelId),
|
|
previousLastAgentId,
|
|
));
|
|
} finally {
|
|
advancingChannels.delete(channelId);
|
|
}
|
|
|
|
// Clear turn pending AFTER advanceSpeaker completes. This ensures isTurnPending
|
|
// remains true during the async rebuildFn window at cycle boundaries, preventing
|
|
// re-triggers from starting a second real turn while currentIndex is still at the
|
|
// outgoing speaker's position.
|
|
clearTurnPending(channelId, agentId);
|
|
|
|
if (enteredDormant) {
|
|
api.logger.info(`dirigent: channel=${channelId} entered dormant`);
|
|
if (mode === "discussion") {
|
|
await onDiscussionDormant?.(channelId).catch((err) => {
|
|
api.logger.warn(`dirigent: onDiscussionDormant failed: ${String(err)}`);
|
|
});
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (next) await triggerNextSpeaker(channelId, next);
|
|
} catch (err) {
|
|
api.logger.warn(`dirigent: agent_end hook error: ${String(err)}`);
|
|
}
|
|
});
|
|
|
|
return interrupt;
|
|
}
|