import { WebSocketServer, WebSocket, RawData } from "ws"; import type { YonexusServerConfig } from "./config.js"; import { safeErrorMessage } from "./logging.js"; export interface ClientConnection { readonly identifier: string | null; readonly ws: WebSocket; readonly connectedAt: number; isAuthenticated: boolean; } /** * Temporary connection tracking before authentication. * Connections remain in this state until successfully authenticated. */ interface TempConnection { readonly ws: WebSocket; readonly connectedAt: number; /** The identifier claimed during hello, if any */ assignedIdentifier: string | null; } export interface ServerTransport { readonly isRunning: boolean; readonly connections: ReadonlyMap; start(): Promise; stop(): Promise; send(identifier: string, message: string): boolean; sendToConnection(connection: ClientConnection, message: string): boolean; broadcast(message: string): void; closeConnection(identifier: string, code?: number, reason?: string): boolean; /** * Promote a temp connection to authenticated status. * This implements the single-identifier-single-active-connection policy: * - If another authenticated connection exists for this identifier, it is closed * - The connection is moved from temp to authenticated registry */ promoteToAuthenticated(identifier: string, ws: WebSocket): boolean; /** * Remove a temp connection without promoting it. * Called when authentication fails or connection closes before auth. */ removeTempConnection(ws: WebSocket): void; /** * Assign an identifier to a temp connection during hello processing. * This does NOT register the connection as authenticated yet. */ assignIdentifierToTemp(ws: WebSocket, identifier: string): void; } export type MessageHandler = (connection: ClientConnection, message: string) => void; export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void; export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void; export interface ServerTransportOptions { config: YonexusServerConfig; onMessage: MessageHandler; onConnect?: ConnectionHandler; onDisconnect?: DisconnectionHandler; } export class YonexusServerTransport implements ServerTransport { private wss: WebSocketServer | null = null; private _connections = new Map(); private tempConnections = new Map(); private options: ServerTransportOptions; private _isRunning = false; constructor(options: ServerTransportOptions) { this.options = options; } get isRunning(): boolean { return this._isRunning; } get connections(): ReadonlyMap { return this._connections; } async start(): Promise { if (this._isRunning) { throw new Error("Server transport is already running"); } const { listenHost, listenPort } = this.options.config; return new Promise((resolve, reject) => { this.wss = new WebSocketServer({ host: listenHost, port: listenPort }); this.wss.on("error", (error) => { if (!this._isRunning) { reject(error); } }); this.wss.on("listening", () => { this._isRunning = true; resolve(); }); this.wss.on("connection", (ws, req) => { this.handleConnection(ws, req); }); }); } async stop(): Promise { if (!this._isRunning || !this.wss) { return; } // Close all authenticated connections for (const conn of this._connections.values()) { conn.ws.close(1000, "Server shutting down"); } this._connections.clear(); // Close all temp connections for (const temp of this.tempConnections.values()) { temp.ws.close(1000, "Server shutting down"); } this.tempConnections.clear(); return new Promise((resolve) => { this.wss!.close(() => { this._isRunning = false; this.wss = null; resolve(); }); }); } send(identifier: string, message: string): boolean { const conn = this._connections.get(identifier); if (!conn) { return false; } return this.sendToConnection(conn, message); } sendToConnection(connection: ClientConnection, message: string): boolean { const { ws } = connection; if (ws.readyState === WebSocket.OPEN) { ws.send(message); return true; } return false; } broadcast(message: string): void { for (const conn of this._connections.values()) { if (conn.isAuthenticated) { this.sendToConnection(conn, message); } } } closeConnection(identifier: string, code = 1000, reason = "Connection closed"): boolean { const conn = this._connections.get(identifier); if (!conn) { return false; } conn.ws.close(code, reason); this._connections.delete(identifier); return true; } promoteToAuthenticated(identifier: string, ws: WebSocket): boolean { // Verify the connection exists in temp connections const tempConn = this.tempConnections.get(ws); if (!tempConn) { return false; } // Check if already have an authenticated connection for this identifier // If so, close it (single-identifier-single-active-connection policy) const existingConn = this._connections.get(identifier); if (existingConn) { existingConn.ws.close(1008, "Connection replaced by new authenticated session"); this._connections.delete(identifier); } // Remove from temp connections this.tempConnections.delete(ws); // Register the new authenticated connection const conn: ClientConnection = { identifier, ws, connectedAt: tempConn.connectedAt, isAuthenticated: true }; this._connections.set(identifier, conn); return true; } removeTempConnection(ws: WebSocket): void { this.tempConnections.delete(ws); } assignIdentifierToTemp(ws: WebSocket, identifier: string): void { const tempConn = this.tempConnections.get(ws); if (tempConn) { tempConn.assignedIdentifier = identifier; } } private handleConnection(ws: WebSocket, _req: import("http").IncomingMessage): void { // Store as temp connection until authenticated this.tempConnections.set(ws, { ws, connectedAt: Math.floor(Date.now() / 1000), assignedIdentifier: null }); const tempConn: ClientConnection = { identifier: null, ws, connectedAt: Math.floor(Date.now() / 1000), isAuthenticated: false }; ws.on("message", (data: RawData) => { const message = data.toString("utf8"); // Try to get identifier from temp connections first, then authenticated connections let identifier: string | null = null; const tempData = this.tempConnections.get(ws); if (tempData) { identifier = tempData.assignedIdentifier; } if (!identifier) { for (const [id, conn] of this._connections) { if (conn.ws === ws) { identifier = id; break; } } } const connection = identifier ? this._connections.get(identifier) ?? tempConn : tempConn; this.options.onMessage(connection, message); }); ws.on("close", (code: number, reason: Buffer) => { this.tempConnections.delete(ws); // Find and remove from authenticated connections for (const [id, conn] of this._connections) { if (conn.ws === ws) { this._connections.delete(id); if (this.options.onDisconnect) { this.options.onDisconnect(id, code, reason); } return; } } if (this.options.onDisconnect) { this.options.onDisconnect(null, code, reason); } }); ws.on("error", (error: Error) => { // Log error but let close handler clean up console.error("[Yonexus.Server] WebSocket error:", safeErrorMessage(error)); }); if (this.options.onConnect) { this.options.onConnect(null, ws); } } } export function createServerTransport(options: ServerTransportOptions): ServerTransport { return new YonexusServerTransport(options); }