feat(csm): bootstrap discussion callback flow

This commit is contained in:
zhi
2026-04-02 02:35:08 +00:00
parent 9fa71f37bf
commit 62cd2f20cf
9 changed files with 450 additions and 3 deletions

View File

@@ -0,0 +1,186 @@
import fs from "node:fs";
import path from "node:path";
import type { OpenClawPluginApi } from "openclaw/plugin-sdk";
import { closeDiscussion, createDiscussion, getDiscussion, isDiscussionClosed, markDiscussionIdleReminderSent, type DiscussionMetadata } from "./discussion-state.js";
import { sendModeratorMessage } from "./moderator-discord.js";
type DiscussionServiceDeps = {
api: OpenClawPluginApi;
moderatorBotToken?: string;
workspaceRoot?: string;
forceNoReplyForSession: (sessionKey: string) => void;
};
export function createDiscussionService(deps: DiscussionServiceDeps) {
const workspaceRoot = path.resolve(deps.workspaceRoot || process.cwd());
function buildKickoffMessage(discussGuide: string): string {
return [
"[Discussion Started]",
"",
"This channel was created for a temporary agent discussion.",
"",
"Goal:",
discussGuide,
"",
"Instructions:",
"1. Discuss only the topic above.",
"2. Work toward a concrete conclusion.",
"3. When the initiator decides the goal has been achieved, the initiator must:",
" - write a summary document to a file",
" - call the tool: discuss-callback",
" - provide the summary document path",
"",
"Completion rule:",
"Only the discussion initiator may finish this discussion.",
"",
"After callback:",
"- this channel will be closed",
"- further discussion messages will be ignored",
"- this channel will remain only for archive/reference",
"- the original work channel will be notified with the summary file path",
].join("\n");
}
function buildIdleReminderMessage(): string {
return [
"[Discussion Idle]",
"",
"No agent responded in the latest discussion round.",
"If the discussion goal has been achieved, the initiator should now:",
"1. write the discussion summary to a file in the workspace",
"2. call discuss-callback with the summary file path",
"",
"If more discussion is still needed, continue the discussion in this channel.",
].join("\n");
}
function buildClosedMessage(): string {
return [
"[Channel Closed]",
"",
"This discussion channel has been closed.",
"It is now kept for archive/reference only.",
"Further discussion in this channel is ignored.",
].join("\n");
}
function buildOriginCallbackMessage(summaryPath: string, discussionChannelId: string): string {
return [
"[Discussion Result Ready]",
"",
"A temporary discussion has completed.",
"",
"Summary file:",
summaryPath,
"",
"Source discussion channel:",
`<#${discussionChannelId}>`,
"",
"Status:",
"completed",
"",
"Continue the original task using the summary file above.",
].join("\n");
}
async function initDiscussion(params: {
discussionChannelId: string;
originChannelId: string;
initiatorAgentId: string;
initiatorSessionId: string;
discussGuide: string;
}): Promise<DiscussionMetadata> {
const metadata = createDiscussion({
mode: "discussion",
discussionChannelId: params.discussionChannelId,
originChannelId: params.originChannelId,
initiatorAgentId: params.initiatorAgentId,
initiatorSessionId: params.initiatorSessionId,
discussGuide: params.discussGuide,
status: "active",
createdAt: new Date().toISOString(),
});
if (deps.moderatorBotToken) {
await sendModeratorMessage(deps.moderatorBotToken, params.discussionChannelId, buildKickoffMessage(params.discussGuide), deps.api.logger);
}
return metadata;
}
async function maybeSendIdleReminder(channelId: string): Promise<void> {
const metadata = getDiscussion(channelId);
if (!metadata || metadata.status !== "active" || metadata.idleReminderSent) return;
markDiscussionIdleReminderSent(channelId);
if (deps.moderatorBotToken) {
await sendModeratorMessage(deps.moderatorBotToken, channelId, buildIdleReminderMessage(), deps.api.logger);
}
}
function validateSummaryPath(summaryPath: string): string {
if (!summaryPath || !summaryPath.trim()) throw new Error("summaryPath is required");
const resolved = path.resolve(workspaceRoot, summaryPath);
const relative = path.relative(workspaceRoot, resolved);
if (relative.startsWith("..") || path.isAbsolute(relative)) {
throw new Error("summaryPath must stay inside the initiator workspace");
}
const real = fs.realpathSync.native(resolved);
const realWorkspace = fs.realpathSync.native(workspaceRoot);
const realRelative = path.relative(realWorkspace, real);
if (realRelative.startsWith("..") || path.isAbsolute(realRelative)) {
throw new Error("summaryPath resolves outside the initiator workspace");
}
const stat = fs.statSync(real);
if (!stat.isFile()) throw new Error("summaryPath must point to a file");
return real;
}
async function handleCallback(params: {
channelId: string;
summaryPath: string;
callerAgentId?: string;
callerSessionKey?: string;
}): Promise<{ ok: true; summaryPath: string; discussion: DiscussionMetadata }> {
const metadata = getDiscussion(params.channelId);
if (!metadata) throw new Error("current channel is not a discussion channel");
if (metadata.status !== "active" || isDiscussionClosed(params.channelId)) throw new Error("discussion is already closed");
if (!params.callerSessionKey || params.callerSessionKey !== metadata.initiatorSessionId) {
throw new Error("only the discussion initiator session may call discuss-callback");
}
if (params.callerAgentId && params.callerAgentId !== metadata.initiatorAgentId) {
throw new Error("only the discussion initiator agent may call discuss-callback");
}
const realPath = validateSummaryPath(params.summaryPath);
const closed = closeDiscussion(params.channelId, realPath);
if (!closed) throw new Error("failed to close discussion");
deps.forceNoReplyForSession(metadata.initiatorSessionId);
if (deps.moderatorBotToken) {
await sendModeratorMessage(deps.moderatorBotToken, metadata.originChannelId, buildOriginCallbackMessage(realPath, metadata.discussionChannelId), deps.api.logger);
}
return { ok: true, summaryPath: realPath, discussion: closed };
}
async function maybeReplyClosedChannel(channelId: string, senderId?: string): Promise<boolean> {
const metadata = getDiscussion(channelId);
if (!metadata || metadata.status !== "closed") return false;
if (!deps.moderatorBotToken) return true;
await sendModeratorMessage(deps.moderatorBotToken, channelId, buildClosedMessage(), deps.api.logger);
return true;
}
return {
initDiscussion,
getDiscussion,
maybeSendIdleReminder,
maybeReplyClosedChannel,
handleCallback,
};
}

View File

@@ -0,0 +1,55 @@
export type DiscussionStatus = "active" | "closed";
export type DiscussionMetadata = {
mode: "discussion";
discussionChannelId: string;
originChannelId: string;
initiatorAgentId: string;
initiatorSessionId: string;
discussGuide: string;
status: DiscussionStatus;
createdAt: string;
completedAt?: string;
summaryPath?: string;
idleReminderSent?: boolean;
};
const discussionByChannelId = new Map<string, DiscussionMetadata>();
export function createDiscussion(metadata: DiscussionMetadata): DiscussionMetadata {
discussionByChannelId.set(metadata.discussionChannelId, metadata);
return metadata;
}
export function getDiscussion(channelId: string): DiscussionMetadata | undefined {
return discussionByChannelId.get(channelId);
}
export function isDiscussionChannel(channelId: string): boolean {
return discussionByChannelId.has(channelId);
}
export function isDiscussionClosed(channelId: string): boolean {
return discussionByChannelId.get(channelId)?.status === "closed";
}
export function markDiscussionIdleReminderSent(channelId: string): void {
const rec = discussionByChannelId.get(channelId);
if (!rec) return;
rec.idleReminderSent = true;
}
export function clearDiscussionIdleReminderSent(channelId: string): void {
const rec = discussionByChannelId.get(channelId);
if (!rec) return;
rec.idleReminderSent = false;
}
export function closeDiscussion(channelId: string, summaryPath: string): DiscussionMetadata | undefined {
const rec = discussionByChannelId.get(channelId);
if (!rec) return undefined;
rec.status = "closed";
rec.summaryPath = summaryPath;
rec.completedAt = new Date().toISOString();
return rec;
}

View File

@@ -15,6 +15,7 @@ export const sessionInjected = new Set<string>();
export const sessionChannelId = new Map<string, string>();
export const sessionAccountId = new Map<string, string>();
export const sessionTurnHandled = new Set<string>();
export const forceNoReplySessions = new Set<string>();
export function pruneDecisionMap(now = Date.now()): void {
for (const [k, v] of sessionDecision.entries()) {

View File

@@ -25,6 +25,10 @@ type BeforeMessageWriteDeps = {
content: string,
logger: { info: (m: string) => void; warn: (m: string) => void },
) => Promise<void>;
discussionService?: {
maybeSendIdleReminder: (channelId: string) => Promise<void>;
getDiscussion: (channelId: string) => { status: string } | undefined;
};
};
export function registerBeforeMessageWriteHook(deps: BeforeMessageWriteDeps): void {
@@ -41,6 +45,7 @@ export function registerBeforeMessageWriteHook(deps: BeforeMessageWriteDeps): vo
ensureTurnOrder,
resolveDiscordUserId,
sendModeratorMessage,
discussionService,
} = deps;
api.on("before_message_write", (event, ctx) => {
@@ -164,6 +169,11 @@ export function registerBeforeMessageWriteHook(deps: BeforeMessageWriteDeps): vo
);
if (!nextSpeaker) {
if (discussionService?.getDiscussion(channelId)?.status === "active") {
void discussionService.maybeSendIdleReminder(channelId).catch((err) => {
api.logger.warn(`dirigent: idle reminder failed: ${String(err)}`);
});
}
if (shouldDebugLog(live, channelId)) {
api.logger.info(`dirigent: before_message_write all agents no-reply, going dormant - no handoff`);
}

View File

@@ -21,6 +21,7 @@ type BeforeModelResolveDeps = {
sessionAllowed: Map<string, boolean>;
sessionChannelId: Map<string, string>;
sessionAccountId: Map<string, string>;
forceNoReplySessions: Set<string>;
policyState: { channelPolicies: Record<string, unknown> };
DECISION_TTL_MS: number;
ensurePolicyStateLoaded: (api: OpenClawPluginApi, config: DirigentConfig) => void;
@@ -38,6 +39,7 @@ export function registerBeforeModelResolveHook(deps: BeforeModelResolveDeps): vo
sessionAllowed,
sessionChannelId,
sessionAccountId,
forceNoReplySessions,
policyState,
DECISION_TTL_MS,
ensurePolicyStateLoaded,
@@ -54,6 +56,14 @@ export function registerBeforeModelResolveHook(deps: BeforeModelResolveDeps): vo
const live = baseConfig as DirigentConfig & DebugConfig;
ensurePolicyStateLoaded(api, live);
if (forceNoReplySessions.has(key)) {
return {
model: ctx.model,
provider: ctx.provider,
noReply: true,
};
}
const prompt = ((event as Record<string, unknown>).prompt as string) || "";
if (live.enableDebugLogs) {

View File

@@ -18,6 +18,9 @@ type MessageReceivedDeps = {
recordChannelAccount: (api: OpenClawPluginApi, channelId: string, accountId: string) => boolean;
extractMentionedUserIds: (content: string) => string[];
buildUserIdToAccountIdMap: (api: OpenClawPluginApi) => Map<string, string>;
discussionService?: {
maybeReplyClosedChannel: (channelId: string, senderId?: string) => Promise<boolean>;
};
};
export function registerMessageReceivedHook(deps: MessageReceivedDeps): void {
@@ -31,6 +34,7 @@ export function registerMessageReceivedHook(deps: MessageReceivedDeps): void {
recordChannelAccount,
extractMentionedUserIds,
buildUserIdToAccountIdMap,
discussionService,
} = deps;
api.on("message_received", async (event, ctx) => {
@@ -51,6 +55,11 @@ export function registerMessageReceivedHook(deps: MessageReceivedDeps): void {
(typeof (e as Record<string, unknown>).from === "string" ? ((e as Record<string, unknown>).from as string) : "");
const moderatorUserId = getModeratorUserId(livePre);
if (discussionService) {
const closedHandled = await discussionService.maybeReplyClosedChannel(preChannelId, from);
if (closedHandled) return;
}
if (moderatorUserId && from === moderatorUserId) {
if (shouldDebugLog(livePre, preChannelId)) {
api.logger.info(`dirigent: ignoring moderator message in channel=${preChannelId}`);

View File

@@ -17,8 +17,10 @@ import { ensureTurnOrder, recordChannelAccount } from "./core/turn-bootstrap.js"
import { debugCtxSummary, pickDefined, shouldDebugLog } from "./core/utils.js";
import { resolveDiscordUserId, sendModeratorMessage } from "./core/moderator-discord.js";
import { startNoReplyApi, stopNoReplyApi } from "./core/no-reply-process.js";
import { createDiscussionService } from "./core/discussion-service.js";
import {
DECISION_TTL_MS,
forceNoReplySessions,
pruneDecisionMap,
sessionAccountId,
sessionAllowed,
@@ -113,11 +115,21 @@ export default {
api.logger.info("dirigent: gateway stopping, services shut down");
});
const discussionService = createDiscussionService({
api,
moderatorBotToken: baseConfig.moderatorBotToken,
workspaceRoot: process.cwd(),
forceNoReplyForSession: (sessionKey: string) => {
if (sessionKey) forceNoReplySessions.add(sessionKey);
},
});
// Register tools
registerDirigentTools({
api,
baseConfig,
pickDefined,
discussionService,
});
// Turn management is handled internally by the plugin (not exposed as tools).
@@ -133,6 +145,7 @@ export default {
recordChannelAccount,
extractMentionedUserIds,
buildUserIdToAccountIdMap,
discussionService,
});
registerBeforeModelResolveHook({
@@ -142,6 +155,7 @@ export default {
sessionAllowed,
sessionChannelId,
sessionAccountId,
forceNoReplySessions,
policyState,
DECISION_TTL_MS,
ensurePolicyStateLoaded,
@@ -188,6 +202,7 @@ export default {
ensureTurnOrder,
resolveDiscordUserId,
sendModeratorMessage,
discussionService,
});
// Turn advance: when an agent sends a message, check if it signals end of turn

View File

@@ -7,6 +7,21 @@ type ToolDeps = {
api: OpenClawPluginApi;
baseConfig: DirigentConfig;
pickDefined: (obj: Record<string, unknown>) => Record<string, unknown>;
discussionService?: {
initDiscussion: (params: {
discussionChannelId: string;
originChannelId: string;
initiatorAgentId: string;
initiatorSessionId: string;
discussGuide: string;
}) => Promise<unknown>;
handleCallback: (params: {
channelId: string;
summaryPath: string;
callerAgentId?: string;
callerSessionKey?: string;
}) => Promise<unknown>;
};
};
function parseAccountToken(api: OpenClawPluginApi, accountId?: string): { accountId: string; token: string } | null {
@@ -46,7 +61,7 @@ function roleOrMemberType(v: unknown): number {
}
export function registerDirigentTools(deps: ToolDeps): void {
const { api, baseConfig, pickDefined } = deps;
const { api, baseConfig, pickDefined, discussionService } = deps;
async function executeDiscordAction(action: DiscordControlAction, params: Record<string, unknown>) {
const live = baseConfig as DirigentConfig & {
@@ -68,6 +83,12 @@ export function registerDirigentTools(deps: ToolDeps): void {
const name = String(params.name || "").trim();
if (!guildId || !name) return { content: [{ type: "text", text: "guildId and name are required" }], isError: true };
const callbackChannelId = typeof params.callbackChannelId === "string" ? params.callbackChannelId.trim() : "";
const discussGuide = typeof params.discussGuide === "string" ? params.discussGuide.trim() : "";
if (callbackChannelId && !discussGuide) {
return { content: [{ type: "text", text: "discussGuide is required when callbackChannelId is provided" }], isError: true };
}
const allowedUserIds = Array.isArray(params.allowedUserIds) ? params.allowedUserIds.map(String) : [];
const allowedRoleIds = Array.isArray(params.allowedRoleIds) ? params.allowedRoleIds.map(String) : [];
const allowMask = String(params.allowMask || "1024");
@@ -91,7 +112,18 @@ export function registerDirigentTools(deps: ToolDeps): void {
const resp = await discordRequest(token, "POST", `/guilds/${guildId}/channels`, body);
if (!resp.ok) return { content: [{ type: "text", text: `discord action failed (${resp.status}): ${resp.text}` }], isError: true };
return { content: [{ type: "text", text: JSON.stringify({ ok: true, accountId: selected.accountId, channel: resp.json }, null, 2) }] };
if (callbackChannelId && discussGuide && discussionService) {
await discussionService.initDiscussion({
discussionChannelId: String(resp.json?.id || ""),
originChannelId: callbackChannelId,
initiatorAgentId: String((params.__agentId as string | undefined) || ""),
initiatorSessionId: String((params.__sessionKey as string | undefined) || ""),
discussGuide,
});
}
return { content: [{ type: "text", text: JSON.stringify({ ok: true, accountId: selected.accountId, channel: resp.json, discussionMode: !!callbackChannelId }, null, 2) }] };
}
const channelId = String(params.channelId || "").trim();
@@ -152,9 +184,47 @@ export function registerDirigentTools(deps: ToolDeps): void {
addRoleIds: { type: "array", items: { type: "string" } },
removeTargetIds: { type: "array", items: { type: "string" } },
denyMask: { type: "string" },
callbackChannelId: { type: "string" },
discussGuide: { type: "string" },
},
required: ["action"],
},
handler: async (params) => executeDiscordAction(params.action as DiscordControlAction, params as Record<string, unknown>),
handler: async (params, ctx) => {
const nextParams = {
...(params as Record<string, unknown>),
__agentId: ctx?.agentId,
__sessionKey: ctx?.sessionKey,
};
return executeDiscordAction(params.action as DiscordControlAction, nextParams);
},
});
api.registerTool({
name: "discuss-callback",
description: "Close a discussion channel and notify the origin work channel with the discussion summary path",
inputSchema: {
type: "object",
additionalProperties: false,
properties: {
summaryPath: { type: "string" },
},
required: ["summaryPath"],
},
handler: async (params, ctx) => {
if (!discussionService) {
return { content: [{ type: "text", text: "discussion service is not available" }], isError: true };
}
try {
const result = await discussionService.handleCallback({
channelId: String(ctx?.channelId || ""),
summaryPath: String((params as Record<string, unknown>).summaryPath || ""),
callerAgentId: ctx?.agentId,
callerSessionKey: ctx?.sessionKey,
});
return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] };
} catch (error) {
return { content: [{ type: "text", text: `discuss-callback failed: ${String(error)}` }], isError: true };
}
},
});
}