diff --git a/Makefile b/Makefile index 8bc67c5..868cacc 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: check check-rules test-api up down smoke render-config package-plugin +.PHONY: check check-rules check-files smoke install check: cd plugin && npm run check @@ -6,21 +6,11 @@ check: check-rules: node scripts/validate-rules.mjs -test-api: - node scripts/test-no-reply-api.mjs - -up: - ./scripts/dev-up.sh - -down: - ./scripts/dev-down.sh +check-files: + node scripts/check-plugin-files.mjs smoke: ./scripts/smoke-no-reply-api.sh -render-config: - node scripts/render-openclaw-config.mjs - -package-plugin: - node scripts/package-plugin.mjs - +install: + node scripts/install.mjs --install diff --git a/README.md b/README.md index a6a5ae4..3e7eb49 100644 --- a/README.md +++ b/README.md @@ -1,144 +1,117 @@ # Dirigent -Rule-based no-reply gate + turn manager for OpenClaw (Discord). +Turn-management and moderation plugin for OpenClaw (Discord). > Formerly known as WhisperGate. Renamed to Dirigent in v0.2.0. ## What it does -Dirigent adds deterministic logic **before model selection** and **turn-based speaking** for multi-agent Discord channels: +Dirigent adds deterministic routing and **turn-based speaking** for multi-agent Discord channels: -- **Rule gate (before_model_resolve)** - 1. Non-Discord → skip - 2. Sender in bypass list / human list → skip - 3. Message ends with configured end symbol → skip - 4. Otherwise → route to no-reply model/provider +- **Rule gate (`before_model_resolve`)** + - Non-speaker agents → routed to no-reply model (silent turn) + - Dormant channels → all agents suppressed until a human message wakes them + - Configurable per-channel mode: `chat`, `work`, `discussion`, `report`, `none` -- **End-symbol enforcement** - - Injects instruction: `Your response MUST end with 🔚…` - - In group chats, also injects: "If not relevant, reply NO_REPLY" +- **Turn management** + - Only the current speaker responds; others are silenced + - Turn advances when the current speaker ends with a configured symbol or returns `NO_REPLY` + - Full round of silence → channel enters **dormant** state + - Human message → wakes dormant channel, triggers first speaker -- **Scheduling identifier (moderator handoff)** - - Configurable identifier (default: `➡️`) used by the moderator bot - - Handoff format: `<@TARGET_USER_ID>➡️` (non-semantic, just a scheduling signal) - - Agent receives instruction explaining the identifier is meaningless — check chat history and decide +- **Moderator bot sidecar** + - Dedicated Discord Gateway connection (separate bot token) for real-time message push + - Sends schedule-trigger messages (`<@USER_ID>➡️`) to signal speaker turns + - Notifies the plugin via HTTP callback on new messages (wake/interrupt) -- **Turn-based speaking (multi-bot)** - - Only the current speaker is allowed to respond - - Others are forced to no-reply - - Turn advances on **end-symbol** or **NO_REPLY** - - If all bots NO_REPLY, channel becomes **dormant** until a new human message +- **Discussion mode** + - Agents can initiate a structured discussion via `create-discussion-channel` tool + - Initiator calls `discussion-complete` to conclude; summary is posted to the callback channel -- **Agent identity injection** - - Injects agent name, Discord accountId, and Discord userId into group chat prompts - -- **Human @mention override** - - When a `humanList` user @mentions agents, temporarily overrides turn order - - Only mentioned agents cycle; original order restores when cycle completes - -- **Per-channel policy runtime** - - Policies stored in a standalone JSON file - - Update at runtime via `dirigent_policy_set` / `dirigent_policy_delete` tools - -- **Discord control actions (optional)** - - Private channel create/update + member list - - Via `dirigent_channel_create`, `dirigent_channel_update`, `dirigent_member_list` tools +- **Channel management tools** + - `create-chat-channel`, `create-work-channel`, `create-report-channel` — create typed channels + - `create-discussion-channel`, `discussion-complete` — discussion lifecycle + - `dirigent-register` — register an agent identity --- ## Repo layout -- `plugin/` — OpenClaw plugin (gate + turn manager + moderator presence) -- `no-reply-api/` — OpenAI-compatible API that always returns `NO_REPLY` -- Discord admin actions are now handled in-plugin via direct Discord REST API calls (no sidecar service) -- `docs/` — rollout, integration, run-mode notes, turn-wakeup analysis -- `scripts/` — smoke/dev/helper checks -- `Makefile` — common dev commands (`make check`, `make check-rules`, `make test-api`, `make smoke-discord-control`, `make up`) -- `CHANGELOG.md` — milestone summary +``` +plugin/ OpenClaw plugin (hooks, tools, commands, web UI) + core/ Channel store, identity registry, moderator REST helpers + hooks/ before_model_resolve, agent_end, message_received + tools/ Agent-facing tools + commands/ Slash commands + web/ Control page + Dirigent API (HTTP routes) +services/ Sidecar process — spawned automatically by the plugin + main.mjs Unified entry point, routes /no-reply/* and /moderator/* + no-reply-api/ OpenAI-compatible server that always returns NO_REPLY + moderator/ Discord Gateway client + HTTP control endpoints +scripts/ Dev helpers +docs/ Architecture and integration notes +``` --- -## Quick start (no Docker) +## Installation ```bash -cd no-reply-api -node server.mjs +node scripts/install.mjs --install ``` -Then render config snippet: +This copies `plugin/` and `services/` into the OpenClaw plugin directory and registers skills. + +--- + +## Sidecar + +The sidecar (`services/main.mjs`) is spawned automatically when openclaw-gateway starts. It exposes: + +| Path prefix | Description | +|---|---| +| `/no-reply/*` | No-reply model API (`/v1/chat/completions`, `/v1/responses`) | +| `/moderator/*` | Moderator bot control (`/send`, `/create-channel`, `/me`, …) | +| `/health` | Combined health check | + +Port is configured via `sideCarPort` (default `8787`). + +Smoke-test after gateway start: ```bash -node scripts/render-openclaw-config.mjs -``` - -See `docs/RUN_MODES.md` for Docker mode. -Discord extension capabilities: `docs/DISCORD_CONTROL.md`. - ---- - -## Runtime tools & commands - -### Tools (6 individual tools) - -**Discord control:** -- `dirigent_discord_channel_create` — Create private channel -- `dirigent_discord_channel_update` — Update channel permissions -- `dirigent_discord_member_list` — List guild members - -**Policy management:** -- `dirigent_policy_get` — Get all policies -- `dirigent_policy_set` — Set/update channel policy -- `dirigent_policy_delete` — Delete channel policy - -> Turn management is internal to the plugin (not exposed as tools). - -> See `FEAT.md` for full feature documentation. - -### Slash command (Discord) - -``` -/dirigent status -/dirigent turn-status -/dirigent turn-advance -/dirigent turn-reset -/dirigent turn-shuffling -/dirigent turn-shuffling on -/dirigent turn-shuffling off +make smoke +# or: +./scripts/smoke-no-reply-api.sh ``` --- -## Config highlights +## Plugin config -Common options (see `docs/INTEGRATION.md`): +Key options (in `openclaw.json` under `plugins.entries.dirigent.config`): -- `listMode`: `human-list` or `agent-list` -- `humanList`, `agentList` -- `endSymbols` -- `schedulingIdentifier` (default `➡️`) -- `waitIdentifier` (default `👤`) — agent ends with this to pause all agents until human replies -- `channelPoliciesFile` (per-channel overrides) -- `moderatorBotToken` (handoff messages) -- `multiMessageStartMarker` (default `↗️`) -- `multiMessageEndMarker` (default `↙️`) -- `multiMessagePromptMarker` (default `⤵️`) -- `enableDebugLogs`, `debugLogChannelIds` - -Shuffle mode does not currently have a global config key. It is a per-channel runtime toggle, defaults to off, and is controlled with `/dirigent turn-shuffling ...`. +| Key | Default | Description | +|---|---|---| +| `moderatorBotToken` | — | Discord bot token for the moderator/sidecar bot | +| `scheduleIdentifier` | `➡️` | Symbol appended to schedule-trigger mentions | +| `listMode` | `human-list` | `human-list` or `agent-list` | +| `humanList` | `[]` | Discord user IDs treated as humans (bypass turn gate) | +| `agentList` | `[]` | Discord user IDs treated as agents (when `listMode=agent-list`) | +| `noReplyProvider` | `dirigent` | Provider ID for the no-reply model | +| `noReplyModel` | `no-reply` | Model ID for the no-reply model | +| `sideCarPort` | `8787` | Port the sidecar listens on | +| `debugMode` | `false` | Enable verbose debug logging | +| `debugLogChannelIds` | `[]` | Channel IDs that receive debug log messages | +| `channelPoliciesFile` | `~/.openclaw/dirigent-channel-policies.json` | Per-channel policy overrides | --- -## Development plan (incremental commits) +## Dev commands -- [x] Task 1: project docs + structure -- [x] Task 2: no-reply API MVP -- [x] Task 3: plugin MVP with rule chain -- [x] Task 4: sample config + quick verification scripts -- [x] Task 5: plugin rule extraction + hardening -- [x] Task 6: containerization + compose -- [x] Task 7: plugin usage notes -- [x] Task 8: sender normalization + TTL + one-shot decision -- [x] Task 9: auth-aware no-reply API -- [x] Task 10: smoke test helpers -- [x] Task 11: plugin structure checker -- [x] Task 12: rollout checklist +```bash +make check # TypeScript check (plugin/) +make check-rules # Validate rule-case fixtures +make check-files # Verify required files exist +make smoke # Smoke-test no-reply endpoint (sidecar must be running) +make install # Install plugin + sidecar into OpenClaw +``` diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index e550422..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,11 +0,0 @@ -services: - dirigent-no-reply-api: - build: - context: ./no-reply-api - container_name: dirigent-no-reply-api - ports: - - "8787:8787" - environment: - - PORT=8787 - - NO_REPLY_MODEL=dirigent-no-reply-v1 - restart: unless-stopped diff --git a/no-reply-api/.dockerignore b/no-reply-api/.dockerignore deleted file mode 100644 index 93f1361..0000000 --- a/no-reply-api/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -node_modules -npm-debug.log diff --git a/no-reply-api/Dockerfile b/no-reply-api/Dockerfile deleted file mode 100644 index b5c7e50..0000000 --- a/no-reply-api/Dockerfile +++ /dev/null @@ -1,7 +0,0 @@ -FROM node:22-alpine -WORKDIR /app -COPY package.json ./ -COPY server.mjs ./ -EXPOSE 8787 -ENV PORT=8787 -CMD ["node", "server.mjs"] diff --git a/no-reply-api/package-lock.json b/no-reply-api/package-lock.json deleted file mode 100644 index 841c161..0000000 --- a/no-reply-api/package-lock.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "name": "dirigent-no-reply-api", - "version": "0.1.0", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "name": "dirigent-no-reply-api", - "version": "0.1.0" - } - } -} diff --git a/no-reply-api/package.json b/no-reply-api/package.json deleted file mode 100644 index 212323f..0000000 --- a/no-reply-api/package.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "name": "dirigent-no-reply-api", - "version": "0.1.0", - "private": true, - "type": "module", - "scripts": { - "start": "node server.mjs" - } -} diff --git a/no-reply-api/server.mjs b/no-reply-api/server.mjs deleted file mode 100644 index da9dd70..0000000 --- a/no-reply-api/server.mjs +++ /dev/null @@ -1,112 +0,0 @@ -import http from "node:http"; - -const port = Number(process.env.PORT || 8787); -const modelName = process.env.NO_REPLY_MODEL || "no-reply"; -const authToken = process.env.AUTH_TOKEN || ""; - -function sendJson(res, status, payload) { - res.writeHead(status, { "Content-Type": "application/json; charset=utf-8" }); - res.end(JSON.stringify(payload)); -} - -function isAuthorized(req) { - if (!authToken) return true; - const header = req.headers.authorization || ""; - return header === `Bearer ${authToken}`; -} - -function noReplyChatCompletion(reqBody) { - return { - id: `chatcmpl_dirigent_${Date.now()}`, - object: "chat.completion", - created: Math.floor(Date.now() / 1000), - model: reqBody?.model || modelName, - choices: [ - { - index: 0, - message: { role: "assistant", content: "NO_REPLY" }, - finish_reason: "stop" - } - ], - usage: { prompt_tokens: 0, completion_tokens: 1, total_tokens: 1 } - }; -} - -function noReplyResponses(reqBody) { - return { - id: `resp_dirigent_${Date.now()}`, - object: "response", - created_at: Math.floor(Date.now() / 1000), - model: reqBody?.model || modelName, - output: [ - { - type: "message", - role: "assistant", - content: [{ type: "output_text", text: "NO_REPLY" }] - } - ], - usage: { input_tokens: 0, output_tokens: 1, total_tokens: 1 } - }; -} - -function listModels() { - return { - object: "list", - data: [ - { - id: modelName, - object: "model", - created: Math.floor(Date.now() / 1000), - owned_by: "dirigent" - } - ] - }; -} - -const server = http.createServer((req, res) => { - if (req.method === "GET" && req.url === "/health") { - return sendJson(res, 200, { ok: true, service: "dirigent-no-reply-api", model: modelName }); - } - - if (req.method === "GET" && req.url === "/v1/models") { - if (!isAuthorized(req)) return sendJson(res, 401, { error: "unauthorized" }); - return sendJson(res, 200, listModels()); - } - - if (req.method !== "POST") { - return sendJson(res, 404, { error: "not_found" }); - } - - if (!isAuthorized(req)) { - return sendJson(res, 401, { error: "unauthorized" }); - } - - let body = ""; - req.on("data", (chunk) => { - body += chunk; - if (body.length > 1_000_000) req.destroy(); - }); - - req.on("end", () => { - let parsed = {}; - try { - parsed = body ? JSON.parse(body) : {}; - } catch { - return sendJson(res, 400, { error: "invalid_json" }); - } - - if (req.url === "/v1/chat/completions") { - return sendJson(res, 200, noReplyChatCompletion(parsed)); - } - - if (req.url === "/v1/responses") { - return sendJson(res, 200, noReplyResponses(parsed)); - } - - return sendJson(res, 404, { error: "not_found" }); - }); -}); - -server.listen(port, () => { - console.log(`[dirigent-no-reply-api] listening on :${port}`); -}); diff --git a/package.json b/package.json index aec4f9e..982e0f8 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "files": [ "dist/", "plugin/", - "no-reply-api/", + "services/", "docs/", "scripts/install.mjs", @@ -17,7 +17,7 @@ "TASKLIST.md" ], "scripts": { - "prepare": "mkdir -p dist/dirigent && cp -r plugin/* dist/dirigent/", + "prepare": "mkdir -p dist/dirigent && cp -r plugin/* dist/dirigent/ && cp -r services dist/dirigent/services", "test": "node --test --experimental-strip-types test/**/*.test.ts", "postinstall": "node scripts/install.mjs --install", "uninstall": "node scripts/install.mjs --uninstall", diff --git a/plugin/core/no-reply-process.ts b/plugin/core/no-reply-process.ts deleted file mode 100644 index ce43217..0000000 --- a/plugin/core/no-reply-process.ts +++ /dev/null @@ -1,51 +0,0 @@ -import fs from "node:fs"; -import path from "node:path"; -import { spawn, type ChildProcess } from "node:child_process"; - -let noReplyProcess: ChildProcess | null = null; - -export function startNoReplyApi( - logger: { info: (m: string) => void; warn: (m: string) => void }, - pluginDir: string, - port = 8787, -): void { - logger.info(`dirigent: startNoReplyApi called, pluginDir=${pluginDir}`); - - if (noReplyProcess) { - logger.info("dirigent: no-reply API already running, skipping"); - return; - } - - const serverPath = path.resolve(pluginDir, "no-reply-api", "server.mjs"); - logger.info(`dirigent: resolved serverPath=${serverPath}`); - - if (!fs.existsSync(serverPath)) { - logger.warn(`dirigent: no-reply API server not found at ${serverPath}, skipping`); - return; - } - - logger.info("dirigent: no-reply API server found, spawning process..."); - - noReplyProcess = spawn(process.execPath, [serverPath], { - env: { ...process.env, PORT: String(port) }, - stdio: ["ignore", "pipe", "pipe"], - detached: false, - }); - - noReplyProcess.stdout?.on("data", (d: Buffer) => logger.info(`dirigent: no-reply-api: ${d.toString().trim()}`)); - noReplyProcess.stderr?.on("data", (d: Buffer) => logger.warn(`dirigent: no-reply-api: ${d.toString().trim()}`)); - - noReplyProcess.on("exit", (code, signal) => { - logger.info(`dirigent: no-reply API exited (code=${code}, signal=${signal})`); - noReplyProcess = null; - }); - - logger.info(`dirigent: no-reply API started (pid=${noReplyProcess.pid}, port=${port})`); -} - -export function stopNoReplyApi(logger: { info: (m: string) => void }): void { - if (!noReplyProcess) return; - logger.info("dirigent: stopping no-reply API"); - noReplyProcess.kill("SIGTERM"); - noReplyProcess = null; -} diff --git a/plugin/core/sidecar-process.ts b/plugin/core/sidecar-process.ts new file mode 100644 index 0000000..4d1820e --- /dev/null +++ b/plugin/core/sidecar-process.ts @@ -0,0 +1,119 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { spawn, type ChildProcess } from "node:child_process"; + +let noReplyProcess: ChildProcess | null = null; + +const LOCK_FILE = path.join(os.tmpdir(), "dirigent-sidecar.lock"); + +function readLock(): { pid: number } | null { + try { + const raw = fs.readFileSync(LOCK_FILE, "utf8").trim(); + return { pid: Number(raw) }; + } catch { + return null; + } +} + +function writeLock(pid: number): void { + try { fs.writeFileSync(LOCK_FILE, String(pid)); } catch { /* ignore */ } +} + +function clearLock(): void { + try { fs.unlinkSync(LOCK_FILE); } catch { /* ignore */ } +} + +function isLockHeld(): boolean { + const lock = readLock(); + if (!lock) return false; + try { + process.kill(lock.pid, 0); + return true; + } catch { + return false; + } +} + +export function startSideCar( + logger: { info: (m: string) => void; warn: (m: string) => void }, + pluginDir: string, + port = 8787, + moderatorToken?: string, + pluginApiToken?: string, + gatewayPort?: number, + debugMode?: boolean, +): void { + logger.info(`dirigent: startSideCar called, pluginDir=${pluginDir}`); + + if (noReplyProcess) { + logger.info("dirigent: no-reply API already running (local ref), skipping"); + return; + } + + if (isLockHeld()) { + logger.info("dirigent: no-reply API already running (lock file), skipping"); + return; + } + + // services/main.mjs lives alongside the plugin directory in the distribution + const serverPath = path.resolve(pluginDir, "services", "main.mjs"); + logger.info(`dirigent: resolved serverPath=${serverPath}`); + + if (!fs.existsSync(serverPath)) { + logger.warn(`dirigent: services/main.mjs not found at ${serverPath}, skipping`); + return; + } + + logger.info("dirigent: services/main.mjs found, spawning process..."); + + // Build plugin API URL from gateway port, or use a default + const pluginApiUrl = gatewayPort + ? `http://127.0.0.1:${gatewayPort}` + : "http://127.0.0.1:18789"; + + const env: NodeJS.ProcessEnv = { + ...process.env, + SERVICES_PORT: String(port), + PLUGIN_API_URL: pluginApiUrl, + }; + + if (moderatorToken) { + env.MODERATOR_TOKEN = moderatorToken; + } + if (pluginApiToken) { + env.PLUGIN_API_TOKEN = pluginApiToken; + } + if (debugMode !== undefined) { + env.DEBUG_MODE = debugMode ? "true" : "false"; + } + + noReplyProcess = spawn(process.execPath, [serverPath], { + env, + stdio: ["ignore", "pipe", "pipe"], + detached: false, + }); + + if (noReplyProcess.pid) { + writeLock(noReplyProcess.pid); + } + + noReplyProcess.stdout?.on("data", (d: Buffer) => logger.info(`dirigent: services: ${d.toString().trim()}`)); + noReplyProcess.stderr?.on("data", (d: Buffer) => logger.warn(`dirigent: services: ${d.toString().trim()}`)); + + noReplyProcess.on("exit", (code, signal) => { + logger.info(`dirigent: services exited (code=${code}, signal=${signal})`); + clearLock(); + noReplyProcess = null; + }); + + logger.info(`dirigent: services started (pid=${noReplyProcess.pid}, port=${port})`); +} + +export function stopSideCar(logger: { info: (m: string) => void }): void { + if (!noReplyProcess) return; + logger.info("dirigent: stopping sidecar"); + noReplyProcess.kill("SIGTERM"); + noReplyProcess = null; + clearLock(); +} diff --git a/plugin/hooks/agent-end.ts b/plugin/hooks/agent-end.ts index dddd57f..3c01724 100644 --- a/plugin/hooks/agent-end.ts +++ b/plugin/hooks/agent-end.ts @@ -176,8 +176,6 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { return; } - clearTurnPending(channelId, agentId); - api.logger.info( `dirigent: agent_end channel=${channelId} agentId=${agentId} empty=${empty} text=${finalText.slice(0, 80)}`, ); @@ -186,6 +184,10 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { // Real turn: wait for Discord delivery before triggering next speaker. // Anchor was set in before_model_resolve just before the LLM call, so any // message from the agent after the anchor must be from this turn. + // NOTE: clearTurnPending is intentionally deferred until after pollForTailMatch + // returns. While waiting, isTurnPending remains true so that any re-trigger of + // this agent is correctly treated as a self-wakeup (suppressed), preventing it + // from starting a second real turn during the tail-match window. const identity = identityRegistry.findByAgentId(agentId); if (identity && moderatorBotToken) { const anchorId = getAnchor(channelId, agentId) ?? "0"; @@ -202,6 +204,7 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { if (isDormant(channelId)) { // Channel is dormant: a new external message woke it — restart from first speaker api.logger.info(`dirigent: tail-match interrupted (dormant) channel=${channelId} — waking`); + clearTurnPending(channelId, agentId); const first = wakeFromDormant(channelId); if (first) await triggerNextSpeaker(channelId, first); return; @@ -243,6 +246,12 @@ export function registerAgentEndHook(deps: AgentEndDeps): InterruptFn { advancingChannels.delete(channelId); } + // Clear turn pending AFTER advanceSpeaker completes. This ensures isTurnPending + // remains true during the async rebuildFn window at cycle boundaries, preventing + // re-triggers from starting a second real turn while currentIndex is still at the + // outgoing speaker's position. + clearTurnPending(channelId, agentId); + if (enteredDormant) { api.logger.info(`dirigent: channel=${channelId} entered dormant`); if (mode === "discussion") { diff --git a/plugin/hooks/message-received.ts b/plugin/hooks/message-received.ts index 35540b7..cc7e898 100644 --- a/plugin/hooks/message-received.ts +++ b/plugin/hooks/message-received.ts @@ -3,7 +3,7 @@ import type { ChannelStore } from "../core/channel-store.js"; import type { IdentityRegistry } from "../core/identity-registry.js"; import { parseDiscordChannelId } from "./before-model-resolve.js"; import { isDormant, wakeFromDormant, isCurrentSpeaker, hasSpeakers, setSpeakerList, getInitializingChannels, type SpeakerEntry } from "../turn-manager.js"; -import { sendAndDelete, sendModeratorMessage, userIdFromBotToken } from "../core/moderator-discord.js"; +import { sendScheduleTrigger, userIdFromBotToken } from "../core/moderator-discord.js"; import { fetchVisibleChannelBotAccountIds } from "../core/channel-members.js"; import type { InterruptFn } from "./agent-end.js"; @@ -14,24 +14,17 @@ type Deps = { moderatorBotToken: string | undefined; scheduleIdentifier: string; interruptTailMatch: InterruptFn; + debugMode: boolean; + /** + * When true, the moderator service handles wake-from-dormant and + * interrupt-tail-match via HTTP callback. This hook only runs speaker-list + * initialization in that case. + */ + moderatorHandlesMessages?: boolean; }; -/** - * Process-level dedup for concluded-discussion auto-replies. - * Multiple agent VM contexts all fire message_received for the same incoming message; - * only the first should send the "This discussion is closed" reply. - * Keyed on channelId:messageId; evicted after 500 entries. - */ -const _CONCLUDED_REPLY_DEDUP_KEY = "_dirigentConcludedReplyDedup"; -if (!(globalThis as Record)[_CONCLUDED_REPLY_DEDUP_KEY]) { - (globalThis as Record)[_CONCLUDED_REPLY_DEDUP_KEY] = new Set(); -} -const concludedReplyDedup: Set = (globalThis as Record)[_CONCLUDED_REPLY_DEDUP_KEY] as Set; - export function registerMessageReceivedHook(deps: Deps): void { - const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, interruptTailMatch } = deps; - // Derive the moderator bot's own Discord user ID so we can skip self-messages - // from waking dormant channels (idle reminders must not re-trigger the cycle). + const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, interruptTailMatch, debugMode, moderatorHandlesMessages } = deps; const moderatorBotUserId = moderatorBotToken ? userIdFromBotToken(moderatorBotToken) : undefined; api.on("message_received", async (event, ctx) => { @@ -42,23 +35,19 @@ export function registerMessageReceivedHook(deps: Deps): void { // Extract Discord channel ID from ctx or event metadata let channelId: string | undefined; - // ctx.channelId may be bare "1234567890" or "channel:1234567890" if (typeof c.channelId === "string") { const bare = c.channelId.match(/^(\d+)$/)?.[1] ?? c.channelId.match(/:(\d+)$/)?.[1]; if (bare) channelId = bare; } - // fallback: sessionKey (per-session api instances) if (!channelId && typeof c.sessionKey === "string") { channelId = parseDiscordChannelId(c.sessionKey); } - // fallback: metadata.to / originatingTo = "channel:1234567890" if (!channelId) { const metadata = e.metadata as Record | undefined; const to = String(metadata?.to ?? metadata?.originatingTo ?? ""); const toMatch = to.match(/:(\d+)$/); if (toMatch) channelId = toMatch[1]; } - // fallback: conversation_info if (!channelId) { const metadata = e.metadata as Record | undefined; const convInfo = metadata?.conversation_info as Record | undefined; @@ -69,45 +58,12 @@ export function registerMessageReceivedHook(deps: Deps): void { const mode = channelStore.getMode(channelId); - // dead: suppress routing entirely (OpenClaw handles no-route automatically, - // but we handle archived auto-reply here) if (mode === "report") return; - - // archived: auto-reply via moderator (deduped — only one agent instance should reply) - if (mode === "discussion") { - const rec = channelStore.getRecord(channelId); - if (rec.discussion?.concluded && moderatorBotToken) { - const metadata = e.metadata as Record | undefined; - const convInfo = metadata?.conversation_info as Record | undefined; - const incomingMsgId = String( - convInfo?.message_id ?? - metadata?.message_id ?? - metadata?.messageId ?? - e.id ?? "", - ); - const dedupKey = `${channelId}:${incomingMsgId}`; - if (!concludedReplyDedup.has(dedupKey)) { - concludedReplyDedup.add(dedupKey); - if (concludedReplyDedup.size > 500) { - const oldest = concludedReplyDedup.values().next().value; - if (oldest) concludedReplyDedup.delete(oldest); - } - await sendModeratorMessage( - moderatorBotToken, channelId, - "This discussion is closed and no longer active.", - api.logger, - ).catch(() => undefined); - } - return; - } - } - if (mode === "none" || mode === "work") return; - // chat / discussion (active): initialize speaker list on first message if needed + // ── Speaker-list initialization (always runs, even with moderator service) ── const initializingChannels = getInitializingChannels(); if (!hasSpeakers(channelId) && moderatorBotToken) { - // Guard against concurrent initialization from multiple VM contexts if (initializingChannels.has(channelId)) { api.logger.info(`dirigent: message_received init in progress, skipping channel=${channelId}`); return; @@ -126,7 +82,7 @@ export function registerMessageReceivedHook(deps: Deps): void { setSpeakerList(channelId, speakers); const first = speakers[0]; api.logger.info(`dirigent: initialized speaker list channel=${channelId} speakers=${speakers.map(s => s.agentId).join(",")}`); - await sendAndDelete(moderatorBotToken, channelId, `<@${first.discordUserId}>${scheduleIdentifier}`, api.logger); + await sendScheduleTrigger(moderatorBotToken, channelId, `<@${first.discordUserId}>${scheduleIdentifier}`, api.logger, debugMode); return; } } finally { @@ -134,8 +90,8 @@ export function registerMessageReceivedHook(deps: Deps): void { } } - // chat / discussion (active): check if this is an external message - // that should interrupt an in-progress tail-match or wake dormant + // ── Wake / interrupt (skipped when moderator service handles it via HTTP callback) ── + if (moderatorHandlesMessages) return; const senderId = String( (e.metadata as Record)?.senderId ?? @@ -143,7 +99,6 @@ export function registerMessageReceivedHook(deps: Deps): void { e.from ?? "", ); - // Identify the sender: is it the current speaker's Discord account? const currentSpeakerIsThisSender = (() => { if (!senderId) return false; const entry = identityRegistry.findByDiscordUserId(senderId); @@ -152,17 +107,16 @@ export function registerMessageReceivedHook(deps: Deps): void { })(); if (!currentSpeakerIsThisSender) { - // Non-current-speaker posted — interrupt any tail-match in progress - interruptTailMatch(channelId); - api.logger.info(`dirigent: message_received interrupt tail-match channel=${channelId} senderId=${senderId}`); + if (senderId !== moderatorBotUserId) { + interruptTailMatch(channelId); + api.logger.info(`dirigent: message_received interrupt tail-match channel=${channelId} senderId=${senderId}`); + } - // Wake from dormant if needed — but ignore the moderator bot's own messages - // (e.g. idle reminder) to prevent it from immediately re-waking the channel. if (isDormant(channelId) && moderatorBotToken && senderId !== moderatorBotUserId) { const first = wakeFromDormant(channelId); if (first) { const msg = `<@${first.discordUserId}>${scheduleIdentifier}`; - await sendAndDelete(moderatorBotToken, channelId, msg, api.logger); + await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, debugMode); api.logger.info(`dirigent: woke dormant channel=${channelId} first speaker=${first.agentId}`); } } diff --git a/plugin/index.ts b/plugin/index.ts index d8a8295..a22483b 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -4,8 +4,7 @@ import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import { IdentityRegistry } from "./core/identity-registry.js"; import { ChannelStore } from "./core/channel-store.js"; import { scanPaddedCell } from "./core/padded-cell.js"; -import { startNoReplyApi, stopNoReplyApi } from "./core/no-reply-process.js"; -import { startModeratorPresence, stopModeratorPresence } from "./moderator-presence.js"; +import { startSideCar, stopSideCar } from "./core/sidecar-process.js"; import { registerBeforeModelResolveHook } from "./hooks/before-model-resolve.js"; import { registerAgentEndHook } from "./hooks/agent-end.js"; import { registerMessageReceivedHook } from "./hooks/message-received.js"; @@ -13,33 +12,44 @@ import { registerDirigentTools } from "./tools/register-tools.js"; import { registerSetChannelModeCommand } from "./commands/set-channel-mode-command.js"; import { registerAddGuildCommand } from "./commands/add-guild-command.js"; import { registerControlPage } from "./web/control-page.js"; -import { sendModeratorMessage, sendAndDelete } from "./core/moderator-discord.js"; -import { setSpeakerList } from "./turn-manager.js"; +import { registerDirigentApi } from "./web/dirigent-api.js"; +import { sendModeratorMessage, sendScheduleTrigger, getBotUserIdFromToken } from "./core/moderator-discord.js"; +import { setSpeakerList, isCurrentSpeaker, isDormant, wakeFromDormant } from "./turn-manager.js"; import { fetchVisibleChannelBotAccountIds } from "./core/channel-members.js"; type PluginConfig = { moderatorBotToken?: string; - noReplyProvider?: string; - noReplyModel?: string; - noReplyPort?: number; scheduleIdentifier?: string; identityFilePath?: string; channelStoreFilePath?: string; + debugMode?: boolean; + noReplyProvider?: string; + noReplyModel?: string; + sideCarPort?: number; }; function normalizeConfig(api: OpenClawPluginApi): Required { const cfg = (api.pluginConfig ?? {}) as PluginConfig; return { moderatorBotToken: cfg.moderatorBotToken ?? "", - noReplyProvider: cfg.noReplyProvider ?? "dirigent", - noReplyModel: cfg.noReplyModel ?? "no-reply", - noReplyPort: Number(cfg.noReplyPort ?? 8787), scheduleIdentifier: cfg.scheduleIdentifier ?? "➡️", identityFilePath: cfg.identityFilePath ?? path.join(os.homedir(), ".openclaw", "dirigent-identity.json"), channelStoreFilePath: cfg.channelStoreFilePath ?? path.join(os.homedir(), ".openclaw", "dirigent-channels.json"), + debugMode: cfg.debugMode ?? false, + noReplyProvider: cfg.noReplyProvider ?? "dirigent", + noReplyModel: cfg.noReplyModel ?? "no-reply", + sideCarPort: cfg.sideCarPort ?? 8787, }; } +function getGatewayPort(api: OpenClawPluginApi): number { + try { + return ((api.config as Record)?.gateway as Record)?.port as number ?? 18789; + } catch { + return 18789; + } +} + /** * Gateway lifecycle events (gateway_start / gateway_stop) are global — fired once * when the gateway process starts/stops, not per agent session. We guard these on @@ -76,6 +86,10 @@ export default { const identityRegistry = new IdentityRegistry(config.identityFilePath); const channelStore = new ChannelStore(config.channelStoreFilePath); + const moderatorBotToken = config.moderatorBotToken || undefined; + const moderatorBotUserId = moderatorBotToken ? getBotUserIdFromToken(moderatorBotToken) : undefined; + const moderatorServiceUrl = `http://127.0.0.1:${config.sideCarPort}/moderator`; + let paddedCellDetected = false; function hasPaddedCell(): boolean { @@ -94,24 +108,27 @@ export default { if (!isGatewayLifecycleRegistered()) { markGatewayLifecycleRegistered(); - api.on("gateway_start", () => { - const live = normalizeConfig(api); + const gatewayPort = getGatewayPort(api); - startNoReplyApi(api.logger, pluginDir, live.noReplyPort); + // Start unified services (no-reply API + moderator bot) + startSideCar( + api.logger, + pluginDir, + config.sideCarPort, + moderatorBotToken, + undefined, // pluginApiToken — gateway handles auth for plugin routes + gatewayPort, + config.debugMode, + ); - if (live.moderatorBotToken) { - startModeratorPresence(live.moderatorBotToken, api.logger); - api.logger.info("dirigent: moderator bot presence started"); - } else { - api.logger.info("dirigent: moderatorBotToken not set — moderator features disabled"); - } + if (!moderatorBotToken) { + api.logger.info("dirigent: moderatorBotToken not set — moderator features disabled"); + } - tryAutoScanPaddedCell(); - }); + tryAutoScanPaddedCell(); api.on("gateway_stop", () => { - stopNoReplyApi(api.logger); - stopModeratorPresence(); + stopSideCar(api.logger); }); } @@ -120,18 +137,20 @@ export default { api, channelStore, identityRegistry, - moderatorBotToken: config.moderatorBotToken || undefined, - noReplyModel: config.noReplyModel, - noReplyProvider: config.noReplyProvider, + moderatorBotToken, scheduleIdentifier: config.scheduleIdentifier, + debugMode: config.debugMode, + noReplyProvider: config.noReplyProvider, + noReplyModel: config.noReplyModel, }); const interruptTailMatch = registerAgentEndHook({ api, channelStore, identityRegistry, - moderatorBotToken: config.moderatorBotToken || undefined, + moderatorBotToken, scheduleIdentifier: config.scheduleIdentifier, + debugMode: config.debugMode, onDiscussionDormant: async (channelId: string) => { const live = normalizeConfig(api); if (!live.moderatorBotToken) return; @@ -148,13 +167,74 @@ export default { }, }); + // Speaker-list init still handled via message_received (needs OpenClaw API for channel member lookup) registerMessageReceivedHook({ api, channelStore, identityRegistry, - moderatorBotToken: config.moderatorBotToken || undefined, + moderatorBotToken, scheduleIdentifier: config.scheduleIdentifier, interruptTailMatch, + debugMode: config.debugMode, + // When moderator service is active it handles wake/interrupt via HTTP callback; + // message_received only needs to run speaker-list initialization. + moderatorHandlesMessages: !!moderatorBotToken, + }); + + // ── Dirigent API (moderator service → plugin callbacks) ─────────────── + registerDirigentApi({ + api, + channelStore, + moderatorBotUserId, + scheduleIdentifier: config.scheduleIdentifier, + moderatorServiceUrl, + moderatorServiceToken: undefined, + debugMode: config.debugMode, + onNewMessage: async ({ channelId, senderId }) => { + const mode = channelStore.getMode(channelId); + + // Modes where agents don't participate + if (mode === "none" || mode === "work" || mode === "report") return; + + // Skip messages from the moderator bot itself (schedule triggers, etc.) + if (senderId === moderatorBotUserId) return; + + // Concluded discussion: send "closed" reply via moderator service + if (mode === "discussion") { + const rec = channelStore.getRecord(channelId); + if (rec.discussion?.concluded && moderatorBotToken) { + await sendModeratorMessage( + moderatorBotToken, + channelId, + "This discussion is closed and no longer active.", + api.logger, + ).catch(() => undefined); + return; + } + } + + // Identify sender — is it the current speaker? + const senderEntry = identityRegistry.findByDiscordUserId(senderId); + const currentSpeakerIsThisSender = senderEntry + ? isCurrentSpeaker(channelId, senderEntry.agentId) + : false; + + if (!currentSpeakerIsThisSender) { + // Non-current-speaker: interrupt any ongoing tail-match poll + interruptTailMatch(channelId); + api.logger.info(`dirigent: moderator-callback interrupt tail-match channel=${channelId} senderId=${senderId}`); + + // Wake from dormant if needed + if (isDormant(channelId) && moderatorBotToken) { + const first = wakeFromDormant(channelId); + if (first) { + const msg = `<@${first.discordUserId}>${config.scheduleIdentifier}`; + await sendScheduleTrigger(moderatorBotToken, channelId, msg, api.logger, config.debugMode); + api.logger.info(`dirigent: moderator-callback woke dormant channel=${channelId} first=${first.agentId}`); + } + } + } + }, }); // ── Tools ────────────────────────────────────────────────────────────── @@ -162,9 +242,9 @@ export default { api, channelStore, identityRegistry, - moderatorBotToken: config.moderatorBotToken || undefined, + moderatorBotToken, scheduleIdentifier: config.scheduleIdentifier, - onDiscussionCreate: async ({ channelId, guildId, initiatorAgentId, callbackGuildId, callbackChannelId, discussionGuide, participants }) => { + onDiscussionCreate: async ({ channelId, initiatorAgentId, callbackGuildId, callbackChannelId, discussionGuide, participants }) => { const live = normalizeConfig(api); if (!live.moderatorBotToken) return; @@ -184,11 +264,12 @@ export default { if (speakers.length > 0) { setSpeakerList(channelId, speakers); const first = speakers[0]; - await sendAndDelete( + await sendScheduleTrigger( live.moderatorBotToken, channelId, `<@${first.discordUserId}>${live.scheduleIdentifier}`, api.logger, + live.debugMode, ).catch(() => undefined); } }, @@ -203,7 +284,7 @@ export default { api, channelStore, identityRegistry, - moderatorBotToken: config.moderatorBotToken || undefined, + moderatorBotToken, openclawDir, hasPaddedCell, }); diff --git a/plugin/moderator-presence.ts b/plugin/moderator-presence.ts deleted file mode 100644 index 0f914a9..0000000 --- a/plugin/moderator-presence.ts +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Minimal Discord Gateway connection to keep the moderator bot "online". - * Uses Node.js built-in WebSocket (Node 22+). - * - * IMPORTANT: Only ONE instance should exist per bot token. - * Uses a singleton guard to prevent multiple connections. - */ - -let ws: WebSocket | null = null; -let heartbeatInterval: ReturnType | null = null; -let heartbeatAcked = true; -let lastSequence: number | null = null; -let sessionId: string | null = null; -let resumeUrl: string | null = null; -let reconnectTimer: ReturnType | null = null; -let destroyed = false; -let started = false; // singleton guard - -type Logger = { - info: (msg: string) => void; - warn: (msg: string) => void; -}; - -const GATEWAY_URL = "wss://gateway.discord.gg/?v=10&encoding=json"; -const MAX_RECONNECT_DELAY_MS = 60_000; -let reconnectAttempts = 0; - -function sendPayload(data: Record) { - if (ws?.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify(data)); - } -} - -function startHeartbeat(intervalMs: number) { - stopHeartbeat(); - heartbeatAcked = true; - - // First heartbeat after jitter - const jitter = Math.floor(Math.random() * intervalMs); - const firstTimer = setTimeout(() => { - if (destroyed) return; - if (!heartbeatAcked) { - // Missed ACK — zombie connection, close and reconnect - ws?.close(4000, "missed heartbeat ack"); - return; - } - heartbeatAcked = false; - sendPayload({ op: 1, d: lastSequence }); - - heartbeatInterval = setInterval(() => { - if (destroyed) return; - if (!heartbeatAcked) { - ws?.close(4000, "missed heartbeat ack"); - return; - } - heartbeatAcked = false; - sendPayload({ op: 1, d: lastSequence }); - }, intervalMs); - }, jitter); - - // Store the first timer so we can clear it - heartbeatInterval = firstTimer as unknown as ReturnType; -} - -function stopHeartbeat() { - if (heartbeatInterval) { - clearInterval(heartbeatInterval); - clearTimeout(heartbeatInterval as unknown as ReturnType); - heartbeatInterval = null; - } -} - -function cleanup() { - stopHeartbeat(); - if (ws) { - // Remove all handlers to avoid ghost callbacks - ws.onopen = null; - ws.onmessage = null; - ws.onclose = null; - ws.onerror = null; - try { ws.close(1000); } catch { /* ignore */ } - ws = null; - } -} - -function connect(token: string, logger: Logger, isResume = false) { - if (destroyed) return; - - // Clean up any existing connection first - cleanup(); - - const url = isResume && resumeUrl ? resumeUrl : GATEWAY_URL; - - try { - ws = new WebSocket(url); - } catch (err) { - logger.warn(`dirigent: moderator ws constructor failed: ${String(err)}`); - scheduleReconnect(token, logger, false); - return; - } - - const currentWs = ws; // capture for closure - - ws.onopen = () => { - if (currentWs !== ws || destroyed) return; // stale - - reconnectAttempts = 0; // reset on successful open - - if (isResume && sessionId) { - sendPayload({ - op: 6, - d: { token, session_id: sessionId, seq: lastSequence }, - }); - } else { - sendPayload({ - op: 2, - d: { - token, - intents: 0, - properties: { - os: "linux", - browser: "dirigent", - device: "dirigent", - }, - presence: { - status: "online", - activities: [{ name: "Moderating", type: 3 }], - }, - }, - }); - } - }; - - ws.onmessage = (evt: MessageEvent) => { - if (currentWs !== ws || destroyed) return; - - try { - const msg = JSON.parse(typeof evt.data === "string" ? evt.data : String(evt.data)); - const { op, t, s, d } = msg; - - if (s != null) lastSequence = s; - - switch (op) { - case 10: // Hello - startHeartbeat(d.heartbeat_interval); - break; - case 11: // Heartbeat ACK - heartbeatAcked = true; - break; - case 1: // Heartbeat request - sendPayload({ op: 1, d: lastSequence }); - break; - case 0: // Dispatch - if (t === "READY") { - sessionId = d.session_id; - resumeUrl = d.resume_gateway_url; - logger.info("dirigent: moderator bot connected and online"); - } - if (t === "RESUMED") { - logger.info("dirigent: moderator bot resumed"); - } - break; - case 7: // Reconnect request - logger.info("dirigent: moderator bot reconnect requested by Discord"); - cleanup(); - scheduleReconnect(token, logger, true); - break; - case 9: // Invalid Session - logger.warn(`dirigent: moderator bot invalid session, resumable=${d}`); - cleanup(); - sessionId = d ? sessionId : null; - // Wait longer before re-identifying - setTimeout(() => { - if (!destroyed) connect(token, logger, !!d && !!sessionId); - }, 3000 + Math.random() * 2000); - break; - } - } catch { - // ignore parse errors - } - }; - - ws.onclose = (evt: CloseEvent) => { - if (currentWs !== ws) return; // stale ws - stopHeartbeat(); - if (destroyed) return; - - const code = evt.code; - - // Non-recoverable codes — stop reconnecting - if (code === 4004) { - logger.warn("dirigent: moderator bot token invalid (4004), stopping"); - started = false; - return; - } - if (code === 4010 || code === 4011 || code === 4013 || code === 4014) { - logger.warn(`dirigent: moderator bot fatal close (${code}), re-identifying`); - sessionId = null; - scheduleReconnect(token, logger, false); - return; - } - - logger.info(`dirigent: moderator bot disconnected (code=${code}), will reconnect`); - const canResume = !!sessionId && code !== 4012; - scheduleReconnect(token, logger, canResume); - }; - - ws.onerror = () => { - // onclose will fire after this - }; -} - -function scheduleReconnect(token: string, logger: Logger, resume: boolean) { - if (destroyed) return; - if (reconnectTimer) clearTimeout(reconnectTimer); - - // Exponential backoff with cap - reconnectAttempts++; - const baseDelay = Math.min(1000 * Math.pow(2, reconnectAttempts), MAX_RECONNECT_DELAY_MS); - const jitter = Math.random() * 1000; - const delay = baseDelay + jitter; - - logger.info(`dirigent: moderator reconnect in ${Math.round(delay)}ms (attempt ${reconnectAttempts})`); - - reconnectTimer = setTimeout(() => { - reconnectTimer = null; - connect(token, logger, resume); - }, delay); -} - -/** - * Start the moderator bot's Discord Gateway connection. - * Singleton: calling multiple times with the same token is safe (no-op). - */ -export function startModeratorPresence(token: string, logger: Logger): void { - if (started) { - logger.info("dirigent: moderator presence already started, skipping"); - return; - } - started = true; - destroyed = false; - reconnectAttempts = 0; - connect(token, logger); -} - -/** - * Disconnect the moderator bot. - */ -export function stopModeratorPresence(): void { - destroyed = true; - started = false; - if (reconnectTimer) { - clearTimeout(reconnectTimer); - reconnectTimer = null; - } - cleanup(); -} diff --git a/plugin/openclaw.plugin.json b/plugin/openclaw.plugin.json index f42b380..cab417c 100644 --- a/plugin/openclaw.plugin.json +++ b/plugin/openclaw.plugin.json @@ -8,28 +8,15 @@ "type": "object", "additionalProperties": false, "properties": { - "enabled": { "type": "boolean", "default": true }, - "discordOnly": { "type": "boolean", "default": true }, - "listMode": { "type": "string", "enum": ["human-list", "agent-list"], "default": "human-list" }, - "humanList": { "type": "array", "items": { "type": "string" }, "default": [] }, - "agentList": { "type": "array", "items": { "type": "string" }, "default": [] }, - "channelPoliciesFile": { "type": "string", "default": "~/.openclaw/dirigent-channel-policies.json" }, - "bypassUserIds": { "type": "array", "items": { "type": "string" }, "default": [] }, - "endSymbols": { "type": "array", "items": { "type": "string" }, "default": ["🔚"] }, - "schedulingIdentifier": { "type": "string", "default": "➡️" }, - "waitIdentifier": { "type": "string", "default": "👤" }, - "noReplyProvider": { "type": "string" }, - "noReplyModel": { "type": "string" }, - "noReplyPort": { "type": "number", "default": 8787 }, - "enableDiscordControlTool": { "type": "boolean", "default": true }, - "enableDirigentPolicyTool": { "type": "boolean", "default": true }, - "enableDebugLogs": { "type": "boolean", "default": false }, - "debugLogChannelIds": { "type": "array", "items": { "type": "string" }, "default": [] }, "moderatorBotToken": { "type": "string" }, - "multiMessageStartMarker": { "type": "string", "default": "↗️" }, - "multiMessageEndMarker": { "type": "string", "default": "↙️" }, - "multiMessagePromptMarker": { "type": "string", "default": "⤵️" } + "scheduleIdentifier": { "type": "string", "default": "➡️" }, + "identityFilePath": { "type": "string" }, + "channelStoreFilePath": { "type": "string" }, + "debugMode": { "type": "boolean", "default": false }, + "noReplyProvider": { "type": "string", "default": "dirigent" }, + "noReplyModel": { "type": "string", "default": "no-reply" }, + "sideCarPort": { "type": "number", "default": 8787 } }, - "required": ["noReplyProvider", "noReplyModel"] + "required": [] } } diff --git a/plugin/tools/register-tools.ts b/plugin/tools/register-tools.ts index 3dc1843..b982da1 100644 --- a/plugin/tools/register-tools.ts +++ b/plugin/tools/register-tools.ts @@ -33,14 +33,23 @@ function parseDiscordChannelIdFromSession(sessionKey: string): string | undefine return m?.[1]; } +function textResult(text: string) { + return { content: [{ type: "text" as const, text }], details: undefined }; +} + +function errorResult(text: string) { + return { content: [{ type: "text" as const, text }], details: { error: true } }; +} + export function registerDirigentTools(deps: ToolDeps): void { const { api, channelStore, identityRegistry, moderatorBotToken, scheduleIdentifier, onDiscussionCreate } = deps; // ─────────────────────────────────────────────── // dirigent-register // ─────────────────────────────────────────────── - api.registerTool({ + api.registerTool((ctx) => ({ name: "dirigent-register", + label: "Dirigent Register", description: "Register or update this agent's Discord user ID in Dirigent's identity registry.", parameters: { type: "object", @@ -51,18 +60,18 @@ export function registerDirigentTools(deps: ToolDeps): void { }, required: ["discordUserId"], }, - handler: async (params, ctx) => { + execute: async (_toolCallId: string, params: unknown) => { const agentId = ctx?.agentId; - if (!agentId) return { content: [{ type: "text", text: "Cannot resolve agentId from session context" }], isError: true }; + if (!agentId) return errorResult("Cannot resolve agentId from session context"); const p = params as { discordUserId: string; agentName?: string }; identityRegistry.upsert({ agentId, discordUserId: p.discordUserId, agentName: p.agentName ?? agentId, }); - return { content: [{ type: "text", text: `Registered: agentId=${agentId} discordUserId=${p.discordUserId}` }] }; + return textResult(`Registered: agentId=${agentId} discordUserId=${p.discordUserId}`); }, - }); + })); // ─────────────────────────────────────────────── // Helper: create channel + set mode @@ -72,7 +81,6 @@ export function registerDirigentTools(deps: ToolDeps): void { name: string; memberDiscordIds: string[]; mode: "chat" | "report" | "work"; - callerCtx: { agentId?: string }; }): Promise<{ ok: boolean; channelId?: string; error?: string }> { if (!moderatorBotToken) return { ok: false, error: "moderatorBotToken not configured" }; @@ -112,6 +120,7 @@ export function registerDirigentTools(deps: ToolDeps): void { // ─────────────────────────────────────────────── api.registerTool({ name: "create-chat-channel", + label: "Create Chat Channel", description: "Create a new private Discord channel in the specified guild with mode=chat.", parameters: { type: "object", @@ -126,16 +135,15 @@ export function registerDirigentTools(deps: ToolDeps): void { }, required: ["guildId", "name"], }, - handler: async (params, ctx) => { + execute: async (_toolCallId: string, params: unknown) => { const p = params as { guildId: string; name: string; participants?: string[] }; const result = await createManagedChannel({ guildId: p.guildId, name: p.name, memberDiscordIds: p.participants ?? [], mode: "chat", - callerCtx: { agentId: ctx?.agentId }, }); - if (!result.ok) return { content: [{ type: "text", text: `Failed: ${result.error}` }], isError: true }; - return { content: [{ type: "text", text: `Created chat channel: ${result.channelId}` }] }; + if (!result.ok) return errorResult(`Failed: ${result.error}`); + return textResult(`Created chat channel: ${result.channelId}`); }, }); @@ -144,6 +152,7 @@ export function registerDirigentTools(deps: ToolDeps): void { // ─────────────────────────────────────────────── api.registerTool({ name: "create-report-channel", + label: "Create Report Channel", description: "Create a new private Discord channel with mode=report. Agents can post to it but are not woken by messages.", parameters: { type: "object", @@ -155,24 +164,24 @@ export function registerDirigentTools(deps: ToolDeps): void { }, required: ["guildId", "name"], }, - handler: async (params, ctx) => { + execute: async (_toolCallId: string, params: unknown) => { const p = params as { guildId: string; name: string; members?: string[] }; const result = await createManagedChannel({ guildId: p.guildId, name: p.name, memberDiscordIds: p.members ?? [], mode: "report", - callerCtx: { agentId: ctx?.agentId }, }); - if (!result.ok) return { content: [{ type: "text", text: `Failed: ${result.error}` }], isError: true }; - return { content: [{ type: "text", text: `Created report channel: ${result.channelId}` }] }; + if (!result.ok) return errorResult(`Failed: ${result.error}`); + return textResult(`Created report channel: ${result.channelId}`); }, }); // ─────────────────────────────────────────────── // create-work-channel // ─────────────────────────────────────────────── - api.registerTool({ + api.registerTool((ctx) => ({ name: "create-work-channel", + label: "Create Work Channel", description: "Create a new private Discord workspace channel with mode=work (turn-manager disabled, mode locked).", parameters: { type: "object", @@ -184,7 +193,7 @@ export function registerDirigentTools(deps: ToolDeps): void { }, required: ["guildId", "name"], }, - handler: async (params, ctx) => { + execute: async (_toolCallId: string, params: unknown) => { const p = params as { guildId: string; name: string; members?: string[] }; // Include calling agent's Discord ID if known const callerDiscordId = ctx?.agentId ? identityRegistry.findByAgentId(ctx.agentId)?.discordUserId : undefined; @@ -195,18 +204,18 @@ export function registerDirigentTools(deps: ToolDeps): void { guildId: p.guildId, name: p.name, memberDiscordIds: members, mode: "work", - callerCtx: { agentId: ctx?.agentId }, }); - if (!result.ok) return { content: [{ type: "text", text: `Failed: ${result.error}` }], isError: true }; - return { content: [{ type: "text", text: `Created work channel: ${result.channelId}` }] }; + if (!result.ok) return errorResult(`Failed: ${result.error}`); + return textResult(`Created work channel: ${result.channelId}`); }, - }); + })); // ─────────────────────────────────────────────── // create-discussion-channel // ─────────────────────────────────────────────── - api.registerTool({ + api.registerTool((ctx) => ({ name: "create-discussion-channel", + label: "Create Discussion Channel", description: "Create a structured discussion channel between agents. The calling agent becomes the initiator.", parameters: { type: "object", @@ -220,7 +229,7 @@ export function registerDirigentTools(deps: ToolDeps): void { }, required: ["callbackGuildId", "callbackChannelId", "name", "discussionGuide", "participants"], }, - handler: async (params, ctx) => { + execute: async (_toolCallId: string, params: unknown) => { const p = params as { callbackGuildId: string; callbackChannelId: string; @@ -230,13 +239,13 @@ export function registerDirigentTools(deps: ToolDeps): void { }; const initiatorAgentId = ctx?.agentId; if (!initiatorAgentId) { - return { content: [{ type: "text", text: "Cannot resolve initiator agentId from session" }], isError: true }; + return errorResult("Cannot resolve initiator agentId from session"); } if (!moderatorBotToken) { - return { content: [{ type: "text", text: "moderatorBotToken not configured" }], isError: true }; + return errorResult("moderatorBotToken not configured"); } if (!onDiscussionCreate) { - return { content: [{ type: "text", text: "Discussion service not available" }], isError: true }; + return errorResult("Discussion service not available"); } const botId = getBotUserIdFromToken(moderatorBotToken); @@ -262,7 +271,7 @@ export function registerDirigentTools(deps: ToolDeps): void { logger: api.logger, }); } catch (err) { - return { content: [{ type: "text", text: `Failed to create channel: ${String(err)}` }], isError: true }; + return errorResult(`Failed to create channel: ${String(err)}`); } try { @@ -273,7 +282,7 @@ export function registerDirigentTools(deps: ToolDeps): void { concluded: false, }); } catch (err) { - return { content: [{ type: "text", text: `Failed to register channel: ${String(err)}` }], isError: true }; + return errorResult(`Failed to register channel: ${String(err)}`); } await onDiscussionCreate({ @@ -286,15 +295,16 @@ export function registerDirigentTools(deps: ToolDeps): void { participants: p.participants, }); - return { content: [{ type: "text", text: `Discussion channel created: ${channelId}` }] }; + return textResult(`Discussion channel created: ${channelId}`); }, - }); + })); // ─────────────────────────────────────────────── // discussion-complete // ─────────────────────────────────────────────── - api.registerTool({ + api.registerTool((ctx) => ({ name: "discussion-complete", + label: "Discussion Complete", description: "Mark a discussion as complete, archive the channel, and post the summary path to the callback channel.", parameters: { type: "object", @@ -305,31 +315,25 @@ export function registerDirigentTools(deps: ToolDeps): void { }, required: ["discussionChannelId", "summary"], }, - handler: async (params, ctx) => { + execute: async (_toolCallId: string, params: unknown) => { const p = params as { discussionChannelId: string; summary: string }; const callerAgentId = ctx?.agentId; if (!callerAgentId) { - return { content: [{ type: "text", text: "Cannot resolve agentId from session" }], isError: true }; + return errorResult("Cannot resolve agentId from session"); } const rec = channelStore.getRecord(p.discussionChannelId); if (rec.mode !== "discussion") { - return { content: [{ type: "text", text: `Channel ${p.discussionChannelId} is not a discussion channel` }], isError: true }; + return errorResult(`Channel ${p.discussionChannelId} is not a discussion channel`); } if (!rec.discussion) { - return { content: [{ type: "text", text: "Discussion metadata not found" }], isError: true }; + return errorResult("Discussion metadata not found"); } if (rec.discussion.initiatorAgentId !== callerAgentId) { - return { - content: [{ type: "text", text: `Only the initiator (${rec.discussion.initiatorAgentId}) may call discussion-complete` }], - isError: true, - }; + return errorResult(`Only the initiator (${rec.discussion.initiatorAgentId}) may call discussion-complete`); } if (!p.summary.includes("discussion-summary")) { - return { - content: [{ type: "text", text: "Summary path must be under {workspace}/discussion-summary/" }], - isError: true, - }; + return errorResult("Summary path must be under {workspace}/discussion-summary/"); } channelStore.concludeDiscussion(p.discussionChannelId); @@ -343,7 +347,7 @@ export function registerDirigentTools(deps: ToolDeps): void { ).catch(() => undefined); } - return { content: [{ type: "text", text: `Discussion ${p.discussionChannelId} concluded. Summary posted to ${rec.discussion.callbackChannelId}.` }] }; + return textResult(`Discussion ${p.discussionChannelId} concluded. Summary posted to ${rec.discussion.callbackChannelId}.`); }, - }); + })); } diff --git a/plugin/web/dirigent-api.ts b/plugin/web/dirigent-api.ts new file mode 100644 index 0000000..61ba949 --- /dev/null +++ b/plugin/web/dirigent-api.ts @@ -0,0 +1,112 @@ +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import type { ChannelStore } from "../core/channel-store.js"; + +type Deps = { + api: OpenClawPluginApi; + channelStore: ChannelStore; + moderatorBotUserId: string | undefined; + scheduleIdentifier: string; + moderatorServiceUrl: string | undefined; + moderatorServiceToken: string | undefined; + debugMode: boolean; + onNewMessage: (event: { + channelId: string; + messageId: string; + senderId: string; + guildId?: string; + }) => Promise; +}; + +function sendJson(res: import("node:http").ServerResponse, status: number, payload: unknown): void { + res.writeHead(status, { "Content-Type": "application/json; charset=utf-8" }); + res.end(JSON.stringify(payload)); +} + +function readBody(req: import("node:http").IncomingMessage): Promise> { + return new Promise((resolve, reject) => { + let body = ""; + req.on("data", (chunk: Buffer) => { + body += chunk.toString(); + if (body.length > 1_000_000) { + req.destroy(); + reject(new Error("body too large")); + } + }); + req.on("end", () => { + try { + resolve(body ? (JSON.parse(body) as Record) : {}); + } catch { + reject(new Error("invalid_json")); + } + }); + req.on("error", reject); + }); +} + +/** + * Register Dirigent plugin HTTP routes that the moderator service calls back into. + * + * Routes: + * POST /dirigent/api/moderator/message — inbound message notification from moderator service + * GET /dirigent/api/moderator/status — health/status check + */ +export function registerDirigentApi(deps: Deps): void { + const { api, moderatorServiceUrl, onNewMessage } = deps; + + // ── POST /dirigent/api/moderator/message ───────────────────────────────────── + // Called by the moderator service on every Discord MESSAGE_CREATE event. + api.registerHttpRoute({ + path: "/dirigent/api/moderator/message", + auth: "plugin", + match: "exact", + handler: async (req, res) => { + if (req.method !== "POST") { + res.writeHead(405); + res.end(); + return; + } + + let body: Record; + try { + body = await readBody(req); + } catch (err) { + return sendJson(res, 400, { ok: false, error: String(err) }); + } + + const channelId = typeof body.channelId === "string" ? body.channelId : undefined; + const messageId = typeof body.messageId === "string" ? body.messageId : undefined; + const senderId = typeof body.senderId === "string" ? body.senderId : undefined; + const guildId = typeof body.guildId === "string" ? body.guildId : undefined; + + if (!channelId || !senderId) { + return sendJson(res, 400, { ok: false, error: "channelId and senderId required" }); + } + + try { + await onNewMessage({ + channelId, + messageId: messageId ?? "", + senderId, + guildId, + }); + return sendJson(res, 200, { ok: true }); + } catch (err) { + api.logger.warn(`dirigent: moderator/message handler error: ${String(err)}`); + return sendJson(res, 500, { ok: false, error: String(err) }); + } + }, + }); + + // ── GET /dirigent/api/moderator/status ─────────────────────────────────────── + api.registerHttpRoute({ + path: "/dirigent/api/moderator/status", + auth: "plugin", + match: "exact", + handler: (_req, res) => { + return sendJson(res, 200, { + ok: true, + moderatorServiceUrl: moderatorServiceUrl ?? null, + }); + }, + }); +} diff --git a/scripts/check-plugin-files.mjs b/scripts/check-plugin-files.mjs index 9f48f57..6aaac6b 100644 --- a/scripts/check-plugin-files.mjs +++ b/scripts/check-plugin-files.mjs @@ -1,20 +1,29 @@ import fs from 'node:fs'; import path from 'node:path'; -const root = path.resolve(process.cwd(), '..'); -const pluginDir = path.join(root, 'plugin'); -const required = ['index.ts', 'rules.ts', 'openclaw.plugin.json', 'README.md', 'package.json']; +const root = path.resolve(import.meta.dirname, '..'); + +const checks = [ + // Core plugin files + path.join(root, 'plugin', 'index.ts'), + path.join(root, 'plugin', 'turn-manager.ts'), + path.join(root, 'plugin', 'openclaw.plugin.json'), + path.join(root, 'plugin', 'package.json'), + // Sidecar + path.join(root, 'services', 'main.mjs'), + path.join(root, 'services', 'no-reply-api', 'server.mjs'), + path.join(root, 'services', 'moderator', 'index.mjs'), +]; let ok = true; -for (const f of required) { - const p = path.join(pluginDir, f); +for (const p of checks) { if (!fs.existsSync(p)) { ok = false; console.error(`missing: ${p}`); } } -const manifestPath = path.join(pluginDir, 'openclaw.plugin.json'); +const manifestPath = path.join(root, 'plugin', 'openclaw.plugin.json'); if (fs.existsSync(manifestPath)) { const m = JSON.parse(fs.readFileSync(manifestPath, 'utf8')); for (const k of ['id', 'entry', 'configSchema']) { diff --git a/scripts/dev-down.sh b/scripts/dev-down.sh deleted file mode 100755 index d5f449f..0000000 --- a/scripts/dev-down.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail -ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" -cd "$ROOT_DIR" - -docker compose down diff --git a/scripts/dev-up.sh b/scripts/dev-up.sh deleted file mode 100755 index e876085..0000000 --- a/scripts/dev-up.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" -cd "$ROOT_DIR" - -echo "[dirigent] building/starting no-reply API container" -docker compose up -d --build dirigent-no-reply-api - -echo "[dirigent] health check" -curl -sS http://127.0.0.1:8787/health - -echo "[dirigent] done" diff --git a/scripts/install.mjs b/scripts/install.mjs index 76e9340..857702b 100755 --- a/scripts/install.mjs +++ b/scripts/install.mjs @@ -120,7 +120,7 @@ const PLUGIN_SKILLS_DIR = path.join(REPO_ROOT, "skills"); const NO_REPLY_PROVIDER_ID = process.env.NO_REPLY_PROVIDER_ID || "dirigent"; const NO_REPLY_MODEL_ID = process.env.NO_REPLY_MODEL_ID || "no-reply"; const NO_REPLY_PORT = Number(process.env.NO_REPLY_PORT || argNoReplyPort); -const NO_REPLY_BASE_URL = process.env.NO_REPLY_BASE_URL || `http://127.0.0.1:${NO_REPLY_PORT}/v1`; +const NO_REPLY_BASE_URL = process.env.NO_REPLY_BASE_URL || `http://127.0.0.1:${NO_REPLY_PORT}/no-reply/v1`; const NO_REPLY_API_KEY = process.env.NO_REPLY_API_KEY || "wg-local-test-token"; function runOpenclaw(args, allowFail = false) { @@ -143,10 +143,11 @@ if (mode === "install") { step(1, 7, "build dist assets"); const pluginSrc = path.resolve(REPO_ROOT, "plugin"); - const noReplySrc = path.resolve(REPO_ROOT, "no-reply-api"); + const sidecarSrc = path.resolve(REPO_ROOT, "services"); const distPlugin = path.resolve(REPO_ROOT, "dist", "dirigent"); + fs.rmSync(distPlugin, { recursive: true, force: true }); syncDirRecursive(pluginSrc, distPlugin); - syncDirRecursive(noReplySrc, path.join(distPlugin, "no-reply-api")); + syncDirRecursive(sidecarSrc, path.join(distPlugin, "services")); ok("dist assets built"); step(2, 7, `install plugin files -> ${PLUGIN_INSTALL_DIR}`); @@ -187,17 +188,10 @@ if (mode === "install") { } setIfMissing("plugins.entries.dirigent.enabled", true); const cp = "plugins.entries.dirigent.config"; - setIfMissing(`${cp}.enabled`, true); - setIfMissing(`${cp}.discordOnly`, true); - setIfMissing(`${cp}.listMode`, "human-list"); - setIfMissing(`${cp}.humanList`, []); - setIfMissing(`${cp}.agentList`, []); - setIfMissing(`${cp}.channelPoliciesFile`, path.join(OPENCLAW_DIR, "dirigent-channel-policies.json")); - setIfMissing(`${cp}.endSymbols`, ["🔚"]); - setIfMissing(`${cp}.schedulingIdentifier`, "➡️"); + setIfMissing(`${cp}.scheduleIdentifier`, "➡️"); setIfMissing(`${cp}.noReplyProvider`, NO_REPLY_PROVIDER_ID); setIfMissing(`${cp}.noReplyModel`, NO_REPLY_MODEL_ID); - setIfMissing(`${cp}.noReplyPort`, NO_REPLY_PORT); + setIfMissing(`${cp}.sideCarPort`, NO_REPLY_PORT); // moderatorBotToken: intentionally not touched — set manually via: // openclaw config set plugins.entries.dirigent.config.moderatorBotToken "" ok("plugin configured"); diff --git a/scripts/package-plugin.mjs b/scripts/package-plugin.mjs deleted file mode 100644 index 7132cb6..0000000 --- a/scripts/package-plugin.mjs +++ /dev/null @@ -1,15 +0,0 @@ -import fs from "node:fs"; -import path from "node:path"; - -const root = process.cwd(); -const pluginDir = path.join(root, "plugin"); -const outDir = path.join(root, "dist", "dirigent"); - -fs.rmSync(outDir, { recursive: true, force: true }); -fs.mkdirSync(outDir, { recursive: true }); - -for (const f of ["index.ts", "rules.ts", "turn-manager.ts", "moderator-presence.ts", "openclaw.plugin.json", "README.md", "package.json"]) { - fs.copyFileSync(path.join(pluginDir, f), path.join(outDir, f)); -} - -console.log(`packaged plugin to ${outDir}`); diff --git a/scripts/smoke-no-reply-api.sh b/scripts/smoke-no-reply-api.sh index 2a2cddb..345003a 100755 --- a/scripts/smoke-no-reply-api.sh +++ b/scripts/smoke-no-reply-api.sh @@ -1,32 +1,31 @@ #!/usr/bin/env bash set -euo pipefail -BASE_URL="${BASE_URL:-http://127.0.0.1:8787}" -AUTH_TOKEN="${AUTH_TOKEN:-}" - -AUTH_HEADER=() -if [[ -n "$AUTH_TOKEN" ]]; then - AUTH_HEADER=(-H "Authorization: Bearer ${AUTH_TOKEN}") -fi +# Smoke-tests the no-reply API endpoint exposed by the sidecar service. +# The sidecar must already be running (it starts automatically with openclaw-gateway). +# Default base URL matches the sidecar's no-reply prefix. +BASE_URL="${BASE_URL:-http://127.0.0.1:8787/no-reply}" echo "[1] health" -curl -sS "${BASE_URL}/health" | sed -n '1,3p' +curl -fsS "${BASE_URL}/health" +echo "" echo "[2] models" -curl -sS "${BASE_URL}/v1/models" "${AUTH_HEADER[@]}" | sed -n '1,8p' +curl -fsS "${BASE_URL}/v1/models" | head -c 200 +echo "" echo "[3] chat/completions" -curl -sS -X POST "${BASE_URL}/v1/chat/completions" \ +curl -fsS -X POST "${BASE_URL}/v1/chat/completions" \ -H 'Content-Type: application/json' \ - "${AUTH_HEADER[@]}" \ - -d '{"model":"dirigent-no-reply-v1","messages":[{"role":"user","content":"hello"}]}' \ - | sed -n '1,20p' + -d '{"model":"no-reply","messages":[{"role":"user","content":"hello"}]}' \ + | head -c 300 +echo "" echo "[4] responses" -curl -sS -X POST "${BASE_URL}/v1/responses" \ +curl -fsS -X POST "${BASE_URL}/v1/responses" \ -H 'Content-Type: application/json' \ - "${AUTH_HEADER[@]}" \ - -d '{"model":"dirigent-no-reply-v1","input":"hello"}' \ - | sed -n '1,20p' + -d '{"model":"no-reply","input":"hello"}' \ + | head -c 300 +echo "" echo "smoke ok" diff --git a/scripts/test-no-reply-api.mjs b/scripts/test-no-reply-api.mjs deleted file mode 100644 index 971bf1f..0000000 --- a/scripts/test-no-reply-api.mjs +++ /dev/null @@ -1,82 +0,0 @@ -import { spawn } from "node:child_process"; - -const BASE = "http://127.0.0.1:18787"; - -function sleep(ms) { - return new Promise((r) => setTimeout(r, ms)); -} - -async function waitForHealth(retries = 30) { - for (let i = 0; i < retries; i++) { - try { - const r = await fetch(`${BASE}/health`); - if (r.ok) return true; - } catch {} - await sleep(200); - } - return false; -} - -function assert(cond, msg) { - if (!cond) throw new Error(msg); -} - -async function run() { - const token = "test-token"; - const child = spawn("node", ["no-reply-api/server.mjs"], { - cwd: process.cwd(), - env: { ...process.env, PORT: "18787", AUTH_TOKEN: token, NO_REPLY_MODEL: "wg-test-model" }, - stdio: ["ignore", "pipe", "pipe"], - }); - - child.stdout.on("data", () => {}); - child.stderr.on("data", () => {}); - - try { - const ok = await waitForHealth(); - assert(ok, "health check failed"); - - const unauth = await fetch(`${BASE}/v1/models`); - assert(unauth.status === 401, `expected 401, got ${unauth.status}`); - - const models = await fetch(`${BASE}/v1/models`, { - headers: { Authorization: `Bearer ${token}` }, - }); - assert(models.ok, "authorized /v1/models failed"); - const modelsJson = await models.json(); - assert(modelsJson?.data?.[0]?.id === "wg-test-model", "model id mismatch"); - - const cc = await fetch(`${BASE}/v1/chat/completions`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${token}`, - }, - body: JSON.stringify({ model: "wg-test-model", messages: [{ role: "user", content: "hi" }] }), - }); - assert(cc.ok, "chat completions failed"); - const ccJson = await cc.json(); - assert(ccJson?.choices?.[0]?.message?.content === "NO_REPLY", "chat completion not NO_REPLY"); - - const rsp = await fetch(`${BASE}/v1/responses`, { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${token}`, - }, - body: JSON.stringify({ model: "wg-test-model", input: "hi" }), - }); - assert(rsp.ok, "responses failed"); - const rspJson = await rsp.json(); - assert(rspJson?.output?.[0]?.content?.[0]?.text === "NO_REPLY", "responses not NO_REPLY"); - - console.log("test-no-reply-api: ok"); - } finally { - child.kill("SIGTERM"); - } -} - -run().catch((err) => { - console.error(`test-no-reply-api: fail: ${err.message}`); - process.exit(1); -}); diff --git a/services/main.mjs b/services/main.mjs new file mode 100644 index 0000000..47f7270 --- /dev/null +++ b/services/main.mjs @@ -0,0 +1,111 @@ +/** + * Unified entry point for Dirigent services. + * + * Routes: + * /no-reply/* → no-reply API (strips /no-reply prefix) + * /moderator/* → moderator bot service (strips /moderator prefix) + * otherwise → 404 + * + * Env vars: + * SERVICES_PORT (default 8787) + * MODERATOR_TOKEN Discord bot token (required for moderator) + * PLUGIN_API_URL (default http://127.0.0.1:18789) + * PLUGIN_API_TOKEN auth token for plugin API calls + * SCHEDULE_IDENTIFIER (default ➡️) + * DEBUG_MODE (default false) + */ + +import http from "node:http"; +import { createNoReplyHandler } from "./no-reply-api/server.mjs"; +import { createModeratorService } from "./moderator/index.mjs"; + +const PORT = Number(process.env.SERVICES_PORT || 8787); +const MODERATOR_TOKEN = process.env.MODERATOR_TOKEN || ""; +const PLUGIN_API_URL = process.env.PLUGIN_API_URL || "http://127.0.0.1:18789"; +const PLUGIN_API_TOKEN = process.env.PLUGIN_API_TOKEN || ""; +const SCHEDULE_IDENTIFIER = process.env.SCHEDULE_IDENTIFIER || "➡️"; +const DEBUG_MODE = process.env.DEBUG_MODE === "true" || process.env.DEBUG_MODE === "1"; + +function sendJson(res, status, payload) { + res.writeHead(status, { "Content-Type": "application/json; charset=utf-8" }); + res.end(JSON.stringify(payload)); +} + +// ── Initialize services ──────────────────────────────────────────────────────── + +const noReplyHandler = createNoReplyHandler(); + +let moderatorService = null; +if (MODERATOR_TOKEN) { + console.log("[dirigent-services] moderator bot enabled"); + moderatorService = createModeratorService({ + token: MODERATOR_TOKEN, + pluginApiUrl: PLUGIN_API_URL, + pluginApiToken: PLUGIN_API_TOKEN, + scheduleIdentifier: SCHEDULE_IDENTIFIER, + debugMode: DEBUG_MODE, + }); +} else { + console.log("[dirigent-services] MODERATOR_TOKEN not set — moderator disabled"); +} + +// ── HTTP server ──────────────────────────────────────────────────────────────── + +const server = http.createServer((req, res) => { + const url = req.url ?? "/"; + + if (url === "/health") { + return sendJson(res, 200, { + ok: true, + services: { + noReply: true, + moderator: !!moderatorService, + }, + }); + } + + if (url.startsWith("/no-reply")) { + req.url = url.slice("/no-reply".length) || "/"; + return noReplyHandler(req, res); + } + + if (url.startsWith("/moderator")) { + if (!moderatorService) { + return sendJson(res, 503, { error: "moderator service not configured" }); + } + req.url = url.slice("/moderator".length) || "/"; + return moderatorService.httpHandler(req, res); + } + + return sendJson(res, 404, { error: "not_found" }); +}); + +server.listen(PORT, "127.0.0.1", () => { + console.log(`[dirigent-services] listening on 127.0.0.1:${PORT}`); + console.log(`[dirigent-services] /no-reply → no-reply API`); + if (moderatorService) { + console.log(`[dirigent-services] /moderator → moderator bot`); + console.log(`[dirigent-services] plugin API: ${PLUGIN_API_URL}`); + } + if (DEBUG_MODE) { + console.log(`[dirigent-services] debug mode ON`); + } +}); + +// ── Graceful shutdown ────────────────────────────────────────────────────────── + +function shutdown(signal) { + console.log(`[dirigent-services] received ${signal}, shutting down`); + if (moderatorService) { + moderatorService.stop(); + } + server.close(() => { + console.log("[dirigent-services] server closed"); + process.exit(0); + }); + // Force-exit after 5s + setTimeout(() => process.exit(1), 5000).unref(); +} + +process.on("SIGTERM", () => shutdown("SIGTERM")); +process.on("SIGINT", () => shutdown("SIGINT")); diff --git a/services/moderator/index.mjs b/services/moderator/index.mjs new file mode 100644 index 0000000..1f869fb --- /dev/null +++ b/services/moderator/index.mjs @@ -0,0 +1,514 @@ +/** + * Moderator bot service. + * + * Exports createModeratorService(config) returning { httpHandler(req, res), stop() }. + * + * Responsibilities: + * - Discord Gateway WS with intents GUILD_MESSAGES (512) | MESSAGE_CONTENT (32768) + * - On MESSAGE_CREATE dispatch: notify plugin API + * - HTTP sub-handler for /health, /me, /send, /delete-message, /create-channel, /guilds, /channels/:guildId + */ + +import { URL as NodeURL } from "node:url"; + +const DISCORD_API = "https://discord.com/api/v10"; +const GATEWAY_URL = "wss://gateway.discord.gg/?v=10&encoding=json"; +const MAX_RECONNECT_DELAY_MS = 60_000; +const INTENTS = 512 | 32768; // GUILD_MESSAGES | MESSAGE_CONTENT + +// ── Helpers ──────────────────────────────────────────────────────────────────── + +function sendJson(res, status, payload) { + res.writeHead(status, { "Content-Type": "application/json; charset=utf-8" }); + res.end(JSON.stringify(payload)); +} + +function readBody(req) { + return new Promise((resolve, reject) => { + let body = ""; + req.on("data", (chunk) => { + body += chunk; + if (body.length > 1_000_000) { + req.destroy(); + reject(new Error("body too large")); + } + }); + req.on("end", () => { + try { + resolve(body ? JSON.parse(body) : {}); + } catch { + reject(new Error("invalid_json")); + } + }); + req.on("error", reject); + }); +} + +function getBotUserIdFromToken(token) { + try { + const segment = token.split(".")[0]; + const padded = segment + "=".repeat((4 - (segment.length % 4)) % 4); + return Buffer.from(padded, "base64").toString("utf8"); + } catch { + return undefined; + } +} + +// ── Discord REST helpers ─────────────────────────────────────────────────────── + +async function discordGet(token, path) { + const r = await fetch(`${DISCORD_API}${path}`, { + headers: { Authorization: `Bot ${token}` }, + }); + if (!r.ok) { + const text = await r.text().catch(() => ""); + throw new Error(`Discord GET ${path} failed (${r.status}): ${text}`); + } + return r.json(); +} + +async function discordPost(token, path, body) { + const r = await fetch(`${DISCORD_API}${path}`, { + method: "POST", + headers: { + Authorization: `Bot ${token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify(body), + }); + return { ok: r.ok, status: r.status, data: await r.json().catch(() => null) }; +} + +async function discordDelete(token, path) { + const r = await fetch(`${DISCORD_API}${path}`, { + method: "DELETE", + headers: { Authorization: `Bot ${token}` }, + }); + return { ok: r.ok, status: r.status }; +} + +// ── Gateway connection ───────────────────────────────────────────────────────── + +function createGatewayConnection(token, onMessage, log) { + let ws = null; + let heartbeatTimer = null; + let heartbeatAcked = true; + let lastSequence = null; + let sessionId = null; + let resumeUrl = null; + let reconnectTimer = null; + let reconnectAttempts = 0; + let destroyed = false; + + function sendPayload(data) { + if (ws?.readyState === 1 /* OPEN */) { + ws.send(JSON.stringify(data)); + } + } + + function stopHeartbeat() { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + clearTimeout(heartbeatTimer); + heartbeatTimer = null; + } + } + + function startHeartbeat(intervalMs) { + stopHeartbeat(); + heartbeatAcked = true; + + const jitter = Math.floor(Math.random() * intervalMs); + const firstTimer = setTimeout(() => { + if (destroyed) return; + if (!heartbeatAcked) { + ws?.close(4000, "missed heartbeat ack"); + return; + } + heartbeatAcked = false; + sendPayload({ op: 1, d: lastSequence }); + + heartbeatTimer = setInterval(() => { + if (destroyed) return; + if (!heartbeatAcked) { + ws?.close(4000, "missed heartbeat ack"); + return; + } + heartbeatAcked = false; + sendPayload({ op: 1, d: lastSequence }); + }, intervalMs); + }, jitter); + + heartbeatTimer = firstTimer; + } + + function cleanup() { + stopHeartbeat(); + if (ws) { + ws.onopen = null; + ws.onmessage = null; + ws.onclose = null; + ws.onerror = null; + try { ws.close(1000); } catch { /* ignore */ } + ws = null; + } + } + + function scheduleReconnect(resume) { + if (destroyed) return; + if (reconnectTimer) clearTimeout(reconnectTimer); + + reconnectAttempts++; + const baseDelay = Math.min(1000 * Math.pow(2, reconnectAttempts), MAX_RECONNECT_DELAY_MS); + const delay = baseDelay + Math.random() * 1000; + + log.info(`dirigent-moderator: reconnect in ${Math.round(delay)}ms (attempt ${reconnectAttempts})`); + + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + connect(resume); + }, delay); + } + + function connect(isResume = false) { + if (destroyed) return; + + cleanup(); + + const url = isResume && resumeUrl ? resumeUrl : GATEWAY_URL; + + try { + ws = new WebSocket(url); + } catch (err) { + log.warn(`dirigent-moderator: ws constructor failed: ${String(err)}`); + scheduleReconnect(false); + return; + } + + const currentWs = ws; + + ws.onopen = () => { + if (currentWs !== ws || destroyed) return; + reconnectAttempts = 0; + + if (isResume && sessionId) { + sendPayload({ + op: 6, + d: { token, session_id: sessionId, seq: lastSequence }, + }); + } else { + sendPayload({ + op: 2, + d: { + token, + intents: INTENTS, + properties: { + os: "linux", + browser: "dirigent", + device: "dirigent", + }, + }, + }); + } + }; + + ws.onmessage = (evt) => { + if (currentWs !== ws || destroyed) return; + + try { + const msg = JSON.parse(typeof evt.data === "string" ? evt.data : String(evt.data)); + const { op, t, s, d } = msg; + + if (s != null) lastSequence = s; + + switch (op) { + case 10: // Hello + startHeartbeat(d.heartbeat_interval); + break; + case 11: // Heartbeat ACK + heartbeatAcked = true; + break; + case 1: // Heartbeat request + sendPayload({ op: 1, d: lastSequence }); + break; + case 0: // Dispatch + if (t === "READY") { + sessionId = d.session_id; + resumeUrl = d.resume_gateway_url; + log.info("dirigent-moderator: connected and ready"); + } else if (t === "RESUMED") { + log.info("dirigent-moderator: session resumed"); + } else if (t === "MESSAGE_CREATE") { + onMessage(d); + } + break; + case 7: // Reconnect + log.info("dirigent-moderator: reconnect requested by Discord"); + cleanup(); + scheduleReconnect(true); + break; + case 9: // Invalid Session + log.warn(`dirigent-moderator: invalid session, resumable=${d}`); + cleanup(); + sessionId = d ? sessionId : null; + setTimeout(() => { + if (!destroyed) connect(!!d && !!sessionId); + }, 3000 + Math.random() * 2000); + break; + } + } catch { + // ignore parse errors + } + }; + + ws.onclose = (evt) => { + if (currentWs !== ws) return; + stopHeartbeat(); + if (destroyed) return; + + const code = evt.code; + + if (code === 4004) { + log.warn("dirigent-moderator: token invalid (4004), stopping"); + return; + } + if (code === 4010 || code === 4011 || code === 4013 || code === 4014) { + log.warn(`dirigent-moderator: fatal close (${code}), re-identifying`); + sessionId = null; + scheduleReconnect(false); + return; + } + + log.info(`dirigent-moderator: disconnected (code=${code}), will reconnect`); + const canResume = !!sessionId && code !== 4012; + scheduleReconnect(canResume); + }; + + ws.onerror = () => { + // onclose will fire after this + }; + } + + // Start initial connection + connect(false); + + return { + stop() { + destroyed = true; + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + cleanup(); + }, + }; +} + +// ── HTTP route handler ───────────────────────────────────────────────────────── + +function createHttpHandler(token, botUserId, log) { + return async function httpHandler(req, res) { + const url = req.url ?? "/"; + + // GET /health + if (req.method === "GET" && url === "/health") { + return sendJson(res, 200, { ok: true, botId: botUserId }); + } + + // GET /me + if (req.method === "GET" && url === "/me") { + try { + const data = await discordGet(token, "/users/@me"); + return sendJson(res, 200, { id: data.id, username: data.username }); + } catch (err) { + return sendJson(res, 500, { ok: false, error: String(err) }); + } + } + + // GET /guilds + if (req.method === "GET" && url === "/guilds") { + try { + const guilds = await discordGet(token, "/users/@me/guilds"); + const ADMIN = 8n; + const adminGuilds = guilds + .filter((g) => (BigInt(g.permissions ?? "0") & ADMIN) === ADMIN) + .map((g) => ({ id: g.id, name: g.name })); + return sendJson(res, 200, { guilds: adminGuilds }); + } catch (err) { + return sendJson(res, 500, { ok: false, error: String(err) }); + } + } + + // GET /channels/:guildId + const channelsMatch = url.match(/^\/channels\/(\d+)$/); + if (req.method === "GET" && channelsMatch) { + const guildId = channelsMatch[1]; + try { + const channels = await discordGet(token, `/guilds/${guildId}/channels`); + return sendJson(res, 200, { + channels: channels + .filter((c) => c.type === 0) + .map((c) => ({ id: c.id, name: c.name, type: c.type })), + }); + } catch (err) { + return sendJson(res, 500, { ok: false, error: String(err) }); + } + } + + // POST /send + if (req.method === "POST" && url === "/send") { + let body; + try { + body = await readBody(req); + } catch (err) { + return sendJson(res, 400, { ok: false, error: String(err) }); + } + const { channelId, content } = body; + if (!channelId || !content) { + return sendJson(res, 400, { ok: false, error: "channelId and content required" }); + } + try { + const result = await discordPost(token, `/channels/${channelId}/messages`, { content }); + if (!result.ok) { + return sendJson(res, result.status, { ok: false, error: `Discord API error ${result.status}` }); + } + return sendJson(res, 200, { ok: true, messageId: result.data?.id }); + } catch (err) { + return sendJson(res, 500, { ok: false, error: String(err) }); + } + } + + // POST /delete-message + if (req.method === "POST" && url === "/delete-message") { + let body; + try { + body = await readBody(req); + } catch (err) { + return sendJson(res, 400, { ok: false, error: String(err) }); + } + const { channelId, messageId } = body; + if (!channelId || !messageId) { + return sendJson(res, 400, { ok: false, error: "channelId and messageId required" }); + } + try { + const result = await discordDelete(token, `/channels/${channelId}/messages/${messageId}`); + return sendJson(res, 200, { ok: result.ok }); + } catch (err) { + return sendJson(res, 500, { ok: false, error: String(err) }); + } + } + + // POST /create-channel + if (req.method === "POST" && url === "/create-channel") { + let body; + try { + body = await readBody(req); + } catch (err) { + return sendJson(res, 400, { ok: false, error: String(err) }); + } + const { guildId, name, permissionOverwrites = [] } = body; + if (!guildId || !name) { + return sendJson(res, 400, { ok: false, error: "guildId and name required" }); + } + try { + const result = await discordPost(token, `/guilds/${guildId}/channels`, { + name, + type: 0, + permission_overwrites: permissionOverwrites, + }); + if (!result.ok) { + return sendJson(res, result.status, { ok: false, error: `Discord API error ${result.status}` }); + } + return sendJson(res, 200, { ok: true, channelId: result.data?.id }); + } catch (err) { + return sendJson(res, 500, { ok: false, error: String(err) }); + } + } + + return sendJson(res, 404, { error: "not_found" }); + }; +} + +// ── Plugin notification ──────────────────────────────────────────────────────── + +function createNotifyPlugin(pluginApiUrl, pluginApiToken, log) { + return function notifyPlugin(message) { + const body = JSON.stringify({ + channelId: message.channel_id, + messageId: message.id, + senderId: message.author?.id, + guildId: message.guild_id, + content: message.content, + }); + + const headers = { + "Content-Type": "application/json", + }; + if (pluginApiToken) { + headers["Authorization"] = `Bearer ${pluginApiToken}`; + } + + fetch(`${pluginApiUrl}/dirigent/api/moderator/message`, { + method: "POST", + headers, + body, + }).catch((err) => { + log.warn(`dirigent-moderator: notify plugin failed: ${String(err)}`); + }); + }; +} + +// ── Public API ───────────────────────────────────────────────────────────────── + +/** + * Create the moderator service. + * + * @param {object} config + * @param {string} config.token - Discord bot token + * @param {string} config.pluginApiUrl - e.g. "http://127.0.0.1:18789" + * @param {string} [config.pluginApiToken] - bearer token for plugin API + * @param {string} [config.scheduleIdentifier] - e.g. "➡️" + * @param {boolean} [config.debugMode] + * @returns {{ httpHandler: Function, stop: Function }} + */ +export function createModeratorService(config) { + const { token, pluginApiUrl, pluginApiToken = "", scheduleIdentifier = "➡️", debugMode = false } = config; + + const log = { + info: (msg) => console.log(`[dirigent-moderator] ${msg}`), + warn: (msg) => console.warn(`[dirigent-moderator] WARN ${msg}`), + }; + + if (debugMode) { + log.info(`debug mode enabled, scheduleIdentifier=${scheduleIdentifier}`); + } + + // Decode bot user ID from token + const botUserId = getBotUserIdFromToken(token); + log.info(`bot user id decoded: ${botUserId ?? "(unknown)"}`); + + // Plugin notify callback (fire-and-forget) + const notifyPlugin = createNotifyPlugin(pluginApiUrl, pluginApiToken, log); + + // Gateway connection + const gateway = createGatewayConnection( + token, + (message) => { + // Skip bot's own messages + if (message.author?.id === botUserId) return; + notifyPlugin(message); + }, + log, + ); + + // HTTP handler (caller strips /moderator prefix) + const httpHandler = createHttpHandler(token, botUserId, log); + + return { + httpHandler, + stop() { + log.info("stopping moderator service"); + gateway.stop(); + }, + }; +} diff --git a/services/no-reply-api/server.mjs b/services/no-reply-api/server.mjs new file mode 100644 index 0000000..5508f3a --- /dev/null +++ b/services/no-reply-api/server.mjs @@ -0,0 +1,131 @@ +import http from "node:http"; + +const modelName = process.env.NO_REPLY_MODEL || "no-reply"; +const authToken = process.env.AUTH_TOKEN || ""; + +function sendJson(res, status, payload) { + res.writeHead(status, { "Content-Type": "application/json; charset=utf-8" }); + res.end(JSON.stringify(payload)); +} + +function isAuthorized(req) { + if (!authToken) return true; + const header = req.headers.authorization || ""; + return header === `Bearer ${authToken}`; +} + +function noReplyChatCompletion(reqBody) { + return { + id: `chatcmpl_dirigent_${Date.now()}`, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model: reqBody?.model || modelName, + choices: [ + { + index: 0, + message: { role: "assistant", content: "NO_REPLY" }, + finish_reason: "stop", + }, + ], + usage: { prompt_tokens: 0, completion_tokens: 1, total_tokens: 1 }, + }; +} + +function noReplyResponses(reqBody) { + return { + id: `resp_dirigent_${Date.now()}`, + object: "response", + created_at: Math.floor(Date.now() / 1000), + model: reqBody?.model || modelName, + output: [ + { + type: "message", + role: "assistant", + content: [{ type: "output_text", text: "NO_REPLY" }], + }, + ], + usage: { input_tokens: 0, output_tokens: 1, total_tokens: 1 }, + }; +} + +function listModels() { + return { + object: "list", + data: [ + { + id: modelName, + object: "model", + created: Math.floor(Date.now() / 1000), + owned_by: "dirigent", + }, + ], + }; +} + +/** + * Returns a Node.js HTTP request handler for the no-reply API. + * When used as a sub-service inside main.mjs, the caller strips + * the "/no-reply" prefix from req.url before calling this handler. + */ +export function createNoReplyHandler() { + return function noReplyHandler(req, res) { + const url = req.url ?? "/"; + + if (req.method === "GET" && url === "/health") { + return sendJson(res, 200, { + ok: true, + service: "dirigent-no-reply-api", + model: modelName, + }); + } + + if (req.method === "GET" && url === "/v1/models") { + if (!isAuthorized(req)) return sendJson(res, 401, { error: "unauthorized" }); + return sendJson(res, 200, listModels()); + } + + if (req.method !== "POST") { + return sendJson(res, 404, { error: "not_found" }); + } + + if (!isAuthorized(req)) { + return sendJson(res, 401, { error: "unauthorized" }); + } + + let body = ""; + req.on("data", (chunk) => { + body += chunk; + if (body.length > 1_000_000) req.destroy(); + }); + + req.on("end", () => { + let parsed = {}; + try { + parsed = body ? JSON.parse(body) : {}; + } catch { + return sendJson(res, 400, { error: "invalid_json" }); + } + + if (url === "/v1/chat/completions") { + return sendJson(res, 200, noReplyChatCompletion(parsed)); + } + + if (url === "/v1/responses") { + return sendJson(res, 200, noReplyResponses(parsed)); + } + + return sendJson(res, 404, { error: "not_found" }); + }); + }; +} + +// Standalone mode: run HTTP server if this file is the entry point +const isMain = process.argv[1] && process.argv[1].endsWith("server.mjs"); +if (isMain) { + const port = Number(process.env.PORT || 8787); + const handler = createNoReplyHandler(); + const server = http.createServer(handler); + server.listen(port, () => { + console.log(`[dirigent-no-reply-api] listening on :${port}`); + }); +}