Files
Dirigent/plugin/index.ts

1222 lines
51 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { spawn, type ChildProcess } from "node:child_process";
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
import { evaluateDecision, resolvePolicy, type ChannelPolicy, type Decision, type DirigentConfig } from "./rules.js";
import { checkTurn, advanceTurn, resetTurn, onNewMessage, onSpeakerDone, initTurnOrder, getTurnDebugInfo } from "./turn-manager.js";
import { startModeratorPresence, stopModeratorPresence } from "./moderator-presence.js";
// ── No-Reply API child process lifecycle ──────────────────────────────
let noReplyProcess: ChildProcess | null = null;
function startNoReplyApi(logger: { info: (m: string) => void; warn: (m: string) => void }, pluginDir: string, port = 8787): void {
logger.info(`dirigent: startNoReplyApi called, pluginDir=${pluginDir}`);
if (noReplyProcess) {
logger.info("dirigent: no-reply API already running, skipping");
return;
}
const serverPath = path.resolve(pluginDir, "..", "no-reply-api", "server.mjs");
logger.info(`dirigent: resolved serverPath=${serverPath}`);
if (!fs.existsSync(serverPath)) {
logger.warn(`dirigent: no-reply API server not found at ${serverPath}, skipping`);
return;
}
logger.info(`dirigent: no-reply API server found, spawning process...`);
noReplyProcess = spawn(process.execPath, [serverPath], {
env: { ...process.env, PORT: String(port) },
stdio: ["ignore", "pipe", "pipe"],
detached: false,
});
noReplyProcess.stdout?.on("data", (d: Buffer) => logger.info(`dirigent: no-reply-api: ${d.toString().trim()}`));
noReplyProcess.stderr?.on("data", (d: Buffer) => logger.warn(`dirigent: no-reply-api: ${d.toString().trim()}`));
noReplyProcess.on("exit", (code, signal) => {
logger.info(`dirigent: no-reply API exited (code=${code}, signal=${signal})`);
noReplyProcess = null;
});
logger.info(`dirigent: no-reply API started (pid=${noReplyProcess.pid}, port=${port})`);
}
function stopNoReplyApi(logger: { info: (m: string) => void }): void {
if (!noReplyProcess) return;
logger.info("dirigent: stopping no-reply API");
noReplyProcess.kill("SIGTERM");
noReplyProcess = null;
}
type DiscordControlAction = "channel-private-create" | "channel-private-update" | "member-list";
type DecisionRecord = {
decision: Decision;
createdAt: number;
needsRestore?: boolean;
};
type PolicyState = {
filePath: string;
channelPolicies: Record<string, ChannelPolicy>;
};
type DebugConfig = {
enableDebugLogs?: boolean;
debugLogChannelIds?: string[];
};
const sessionDecision = new Map<string, DecisionRecord>();
const sessionAllowed = new Map<string, boolean>(); // Track if session was allowed to speak (true) or forced no-reply (false)
const sessionInjected = new Set<string>(); // Track which sessions have already injected the end marker
const sessionChannelId = new Map<string, string>(); // Track sessionKey -> channelId mapping
const sessionAccountId = new Map<string, string>(); // Track sessionKey -> accountId mapping
const sessionTurnHandled = new Set<string>(); // Track sessions where turn was already advanced in before_message_write
const MAX_SESSION_DECISIONS = 2000;
const DECISION_TTL_MS = 5 * 60 * 1000;
function buildEndMarkerInstruction(endSymbols: string[], isGroupChat: boolean, schedulingIdentifier: string): string {
const symbols = endSymbols.length > 0 ? endSymbols.join("") : "🔚";
let instruction = `Your response MUST end with ${symbols}. Exception: gateway keywords (e.g. NO_REPLY, HEARTBEAT_OK) must NOT include ${symbols}.`;
if (isGroupChat) {
instruction += `\n\nGroup chat rules: If this message is not relevant to you, does not need your response, or you have nothing valuable to add, reply with NO_REPLY. Do not speak just for the sake of speaking.`;
}
return instruction;
}
function buildSchedulingIdentifierInstruction(schedulingIdentifier: string): string {
return `\n\nScheduling identifier: "${schedulingIdentifier}". This identifier itself is meaningless — it carries no semantic content. When you receive a message containing <@YOUR_USER_ID> followed by the scheduling identifier, check recent chat history and decide whether you have something to say. If not, reply NO_REPLY.`;
}
const policyState: PolicyState = {
filePath: "",
channelPolicies: {},
};
function normalizeChannel(ctx: Record<string, unknown>): string {
const candidates = [ctx.commandSource, ctx.messageProvider, ctx.channelId, ctx.channel];
for (const c of candidates) {
if (typeof c === "string" && c.trim()) return c.trim().toLowerCase();
}
return "";
}
/**
* Extract the actual Discord channel ID from a conversationId or "to" field.
* OpenClaw uses format "channel:<snowflake>" for Discord conversations.
* Also tries event.to and event.metadata.to as fallbacks.
*/
function extractDiscordChannelId(ctx: Record<string, unknown>, event?: Record<string, unknown>): string | undefined {
const candidates: unknown[] = [
ctx.conversationId,
event?.to,
(event?.metadata as Record<string, unknown>)?.to,
];
for (const c of candidates) {
if (typeof c === "string" && c.trim()) {
const s = c.trim();
// Handle "channel:123456" format
if (s.startsWith("channel:")) {
const id = s.slice("channel:".length);
if (/^\d+$/.test(id)) return id;
}
// Handle "discord:channel:123456" format
if (s.startsWith("discord:channel:")) {
const id = s.slice("discord:channel:".length);
if (/^\d+$/.test(id)) return id;
}
// If it's a raw snowflake (all digits), use directly
if (/^\d{15,}$/.test(s)) return s;
}
}
return undefined;
}
function normalizeSender(event: Record<string, unknown>, ctx: Record<string, unknown>): string | undefined {
const direct = [ctx.senderId, ctx.from, event.from];
for (const v of direct) {
if (typeof v === "string" && v.trim()) return v.trim();
}
const meta = (event.metadata || ctx.metadata) as Record<string, unknown> | undefined;
if (!meta) return undefined;
const metaCandidates = [meta.senderId, meta.sender_id, meta.userId, meta.user_id];
for (const v of metaCandidates) {
if (typeof v === "string" && v.trim()) return v.trim();
}
return undefined;
}
function extractUntrustedConversationInfo(text: string): Record<string, unknown> | undefined {
const marker = "Conversation info (untrusted metadata):";
const idx = text.indexOf(marker);
if (idx < 0) return undefined;
const tail = text.slice(idx + marker.length);
const m = tail.match(/```json\s*([\s\S]*?)\s*```/i);
if (!m) return undefined;
try {
const parsed = JSON.parse(m[1]);
return parsed && typeof parsed === "object" ? (parsed as Record<string, unknown>) : undefined;
} catch {
return undefined;
}
}
function deriveDecisionInputFromPrompt(
prompt: string,
messageProvider?: string,
channelIdFromCtx?: string,
): {
channel: string;
channelId?: string;
senderId?: string;
content: string;
conv: Record<string, unknown>;
} {
const conv = extractUntrustedConversationInfo(prompt) || {};
const channel = (messageProvider || "").toLowerCase();
// Priority: ctx.channelId > conv.chat_id > conversation_label > conv.channel_id
let channelId = channelIdFromCtx;
if (!channelId) {
// Try chat_id field (format "channel:123456")
if (typeof conv.chat_id === "string" && conv.chat_id.startsWith("channel:")) {
channelId = conv.chat_id.slice("channel:".length);
}
// Try conversation_label (format "Guild #name channel id:123456")
if (!channelId && typeof conv.conversation_label === "string") {
const labelMatch = conv.conversation_label.match(/channel id:(\d+)/);
if (labelMatch) channelId = labelMatch[1];
}
// Try channel_id field directly
if (!channelId && typeof conv.channel_id === "string" && conv.channel_id) {
channelId = conv.channel_id;
}
}
const senderId =
(typeof conv.sender_id === "string" && conv.sender_id) ||
(typeof conv.sender === "string" && conv.sender) ||
undefined;
return { channel, channelId, senderId, content: prompt, conv };
}
function pruneDecisionMap(now = Date.now()) {
for (const [k, v] of sessionDecision.entries()) {
if (now - v.createdAt > DECISION_TTL_MS) sessionDecision.delete(k);
}
if (sessionDecision.size <= MAX_SESSION_DECISIONS) return;
const keys = sessionDecision.keys();
while (sessionDecision.size > MAX_SESSION_DECISIONS) {
const k = keys.next();
if (k.done) break;
sessionDecision.delete(k.value);
}
}
function getLivePluginConfig(api: OpenClawPluginApi, fallback: DirigentConfig): DirigentConfig {
const root = (api.config as Record<string, unknown>) || {};
const plugins = (root.plugins as Record<string, unknown>) || {};
const entries = (plugins.entries as Record<string, unknown>) || {};
// Support both "dirigent" and legacy "whispergate" config keys
const entry = (entries.dirigent as Record<string, unknown>) || (entries.whispergate as Record<string, unknown>) || {};
const cfg = (entry.config as Record<string, unknown>) || {};
if (Object.keys(cfg).length > 0) {
// Merge with defaults to ensure optional fields have values
return {
enableDiscordControlTool: true,
enableDirigentPolicyTool: true,
discordControlApiBaseUrl: "http://127.0.0.1:8790",
enableDebugLogs: false,
debugLogChannelIds: [],
schedulingIdentifier: "➡️",
...cfg,
} as DirigentConfig;
}
return fallback;
}
function resolvePoliciesPath(api: OpenClawPluginApi, config: DirigentConfig): string {
return api.resolvePath(config.channelPoliciesFile || "~/.openclaw/dirigent-channel-policies.json");
}
function ensurePolicyStateLoaded(api: OpenClawPluginApi, config: DirigentConfig) {
if (policyState.filePath) return;
const filePath = resolvePoliciesPath(api, config);
policyState.filePath = filePath;
try {
if (!fs.existsSync(filePath)) {
fs.mkdirSync(path.dirname(filePath), { recursive: true });
fs.writeFileSync(filePath, "{}\n", "utf8");
policyState.channelPolicies = {};
return;
}
const raw = fs.readFileSync(filePath, "utf8");
const parsed = JSON.parse(raw) as Record<string, ChannelPolicy>;
policyState.channelPolicies = parsed && typeof parsed === "object" ? parsed : {};
} catch (err) {
api.logger.warn(`dirigent: failed init policy file ${filePath}: ${String(err)}`);
policyState.channelPolicies = {};
}
}
/** Resolve agentId → Discord accountId from config bindings */
function resolveAccountId(api: OpenClawPluginApi, agentId: string): string | undefined {
const root = (api.config as Record<string, unknown>) || {};
const bindings = root.bindings as Array<Record<string, unknown>> | undefined;
if (!Array.isArray(bindings)) return undefined;
for (const b of bindings) {
if (b.agentId === agentId) {
const match = b.match as Record<string, unknown> | undefined;
if (match?.channel === "discord" && typeof match.accountId === "string") {
return match.accountId;
}
}
}
return undefined;
}
/**
* Get all Discord bot accountIds from config bindings.
*/
function getAllBotAccountIds(api: OpenClawPluginApi): string[] {
const root = (api.config as Record<string, unknown>) || {};
const bindings = root.bindings as Array<Record<string, unknown>> | undefined;
if (!Array.isArray(bindings)) return [];
const ids: string[] = [];
for (const b of bindings) {
const match = b.match as Record<string, unknown> | undefined;
if (match?.channel === "discord" && typeof match.accountId === "string") {
ids.push(match.accountId);
}
}
return ids;
}
/**
* Track which bot accountIds have been seen in each channel via message_received.
* Key: channelId, Value: Set of accountIds seen.
*/
const channelSeenAccounts = new Map<string, Set<string>>();
/**
* Record a bot accountId seen in a channel.
* Returns true if this is a new account for this channel (turn order should be updated).
*/
function recordChannelAccount(channelId: string, accountId: string): boolean {
let seen = channelSeenAccounts.get(channelId);
if (!seen) {
seen = new Set();
channelSeenAccounts.set(channelId, seen);
}
if (seen.has(accountId)) return false;
seen.add(accountId);
return true;
}
/**
* Get the list of bot accountIds seen in a channel.
* Only returns accounts that are also in the global bindings (actual bots).
*/
function getChannelBotAccountIds(api: OpenClawPluginApi, channelId: string): string[] {
const allBots = new Set(getAllBotAccountIds(api));
const seen = channelSeenAccounts.get(channelId);
if (!seen) return [];
return [...seen].filter(id => allBots.has(id));
}
/**
* Ensure turn order is initialized for a channel.
* Uses only bot accounts that have been seen in this channel.
*/
function ensureTurnOrder(api: OpenClawPluginApi, channelId: string): void {
const botAccounts = getChannelBotAccountIds(api, channelId);
if (botAccounts.length > 0) {
initTurnOrder(channelId, botAccounts);
}
}
/**
* Build agent identity string for injection into group chat prompts.
* Includes agent name, Discord accountId, and Discord userId.
*/
function buildAgentIdentity(api: OpenClawPluginApi, agentId: string): string | undefined {
const root = (api.config as Record<string, unknown>) || {};
const bindings = root.bindings as Array<Record<string, unknown>> | undefined;
const agents = ((root.agents as Record<string, unknown>)?.list as Array<Record<string, unknown>>) || [];
if (!Array.isArray(bindings)) return undefined;
// Find accountId for this agent
let accountId: string | undefined;
for (const b of bindings) {
if (b.agentId === agentId) {
const match = b.match as Record<string, unknown> | undefined;
if (match?.channel === "discord" && typeof match.accountId === "string") {
accountId = match.accountId;
break;
}
}
}
if (!accountId) return undefined;
// Find agent name
const agent = agents.find((a: Record<string, unknown>) => a.id === agentId);
const name = (agent?.name as string) || agentId;
// Resolve Discord userId from bot token
const discordUserId = resolveDiscordUserId(api, accountId);
let identity = `You are ${name} (Discord account: ${accountId}`;
if (discordUserId) {
identity += `, Discord userId: ${discordUserId}`;
}
identity += `).`;
return identity;
}
// --- Moderator bot helpers ---
/** Extract Discord user ID from a bot token (base64-encoded in first segment) */
function userIdFromToken(token: string): string | undefined {
try {
const segment = token.split(".")[0];
// Add padding
const padded = segment + "=".repeat((4 - (segment.length % 4)) % 4);
return Buffer.from(padded, "base64").toString("utf8");
} catch {
return undefined;
}
}
/** Resolve accountId → Discord user ID by reading the account's bot token from config */
function resolveDiscordUserId(api: OpenClawPluginApi, accountId: string): string | undefined {
const root = (api.config as Record<string, unknown>) || {};
const channels = (root.channels as Record<string, unknown>) || {};
const discord = (channels.discord as Record<string, unknown>) || {};
const accounts = (discord.accounts as Record<string, Record<string, unknown>>) || {};
const acct = accounts[accountId];
if (!acct?.token || typeof acct.token !== "string") return undefined;
return userIdFromToken(acct.token);
}
/** Get the moderator bot's Discord user ID from its token */
function getModeratorUserId(config: DirigentConfig): string | undefined {
if (!config.moderatorBotToken) return undefined;
return userIdFromToken(config.moderatorBotToken);
}
/** Send a message as the moderator bot via Discord REST API */
async function sendModeratorMessage(token: string, channelId: string, content: string, logger: { info: (msg: string) => void; warn: (msg: string) => void }): Promise<boolean> {
try {
const r = await fetch(`https://discord.com/api/v10/channels/${channelId}/messages`, {
method: "POST",
headers: {
"Authorization": `Bot ${token}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ content }),
});
if (!r.ok) {
const text = await r.text();
logger.warn(`dirigent: moderator send failed (${r.status}): ${text}`);
return false;
}
logger.info(`dirigent: moderator message sent to channel=${channelId}`);
return true;
} catch (err) {
logger.warn(`dirigent: moderator send error: ${String(err)}`);
return false;
}
}
function persistPolicies(api: OpenClawPluginApi): void {
const filePath = policyState.filePath;
if (!filePath) throw new Error("policy file path not initialized");
const before = JSON.stringify(policyState.channelPolicies, null, 2) + "\n";
const tmp = `${filePath}.tmp`;
fs.mkdirSync(path.dirname(filePath), { recursive: true });
fs.writeFileSync(tmp, before, "utf8");
fs.renameSync(tmp, filePath);
api.logger.info(`dirigent: policy file persisted: ${filePath}`);
}
function pickDefined(input: Record<string, unknown>) {
const out: Record<string, unknown> = {};
for (const [k, v] of Object.entries(input)) {
if (v !== undefined) out[k] = v;
}
return out;
}
function shouldDebugLog(cfg: DebugConfig, channelId?: string): boolean {
if (!cfg.enableDebugLogs) return false;
const allow = Array.isArray(cfg.debugLogChannelIds) ? cfg.debugLogChannelIds : [];
if (allow.length === 0) return true;
if (!channelId) return true;
return allow.includes(channelId);
}
function debugCtxSummary(ctx: Record<string, unknown>, event: Record<string, unknown>) {
const meta = ((ctx.metadata || event.metadata || {}) as Record<string, unknown>) || {};
return {
sessionKey: typeof ctx.sessionKey === "string" ? ctx.sessionKey : undefined,
commandSource: typeof ctx.commandSource === "string" ? ctx.commandSource : undefined,
messageProvider: typeof ctx.messageProvider === "string" ? ctx.messageProvider : undefined,
channel: typeof ctx.channel === "string" ? ctx.channel : undefined,
channelId: typeof ctx.channelId === "string" ? ctx.channelId : undefined,
senderId: typeof ctx.senderId === "string" ? ctx.senderId : undefined,
from: typeof ctx.from === "string" ? ctx.from : undefined,
metaSenderId:
typeof meta.senderId === "string"
? meta.senderId
: typeof meta.sender_id === "string"
? meta.sender_id
: undefined,
metaUserId:
typeof meta.userId === "string"
? meta.userId
: typeof meta.user_id === "string"
? meta.user_id
: undefined,
};
}
export default {
id: "dirigent",
name: "Dirigent",
register(api: OpenClawPluginApi) {
// Merge pluginConfig with defaults (in case config is missing from openclaw.json)
const baseConfig = {
enableDiscordControlTool: true,
enableDirigentPolicyTool: true,
discordControlApiBaseUrl: "http://127.0.0.1:8790",
schedulingIdentifier: "➡️",
...(api.pluginConfig || {}),
} as DirigentConfig & {
enableDiscordControlTool: boolean;
discordControlApiBaseUrl: string;
discordControlApiToken?: string;
discordControlCallerId?: string;
enableDirigentPolicyTool: boolean;
};
const liveAtRegister = getLivePluginConfig(api, baseConfig as DirigentConfig);
ensurePolicyStateLoaded(api, liveAtRegister);
// Resolve plugin directory for locating sibling modules (no-reply-api/)
// Note: api.resolvePath(".") returns cwd, not script directory. Use import.meta.url instead.
const pluginDir = path.dirname(new URL(import.meta.url).pathname);
api.logger.info(`dirigent: pluginDir resolved from import.meta.url: ${pluginDir}`);
// Gateway lifecycle: start/stop no-reply API and moderator bot with the gateway
api.on("gateway_start", () => {
api.logger.info(`dirigent: gateway_start event received`);
// Check no-reply-api server file exists
const serverPath = path.resolve(pluginDir, "..", "no-reply-api", "server.mjs");
api.logger.info(`dirigent: checking no-reply-api server at ${serverPath}, exists=${fs.existsSync(serverPath)}`);
// Additional debug: list what's in the parent directory
const parentDir = path.resolve(pluginDir, "..");
try {
const entries = fs.readdirSync(parentDir);
api.logger.info(`dirigent: parent dir (${parentDir}) entries: ${JSON.stringify(entries)}`);
} catch (e) {
api.logger.warn(`dirigent: cannot read parent dir: ${String(e)}`);
}
startNoReplyApi(api.logger, pluginDir);
const live = getLivePluginConfig(api, baseConfig as DirigentConfig);
api.logger.info(`dirigent: config loaded, moderatorBotToken=${live.moderatorBotToken ? "[set]" : "[not set]"}`);
if (live.moderatorBotToken) {
api.logger.info("dirigent: starting moderator bot presence...");
startModeratorPresence(live.moderatorBotToken, api.logger);
api.logger.info("dirigent: moderator bot presence started");
} else {
api.logger.info("dirigent: moderator bot not starting - no moderatorBotToken in config");
}
});
api.on("gateway_stop", () => {
stopNoReplyApi(api.logger);
stopModeratorPresence();
api.logger.info("dirigent: gateway stopping, services shut down");
});
api.registerTool(
{
name: "dirigent_tools",
description: "Dirigent unified tool: Discord admin actions + in-memory policy management.",
parameters: {
type: "object",
additionalProperties: false,
properties: {
action: {
type: "string",
enum: ["channel-private-create", "channel-private-update", "member-list", "policy-get", "policy-set-channel", "policy-delete-channel", "turn-status", "turn-advance", "turn-reset"],
},
guildId: { type: "string" },
name: { type: "string" },
type: { type: "number" },
parentId: { type: "string" },
topic: { type: "string" },
position: { type: "number" },
nsfw: { type: "boolean" },
allowedUserIds: { type: "array", items: { type: "string" } },
allowedRoleIds: { type: "array", items: { type: "string" } },
allowMask: { type: "string" },
denyEveryoneMask: { type: "string" },
channelId: { type: "string" },
mode: { type: "string", enum: ["merge", "replace"] },
addUserIds: { type: "array", items: { type: "string" } },
addRoleIds: { type: "array", items: { type: "string" } },
removeTargetIds: { type: "array", items: { type: "string" } },
denyMask: { type: "string" },
limit: { type: "number" },
after: { type: "string" },
fields: { anyOf: [{ type: "string" }, { type: "array", items: { type: "string" } }] },
dryRun: { type: "boolean" },
listMode: { type: "string", enum: ["human-list", "agent-list"] },
humanList: { type: "array", items: { type: "string" } },
agentList: { type: "array", items: { type: "string" } },
endSymbols: { type: "array", items: { type: "string" } },
},
required: ["action"],
},
async execute(_id: string, params: Record<string, unknown>) {
const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & {
discordControlApiBaseUrl?: string;
discordControlApiToken?: string;
discordControlCallerId?: string;
enableDiscordControlTool?: boolean;
enableDirigentPolicyTool?: boolean;
};
ensurePolicyStateLoaded(api, live);
const action = String(params.action || "");
const discordActions = new Set(["channel-private-create", "channel-private-update", "member-list"]);
if (discordActions.has(action)) {
if (live.enableDiscordControlTool === false) {
return { content: [{ type: "text", text: "discord actions disabled by config" }], isError: true };
}
const baseUrl = (live.discordControlApiBaseUrl || "http://127.0.0.1:8790").replace(/\/$/, "");
const body = pickDefined({ ...params, action: action as DiscordControlAction });
const headers: Record<string, string> = { "Content-Type": "application/json" };
if (live.discordControlApiToken) headers.Authorization = `Bearer ${live.discordControlApiToken}`;
if (live.discordControlCallerId) headers["X-OpenClaw-Caller-Id"] = live.discordControlCallerId;
const r = await fetch(`${baseUrl}/v1/discord/action`, {
method: "POST",
headers,
body: JSON.stringify(body),
});
const text = await r.text();
if (!r.ok) {
return {
content: [{ type: "text", text: `dirigent_tools discord failed (${r.status}): ${text}` }],
isError: true,
};
}
return { content: [{ type: "text", text }] };
}
if (live.enableDirigentPolicyTool === false) {
return { content: [{ type: "text", text: "policy actions disabled by config" }], isError: true };
}
if (action === "policy-get") {
return {
content: [{ type: "text", text: JSON.stringify({ file: policyState.filePath, policies: policyState.channelPolicies }, null, 2) }],
};
}
if (action === "policy-set-channel") {
const channelId = String(params.channelId || "").trim();
if (!channelId) return { content: [{ type: "text", text: "channelId is required" }], isError: true };
const prev = JSON.parse(JSON.stringify(policyState.channelPolicies));
try {
const next: ChannelPolicy = {
listMode: (params.listMode as "human-list" | "agent-list" | undefined) || undefined,
humanList: Array.isArray(params.humanList) ? (params.humanList as string[]) : undefined,
agentList: Array.isArray(params.agentList) ? (params.agentList as string[]) : undefined,
endSymbols: Array.isArray(params.endSymbols) ? (params.endSymbols as string[]) : undefined,
};
policyState.channelPolicies[channelId] = pickDefined(next as unknown as Record<string, unknown>) as ChannelPolicy;
persistPolicies(api);
return { content: [{ type: "text", text: JSON.stringify({ ok: true, channelId, policy: policyState.channelPolicies[channelId] }) }] };
} catch (err) {
policyState.channelPolicies = prev;
return { content: [{ type: "text", text: `persist failed: ${String(err)}` }], isError: true };
}
}
if (action === "policy-delete-channel") {
const channelId = String(params.channelId || "").trim();
if (!channelId) return { content: [{ type: "text", text: "channelId is required" }], isError: true };
const prev = JSON.parse(JSON.stringify(policyState.channelPolicies));
try {
delete policyState.channelPolicies[channelId];
persistPolicies(api);
return { content: [{ type: "text", text: JSON.stringify({ ok: true, channelId, deleted: true }) }] };
} catch (err) {
policyState.channelPolicies = prev;
return { content: [{ type: "text", text: `persist failed: ${String(err)}` }], isError: true };
}
}
if (action === "turn-status") {
const channelId = String(params.channelId || "").trim();
if (!channelId) return { content: [{ type: "text", text: "channelId is required" }], isError: true };
return { content: [{ type: "text", text: JSON.stringify(getTurnDebugInfo(channelId), null, 2) }] };
}
if (action === "turn-advance") {
const channelId = String(params.channelId || "").trim();
if (!channelId) return { content: [{ type: "text", text: "channelId is required" }], isError: true };
const next = advanceTurn(channelId);
return { content: [{ type: "text", text: JSON.stringify({ ok: true, channelId, nextSpeaker: next, ...getTurnDebugInfo(channelId) }) }] };
}
if (action === "turn-reset") {
const channelId = String(params.channelId || "").trim();
if (!channelId) return { content: [{ type: "text", text: "channelId is required" }], isError: true };
resetTurn(channelId);
return { content: [{ type: "text", text: JSON.stringify({ ok: true, channelId, ...getTurnDebugInfo(channelId) }) }] };
}
return { content: [{ type: "text", text: `unsupported action: ${action}` }], isError: true };
},
},
{ optional: false },
);
api.on("message_received", async (event, ctx) => {
try {
const c = (ctx || {}) as Record<string, unknown>;
const e = (event || {}) as Record<string, unknown>;
// ctx.channelId is the platform name (e.g. "discord"), NOT the Discord channel snowflake.
// Extract the real Discord channel ID from conversationId or event.to.
const preChannelId = extractDiscordChannelId(c, e);
const livePre = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig;
if (shouldDebugLog(livePre, preChannelId)) {
api.logger.info(`dirigent: debug message_received preflight ctx=${JSON.stringify(debugCtxSummary(c, e))}`);
}
// Turn management on message received
if (preChannelId) {
ensureTurnOrder(api, preChannelId);
// event.from is often the channel target (e.g. "discord:channel:xxx"), NOT the sender.
// The actual sender ID is in event.metadata.senderId.
const metadata = (e as Record<string, unknown>).metadata as Record<string, unknown> | undefined;
const from = (typeof metadata?.senderId === "string" && metadata.senderId)
|| (typeof (e as Record<string, unknown>).from === "string" ? (e as Record<string, unknown>).from as string : "");
// Ignore moderator bot messages — they don't affect turn state
const moderatorUserId = getModeratorUserId(livePre);
if (moderatorUserId && from === moderatorUserId) {
if (shouldDebugLog(livePre, preChannelId)) {
api.logger.info(`dirigent: ignoring moderator message in channel=${preChannelId}`);
}
// Don't call onNewMessage — moderator messages are transparent to turn logic
} else {
const humanList = livePre.humanList || livePre.bypassUserIds || [];
const isHuman = humanList.includes(from);
const senderAccountId = typeof c.accountId === "string" ? c.accountId : undefined;
// Track which bot accounts are present in this channel
if (senderAccountId && senderAccountId !== "default") {
const isNew = recordChannelAccount(preChannelId, senderAccountId);
if (isNew) {
// Re-initialize turn order with updated channel membership
ensureTurnOrder(api, preChannelId);
api.logger.info(`dirigent: new account ${senderAccountId} seen in channel=${preChannelId}, turn order updated`);
}
}
onNewMessage(preChannelId, senderAccountId, isHuman);
if (shouldDebugLog(livePre, preChannelId)) {
api.logger.info(`dirigent: turn onNewMessage channel=${preChannelId} from=${from} isHuman=${isHuman} accountId=${senderAccountId ?? "unknown"}`);
}
}
}
} catch (err) {
api.logger.warn(`dirigent: message hook failed: ${String(err)}`);
}
});
api.on("before_model_resolve", async (event, ctx) => {
const key = ctx.sessionKey;
if (!key) return;
const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
const prompt = ((event as Record<string, unknown>).prompt as string) || "";
if (live.enableDebugLogs) {
api.logger.info(
`dirigent: DEBUG_BEFORE_MODEL_RESOLVE ctx=${JSON.stringify({ sessionKey: ctx.sessionKey, messageProvider: ctx.messageProvider, agentId: ctx.agentId })} ` +
`promptPreview=${prompt.slice(0, 300)}`,
);
}
const derived = deriveDecisionInputFromPrompt(prompt, ctx.messageProvider, ctx.channelId);
// Fallback: extract channelId from sessionKey (format "agent:<id>:discord:channel:<channelId>")
if (!derived.channelId && key) {
const skMatch = key.match(/:channel:(\d+)$/);
if (skMatch) derived.channelId = skMatch[1];
}
// Only proceed if: discord channel AND prompt contains untrusted metadata
const hasConvMarker = prompt.includes("Conversation info (untrusted metadata):");
if (live.discordOnly !== false && (!hasConvMarker || derived.channel !== "discord")) return;
// Always save channelId and accountId mappings for use in later hooks
if (derived.channelId) {
sessionChannelId.set(key, derived.channelId);
}
const resolvedAccountId = resolveAccountId(api, ctx.agentId || "");
if (resolvedAccountId) {
sessionAccountId.set(key, resolvedAccountId);
}
let rec = sessionDecision.get(key);
if (!rec || Date.now() - rec.createdAt > DECISION_TTL_MS) {
if (rec) sessionDecision.delete(key);
const decision = evaluateDecision({
config: live,
channel: derived.channel,
channelId: derived.channelId,
channelPolicies: policyState.channelPolicies,
senderId: derived.senderId,
content: derived.content,
});
rec = { decision, createdAt: Date.now() };
sessionDecision.set(key, rec);
pruneDecisionMap();
if (shouldDebugLog(live, derived.channelId)) {
api.logger.info(
`dirigent: debug before_model_resolve recompute session=${key} ` +
`channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` +
`convSenderId=${String((derived.conv as Record<string, unknown>).sender_id ?? "")} ` +
`convSender=${String((derived.conv as Record<string, unknown>).sender ?? "")} ` +
`convChannelId=${String((derived.conv as Record<string, unknown>).channel_id ?? "")} ` +
`decision=${decision.reason} shouldNoReply=${decision.shouldUseNoReply} shouldInject=${decision.shouldInjectEndMarkerPrompt}`,
);
}
}
// Turn-based check: ALWAYS check turn order regardless of evaluateDecision result.
// This ensures only the current speaker can respond even for human messages.
if (derived.channelId) {
ensureTurnOrder(api, derived.channelId);
const accountId = resolveAccountId(api, ctx.agentId || "");
if (accountId) {
const turnCheck = checkTurn(derived.channelId, accountId);
if (!turnCheck.allowed) {
// Forced no-reply - record this session as not allowed to speak
sessionAllowed.set(key, false);
api.logger.info(
`dirigent: turn gate blocked session=${key} accountId=${accountId} currentSpeaker=${turnCheck.currentSpeaker} reason=${turnCheck.reason}`,
);
return {
providerOverride: live.noReplyProvider,
modelOverride: live.noReplyModel,
};
}
// Allowed to speak - record this session as allowed
sessionAllowed.set(key, true);
}
}
if (!rec.decision.shouldUseNoReply) {
if (rec.needsRestore) {
sessionDecision.delete(key);
return {
providerOverride: undefined,
modelOverride: undefined,
};
}
return;
}
rec.needsRestore = true;
sessionDecision.set(key, rec);
if (live.enableDebugLogs) {
const prompt = ((event as Record<string, unknown>).prompt as string) || "";
const hasConvMarker = prompt.includes("Conversation info (untrusted metadata):");
api.logger.info(
`dirigent: DEBUG_NO_REPLY_TRIGGER session=${key} ` +
`channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` +
`convSenderId=${String((derived.conv as Record<string, unknown>).sender_id ?? "")} ` +
`convSender=${String((derived.conv as Record<string, unknown>).sender ?? "")} ` +
`decision=${rec.decision.reason} ` +
`shouldNoReply=${rec.decision.shouldUseNoReply} shouldInject=${rec.decision.shouldInjectEndMarkerPrompt} ` +
`hasConvMarker=${hasConvMarker} promptLen=${prompt.length}`,
);
}
api.logger.info(
`dirigent: override model for session=${key}, provider=${live.noReplyProvider}, model=${live.noReplyModel}, reason=${rec.decision.reason}`,
);
return {
providerOverride: live.noReplyProvider,
modelOverride: live.noReplyModel,
};
});
api.on("before_prompt_build", async (event, ctx) => {
const key = ctx.sessionKey;
if (!key) return;
const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
let rec = sessionDecision.get(key);
if (!rec || Date.now() - rec.createdAt > DECISION_TTL_MS) {
if (rec) sessionDecision.delete(key);
const prompt = ((event as Record<string, unknown>).prompt as string) || "";
const derived = deriveDecisionInputFromPrompt(prompt, ctx.messageProvider, ctx.channelId);
const decision = evaluateDecision({
config: live,
channel: derived.channel,
channelId: derived.channelId,
channelPolicies: policyState.channelPolicies,
senderId: derived.senderId,
content: derived.content,
});
rec = { decision, createdAt: Date.now() };
if (shouldDebugLog(live, derived.channelId)) {
api.logger.info(
`dirigent: debug before_prompt_build recompute session=${key} ` +
`channel=${derived.channel} channelId=${derived.channelId ?? ""} senderId=${derived.senderId ?? ""} ` +
`convSenderId=${String((derived.conv as Record<string, unknown>).sender_id ?? "")} ` +
`convSender=${String((derived.conv as Record<string, unknown>).sender ?? "")} ` +
`convChannelId=${String((derived.conv as Record<string, unknown>).channel_id ?? "")} ` +
`decision=${decision.reason} shouldNoReply=${decision.shouldUseNoReply} shouldInject=${decision.shouldInjectEndMarkerPrompt}`,
);
}
}
sessionDecision.delete(key);
// Only inject once per session (one-time injection)
if (sessionInjected.has(key)) {
if (shouldDebugLog(live, undefined)) {
api.logger.info(
`dirigent: debug before_prompt_build session=${key} inject skipped (already injected)`,
);
}
return;
}
if (!rec.decision.shouldInjectEndMarkerPrompt) {
if (shouldDebugLog(live, undefined)) {
api.logger.info(
`dirigent: debug before_prompt_build session=${key} inject=false reason=${rec.decision.reason}`,
);
}
return;
}
// Resolve end symbols from config/policy for dynamic instruction
const prompt = ((event as Record<string, unknown>).prompt as string) || "";
const derived = deriveDecisionInputFromPrompt(prompt, ctx.messageProvider, ctx.channelId);
const policy = resolvePolicy(live, derived.channelId, policyState.channelPolicies);
const isGroupChat = derived.conv.is_group_chat === true || derived.conv.is_group_chat === "true";
const schedulingId = live.schedulingIdentifier || "➡️";
const instruction = buildEndMarkerInstruction(policy.endSymbols, isGroupChat, schedulingId);
// Inject agent identity for group chats (includes userId now)
let identity = "";
if (isGroupChat && ctx.agentId) {
const idStr = buildAgentIdentity(api, ctx.agentId);
if (idStr) identity = idStr + "\n\n";
}
// Add scheduling identifier instruction for group chats
let schedulingInstruction = "";
if (isGroupChat) {
schedulingInstruction = buildSchedulingIdentifierInstruction(schedulingId);
}
// Mark session as injected (one-time injection)
sessionInjected.add(key);
api.logger.info(`dirigent: prepend end marker instruction for session=${key}, reason=${rec.decision.reason} isGroupChat=${isGroupChat}`);
return { prependContext: identity + instruction + schedulingInstruction };
});
// Register slash commands for Discord
api.registerCommand({
name: "dirigent",
description: "Dirigent channel policy management",
acceptsArgs: true,
handler: async (cmdCtx) => {
const args = cmdCtx.args || "";
const parts = args.trim().split(/\s+/);
const subCmd = parts[0] || "help";
if (subCmd === "help") {
return { text: `Dirigent commands:\n` +
`/dirigent status - Show current channel status\n` +
`/dirigent turn-status - Show turn-based speaking status\n` +
`/dirigent turn-advance - Manually advance turn\n` +
`/dirigent turn-reset - Reset turn order` };
}
if (subCmd === "status") {
return { text: JSON.stringify({ policies: policyState.channelPolicies }, null, 2) };
}
if (subCmd === "turn-status") {
const channelId = cmdCtx.channelId;
if (!channelId) return { text: "Cannot get channel ID", isError: true };
return { text: JSON.stringify(getTurnDebugInfo(channelId), null, 2) };
}
if (subCmd === "turn-advance") {
const channelId = cmdCtx.channelId;
if (!channelId) return { text: "Cannot get channel ID", isError: true };
const next = advanceTurn(channelId);
return { text: JSON.stringify({ ok: true, nextSpeaker: next }) };
}
if (subCmd === "turn-reset") {
const channelId = cmdCtx.channelId;
if (!channelId) return { text: "Cannot get channel ID", isError: true };
resetTurn(channelId);
return { text: JSON.stringify({ ok: true }) };
}
return { text: `Unknown subcommand: ${subCmd}`, isError: true };
},
});
// Handle NO_REPLY detection before message write
api.on("before_message_write", (event, ctx) => {
try {
api.logger.info(
`dirigent: DEBUG before_message_write eventKeys=${JSON.stringify(Object.keys(event ?? {}))} ctxKeys=${JSON.stringify(Object.keys(ctx ?? {}))}`,
);
let key = ctx.sessionKey;
let channelId: string | undefined;
let accountId: string | undefined;
if (key) {
channelId = sessionChannelId.get(key);
accountId = sessionAccountId.get(key);
}
let content = "";
const msg = (event as Record<string, unknown>).message as Record<string, unknown> | undefined;
if (msg) {
const role = msg.role as string | undefined;
if (role && role !== "assistant") return;
if (typeof msg.content === "string") {
content = msg.content;
} else if (Array.isArray(msg.content)) {
for (const part of msg.content) {
if (typeof part === "string") content += part;
else if (part && typeof part === "object" && typeof (part as Record<string, unknown>).text === "string") {
content += (part as Record<string, unknown>).text;
}
}
}
}
if (!content) {
content = ((event as Record<string, unknown>).content as string) || "";
}
api.logger.info(
`dirigent: DEBUG before_message_write session=${key ?? "undefined"} channel=${channelId ?? "undefined"} accountId=${accountId ?? "undefined"} contentType=${typeof content} content=${String(content).slice(0, 200)}`,
);
if (!key || !channelId || !accountId) return;
const currentTurn = getTurnDebugInfo(channelId);
if (currentTurn.currentSpeaker !== accountId) {
api.logger.info(
`dirigent: before_message_write skipping non-current-speaker session=${key} accountId=${accountId} currentSpeaker=${currentTurn.currentSpeaker}`,
);
return;
}
const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
const policy = resolvePolicy(live, channelId, policyState.channelPolicies);
const trimmed = content.trim();
const isEmpty = trimmed.length === 0;
const isNoReply = /^NO_REPLY$/i.test(trimmed) || /^HEARTBEAT_OK$/i.test(trimmed);
const lastChar = trimmed.length > 0 ? Array.from(trimmed).pop() || "" : "";
const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar);
const wasNoReply = isEmpty || isNoReply;
const turnDebug = getTurnDebugInfo(channelId);
api.logger.info(
`dirigent: DEBUG turn state channel=${channelId} turnOrder=${JSON.stringify(turnDebug.turnOrder)} currentSpeaker=${turnDebug.currentSpeaker} noRepliedThisCycle=${JSON.stringify([...turnDebug.noRepliedThisCycle])}`,
);
const wasAllowed = sessionAllowed.get(key);
if (wasNoReply) {
api.logger.info(
`dirigent: DEBUG NO_REPLY detected session=${key} wasAllowed=${wasAllowed}`,
);
if (wasAllowed === undefined) return;
if (wasAllowed === false) {
sessionAllowed.delete(key);
api.logger.info(
`dirigent: before_message_write forced no-reply session=${key} channel=${channelId} - not advancing turn`,
);
return;
}
ensureTurnOrder(api, channelId);
const nextSpeaker = onSpeakerDone(channelId, accountId, true);
sessionAllowed.delete(key);
sessionTurnHandled.add(key);
api.logger.info(
`dirigent: before_message_write real no-reply session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`,
);
if (!nextSpeaker) {
if (shouldDebugLog(live, channelId)) {
api.logger.info(
`dirigent: before_message_write all agents no-reply, going dormant - no handoff`,
);
}
return;
}
// Trigger moderator handoff message using scheduling identifier format
if (live.moderatorBotToken) {
const nextUserId = resolveDiscordUserId(api, nextSpeaker);
if (nextUserId) {
const schedulingId = live.schedulingIdentifier || "➡️";
const handoffMsg = `<@${nextUserId}>${schedulingId}`;
void sendModeratorMessage(live.moderatorBotToken, channelId, handoffMsg, api.logger).catch((err) => {
api.logger.warn(`dirigent: before_message_write handoff failed: ${String(err)}`);
});
} else {
api.logger.warn(`dirigent: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`);
}
}
} else if (hasEndSymbol) {
ensureTurnOrder(api, channelId);
const nextSpeaker = onSpeakerDone(channelId, accountId, false);
sessionAllowed.delete(key);
sessionTurnHandled.add(key);
api.logger.info(
`dirigent: before_message_write end-symbol turn advance session=${key} channel=${channelId} nextSpeaker=${nextSpeaker ?? "dormant"}`,
);
} else {
api.logger.info(
`dirigent: before_message_write no turn action needed session=${key} channel=${channelId}`,
);
return;
}
} catch (err) {
api.logger.warn(`dirigent: before_message_write hook failed: ${String(err)}`);
}
});
// Turn advance: when an agent sends a message, check if it signals end of turn
api.on("message_sent", async (event, ctx) => {
try {
const key = ctx.sessionKey;
const c = (ctx || {}) as Record<string, unknown>;
const e = (event || {}) as Record<string, unknown>;
api.logger.info(
`dirigent: DEBUG message_sent RAW ctxKeys=${JSON.stringify(Object.keys(c))} eventKeys=${JSON.stringify(Object.keys(e))} ` +
`ctx.channelId=${String(c.channelId ?? "undefined")} ctx.conversationId=${String(c.conversationId ?? "undefined")} ` +
`ctx.accountId=${String(c.accountId ?? "undefined")} event.to=${String(e.to ?? "undefined")} ` +
`session=${key ?? "undefined"}`,
);
let channelId = extractDiscordChannelId(c, e);
if (!channelId && key) {
channelId = sessionChannelId.get(key);
}
if (!channelId && key) {
const skMatch = key.match(/:channel:(\d+)$/);
if (skMatch) channelId = skMatch[1];
}
const accountId = (ctx.accountId as string | undefined) || (key ? sessionAccountId.get(key) : undefined);
const content = (event.content as string) || "";
api.logger.info(
`dirigent: DEBUG message_sent RESOLVED session=${key ?? "undefined"} channelId=${channelId ?? "undefined"} accountId=${accountId ?? "undefined"} content=${content.slice(0, 100)}`,
);
if (!channelId || !accountId) return;
const live = getLivePluginConfig(api, baseConfig as DirigentConfig) as DirigentConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
const policy = resolvePolicy(live, channelId, policyState.channelPolicies);
const trimmed = content.trim();
const isEmpty = trimmed.length === 0;
const isNoReply = /^NO_REPLY$/i.test(trimmed) || /^HEARTBEAT_OK$/i.test(trimmed);
const lastChar = trimmed.length > 0 ? Array.from(trimmed).pop() || "" : "";
const hasEndSymbol = !!lastChar && policy.endSymbols.includes(lastChar);
const wasNoReply = isEmpty || isNoReply;
// Skip if turn was already advanced in before_message_write
if (key && sessionTurnHandled.has(key)) {
sessionTurnHandled.delete(key);
api.logger.info(
`dirigent: message_sent skipping turn advance (already handled in before_message_write) session=${key} channel=${channelId}`,
);
return;
}
if (wasNoReply || hasEndSymbol) {
const nextSpeaker = onSpeakerDone(channelId, accountId, wasNoReply);
const trigger = wasNoReply ? (isEmpty ? "empty" : "no_reply_keyword") : "end_symbol";
api.logger.info(
`dirigent: turn onSpeakerDone channel=${channelId} from=${accountId} next=${nextSpeaker ?? "dormant"} trigger=${trigger}`,
);
// Moderator handoff using scheduling identifier format
if (wasNoReply && nextSpeaker && live.moderatorBotToken) {
const nextUserId = resolveDiscordUserId(api, nextSpeaker);
if (nextUserId) {
const schedulingId = live.schedulingIdentifier || "➡️";
const handoffMsg = `<@${nextUserId}>${schedulingId}`;
sendModeratorMessage(live.moderatorBotToken, channelId, handoffMsg, api.logger);
} else {
api.logger.warn(`dirigent: cannot resolve Discord userId for next speaker accountId=${nextSpeaker}`);
}
}
}
} catch (err) {
api.logger.warn(`dirigent: message_sent hook failed: ${String(err)}`);
}
});
},
};