refactor: new design — sidecar services, moderator Gateway client, tool execute API

- Replace standalone no-reply-api Docker service with unified sidecar (services/main.mjs)
  that routes /no-reply/* and /moderator/* and starts/stops with openclaw-gateway
- Add moderator Discord Gateway client (services/moderator/index.mjs) for real-time
  MESSAGE_CREATE push instead of polling; notifies plugin via HTTP callback
- Add plugin HTTP routes (plugin/web/dirigent-api.ts) for moderator → plugin callbacks
  (wake-from-dormant, interrupt tail-match)
- Fix tool registration format: AgentTool requires execute: not handler:; factory form
  for tools needing ctx
- Rename no-reply-process.ts → sidecar-process.ts, startNoReplyApi → startSideCar
- Remove dead config fields from openclaw.plugin.json (humanList, agentList, listMode,
  channelPoliciesFile, endSymbols, waitIdentifier, multiMessage*, bypassUserIds, etc.)
- Rename noReplyPort → sideCarPort
- Remove docker-compose.yml, dev-up/down scripts, package-plugin.mjs, test-no-reply-api.mjs
- Update install.mjs: clean dist before build, copy services/, drop dead config writes
- Update README, Makefile, smoke script for new architecture

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
h z
2026-04-10 08:07:59 +01:00
parent d8ac9ee0f9
commit 32dc9a4233
28 changed files with 1310 additions and 900 deletions

View File

@@ -1,51 +0,0 @@
import fs from "node:fs";
import path from "node:path";
import { spawn, type ChildProcess } from "node:child_process";
let noReplyProcess: ChildProcess | null = null;
export function startNoReplyApi(
logger: { info: (m: string) => void; warn: (m: string) => void },
pluginDir: string,
port = 8787,
): void {
logger.info(`dirigent: startNoReplyApi called, pluginDir=${pluginDir}`);
if (noReplyProcess) {
logger.info("dirigent: no-reply API already running, skipping");
return;
}
const serverPath = path.resolve(pluginDir, "no-reply-api", "server.mjs");
logger.info(`dirigent: resolved serverPath=${serverPath}`);
if (!fs.existsSync(serverPath)) {
logger.warn(`dirigent: no-reply API server not found at ${serverPath}, skipping`);
return;
}
logger.info("dirigent: no-reply API server found, spawning process...");
noReplyProcess = spawn(process.execPath, [serverPath], {
env: { ...process.env, PORT: String(port) },
stdio: ["ignore", "pipe", "pipe"],
detached: false,
});
noReplyProcess.stdout?.on("data", (d: Buffer) => logger.info(`dirigent: no-reply-api: ${d.toString().trim()}`));
noReplyProcess.stderr?.on("data", (d: Buffer) => logger.warn(`dirigent: no-reply-api: ${d.toString().trim()}`));
noReplyProcess.on("exit", (code, signal) => {
logger.info(`dirigent: no-reply API exited (code=${code}, signal=${signal})`);
noReplyProcess = null;
});
logger.info(`dirigent: no-reply API started (pid=${noReplyProcess.pid}, port=${port})`);
}
export function stopNoReplyApi(logger: { info: (m: string) => void }): void {
if (!noReplyProcess) return;
logger.info("dirigent: stopping no-reply API");
noReplyProcess.kill("SIGTERM");
noReplyProcess = null;
}

View File

@@ -0,0 +1,119 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { spawn, type ChildProcess } from "node:child_process";
let noReplyProcess: ChildProcess | null = null;
const LOCK_FILE = path.join(os.tmpdir(), "dirigent-sidecar.lock");
function readLock(): { pid: number } | null {
try {
const raw = fs.readFileSync(LOCK_FILE, "utf8").trim();
return { pid: Number(raw) };
} catch {
return null;
}
}
function writeLock(pid: number): void {
try { fs.writeFileSync(LOCK_FILE, String(pid)); } catch { /* ignore */ }
}
function clearLock(): void {
try { fs.unlinkSync(LOCK_FILE); } catch { /* ignore */ }
}
function isLockHeld(): boolean {
const lock = readLock();
if (!lock) return false;
try {
process.kill(lock.pid, 0);
return true;
} catch {
return false;
}
}
export function startSideCar(
logger: { info: (m: string) => void; warn: (m: string) => void },
pluginDir: string,
port = 8787,
moderatorToken?: string,
pluginApiToken?: string,
gatewayPort?: number,
debugMode?: boolean,
): void {
logger.info(`dirigent: startSideCar called, pluginDir=${pluginDir}`);
if (noReplyProcess) {
logger.info("dirigent: no-reply API already running (local ref), skipping");
return;
}
if (isLockHeld()) {
logger.info("dirigent: no-reply API already running (lock file), skipping");
return;
}
// services/main.mjs lives alongside the plugin directory in the distribution
const serverPath = path.resolve(pluginDir, "services", "main.mjs");
logger.info(`dirigent: resolved serverPath=${serverPath}`);
if (!fs.existsSync(serverPath)) {
logger.warn(`dirigent: services/main.mjs not found at ${serverPath}, skipping`);
return;
}
logger.info("dirigent: services/main.mjs found, spawning process...");
// Build plugin API URL from gateway port, or use a default
const pluginApiUrl = gatewayPort
? `http://127.0.0.1:${gatewayPort}`
: "http://127.0.0.1:18789";
const env: NodeJS.ProcessEnv = {
...process.env,
SERVICES_PORT: String(port),
PLUGIN_API_URL: pluginApiUrl,
};
if (moderatorToken) {
env.MODERATOR_TOKEN = moderatorToken;
}
if (pluginApiToken) {
env.PLUGIN_API_TOKEN = pluginApiToken;
}
if (debugMode !== undefined) {
env.DEBUG_MODE = debugMode ? "true" : "false";
}
noReplyProcess = spawn(process.execPath, [serverPath], {
env,
stdio: ["ignore", "pipe", "pipe"],
detached: false,
});
if (noReplyProcess.pid) {
writeLock(noReplyProcess.pid);
}
noReplyProcess.stdout?.on("data", (d: Buffer) => logger.info(`dirigent: services: ${d.toString().trim()}`));
noReplyProcess.stderr?.on("data", (d: Buffer) => logger.warn(`dirigent: services: ${d.toString().trim()}`));
noReplyProcess.on("exit", (code, signal) => {
logger.info(`dirigent: services exited (code=${code}, signal=${signal})`);
clearLock();
noReplyProcess = null;
});
logger.info(`dirigent: services started (pid=${noReplyProcess.pid}, port=${port})`);
}
export function stopSideCar(logger: { info: (m: string) => void }): void {
if (!noReplyProcess) return;
logger.info("dirigent: stopping sidecar");
noReplyProcess.kill("SIGTERM");
noReplyProcess = null;
clearLock();
}

View File

@@ -176,8 +176,6 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn {
return;
}
clearTurnPending(channelId, agentId);
api.logger.info(
`dirigent: agent_end channel=${channelId} agentId=${agentId} empty=${empty} text=${finalText.slice(0, 80)}`,
);
@@ -186,6 +184,10 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn {
// Real turn: wait for Discord delivery before triggering next speaker.
// Anchor was set in before_model_resolve just before the LLM call, so any
// message from the agent after the anchor must be from this turn.
// NOTE: clearTurnPending is intentionally deferred until after pollForTailMatch
// returns. While waiting, isTurnPending remains true so that any re-trigger of
// this agent is correctly treated as a self-wakeup (suppressed), preventing it
// from starting a second real turn during the tail-match window.
const identity = identityRegistry.findByAgentId(agentId);
if (identity && moderatorBotToken) {
const anchorId = getAnchor(channelId, agentId) ?? "0";
@@ -202,6 +204,7 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn {
if (isDormant(channelId)) {
// Channel is dormant: a new external message woke it — restart from first speaker
api.logger.info(`dirigent: tail-match interrupted (dormant) channel=${channelId} — waking`);
clearTurnPending(channelId, agentId);
const first = wakeFromDormant(channelId);
if (first) await triggerNextSpeaker(channelId, first);
return;
@@ -243,6 +246,12 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn {
advancingChannels.delete(channelId);
}
// Clear turn pending AFTER advanceSpeaker completes. This ensures isTurnPending
// remains true during the async rebuildFn window at cycle boundaries, preventing
// re-triggers from starting a second real turn while currentIndex is still at the
// outgoing speaker's position.
clearTurnPending(channelId, agentId);
if (enteredDormant) {
api.logger.info(`dirigent: channel=${channelId} entered dormant`);
if (mode === "discussion") {

View File

@@ -3,7 +3,7 @@ import type { ChannelStore } from "../core/channel-store.js";
import type { IdentityRegistry } from "../core/identity-registry.js";
import { parseDiscordChannelId } from "./before-model-resolve.js";
import { isDormant, wakeFromDormant, isCurrentSpeaker, hasSpeakers, setSpeakerList, getInitializingChannels, type SpeakerEntry } from "../turn-manager.js";
import { sendAndDelete, sendModeratorMessage, userIdFromBotToken } from "../core/moderator-discord.js";
import { sendScheduleTrigger, userIdFromBotToken } from "../core/moderator-discord.js";
import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js";
import type { InterruptFn } from "./agent-end.js";
@@ -14,24 +14,17 @@ type Deps = {
moderatorBotToken: string | undefined;
scheduleIdentifier: string;
interruptTailMatch: InterruptFn;
debugMode: boolean;
/**
* When true, the moderator service handles wake-from-dormant and
* interrupt-tail-match via HTTP callback. This hook only runs speaker-list
* initialization in that case.
*/
moderatorHandlesMessages?: boolean;
};
/**
* Process-level dedup for concluded-discussion auto-replies.
* Multiple agent VM contexts all fire message_received for the same incoming message;
* only the first should send the "This discussion is closed" reply.
* Keyed on channelId:messageId; evicted after 500 entries.
*/
const _CONCLUDED_REPLY_DEDUP_KEY = "_dirigentConcludedReplyDedup";
if (!(globalThis as Record<string, unknown>)[_CONCLUDED_REPLY_DEDUP_KEY]) {
(globalThis as Record<string, unknown>)[_CONCLUDED_REPLY_DEDUP_KEY] = new Set<string>();
}
const concludedReplyDedup: Set<string> = (globalThis as Record<string, unknown>)[_CONCLUDED_REPLY_DEDUP_KEY] as Set<string>;
export function registerMessageReceivedHook(deps: Deps): void {
const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, interruptTailMatch } = deps;
// Derive the moderator bot's own Discord user ID so we can skip self-messages
// from waking dormant channels (idle reminders must not re-trigger the cycle).
const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, interruptTailMatch, debugMode, moderatorHandlesMessages } = deps;
const moderatorBotUserId = moderatorBotToken ? userIdFromBotToken(moderatorBotToken) : undefined;
api.on("message_received", async (event, ctx) => {
@@ -42,23 +35,19 @@ export function registerMessageReceivedHook(deps: Deps): void {
// Extract Discord channel ID from ctx or event metadata
let channelId: string | undefined;
// ctx.channelId may be bare "1234567890" or "channel:1234567890"
if (typeof c.channelId === "string") {
const bare = c.channelId.match(/^(\d+)$/)?.[1] ?? c.channelId.match(/:(\d+)$/)?.[1];
if (bare) channelId = bare;
}
// fallback: sessionKey (per-session api instances)
if (!channelId && typeof c.sessionKey === "string") {
channelId = parseDiscordChannelId(c.sessionKey);
}
// fallback: metadata.to / originatingTo = "channel:1234567890"
if (!channelId) {
const metadata = e.metadata as Record<string, unknown> | undefined;
const to = String(metadata?.to ?? metadata?.originatingTo ?? "");
const toMatch = to.match(/:(\d+)$/);
if (toMatch) channelId = toMatch[1];
}
// fallback: conversation_info
if (!channelId) {
const metadata = e.metadata as Record<string, unknown> | undefined;
const convInfo = metadata?.conversation_info as Record<string, unknown> | undefined;
@@ -69,45 +58,12 @@ export function registerMessageReceivedHook(deps: Deps): void {
const mode = channelStore.getMode(channelId);
// dead: suppress routing entirely (OpenClaw handles no-route automatically,
// but we handle archived auto-reply here)
if (mode === "report") return;
// archived: auto-reply via moderator (deduped — only one agent instance should reply)
if (mode === "discussion") {
const rec = channelStore.getRecord(channelId);
if (rec.discussion?.concluded && moderatorBotToken) {
const metadata = e.metadata as Record<string, unknown> | undefined;
const convInfo = metadata?.conversation_info as Record<string, unknown> | undefined;
const incomingMsgId = String(
convInfo?.message_id ??
metadata?.message_id ??
metadata?.messageId ??
e.id ?? "",
);
const dedupKey = `${channelId}:${incomingMsgId}`;
if (!concludedReplyDedup.has(dedupKey)) {
concludedReplyDedup.add(dedupKey);
if (concludedReplyDedup.size > 500) {
const oldest = concludedReplyDedup.values().next().value;
if (oldest) concludedReplyDedup.delete(oldest);
}
await sendModeratorMessage(
moderatorBotToken, channelId,
"This discussion is closed and no longer active.",
api.logger,
).catch(() => undefined);
}
return;
}
}
if (mode === "none" || mode === "work") return;
// chat / discussion (active): initialize speaker list on first message if needed
// ── Speaker-list initialization (always runs, even with moderator service) ──
const initializingChannels = getInitializingChannels();
if (!hasSpeakers(channelId) && moderatorBotToken) {
// Guard against concurrent initialization from multiple VM contexts
if (initializingChannels.has(channelId)) {
api.logger.info(`dirigent: message_received init in progress, skipping channel=${channelId}`);
return;
@@ -126,7 +82,7 @@ export function registerMessageReceivedHook(deps: Deps): void {
setSpeakerList(channelId, speakers);
const first = speakers[0];
api.logger.info(`dirigent: initialized speaker list channel=${channelId} speakers=${speakers.map(s => s.agentId).join(",")}`);
await sendAndDelete(moderatorBotToken, channelId, `<@${first.discordUserId}>${scheduleIdentifier}`, api.logger);
await sendScheduleTrigger(moderatorBotToken, channelId, `<@${first.discordUserId}>${scheduleIdentifier}`, api.logger, debugMode);
return;
}
} finally {
@@ -134,8 +90,8 @@ export function registerMessageReceivedHook(deps: Deps): void {
}
}
// chat / discussion (active): check if this is an external message
// that should interrupt an in-progress tail-match or wake dormant
// ── Wake / interrupt (skipped when moderator service handles it via HTTP callback) ──
if (moderatorHandlesMessages) return;
const senderId = String(
(e.metadata as Record<string, unknown>)?.senderId ??
@@ -143,7 +99,6 @@ export function registerMessageReceivedHook(deps: Deps): void {
e.from ?? "",
);
// Identify the sender: is it the current speaker's Discord account?
const currentSpeakerIsThisSender = (() => {
if (!senderId) return false;
const entry = identityRegistry.findByDiscordUserId(senderId);
@@ -152,17 +107,16 @@ export function registerMessageReceivedHook(deps: Deps): void {
})();
if (!currentSpeakerIsThisSender) {
// Non-current-speaker posted — interrupt any tail-match in progress
interruptTailMatch(channelId);
api.logger.info(`dirigent: message_received interrupt tail-match channel=${channelId} senderId=${senderId}`);
if (senderId !== moderatorBotUserId) {
interruptTailMatch(channelId);
api.logger.info(`dirigent: message_received interrupt tail-match channel=${channelId} senderId=${senderId}`);
}
// Wake from dormant if needed — but ignore the moderator bot's own messages
// (e.g. idle reminder) to prevent it from immediately re-waking the channel.
if (isDormant(channelId) && moderatorBotToken && senderId !== moderatorBotUserId) {
const first = wakeFromDormant(channelId);
if (first) {
const msg = `<@${first.discordUserId}>${scheduleIdentifier}`;
await sendAndDelete(moderatorBotToken, channelId, msg, api.logger);
await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, debugMode);
api.logger.info(`dirigent: woke dormant channel=${channelId} first speaker=${first.agentId}`);
}
}

View File

@@ -4,8 +4,7 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
import { IdentityRegistry } from "./core/identity-registry.js";
import { ChannelStore } from "./core/channel-store.js";
import { scanPaddedCell } from "./core/padded-cell.js";
import { startNoReplyApi, stopNoReplyApi } from "./core/no-reply-process.js";
import { startModeratorPresence, stopModeratorPresence } from "./moderator-presence.js";
import { startSideCar, stopSideCar } from "./core/sidecar-process.js";
import { registerBeforeModelResolveHook } from "./hooks/before-model-resolve.js";
import { registerAgentEndHook } from "./hooks/agent-end.js";
import { registerMessageReceivedHook } from "./hooks/message-received.js";
@@ -13,33 +12,44 @@ import { registerDirigentTools } from "./tools/register-tools.js";
import { registerSetChannelModeCommand } from "./commands/set-channel-mode-command.js";
import { registerAddGuildCommand } from "./commands/add-guild-command.js";
import { registerControlPage } from "./web/control-page.js";
import { sendModeratorMessage, sendAndDelete } from "./core/moderator-discord.js";
import { setSpeakerList } from "./turn-manager.js";
import { registerDirigentApi } from "./web/dirigent-api.js";
import { sendModeratorMessage, sendScheduleTrigger, getBotUserIdFromToken } from "./core/moderator-discord.js";
import { setSpeakerList, isCurrentSpeaker, isDormant, wakeFromDormant } from "./turn-manager.js";
import { fetchVisibleChannelBotAccountIds } from "./core/channel-members.js";
type PluginConfig = {
moderatorBotToken?: string;
noReplyProvider?: string;
noReplyModel?: string;
noReplyPort?: number;
scheduleIdentifier?: string;
identityFilePath?: string;
channelStoreFilePath?: string;
debugMode?: boolean;
noReplyProvider?: string;
noReplyModel?: string;
sideCarPort?: number;
};
function normalizeConfig(api: OpenClawPluginApi): Required<PluginConfig> {
const cfg = (api.pluginConfig ?? {}) as PluginConfig;
return {
moderatorBotToken: cfg.moderatorBotToken ?? "",
noReplyProvider: cfg.noReplyProvider ?? "dirigent",
noReplyModel: cfg.noReplyModel ?? "no-reply",
noReplyPort: Number(cfg.noReplyPort ?? 8787),
scheduleIdentifier: cfg.scheduleIdentifier ?? "➡️",
identityFilePath: cfg.identityFilePath ?? path.join(os.homedir(), ".openclaw", "dirigent-identity.json"),
channelStoreFilePath: cfg.channelStoreFilePath ?? path.join(os.homedir(), ".openclaw", "dirigent-channels.json"),
debugMode: cfg.debugMode ?? false,
noReplyProvider: cfg.noReplyProvider ?? "dirigent",
noReplyModel: cfg.noReplyModel ?? "no-reply",
sideCarPort: cfg.sideCarPort ?? 8787,
};
}
function getGatewayPort(api: OpenClawPluginApi): number {
try {
return ((api.config as Record<string, unknown>)?.gateway as Record<string, unknown>)?.port as number ?? 18789;
} catch {
return 18789;
}
}
/**
* Gateway lifecycle events (gateway_start / gateway_stop) are global — fired once
* when the gateway process starts/stops, not per agent session. We guard these on
@@ -76,6 +86,10 @@ export default {
const identityRegistry = new IdentityRegistry(config.identityFilePath);
const channelStore = new ChannelStore(config.channelStoreFilePath);
const moderatorBotToken = config.moderatorBotToken || undefined;
const moderatorBotUserId = moderatorBotToken ? getBotUserIdFromToken(moderatorBotToken) : undefined;
const moderatorServiceUrl = `http://127.0.0.1:${config.sideCarPort}/moderator`;
let paddedCellDetected = false;
function hasPaddedCell(): boolean {
@@ -94,24 +108,27 @@ export default {
if (!isGatewayLifecycleRegistered()) {
markGatewayLifecycleRegistered();
api.on("gateway_start", () => {
const live = normalizeConfig(api);
const gatewayPort = getGatewayPort(api);
startNoReplyApi(api.logger, pluginDir, live.noReplyPort);
// Start unified services (no-reply API + moderator bot)
startSideCar(
api.logger,
pluginDir,
config.sideCarPort,
moderatorBotToken,
undefined, // pluginApiToken — gateway handles auth for plugin routes
gatewayPort,
config.debugMode,
);
if (live.moderatorBotToken) {
startModeratorPresence(live.moderatorBotToken, api.logger);
api.logger.info("dirigent: moderator bot presence started");
} else {
api.logger.info("dirigent: moderatorBotToken not set — moderator features disabled");
}
if (!moderatorBotToken) {
api.logger.info("dirigent: moderatorBotToken not set — moderator features disabled");
}
tryAutoScanPaddedCell();
});
tryAutoScanPaddedCell();
api.on("gateway_stop", () => {
stopNoReplyApi(api.logger);
stopModeratorPresence();
stopSideCar(api.logger);
});
}
@@ -120,18 +137,20 @@ export default {
api,
channelStore,
identityRegistry,
moderatorBotToken: config.moderatorBotToken || undefined,
noReplyModel: config.noReplyModel,
noReplyProvider: config.noReplyProvider,
moderatorBotToken,
scheduleIdentifier: config.scheduleIdentifier,
debugMode: config.debugMode,
noReplyProvider: config.noReplyProvider,
noReplyModel: config.noReplyModel,
});
const interruptTailMatch = registerAgentEndHook({
api,
channelStore,
identityRegistry,
moderatorBotToken: config.moderatorBotToken || undefined,
moderatorBotToken,
scheduleIdentifier: config.scheduleIdentifier,
debugMode: config.debugMode,
onDiscussionDormant: async (channelId: string) => {
const live = normalizeConfig(api);
if (!live.moderatorBotToken) return;
@@ -148,13 +167,74 @@ export default {
},
});
// Speaker-list init still handled via message_received (needs OpenClaw API for channel member lookup)
registerMessageReceivedHook({
api,
channelStore,
identityRegistry,
moderatorBotToken: config.moderatorBotToken || undefined,
moderatorBotToken,
scheduleIdentifier: config.scheduleIdentifier,
interruptTailMatch,
debugMode: config.debugMode,
// When moderator service is active it handles wake/interrupt via HTTP callback;
// message_received only needs to run speaker-list initialization.
moderatorHandlesMessages: !!moderatorBotToken,
});
// ── Dirigent API (moderator service → plugin callbacks) ───────────────
registerDirigentApi({
api,
channelStore,
moderatorBotUserId,
scheduleIdentifier: config.scheduleIdentifier,
moderatorServiceUrl,
moderatorServiceToken: undefined,
debugMode: config.debugMode,
onNewMessage: async ({ channelId, senderId }) => {
const mode = channelStore.getMode(channelId);
// Modes where agents don't participate
if (mode === "none" || mode === "work" || mode === "report") return;
// Skip messages from the moderator bot itself (schedule triggers, etc.)
if (senderId === moderatorBotUserId) return;
// Concluded discussion: send "closed" reply via moderator service
if (mode === "discussion") {
const rec = channelStore.getRecord(channelId);
if (rec.discussion?.concluded && moderatorBotToken) {
await sendModeratorMessage(
moderatorBotToken,
channelId,
"This discussion is closed and no longer active.",
api.logger,
).catch(() => undefined);
return;
}
}
// Identify sender — is it the current speaker?
const senderEntry = identityRegistry.findByDiscordUserId(senderId);
const currentSpeakerIsThisSender = senderEntry
? isCurrentSpeaker(channelId, senderEntry.agentId)
: false;
if (!currentSpeakerIsThisSender) {
// Non-current-speaker: interrupt any ongoing tail-match poll
interruptTailMatch(channelId);
api.logger.info(`dirigent: moderator-callback interrupt tail-match channel=${channelId} senderId=${senderId}`);
// Wake from dormant if needed
if (isDormant(channelId) && moderatorBotToken) {
const first = wakeFromDormant(channelId);
if (first) {
const msg = `<@${first.discordUserId}>${config.scheduleIdentifier}`;
await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, config.debugMode);
api.logger.info(`dirigent: moderator-callback woke dormant channel=${channelId} first=${first.agentId}`);
}
}
}
},
});
// ── Tools ──────────────────────────────────────────────────────────────
@@ -162,9 +242,9 @@ export default {
api,
channelStore,
identityRegistry,
moderatorBotToken: config.moderatorBotToken || undefined,
moderatorBotToken,
scheduleIdentifier: config.scheduleIdentifier,
onDiscussionCreate: async ({ channelId, guildId, initiatorAgentId, callbackGuildId, callbackChannelId, discussionGuide, participants }) => {
onDiscussionCreate: async ({ channelId, initiatorAgentId, callbackGuildId, callbackChannelId, discussionGuide, participants }) => {
const live = normalizeConfig(api);
if (!live.moderatorBotToken) return;
@@ -184,11 +264,12 @@ export default {
if (speakers.length > 0) {
setSpeakerList(channelId, speakers);
const first = speakers[0];
await sendAndDelete(
await sendScheduleTrigger(
live.moderatorBotToken,
channelId,
`<@${first.discordUserId}>${live.scheduleIdentifier}`,
api.logger,
live.debugMode,
).catch(() => undefined);
}
},
@@ -203,7 +284,7 @@ export default {
api,
channelStore,
identityRegistry,
moderatorBotToken: config.moderatorBotToken || undefined,
moderatorBotToken,
openclawDir,
hasPaddedCell,
});

View File

@@ -1,257 +0,0 @@
/**
* Minimal Discord Gateway connection to keep the moderator bot "online".
* Uses Node.js built-in WebSocket (Node 22+).
*
* IMPORTANT: Only ONE instance should exist per bot token.
* Uses a singleton guard to prevent multiple connections.
*/
let ws: WebSocket | null = null;
let heartbeatInterval: ReturnType<typeof setInterval> | null = null;
let heartbeatAcked = true;
let lastSequence: number | null = null;
let sessionId: string | null = null;
let resumeUrl: string | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let destroyed = false;
let started = false; // singleton guard
type Logger = {
info: (msg: string) => void;
warn: (msg: string) => void;
};
const GATEWAY_URL = "wss://gateway.discord.gg/?v=10&encoding=json";
const MAX_RECONNECT_DELAY_MS = 60_000;
let reconnectAttempts = 0;
function sendPayload(data: Record<string, unknown>) {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
function startHeartbeat(intervalMs: number) {
stopHeartbeat();
heartbeatAcked = true;
// First heartbeat after jitter
const jitter = Math.floor(Math.random() * intervalMs);
const firstTimer = setTimeout(() => {
if (destroyed) return;
if (!heartbeatAcked) {
// Missed ACK — zombie connection, close and reconnect
ws?.close(4000, "missed heartbeat ack");
return;
}
heartbeatAcked = false;
sendPayload({ op: 1, d: lastSequence });
heartbeatInterval = setInterval(() => {
if (destroyed) return;
if (!heartbeatAcked) {
ws?.close(4000, "missed heartbeat ack");
return;
}
heartbeatAcked = false;
sendPayload({ op: 1, d: lastSequence });
}, intervalMs);
}, jitter);
// Store the first timer so we can clear it
heartbeatInterval = firstTimer as unknown as ReturnType<typeof setInterval>;
}
function stopHeartbeat() {
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
clearTimeout(heartbeatInterval as unknown as ReturnType<typeof setTimeout>);
heartbeatInterval = null;
}
}
function cleanup() {
stopHeartbeat();
if (ws) {
// Remove all handlers to avoid ghost callbacks
ws.onopen = null;
ws.onmessage = null;
ws.onclose = null;
ws.onerror = null;
try { ws.close(1000); } catch { /* ignore */ }
ws = null;
}
}
function connect(token: string, logger: Logger, isResume = false) {
if (destroyed) return;
// Clean up any existing connection first
cleanup();
const url = isResume && resumeUrl ? resumeUrl : GATEWAY_URL;
try {
ws = new WebSocket(url);
} catch (err) {
logger.warn(`dirigent: moderator ws constructor failed: ${String(err)}`);
scheduleReconnect(token, logger, false);
return;
}
const currentWs = ws; // capture for closure
ws.onopen = () => {
if (currentWs !== ws || destroyed) return; // stale
reconnectAttempts = 0; // reset on successful open
if (isResume && sessionId) {
sendPayload({
op: 6,
d: { token, session_id: sessionId, seq: lastSequence },
});
} else {
sendPayload({
op: 2,
d: {
token,
intents: 0,
properties: {
os: "linux",
browser: "dirigent",
device: "dirigent",
},
presence: {
status: "online",
activities: [{ name: "Moderating", type: 3 }],
},
},
});
}
};
ws.onmessage = (evt: MessageEvent) => {
if (currentWs !== ws || destroyed) return;
try {
const msg = JSON.parse(typeof evt.data === "string" ? evt.data : String(evt.data));
const { op, t, s, d } = msg;
if (s != null) lastSequence = s;
switch (op) {
case 10: // Hello
startHeartbeat(d.heartbeat_interval);
break;
case 11: // Heartbeat ACK
heartbeatAcked = true;
break;
case 1: // Heartbeat request
sendPayload({ op: 1, d: lastSequence });
break;
case 0: // Dispatch
if (t === "READY") {
sessionId = d.session_id;
resumeUrl = d.resume_gateway_url;
logger.info("dirigent: moderator bot connected and online");
}
if (t === "RESUMED") {
logger.info("dirigent: moderator bot resumed");
}
break;
case 7: // Reconnect request
logger.info("dirigent: moderator bot reconnect requested by Discord");
cleanup();
scheduleReconnect(token, logger, true);
break;
case 9: // Invalid Session
logger.warn(`dirigent: moderator bot invalid session, resumable=${d}`);
cleanup();
sessionId = d ? sessionId : null;
// Wait longer before re-identifying
setTimeout(() => {
if (!destroyed) connect(token, logger, !!d && !!sessionId);
}, 3000 + Math.random() * 2000);
break;
}
} catch {
// ignore parse errors
}
};
ws.onclose = (evt: CloseEvent) => {
if (currentWs !== ws) return; // stale ws
stopHeartbeat();
if (destroyed) return;
const code = evt.code;
// Non-recoverable codes — stop reconnecting
if (code === 4004) {
logger.warn("dirigent: moderator bot token invalid (4004), stopping");
started = false;
return;
}
if (code === 4010 || code === 4011 || code === 4013 || code === 4014) {
logger.warn(`dirigent: moderator bot fatal close (${code}), re-identifying`);
sessionId = null;
scheduleReconnect(token, logger, false);
return;
}
logger.info(`dirigent: moderator bot disconnected (code=${code}), will reconnect`);
const canResume = !!sessionId && code !== 4012;
scheduleReconnect(token, logger, canResume);
};
ws.onerror = () => {
// onclose will fire after this
};
}
function scheduleReconnect(token: string, logger: Logger, resume: boolean) {
if (destroyed) return;
if (reconnectTimer) clearTimeout(reconnectTimer);
// Exponential backoff with cap
reconnectAttempts++;
const baseDelay = Math.min(1000 * Math.pow(2, reconnectAttempts), MAX_RECONNECT_DELAY_MS);
const jitter = Math.random() * 1000;
const delay = baseDelay + jitter;
logger.info(`dirigent: moderator reconnect in ${Math.round(delay)}ms (attempt ${reconnectAttempts})`);
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
connect(token, logger, resume);
}, delay);
}
/**
* Start the moderator bot's Discord Gateway connection.
* Singleton: calling multiple times with the same token is safe (no-op).
*/
export function startModeratorPresence(token: string, logger: Logger): void {
if (started) {
logger.info("dirigent: moderator presence already started, skipping");
return;
}
started = true;
destroyed = false;
reconnectAttempts = 0;
connect(token, logger);
}
/**
* Disconnect the moderator bot.
*/
export function stopModeratorPresence(): void {
destroyed = true;
started = false;
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
cleanup();
}

View File

@@ -8,28 +8,15 @@
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": { "type": "boolean", "default": true },
"discordOnly": { "type": "boolean", "default": true },
"listMode": { "type": "string", "enum": ["human-list", "agent-list"], "default": "human-list" },
"humanList": { "type": "array", "items": { "type": "string" }, "default": [] },
"agentList": { "type": "array", "items": { "type": "string" }, "default": [] },
"channelPoliciesFile": { "type": "string", "default": "~/.openclaw/dirigent-channel-policies.json" },
"bypassUserIds": { "type": "array", "items": { "type": "string" }, "default": [] },
"endSymbols": { "type": "array", "items": { "type": "string" }, "default": ["🔚"] },
"schedulingIdentifier": { "type": "string", "default": "➡️" },
"waitIdentifier": { "type": "string", "default": "👤" },
"noReplyProvider": { "type": "string" },
"noReplyModel": { "type": "string" },
"noReplyPort": { "type": "number", "default": 8787 },
"enableDiscordControlTool": { "type": "boolean", "default": true },
"enableDirigentPolicyTool": { "type": "boolean", "default": true },
"enableDebugLogs": { "type": "boolean", "default": false },
"debugLogChannelIds": { "type": "array", "items": { "type": "string" }, "default": [] },
"moderatorBotToken": { "type": "string" },
"multiMessageStartMarker": { "type": "string", "default": "" },
"multiMessageEndMarker": { "type": "string", "default": "↙️" },
"multiMessagePromptMarker": { "type": "string", "default": "⤵️" }
"scheduleIdentifier": { "type": "string", "default": "" },
"identityFilePath": { "type": "string" },
"channelStoreFilePath": { "type": "string" },
"debugMode": { "type": "boolean", "default": false },
"noReplyProvider": { "type": "string", "default": "dirigent" },
"noReplyModel": { "type": "string", "default": "no-reply" },
"sideCarPort": { "type": "number", "default": 8787 }
},
"required": ["noReplyProvider", "noReplyModel"]
"required": []
}
}

View File

@@ -33,14 +33,23 @@ function parseDiscordChannelIdFromSession(sessionKey: string): string | undefine
return m?.[1];
}
function textResult(text: string) {
return { content: [{ type: "text" as const, text }], details: undefined };
}
function errorResult(text: string) {
return { content: [{ type: "text" as const, text }], details: { error: true } };
}
export function registerDirigentTools(deps: ToolDeps): void {
const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, onDiscussionCreate } = deps;
// ───────────────────────────────────────────────
// dirigent-register
// ───────────────────────────────────────────────
api.registerTool({
api.registerTool((ctx) => ({
name: "dirigent-register",
label: "Dirigent Register",
description: "Register or update this agent's Discord user ID in Dirigent's identity registry.",
parameters: {
type: "object",
@@ -51,18 +60,18 @@ export function registerDirigentTools(deps: ToolDeps): void {
},
required: ["discordUserId"],
},
handler: async (params, ctx) => {
execute: async (_toolCallId: string, params: unknown) => {
const agentId = ctx?.agentId;
if (!agentId) return { content: [{ type: "text", text: "Cannot resolve agentId from session context" }], isError: true };
if (!agentId) return errorResult("Cannot resolve agentId from session context");
const p = params as { discordUserId: string; agentName?: string };
identityRegistry.upsert({
agentId,
discordUserId: p.discordUserId,
agentName: p.agentName ?? agentId,
});
return { content: [{ type: "text", text: `Registered: agentId=${agentId} discordUserId=${p.discordUserId}` }] };
return textResult(`Registered: agentId=${agentId} discordUserId=${p.discordUserId}`);
},
});
}));
// ───────────────────────────────────────────────
// Helper: create channel + set mode
@@ -72,7 +81,6 @@ export function registerDirigentTools(deps: ToolDeps): void {
name: string;
memberDiscordIds: string[];
mode: "chat" | "report" | "work";
callerCtx: { agentId?: string };
}): Promise<{ ok: boolean; channelId?: string; error?: string }> {
if (!moderatorBotToken) return { ok: false, error: "moderatorBotToken not configured" };
@@ -112,6 +120,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
// ───────────────────────────────────────────────
api.registerTool({
name: "create-chat-channel",
label: "Create Chat Channel",
description: "Create a new private Discord channel in the specified guild with mode=chat.",
parameters: {
type: "object",
@@ -126,16 +135,15 @@ export function registerDirigentTools(deps: ToolDeps): void {
},
required: ["guildId", "name"],
},
handler: async (params, ctx) => {
execute: async (_toolCallId: string, params: unknown) => {
const p = params as { guildId: string; name: string; participants?: string[] };
const result = await createManagedChannel({
guildId: p.guildId, name: p.name,
memberDiscordIds: p.participants ?? [],
mode: "chat",
callerCtx: { agentId: ctx?.agentId },
});
if (!result.ok) return { content: [{ type: "text", text: `Failed: ${result.error}` }], isError: true };
return { content: [{ type: "text", text: `Created chat channel: ${result.channelId}` }] };
if (!result.ok) return errorResult(`Failed: ${result.error}`);
return textResult(`Created chat channel: ${result.channelId}`);
},
});
@@ -144,6 +152,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
// ───────────────────────────────────────────────
api.registerTool({
name: "create-report-channel",
label: "Create Report Channel",
description: "Create a new private Discord channel with mode=report. Agents can post to it but are not woken by messages.",
parameters: {
type: "object",
@@ -155,24 +164,24 @@ export function registerDirigentTools(deps: ToolDeps): void {
},
required: ["guildId", "name"],
},
handler: async (params, ctx) => {
execute: async (_toolCallId: string, params: unknown) => {
const p = params as { guildId: string; name: string; members?: string[] };
const result = await createManagedChannel({
guildId: p.guildId, name: p.name,
memberDiscordIds: p.members ?? [],
mode: "report",
callerCtx: { agentId: ctx?.agentId },
});
if (!result.ok) return { content: [{ type: "text", text: `Failed: ${result.error}` }], isError: true };
return { content: [{ type: "text", text: `Created report channel: ${result.channelId}` }] };
if (!result.ok) return errorResult(`Failed: ${result.error}`);
return textResult(`Created report channel: ${result.channelId}`);
},
});
// ───────────────────────────────────────────────
// create-work-channel
// ───────────────────────────────────────────────
api.registerTool({
api.registerTool((ctx) => ({
name: "create-work-channel",
label: "Create Work Channel",
description: "Create a new private Discord workspace channel with mode=work (turn-manager disabled, mode locked).",
parameters: {
type: "object",
@@ -184,7 +193,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
},
required: ["guildId", "name"],
},
handler: async (params, ctx) => {
execute: async (_toolCallId: string, params: unknown) => {
const p = params as { guildId: string; name: string; members?: string[] };
// Include calling agent's Discord ID if known
const callerDiscordId = ctx?.agentId ? identityRegistry.findByAgentId(ctx.agentId)?.discordUserId : undefined;
@@ -195,18 +204,18 @@ export function registerDirigentTools(deps: ToolDeps): void {
guildId: p.guildId, name: p.name,
memberDiscordIds: members,
mode: "work",
callerCtx: { agentId: ctx?.agentId },
});
if (!result.ok) return { content: [{ type: "text", text: `Failed: ${result.error}` }], isError: true };
return { content: [{ type: "text", text: `Created work channel: ${result.channelId}` }] };
if (!result.ok) return errorResult(`Failed: ${result.error}`);
return textResult(`Created work channel: ${result.channelId}`);
},
});
}));
// ───────────────────────────────────────────────
// create-discussion-channel
// ───────────────────────────────────────────────
api.registerTool({
api.registerTool((ctx) => ({
name: "create-discussion-channel",
label: "Create Discussion Channel",
description: "Create a structured discussion channel between agents. The calling agent becomes the initiator.",
parameters: {
type: "object",
@@ -220,7 +229,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
},
required: ["callbackGuildId", "callbackChannelId", "name", "discussionGuide", "participants"],
},
handler: async (params, ctx) => {
execute: async (_toolCallId: string, params: unknown) => {
const p = params as {
callbackGuildId: string;
callbackChannelId: string;
@@ -230,13 +239,13 @@ export function registerDirigentTools(deps: ToolDeps): void {
};
const initiatorAgentId = ctx?.agentId;
if (!initiatorAgentId) {
return { content: [{ type: "text", text: "Cannot resolve initiator agentId from session" }], isError: true };
return errorResult("Cannot resolve initiator agentId from session");
}
if (!moderatorBotToken) {
return { content: [{ type: "text", text: "moderatorBotToken not configured" }], isError: true };
return errorResult("moderatorBotToken not configured");
}
if (!onDiscussionCreate) {
return { content: [{ type: "text", text: "Discussion service not available" }], isError: true };
return errorResult("Discussion service not available");
}
const botId = getBotUserIdFromToken(moderatorBotToken);
@@ -262,7 +271,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
logger: api.logger,
});
} catch (err) {
return { content: [{ type: "text", text: `Failed to create channel: ${String(err)}` }], isError: true };
return errorResult(`Failed to create channel: ${String(err)}`);
}
try {
@@ -273,7 +282,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
concluded: false,
});
} catch (err) {
return { content: [{ type: "text", text: `Failed to register channel: ${String(err)}` }], isError: true };
return errorResult(`Failed to register channel: ${String(err)}`);
}
await onDiscussionCreate({
@@ -286,15 +295,16 @@ export function registerDirigentTools(deps: ToolDeps): void {
participants: p.participants,
});
return { content: [{ type: "text", text: `Discussion channel created: ${channelId}` }] };
return textResult(`Discussion channel created: ${channelId}`);
},
});
}));
// ───────────────────────────────────────────────
// discussion-complete
// ───────────────────────────────────────────────
api.registerTool({
api.registerTool((ctx) => ({
name: "discussion-complete",
label: "Discussion Complete",
description: "Mark a discussion as complete, archive the channel, and post the summary path to the callback channel.",
parameters: {
type: "object",
@@ -305,31 +315,25 @@ export function registerDirigentTools(deps: ToolDeps): void {
},
required: ["discussionChannelId", "summary"],
},
handler: async (params, ctx) => {
execute: async (_toolCallId: string, params: unknown) => {
const p = params as { discussionChannelId: string; summary: string };
const callerAgentId = ctx?.agentId;
if (!callerAgentId) {
return { content: [{ type: "text", text: "Cannot resolve agentId from session" }], isError: true };
return errorResult("Cannot resolve agentId from session");
}
const rec = channelStore.getRecord(p.discussionChannelId);
if (rec.mode !== "discussion") {
return { content: [{ type: "text", text: `Channel ${p.discussionChannelId} is not a discussion channel` }], isError: true };
return errorResult(`Channel ${p.discussionChannelId} is not a discussion channel`);
}
if (!rec.discussion) {
return { content: [{ type: "text", text: "Discussion metadata not found" }], isError: true };
return errorResult("Discussion metadata not found");
}
if (rec.discussion.initiatorAgentId !== callerAgentId) {
return {
content: [{ type: "text", text: `Only the initiator (${rec.discussion.initiatorAgentId}) may call discussion-complete` }],
isError: true,
};
return errorResult(`Only the initiator (${rec.discussion.initiatorAgentId}) may call discussion-complete`);
}
if (!p.summary.includes("discussion-summary")) {
return {
content: [{ type: "text", text: "Summary path must be under {workspace}/discussion-summary/" }],
isError: true,
};
return errorResult("Summary path must be under {workspace}/discussion-summary/");
}
channelStore.concludeDiscussion(p.discussionChannelId);
@@ -343,7 +347,7 @@ export function registerDirigentTools(deps: ToolDeps): void {
).catch(() => undefined);
}
return { content: [{ type: "text", text: `Discussion ${p.discussionChannelId} concluded. Summary posted to ${rec.discussion.callbackChannelId}.` }] };
return textResult(`Discussion ${p.discussionChannelId} concluded. Summary posted to ${rec.discussion.callbackChannelId}.`);
},
});
}));
}

112
plugin/web/dirigent-api.ts Normal file
View File

@@ -0,0 +1,112 @@
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
import type { ChannelStore } from "../core/channel-store.js";
type Deps = {
api: OpenClawPluginApi;
channelStore: ChannelStore;
moderatorBotUserId: string | undefined;
scheduleIdentifier: string;
moderatorServiceUrl: string | undefined;
moderatorServiceToken: string | undefined;
debugMode: boolean;
onNewMessage: (event: {
channelId: string;
messageId: string;
senderId: string;
guildId?: string;
}) => Promise<void>;
};
function sendJson(res: import("node:http").ServerResponse, status: number, payload: unknown): void {
res.writeHead(status, { "Content-Type": "application/json; charset=utf-8" });
res.end(JSON.stringify(payload));
}
function readBody(req: import("node:http").IncomingMessage): Promise<Record<string, unknown>> {
return new Promise((resolve, reject) => {
let body = "";
req.on("data", (chunk: Buffer) => {
body += chunk.toString();
if (body.length > 1_000_000) {
req.destroy();
reject(new Error("body too large"));
}
});
req.on("end", () => {
try {
resolve(body ? (JSON.parse(body) as Record<string, unknown>) : {});
} catch {
reject(new Error("invalid_json"));
}
});
req.on("error", reject);
});
}
/**
* Register Dirigent plugin HTTP routes that the moderator service calls back into.
*
* Routes:
* POST /dirigent/api/moderator/message — inbound message notification from moderator service
* GET /dirigent/api/moderator/status — health/status check
*/
export function registerDirigentApi(deps: Deps): void {
const { api, moderatorServiceUrl, onNewMessage } = deps;
// ── POST /dirigent/api/moderator/message ─────────────────────────────────────
// Called by the moderator service on every Discord MESSAGE_CREATE event.
api.registerHttpRoute({
path: "/dirigent/api/moderator/message",
auth: "plugin",
match: "exact",
handler: async (req, res) => {
if (req.method !== "POST") {
res.writeHead(405);
res.end();
return;
}
let body: Record<string, unknown>;
try {
body = await readBody(req);
} catch (err) {
return sendJson(res, 400, { ok: false, error: String(err) });
}
const channelId = typeof body.channelId === "string" ? body.channelId : undefined;
const messageId = typeof body.messageId === "string" ? body.messageId : undefined;
const senderId = typeof body.senderId === "string" ? body.senderId : undefined;
const guildId = typeof body.guildId === "string" ? body.guildId : undefined;
if (!channelId || !senderId) {
return sendJson(res, 400, { ok: false, error: "channelId and senderId required" });
}
try {
await onNewMessage({
channelId,
messageId: messageId ?? "",
senderId,
guildId,
});
return sendJson(res, 200, { ok: true });
} catch (err) {
api.logger.warn(`dirigent: moderator/message handler error: ${String(err)}`);
return sendJson(res, 500, { ok: false, error: String(err) });
}
},
});
// ── GET /dirigent/api/moderator/status ───────────────────────────────────────
api.registerHttpRoute({
path: "/dirigent/api/moderator/status",
auth: "plugin",
match: "exact",
handler: (_req, res) => {
return sendJson(res, 200, {
ok: true,
moderatorServiceUrl: moderatorServiceUrl ?? null,
});
},
});
}