From 59d5b26aff7cd5659c665991d3fc243dd8e4e324 Mon Sep 17 00:00:00 2001 From: hzhang Date: Fri, 10 Apr 2026 20:15:03 +0100 Subject: [PATCH] feat: wire rule registry and client-authenticated callback into server runtime - Add ruleRegistry and onClientAuthenticated options to YonexusServerRuntime - Dispatch rewritten rule messages (rule::sender::content) to rule registry - Guard onClientAuthenticated behind promoteToAuthenticated return value - Fix transport message handler: use tempConn directly when ws is in temp state, preventing stale _connections entry from causing promoteToAuthenticated to fail - Close competing temp connections with same identifier on promotion - Expose __yonexusServer on globalThis for cross-plugin communication - Remove auto-failure on admin notification miss; pairing stays pending Co-Authored-By: Claude Sonnet 4.6 --- package.json | 3 + plugin/core/runtime.ts | 45 ++++------ plugin/core/transport.ts | 40 ++++++--- plugin/index.ts | 165 +++++++++++++++++++++++++++++++++--- plugin/openclaw.plugin.json | 25 ++++-- scripts/install.mjs | 1 + 6 files changed, 218 insertions(+), 61 deletions(-) diff --git a/package.json b/package.json index 4d13a33..ef8d99a 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,9 @@ "description": "Yonexus.Server OpenClaw plugin scaffold", "type": "module", "main": "dist/plugin/index.js", + "openclaw": { + "extensions": ["./dist/Yonexus.Server/plugin/index.js"] + }, "files": [ "dist", "plugin", diff --git a/plugin/core/runtime.ts b/plugin/core/runtime.ts index 68bf972..8f0bd58 100644 --- a/plugin/core/runtime.ts +++ b/plugin/core/runtime.ts @@ -47,12 +47,15 @@ import { type DiscordNotificationService } from "../notifications/discord.js"; import { safeErrorMessage } from "./logging.js"; +import type { ServerRuleRegistry } from "./rules.js"; export interface YonexusServerRuntimeOptions { config: YonexusServerConfig; store: YonexusServerStore; transport: ServerTransport; notificationService?: DiscordNotificationService; + ruleRegistry?: ServerRuleRegistry; + onClientAuthenticated?: (identifier: string) => void; now?: () => number; sweepIntervalMs?: number; } @@ -447,7 +450,7 @@ export class YonexusServerRuntime { ); record.recentHandshakeAttempts.push(now); - if (record.recentHandshakeAttempts.length > AUTH_MAX_ATTEMPTS_PER_WINDOW) { + if (record.recentHandshakeAttempts.length >= AUTH_MAX_ATTEMPTS_PER_WINDOW) { await this.triggerRePairRequired(connection, record, envelope.requestId, "rate_limited"); return; } @@ -543,7 +546,10 @@ export class YonexusServerRuntime { session.lastActivityAt = now; session.publicKey = publicKey; } - this.options.transport.promoteToAuthenticated(identifier, connection.ws); + const promoted = this.options.transport.promoteToAuthenticated(identifier, connection.ws); + if (promoted) { + this.options.onClientAuthenticated?.(identifier); + } this.options.transport.sendToConnection( { ...connection, identifier }, encodeBuiltin( @@ -613,6 +619,11 @@ export class YonexusServerRuntime { this.pairingService.markNotificationFailed(record); } + // Persist immediately so the pairing code is readable from disk (e.g. via CLI) + if (!reusePending) { + await this.persist(); + } + this.options.transport.sendToConnection( connection, encodeBuiltin( @@ -620,7 +631,7 @@ export class YonexusServerRuntime { { identifier: record.identifier, expiresAt: request.expiresAt, - ttlSeconds: this.pairingService.getRemainingTtl(record), + ttlSeconds: request.ttlSeconds, adminNotification: notified ? "sent" : "failed", codeDelivery: "out_of_band" }, @@ -628,22 +639,8 @@ export class YonexusServerRuntime { ) ) ); - - if (!notified) { - this.options.transport.sendToConnection( - connection, - encodeBuiltin( - buildPairFailed( - { - identifier: record.identifier, - reason: "admin_notification_failed" - }, - { requestId, timestamp: this.now() } - ) - ) - ); - this.pairingService.clearPairingState(record); - } + // Pairing remains pending regardless of notification status. + // The admin can retrieve the pairing code via the server CLI command. } private async handleHeartbeat( @@ -932,16 +929,8 @@ export class YonexusServerRuntime { const parsed = parseRuleMessage(raw); const rewritten = `${parsed.ruleIdentifier}::${senderIdentifier}::${parsed.content}`; - // TODO: Dispatch to registered rules via rule registry - // For now, just log the rewritten message - // this.ruleRegistry.dispatch(rewritten); - - // Update last activity session.lastActivityAt = this.now(); - - // Future: dispatch to rule registry - // eslint-disable-next-line @typescript-eslint/no-unused-vars - void rewritten; + this.options.ruleRegistry?.dispatch(rewritten); } catch (error) { // Malformed rule message this.options.transport.sendToConnection( diff --git a/plugin/core/transport.ts b/plugin/core/transport.ts index e5409b3..a7d01bd 100644 --- a/plugin/core/transport.ts +++ b/plugin/core/transport.ts @@ -186,6 +186,16 @@ export class YonexusServerTransport implements ServerTransport { this._connections.delete(identifier); } + // Also close any OTHER temp connections that claimed the same identifier. + // This handles the case where a second hello came in with the same identifier + // while the first was still in the temp/pairing phase. + for (const [otherWs, otherTemp] of this.tempConnections.entries()) { + if (otherWs !== ws && otherTemp.assignedIdentifier === identifier) { + otherWs.close(1008, "Connection replaced by new authenticated session"); + this.tempConnections.delete(otherWs); + } + } + // Remove from temp connections this.tempConnections.delete(ws); @@ -229,22 +239,24 @@ export class YonexusServerTransport implements ServerTransport { ws.on("message", (data: RawData) => { const message = data.toString("utf8"); - // Try to get identifier from temp connections first, then authenticated connections - let identifier: string | null = null; - const tempData = this.tempConnections.get(ws); - if (tempData) { - identifier = tempData.assignedIdentifier; - } - if (!identifier) { - for (const [id, conn] of this._connections) { - if (conn.ws === ws) { - identifier = id; - break; - } - } + // If this ws is still in temp state, use tempConn directly. + // Never fall through to _connections — it may hold a stale entry for the + // same identifier from a previously-authenticated session that hasn't + // finished closing yet, which would cause promoteToAuthenticated to receive + // the wrong WebSocket and silently fail. + if (this.tempConnections.has(ws)) { + this.options.onMessage(tempConn, message); + return; } - const connection = identifier ? this._connections.get(identifier) ?? tempConn : tempConn; + // ws has been promoted — find it in authenticated connections + let connection: ClientConnection = tempConn; + for (const [, conn] of this._connections) { + if (conn.ws === ws) { + connection = conn; + break; + } + } this.options.onMessage(connection, message); }); diff --git a/plugin/index.ts b/plugin/index.ts index eb2d894..33b02b8 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -30,30 +30,173 @@ export { type YonexusServerStore } from "./core/store.js"; +import path from "node:path"; +import fs from "node:fs"; +import { validateYonexusServerConfig } from "./core/config.js"; +import { createYonexusServerStore } from "./core/store.js"; +import { createServerTransport } from "./core/transport.js"; +import { createYonexusServerRuntime } from "./core/runtime.js"; +import { createServerRuleRegistry } from "./core/rules.js"; +import { encodeRuleMessage } from "../../Yonexus.Protocol/src/index.js"; +import type { ServerPersistenceData } from "./core/persistence.js"; + export interface YonexusServerPluginManifest { readonly name: "Yonexus.Server"; readonly version: string; readonly description: string; } -export interface YonexusServerPluginRuntime { - readonly hooks: readonly []; - readonly commands: readonly []; - readonly tools: readonly []; -} - const manifest: YonexusServerPluginManifest = { name: "Yonexus.Server", version: "0.1.0", description: "Yonexus central hub plugin for cross-instance OpenClaw communication" }; -export function createYonexusServerPlugin(): YonexusServerPluginRuntime { - return { - hooks: [], - commands: [], - tools: [] +let _serverStarted = false; + +export function createYonexusServerPlugin(api: { + rootDir: string; + pluginConfig: unknown; + registrationMode?: string; // "full" (gateway) | "cli-metadata" | "setup-only" | "setup-runtime" + // eslint-disable-next-line @typescript-eslint/no-explicit-any + registerCli?: (registrar: (ctx: { program: any }) => void, opts?: { commands?: string[] }) => void; +}): void { + const stateFilePath = path.join(api.rootDir, "state.json"); + + // Register CLI regardless of whether the gateway is already running. + // The CLI process is a separate invocation that reads from the persisted state file. + api.registerCli?.(({ program }) => { + const group = program + .command("yonexus-server") + .description("Yonexus.Server management"); + + group + .command("pair-code ") + .description("Show the pending pairing code for a device awaiting confirmation") + .action((identifier: string) => { + let raw: ServerPersistenceData; + try { + raw = JSON.parse(fs.readFileSync(stateFilePath, "utf8")) as ServerPersistenceData; + } catch { + console.error("Error: could not read server state. Is the gateway running?"); + process.exit(1); + } + + const client = raw.clients?.find((c) => c.identifier === identifier); + if (!client) { + console.error(`Error: identifier "${identifier}" not found in server registry.`); + process.exit(1); + } + if (client.pairingStatus !== "pending" || !client.pairingCode) { + const status = client.pairingStatus; + console.error(`Error: no pending pairing for "${identifier}" (status: ${status}).`); + process.exit(1); + } + if (client.pairingExpiresAt && Math.floor(Date.now() / 1000) > client.pairingExpiresAt) { + console.error(`Error: pairing for "${identifier}" has expired.`); + process.exit(1); + } + + const expiresIn = client.pairingExpiresAt + ? Math.max(0, client.pairingExpiresAt - Math.floor(Date.now() / 1000)) + : 0; + const mm = String(Math.floor(expiresIn / 60)).padStart(2, "0"); + const ss = String(expiresIn % 60).padStart(2, "0"); + + console.log(`Identifier : ${client.identifier}`); + console.log(`Pairing code : ${client.pairingCode}`); + console.log(`Expires in : ${mm}m ${ss}s`); + }); + + group + .command("list-pending") + .description("List all identifiers with a pending pairing code") + .action(() => { + let raw: ServerPersistenceData; + try { + raw = JSON.parse(fs.readFileSync(stateFilePath, "utf8")) as ServerPersistenceData; + } catch { + console.error("Error: could not read server state. Is the gateway running?"); + process.exit(1); + } + + const now = Math.floor(Date.now() / 1000); + const pending = (raw.clients ?? []).filter( + (c) => c.pairingStatus === "pending" && c.pairingCode && (!c.pairingExpiresAt || now <= c.pairingExpiresAt) + ); + + if (pending.length === 0) { + console.log("No pending pairings."); + return; + } + for (const c of pending) { + const expiresIn = c.pairingExpiresAt ? Math.max(0, c.pairingExpiresAt - now) : 0; + const mm = String(Math.floor(expiresIn / 60)).padStart(2, "0"); + const ss = String(expiresIn % 60).padStart(2, "0"); + console.log(` ${c.identifier} (expires in ${mm}m ${ss}s)`); + } + }); + }, { commands: ["yonexus-server"] }); + + if (_serverStarted) return; + _serverStarted = true; + + const config = validateYonexusServerConfig(api.pluginConfig); + const store = createYonexusServerStore(stateFilePath); + + const ruleRegistry = createServerRuleRegistry(); + const onClientAuthenticatedCallbacks: Array<(identifier: string) => void> = []; + + let runtimeRef: ReturnType | null = null; + const transport = createServerTransport({ + config, + onMessage: (conn, msg) => { + runtimeRef?.handleMessage(conn, msg).catch((err: unknown) => { + console.error("[yonexus-server] message handler error:", err); + }); + }, + onDisconnect: (identifier) => { + if (identifier && runtimeRef) { + runtimeRef.handleDisconnect(identifier); + } + } + }); + + // Expose registry and helpers for other plugins loaded in the same process + (globalThis as Record)["__yonexusServer"] = { + ruleRegistry, + sendRule: (identifier: string, ruleId: string, content: string): boolean => + transport.send(identifier, encodeRuleMessage(ruleId, content)), + onClientAuthenticated: onClientAuthenticatedCallbacks }; + + const runtime = createYonexusServerRuntime({ + config, + store, + transport, + ruleRegistry, + onClientAuthenticated: (identifier) => { + for (const cb of onClientAuthenticatedCallbacks) cb(identifier); + } + }); + runtimeRef = runtime; + + const shutdown = (): void => { + runtime.stop().catch((err: unknown) => { + console.error("[yonexus-server] shutdown error:", err); + }); + }; + process.once("SIGTERM", shutdown); + process.once("SIGINT", shutdown); + + runtime.start().catch((err: unknown) => { + // EADDRINUSE means the gateway is already running (e.g. this is a CLI invocation). + // Any other error is a real problem worth logging. + const code = (err as NodeJS.ErrnoException | undefined)?.code; + if (code !== "EADDRINUSE") { + console.error("[yonexus-server] failed to start:", err); + } + }); } export default createYonexusServerPlugin; diff --git a/plugin/openclaw.plugin.json b/plugin/openclaw.plugin.json index def1da4..3cef357 100644 --- a/plugin/openclaw.plugin.json +++ b/plugin/openclaw.plugin.json @@ -1,15 +1,24 @@ { + "id": "yonexus-server", "name": "Yonexus.Server", "version": "0.1.0", "description": "Yonexus central hub plugin for cross-instance OpenClaw communication", - "entry": "dist/plugin/index.js", + "entry": "./dist/Yonexus.Server/plugin/index.js", "permissions": [], - "config": { - "followerIdentifiers": [], - "notifyBotToken": "", - "adminUserId": "", - "listenHost": "0.0.0.0", - "listenPort": 8787, - "publicWsUrl": "" + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": { + "followerIdentifiers": { + "type": "array", + "items": { "type": "string" } + }, + "notifyBotToken": { "type": "string" }, + "adminUserId": { "type": "string" }, + "listenHost": { "type": "string" }, + "listenPort": { "type": "number" }, + "publicWsUrl": { "type": "string" } + }, + "required": ["followerIdentifiers", "notifyBotToken", "adminUserId", "listenPort"] } } diff --git a/scripts/install.mjs b/scripts/install.mjs index ff3f7dd..9fe3a18 100644 --- a/scripts/install.mjs +++ b/scripts/install.mjs @@ -29,6 +29,7 @@ if (mode === "install") { fs.rmSync(targetDir, { recursive: true, force: true }); fs.cpSync(sourceDist, path.join(targetDir, "dist"), { recursive: true }); fs.copyFileSync(path.join(repoRoot, "plugin", "openclaw.plugin.json"), path.join(targetDir, "openclaw.plugin.json")); + fs.copyFileSync(path.join(repoRoot, "package.json"), path.join(targetDir, "package.json")); console.log(`Installed ${pluginName} to ${targetDir}`); process.exit(0); }