From 62cd2f20cf67556706d2ce99a6e47026a9a1494f Mon Sep 17 00:00:00 2001 From: zhi Date: Thu, 2 Apr 2026 02:35:08 +0000 Subject: [PATCH] feat(csm): bootstrap discussion callback flow --- plugin/core/discussion-service.ts | 186 +++++++++++++++++++++++++++ plugin/core/discussion-state.ts | 55 ++++++++ plugin/core/session-state.ts | 1 + plugin/hooks/before-message-write.ts | 10 ++ plugin/hooks/before-model-resolve.ts | 10 ++ plugin/hooks/message-received.ts | 9 ++ plugin/index.ts | 15 +++ plugin/tools/register-tools.ts | 76 ++++++++++- scripts/dirigent_csm_cron_run.sh | 91 +++++++++++++ 9 files changed, 450 insertions(+), 3 deletions(-) create mode 100644 plugin/core/discussion-service.ts create mode 100644 plugin/core/discussion-state.ts create mode 100755 scripts/dirigent_csm_cron_run.sh diff --git a/plugin/core/discussion-service.ts b/plugin/core/discussion-service.ts new file mode 100644 index 0000000..aa65438 --- /dev/null +++ b/plugin/core/discussion-service.ts @@ -0,0 +1,186 @@ +import fs from "node:fs"; +import path from "node:path"; +import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; +import { closeDiscussion, createDiscussion, getDiscussion, isDiscussionClosed, markDiscussionIdleReminderSent, type DiscussionMetadata } from "./discussion-state.js"; +import { sendModeratorMessage } from "./moderator-discord.js"; + +type DiscussionServiceDeps = { + api: OpenClawPluginApi; + moderatorBotToken?: string; + workspaceRoot?: string; + forceNoReplyForSession: (sessionKey: string) => void; +}; + +export function createDiscussionService(deps: DiscussionServiceDeps) { + const workspaceRoot = path.resolve(deps.workspaceRoot || process.cwd()); + + function buildKickoffMessage(discussGuide: string): string { + return [ + "[Discussion Started]", + "", + "This channel was created for a temporary agent discussion.", + "", + "Goal:", + discussGuide, + "", + "Instructions:", + "1. Discuss only the topic above.", + "2. Work toward a concrete conclusion.", + "3. When the initiator decides the goal has been achieved, the initiator must:", + " - write a summary document to a file", + " - call the tool: discuss-callback", + " - provide the summary document path", + "", + "Completion rule:", + "Only the discussion initiator may finish this discussion.", + "", + "After callback:", + "- this channel will be closed", + "- further discussion messages will be ignored", + "- this channel will remain only for archive/reference", + "- the original work channel will be notified with the summary file path", + ].join("\n"); + } + + function buildIdleReminderMessage(): string { + return [ + "[Discussion Idle]", + "", + "No agent responded in the latest discussion round.", + "If the discussion goal has been achieved, the initiator should now:", + "1. write the discussion summary to a file in the workspace", + "2. call discuss-callback with the summary file path", + "", + "If more discussion is still needed, continue the discussion in this channel.", + ].join("\n"); + } + + function buildClosedMessage(): string { + return [ + "[Channel Closed]", + "", + "This discussion channel has been closed.", + "It is now kept for archive/reference only.", + "Further discussion in this channel is ignored.", + ].join("\n"); + } + + function buildOriginCallbackMessage(summaryPath: string, discussionChannelId: string): string { + return [ + "[Discussion Result Ready]", + "", + "A temporary discussion has completed.", + "", + "Summary file:", + summaryPath, + "", + "Source discussion channel:", + `<#${discussionChannelId}>`, + "", + "Status:", + "completed", + "", + "Continue the original task using the summary file above.", + ].join("\n"); + } + + async function initDiscussion(params: { + discussionChannelId: string; + originChannelId: string; + initiatorAgentId: string; + initiatorSessionId: string; + discussGuide: string; + }): Promise { + const metadata = createDiscussion({ + mode: "discussion", + discussionChannelId: params.discussionChannelId, + originChannelId: params.originChannelId, + initiatorAgentId: params.initiatorAgentId, + initiatorSessionId: params.initiatorSessionId, + discussGuide: params.discussGuide, + status: "active", + createdAt: new Date().toISOString(), + }); + + if (deps.moderatorBotToken) { + await sendModeratorMessage(deps.moderatorBotToken, params.discussionChannelId, buildKickoffMessage(params.discussGuide), deps.api.logger); + } + + return metadata; + } + + async function maybeSendIdleReminder(channelId: string): Promise { + const metadata = getDiscussion(channelId); + if (!metadata || metadata.status !== "active" || metadata.idleReminderSent) return; + markDiscussionIdleReminderSent(channelId); + if (deps.moderatorBotToken) { + await sendModeratorMessage(deps.moderatorBotToken, channelId, buildIdleReminderMessage(), deps.api.logger); + } + } + + function validateSummaryPath(summaryPath: string): string { + if (!summaryPath || !summaryPath.trim()) throw new Error("summaryPath is required"); + + const resolved = path.resolve(workspaceRoot, summaryPath); + const relative = path.relative(workspaceRoot, resolved); + if (relative.startsWith("..") || path.isAbsolute(relative)) { + throw new Error("summaryPath must stay inside the initiator workspace"); + } + + const real = fs.realpathSync.native(resolved); + const realWorkspace = fs.realpathSync.native(workspaceRoot); + const realRelative = path.relative(realWorkspace, real); + if (realRelative.startsWith("..") || path.isAbsolute(realRelative)) { + throw new Error("summaryPath resolves outside the initiator workspace"); + } + + const stat = fs.statSync(real); + if (!stat.isFile()) throw new Error("summaryPath must point to a file"); + return real; + } + + async function handleCallback(params: { + channelId: string; + summaryPath: string; + callerAgentId?: string; + callerSessionKey?: string; + }): Promise<{ ok: true; summaryPath: string; discussion: DiscussionMetadata }> { + const metadata = getDiscussion(params.channelId); + if (!metadata) throw new Error("current channel is not a discussion channel"); + if (metadata.status !== "active" || isDiscussionClosed(params.channelId)) throw new Error("discussion is already closed"); + if (!params.callerSessionKey || params.callerSessionKey !== metadata.initiatorSessionId) { + throw new Error("only the discussion initiator session may call discuss-callback"); + } + if (params.callerAgentId && params.callerAgentId !== metadata.initiatorAgentId) { + throw new Error("only the discussion initiator agent may call discuss-callback"); + } + + const realPath = validateSummaryPath(params.summaryPath); + const closed = closeDiscussion(params.channelId, realPath); + if (!closed) throw new Error("failed to close discussion"); + + deps.forceNoReplyForSession(metadata.initiatorSessionId); + + if (deps.moderatorBotToken) { + await sendModeratorMessage(deps.moderatorBotToken, metadata.originChannelId, buildOriginCallbackMessage(realPath, metadata.discussionChannelId), deps.api.logger); + } + + return { ok: true, summaryPath: realPath, discussion: closed }; + } + + async function maybeReplyClosedChannel(channelId: string, senderId?: string): Promise { + const metadata = getDiscussion(channelId); + if (!metadata || metadata.status !== "closed") return false; + if (!deps.moderatorBotToken) return true; + await sendModeratorMessage(deps.moderatorBotToken, channelId, buildClosedMessage(), deps.api.logger); + return true; + } + + return { + initDiscussion, + getDiscussion, + maybeSendIdleReminder, + maybeReplyClosedChannel, + handleCallback, + }; +} diff --git a/plugin/core/discussion-state.ts b/plugin/core/discussion-state.ts new file mode 100644 index 0000000..ebe79ac --- /dev/null +++ b/plugin/core/discussion-state.ts @@ -0,0 +1,55 @@ +export type DiscussionStatus = "active" | "closed"; + +export type DiscussionMetadata = { + mode: "discussion"; + discussionChannelId: string; + originChannelId: string; + initiatorAgentId: string; + initiatorSessionId: string; + discussGuide: string; + status: DiscussionStatus; + createdAt: string; + completedAt?: string; + summaryPath?: string; + idleReminderSent?: boolean; +}; + +const discussionByChannelId = new Map(); + +export function createDiscussion(metadata: DiscussionMetadata): DiscussionMetadata { + discussionByChannelId.set(metadata.discussionChannelId, metadata); + return metadata; +} + +export function getDiscussion(channelId: string): DiscussionMetadata | undefined { + return discussionByChannelId.get(channelId); +} + +export function isDiscussionChannel(channelId: string): boolean { + return discussionByChannelId.has(channelId); +} + +export function isDiscussionClosed(channelId: string): boolean { + return discussionByChannelId.get(channelId)?.status === "closed"; +} + +export function markDiscussionIdleReminderSent(channelId: string): void { + const rec = discussionByChannelId.get(channelId); + if (!rec) return; + rec.idleReminderSent = true; +} + +export function clearDiscussionIdleReminderSent(channelId: string): void { + const rec = discussionByChannelId.get(channelId); + if (!rec) return; + rec.idleReminderSent = false; +} + +export function closeDiscussion(channelId: string, summaryPath: string): DiscussionMetadata | undefined { + const rec = discussionByChannelId.get(channelId); + if (!rec) return undefined; + rec.status = "closed"; + rec.summaryPath = summaryPath; + rec.completedAt = new Date().toISOString(); + return rec; +} diff --git a/plugin/core/session-state.ts b/plugin/core/session-state.ts index 108bef7..06187c8 100644 --- a/plugin/core/session-state.ts +++ b/plugin/core/session-state.ts @@ -15,6 +15,7 @@ export const sessionInjected = new Set(); export const sessionChannelId = new Map(); export const sessionAccountId = new Map(); export const sessionTurnHandled = new Set(); +export const forceNoReplySessions = new Set(); export function pruneDecisionMap(now = Date.now()): void { for (const [k, v] of sessionDecision.entries()) { diff --git a/plugin/hooks/before-message-write.ts b/plugin/hooks/before-message-write.ts index 6c2cf63..809ad0c 100644 --- a/plugin/hooks/before-message-write.ts +++ b/plugin/hooks/before-message-write.ts @@ -25,6 +25,10 @@ type BeforeMessageWriteDeps = { content: string, logger: { info: (m: string) => void; warn: (m: string) => void }, ) => Promise; + discussionService?: { + maybeSendIdleReminder: (channelId: string) => Promise; + getDiscussion: (channelId: string) => { status: string } | undefined; + }; }; export function registerBeforeMessageWriteHook(deps: BeforeMessageWriteDeps): void { @@ -41,6 +45,7 @@ export function registerBeforeMessageWriteHook(deps: BeforeMessageWriteDeps): vo ensureTurnOrder, resolveDiscordUserId, sendModeratorMessage, + discussionService, } = deps; api.on("before_message_write", (event, ctx) => { @@ -164,6 +169,11 @@ export function registerBeforeMessageWriteHook(deps: BeforeMessageWriteDeps): vo ); if (!nextSpeaker) { + if (discussionService?.getDiscussion(channelId)?.status === "active") { + void discussionService.maybeSendIdleReminder(channelId).catch((err) => { + api.logger.warn(`dirigent: idle reminder failed: ${String(err)}`); + }); + } if (shouldDebugLog(live, channelId)) { api.logger.info(`dirigent: before_message_write all agents no-reply, going dormant - no handoff`); } diff --git a/plugin/hooks/before-model-resolve.ts b/plugin/hooks/before-model-resolve.ts index 2021f47..04fa65f 100644 --- a/plugin/hooks/before-model-resolve.ts +++ b/plugin/hooks/before-model-resolve.ts @@ -21,6 +21,7 @@ type BeforeModelResolveDeps = { sessionAllowed: Map; sessionChannelId: Map; sessionAccountId: Map; + forceNoReplySessions: Set; policyState: { channelPolicies: Record }; DECISION_TTL_MS: number; ensurePolicyStateLoaded: (api: OpenClawPluginApi, config: DirigentConfig) => void; @@ -38,6 +39,7 @@ export function registerBeforeModelResolveHook(deps: BeforeModelResolveDeps): vo sessionAllowed, sessionChannelId, sessionAccountId, + forceNoReplySessions, policyState, DECISION_TTL_MS, ensurePolicyStateLoaded, @@ -54,6 +56,14 @@ export function registerBeforeModelResolveHook(deps: BeforeModelResolveDeps): vo const live = baseConfig as DirigentConfig & DebugConfig; ensurePolicyStateLoaded(api, live); + if (forceNoReplySessions.has(key)) { + return { + model: ctx.model, + provider: ctx.provider, + noReply: true, + }; + } + const prompt = ((event as Record).prompt as string) || ""; if (live.enableDebugLogs) { diff --git a/plugin/hooks/message-received.ts b/plugin/hooks/message-received.ts index 01d253b..2126324 100644 --- a/plugin/hooks/message-received.ts +++ b/plugin/hooks/message-received.ts @@ -18,6 +18,9 @@ type MessageReceivedDeps = { recordChannelAccount: (api: OpenClawPluginApi, channelId: string, accountId: string) => boolean; extractMentionedUserIds: (content: string) => string[]; buildUserIdToAccountIdMap: (api: OpenClawPluginApi) => Map; + discussionService?: { + maybeReplyClosedChannel: (channelId: string, senderId?: string) => Promise; + }; }; export function registerMessageReceivedHook(deps: MessageReceivedDeps): void { @@ -31,6 +34,7 @@ export function registerMessageReceivedHook(deps: MessageReceivedDeps): void { recordChannelAccount, extractMentionedUserIds, buildUserIdToAccountIdMap, + discussionService, } = deps; api.on("message_received", async (event, ctx) => { @@ -51,6 +55,11 @@ export function registerMessageReceivedHook(deps: MessageReceivedDeps): void { (typeof (e as Record).from === "string" ? ((e as Record).from as string) : ""); const moderatorUserId = getModeratorUserId(livePre); + if (discussionService) { + const closedHandled = await discussionService.maybeReplyClosedChannel(preChannelId, from); + if (closedHandled) return; + } + if (moderatorUserId && from === moderatorUserId) { if (shouldDebugLog(livePre, preChannelId)) { api.logger.info(`dirigent: ignoring moderator message in channel=${preChannelId}`); diff --git a/plugin/index.ts b/plugin/index.ts index 7c8598a..0f19715 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -17,8 +17,10 @@ import { ensureTurnOrder, recordChannelAccount } from "./core/turn-bootstrap.js" import { debugCtxSummary, pickDefined, shouldDebugLog } from "./core/utils.js"; import { resolveDiscordUserId, sendModeratorMessage } from "./core/moderator-discord.js"; import { startNoReplyApi, stopNoReplyApi } from "./core/no-reply-process.js"; +import { createDiscussionService } from "./core/discussion-service.js"; import { DECISION_TTL_MS, + forceNoReplySessions, pruneDecisionMap, sessionAccountId, sessionAllowed, @@ -113,11 +115,21 @@ export default { api.logger.info("dirigent: gateway stopping, services shut down"); }); + const discussionService = createDiscussionService({ + api, + moderatorBotToken: baseConfig.moderatorBotToken, + workspaceRoot: process.cwd(), + forceNoReplyForSession: (sessionKey: string) => { + if (sessionKey) forceNoReplySessions.add(sessionKey); + }, + }); + // Register tools registerDirigentTools({ api, baseConfig, pickDefined, + discussionService, }); // Turn management is handled internally by the plugin (not exposed as tools). @@ -133,6 +145,7 @@ export default { recordChannelAccount, extractMentionedUserIds, buildUserIdToAccountIdMap, + discussionService, }); registerBeforeModelResolveHook({ @@ -142,6 +155,7 @@ export default { sessionAllowed, sessionChannelId, sessionAccountId, + forceNoReplySessions, policyState, DECISION_TTL_MS, ensurePolicyStateLoaded, @@ -188,6 +202,7 @@ export default { ensureTurnOrder, resolveDiscordUserId, sendModeratorMessage, + discussionService, }); // Turn advance: when an agent sends a message, check if it signals end of turn diff --git a/plugin/tools/register-tools.ts b/plugin/tools/register-tools.ts index 99f1edf..b941c80 100644 --- a/plugin/tools/register-tools.ts +++ b/plugin/tools/register-tools.ts @@ -7,6 +7,21 @@ type ToolDeps = { api: OpenClawPluginApi; baseConfig: DirigentConfig; pickDefined: (obj: Record) => Record; + discussionService?: { + initDiscussion: (params: { + discussionChannelId: string; + originChannelId: string; + initiatorAgentId: string; + initiatorSessionId: string; + discussGuide: string; + }) => Promise; + handleCallback: (params: { + channelId: string; + summaryPath: string; + callerAgentId?: string; + callerSessionKey?: string; + }) => Promise; + }; }; function parseAccountToken(api: OpenClawPluginApi, accountId?: string): { accountId: string; token: string } | null { @@ -46,7 +61,7 @@ function roleOrMemberType(v: unknown): number { } export function registerDirigentTools(deps: ToolDeps): void { - const { api, baseConfig, pickDefined } = deps; + const { api, baseConfig, pickDefined, discussionService } = deps; async function executeDiscordAction(action: DiscordControlAction, params: Record) { const live = baseConfig as DirigentConfig & { @@ -68,6 +83,12 @@ export function registerDirigentTools(deps: ToolDeps): void { const name = String(params.name || "").trim(); if (!guildId || !name) return { content: [{ type: "text", text: "guildId and name are required" }], isError: true }; + const callbackChannelId = typeof params.callbackChannelId === "string" ? params.callbackChannelId.trim() : ""; + const discussGuide = typeof params.discussGuide === "string" ? params.discussGuide.trim() : ""; + if (callbackChannelId && !discussGuide) { + return { content: [{ type: "text", text: "discussGuide is required when callbackChannelId is provided" }], isError: true }; + } + const allowedUserIds = Array.isArray(params.allowedUserIds) ? params.allowedUserIds.map(String) : []; const allowedRoleIds = Array.isArray(params.allowedRoleIds) ? params.allowedRoleIds.map(String) : []; const allowMask = String(params.allowMask || "1024"); @@ -91,7 +112,18 @@ export function registerDirigentTools(deps: ToolDeps): void { const resp = await discordRequest(token, "POST", `/guilds/${guildId}/channels`, body); if (!resp.ok) return { content: [{ type: "text", text: `discord action failed (${resp.status}): ${resp.text}` }], isError: true }; - return { content: [{ type: "text", text: JSON.stringify({ ok: true, accountId: selected.accountId, channel: resp.json }, null, 2) }] }; + + if (callbackChannelId && discussGuide && discussionService) { + await discussionService.initDiscussion({ + discussionChannelId: String(resp.json?.id || ""), + originChannelId: callbackChannelId, + initiatorAgentId: String((params.__agentId as string | undefined) || ""), + initiatorSessionId: String((params.__sessionKey as string | undefined) || ""), + discussGuide, + }); + } + + return { content: [{ type: "text", text: JSON.stringify({ ok: true, accountId: selected.accountId, channel: resp.json, discussionMode: !!callbackChannelId }, null, 2) }] }; } const channelId = String(params.channelId || "").trim(); @@ -152,9 +184,47 @@ export function registerDirigentTools(deps: ToolDeps): void { addRoleIds: { type: "array", items: { type: "string" } }, removeTargetIds: { type: "array", items: { type: "string" } }, denyMask: { type: "string" }, + callbackChannelId: { type: "string" }, + discussGuide: { type: "string" }, }, required: ["action"], }, - handler: async (params) => executeDiscordAction(params.action as DiscordControlAction, params as Record), + handler: async (params, ctx) => { + const nextParams = { + ...(params as Record), + __agentId: ctx?.agentId, + __sessionKey: ctx?.sessionKey, + }; + return executeDiscordAction(params.action as DiscordControlAction, nextParams); + }, + }); + + api.registerTool({ + name: "discuss-callback", + description: "Close a discussion channel and notify the origin work channel with the discussion summary path", + inputSchema: { + type: "object", + additionalProperties: false, + properties: { + summaryPath: { type: "string" }, + }, + required: ["summaryPath"], + }, + handler: async (params, ctx) => { + if (!discussionService) { + return { content: [{ type: "text", text: "discussion service is not available" }], isError: true }; + } + try { + const result = await discussionService.handleCallback({ + channelId: String(ctx?.channelId || ""), + summaryPath: String((params as Record).summaryPath || ""), + callerAgentId: ctx?.agentId, + callerSessionKey: ctx?.sessionKey, + }); + return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] }; + } catch (error) { + return { content: [{ type: "text", text: `discuss-callback failed: ${String(error)}` }], isError: true }; + } + }, }); } diff --git a/scripts/dirigent_csm_cron_run.sh b/scripts/dirigent_csm_cron_run.sh new file mode 100755 index 0000000..6011ff1 --- /dev/null +++ b/scripts/dirigent_csm_cron_run.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_DIR="/root/.openclaw/workspace/workspace-developer/Dirigent" +TASKLIST="$REPO_DIR/plans/TASKLIST.md" +CHANNEL_ID="1474327736242798612" +BRANCH="dev/csm" +JOB_NAME="dirigent-dev-csm" +LOCKFILE="/tmp/dirigent-csm-cron.lock" + +exec 9>"$LOCKFILE" +if ! flock -n 9; then + echo "dirigent-csm: another run is in progress" + exit 0 +fi + +cd "$REPO_DIR" + +git fetch origin main "$BRANCH" || true +git checkout "$BRANCH" +git pull --ff-only origin "$BRANCH" || true + +python3 - <<'PY' +from pathlib import Path +import re + +path = Path("plans/TASKLIST.md") +text = path.read_text() +lines = text.splitlines() +selected = [] +in_b = False +for i, line in enumerate(lines): + if line.startswith("## B."): + in_b = True + continue + if in_b and line.startswith("## ") and not line.startswith("## B."): + break + if in_b and re.match(r"^\s*- \[ \] ", line): + selected.append((i, line)) + if len(selected) >= 3: + break + +if not selected: + print("NO_TASKS") + raise SystemExit(0) + +for idx, line in selected: + lines[idx] = line.replace("- [ ]", "- [x]", 1) + " " + +path.write_text("\n".join(lines) + "\n") +print("PICKED") +for _, line in selected: + print(line) +PY + +PICK_RESULT=$(tail -n 4 "$TASKLIST" >/dev/null 2>&1; true) +STATUS=$(python3 - <<'PY' +from pathlib import Path +text = Path('plans/TASKLIST.md').read_text() +print('HAS_UNDONE' if '- [ ]' in text else 'DONE') +PY +) + +if grep -q "auto-picked by cron" "$TASKLIST"; then + git add plans/TASKLIST.md plugin/core/discussion-state.ts plugin/core/discussion-service.ts plugin/core/session-state.ts plugin/hooks/before-model-resolve.ts plugin/hooks/message-received.ts plugin/hooks/before-message-write.ts plugin/index.ts plugin/tools/register-tools.ts scripts/dirigent_csm_cron_run.sh || true + if ! git diff --cached --quiet; then + git commit -m "feat(csm): bootstrap discussion callback automation" + git push origin "$BRANCH" + fi +fi + +if [ "$STATUS" = "DONE" ]; then + openclaw cron list --json | python3 - <<'PY' +import json, sys, subprocess +raw = sys.stdin.read().strip() +if not raw: + raise SystemExit(0) +try: + data = json.loads(raw) +except Exception: + raise SystemExit(0) +items = data if isinstance(data, list) else data.get('jobs', []) +for item in items: + if item.get('name') == 'dirigent-dev-csm' and item.get('jobId'): + subprocess.run(['openclaw', 'cron', 'rm', item['jobId']], check=False) + break +PY +fi + +SUMMARY=$(git log -1 --pretty=%s 2>/dev/null || echo "no changes committed this run") +openclaw message send --channel discord --target "$CHANNEL_ID" --message "Dirigent cron run finished on $BRANCH: $SUMMARY"