Files
Yonexus.Server/plugin/core/transport.ts
hzhang 59d5b26aff feat: wire rule registry and client-authenticated callback into server runtime
- Add ruleRegistry and onClientAuthenticated options to YonexusServerRuntime
- Dispatch rewritten rule messages (rule::sender::content) to rule registry
- Guard onClientAuthenticated behind promoteToAuthenticated return value
- Fix transport message handler: use tempConn directly when ws is in temp state,
  preventing stale _connections entry from causing promoteToAuthenticated to fail
- Close competing temp connections with same identifier on promotion
- Expose __yonexusServer on globalThis for cross-plugin communication
- Remove auto-failure on admin notification miss; pairing stays pending

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 20:15:03 +01:00

296 lines
8.8 KiB
TypeScript

import { WebSocketServer, WebSocket, RawData } from "ws";
import type { YonexusServerConfig } from "./config.js";
import { safeErrorMessage } from "./logging.js";
export interface ClientConnection {
readonly identifier: string | null;
readonly ws: WebSocket;
readonly connectedAt: number;
isAuthenticated: boolean;
}
/**
* Temporary connection tracking before authentication.
* Connections remain in this state until successfully authenticated.
*/
interface TempConnection {
readonly ws: WebSocket;
readonly connectedAt: number;
/** The identifier claimed during hello, if any */
assignedIdentifier: string | null;
}
export interface ServerTransport {
readonly isRunning: boolean;
readonly connections: ReadonlyMap<string, ClientConnection>;
start(): Promise<void>;
stop(): Promise<void>;
send(identifier: string, message: string): boolean;
sendToConnection(connection: ClientConnection, message: string): boolean;
broadcast(message: string): void;
closeConnection(identifier: string, code?: number, reason?: string): boolean;
/**
* Promote a temp connection to authenticated status.
* This implements the single-identifier-single-active-connection policy:
* - If another authenticated connection exists for this identifier, it is closed
* - The connection is moved from temp to authenticated registry
*/
promoteToAuthenticated(identifier: string, ws: WebSocket): boolean;
/**
* Remove a temp connection without promoting it.
* Called when authentication fails or connection closes before auth.
*/
removeTempConnection(ws: WebSocket): void;
/**
* Assign an identifier to a temp connection during hello processing.
* This does NOT register the connection as authenticated yet.
*/
assignIdentifierToTemp(ws: WebSocket, identifier: string): void;
}
export type MessageHandler = (connection: ClientConnection, message: string) => void;
export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void;
export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void;
export interface ServerTransportOptions {
config: YonexusServerConfig;
onMessage: MessageHandler;
onConnect?: ConnectionHandler;
onDisconnect?: DisconnectionHandler;
}
export class YonexusServerTransport implements ServerTransport {
private wss: WebSocketServer | null = null;
private _connections = new Map<string, ClientConnection>();
private tempConnections = new Map<WebSocket, TempConnection>();
private options: ServerTransportOptions;
private _isRunning = false;
constructor(options: ServerTransportOptions) {
this.options = options;
}
get isRunning(): boolean {
return this._isRunning;
}
get connections(): ReadonlyMap<string, ClientConnection> {
return this._connections;
}
async start(): Promise<void> {
if (this._isRunning) {
throw new Error("Server transport is already running");
}
const { listenHost, listenPort } = this.options.config;
return new Promise((resolve, reject) => {
this.wss = new WebSocketServer({
host: listenHost,
port: listenPort
});
this.wss.on("error", (error) => {
if (!this._isRunning) {
reject(error);
}
});
this.wss.on("listening", () => {
this._isRunning = true;
resolve();
});
this.wss.on("connection", (ws, req) => {
this.handleConnection(ws, req);
});
});
}
async stop(): Promise<void> {
if (!this._isRunning || !this.wss) {
return;
}
// Close all authenticated connections
for (const conn of this._connections.values()) {
conn.ws.close(1000, "Server shutting down");
}
this._connections.clear();
// Close all temp connections
for (const temp of this.tempConnections.values()) {
temp.ws.close(1000, "Server shutting down");
}
this.tempConnections.clear();
return new Promise((resolve) => {
this.wss!.close(() => {
this._isRunning = false;
this.wss = null;
resolve();
});
});
}
send(identifier: string, message: string): boolean {
const conn = this._connections.get(identifier);
if (!conn) {
return false;
}
return this.sendToConnection(conn, message);
}
sendToConnection(connection: ClientConnection, message: string): boolean {
const { ws } = connection;
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
return true;
}
return false;
}
broadcast(message: string): void {
for (const conn of this._connections.values()) {
if (conn.isAuthenticated) {
this.sendToConnection(conn, message);
}
}
}
closeConnection(identifier: string, code = 1000, reason = "Connection closed"): boolean {
const conn = this._connections.get(identifier);
if (!conn) {
return false;
}
conn.ws.close(code, reason);
this._connections.delete(identifier);
return true;
}
promoteToAuthenticated(identifier: string, ws: WebSocket): boolean {
// Verify the connection exists in temp connections
const tempConn = this.tempConnections.get(ws);
if (!tempConn) {
return false;
}
// Check if already have an authenticated connection for this identifier
// If so, close it (single-identifier-single-active-connection policy)
const existingConn = this._connections.get(identifier);
if (existingConn) {
existingConn.ws.close(1008, "Connection replaced by new authenticated session");
this._connections.delete(identifier);
}
// Also close any OTHER temp connections that claimed the same identifier.
// This handles the case where a second hello came in with the same identifier
// while the first was still in the temp/pairing phase.
for (const [otherWs, otherTemp] of this.tempConnections.entries()) {
if (otherWs !== ws && otherTemp.assignedIdentifier === identifier) {
otherWs.close(1008, "Connection replaced by new authenticated session");
this.tempConnections.delete(otherWs);
}
}
// Remove from temp connections
this.tempConnections.delete(ws);
// Register the new authenticated connection
const conn: ClientConnection = {
identifier,
ws,
connectedAt: tempConn.connectedAt,
isAuthenticated: true
};
this._connections.set(identifier, conn);
return true;
}
removeTempConnection(ws: WebSocket): void {
this.tempConnections.delete(ws);
}
assignIdentifierToTemp(ws: WebSocket, identifier: string): void {
const tempConn = this.tempConnections.get(ws);
if (tempConn) {
tempConn.assignedIdentifier = identifier;
}
}
private handleConnection(ws: WebSocket, _req: import("http").IncomingMessage): void {
// Store as temp connection until authenticated
this.tempConnections.set(ws, {
ws,
connectedAt: Math.floor(Date.now() / 1000),
assignedIdentifier: null
});
const tempConn: ClientConnection = {
identifier: null,
ws,
connectedAt: Math.floor(Date.now() / 1000),
isAuthenticated: false
};
ws.on("message", (data: RawData) => {
const message = data.toString("utf8");
// If this ws is still in temp state, use tempConn directly.
// Never fall through to _connections — it may hold a stale entry for the
// same identifier from a previously-authenticated session that hasn't
// finished closing yet, which would cause promoteToAuthenticated to receive
// the wrong WebSocket and silently fail.
if (this.tempConnections.has(ws)) {
this.options.onMessage(tempConn, message);
return;
}
// ws has been promoted — find it in authenticated connections
let connection: ClientConnection = tempConn;
for (const [, conn] of this._connections) {
if (conn.ws === ws) {
connection = conn;
break;
}
}
this.options.onMessage(connection, message);
});
ws.on("close", (code: number, reason: Buffer) => {
this.tempConnections.delete(ws);
// Find and remove from authenticated connections
for (const [id, conn] of this._connections) {
if (conn.ws === ws) {
this._connections.delete(id);
if (this.options.onDisconnect) {
this.options.onDisconnect(id, code, reason);
}
return;
}
}
if (this.options.onDisconnect) {
this.options.onDisconnect(null, code, reason);
}
});
ws.on("error", (error: Error) => {
// Log error but let close handler clean up
console.error("[Yonexus.Server] WebSocket error:", safeErrorMessage(error));
});
if (this.options.onConnect) {
this.options.onConnect(null, ws);
}
}
}
export function createServerTransport(options: ServerTransportOptions): ServerTransport {
return new YonexusServerTransport(options);
}