import { WebSocketServer, WebSocket, RawData } from "ws"; import type { YonexusServerConfig } from "./config.js"; import { safeErrorMessage } from "./logging.js"; export interface ClientConnection { readonly identifier: string | null; readonly ws: WebSocket; readonly connectedAt: number; isAuthenticated: boolean; } export interface ServerTransport { readonly isRunning: boolean; readonly connections: ReadonlyMap; start(): Promise; stop(): Promise; send(identifier: string, message: string): boolean; sendToConnection(connection: ClientConnection, message: string): boolean; broadcast(message: string): void; closeConnection(identifier: string, code?: number, reason?: string): boolean; registerConnection(identifier: string, ws: WebSocket): boolean; markAuthenticated(identifier: string): boolean; } export type MessageHandler = (connection: ClientConnection, message: string) => void; export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void; export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void; export interface ServerTransportOptions { config: YonexusServerConfig; onMessage: MessageHandler; onConnect?: ConnectionHandler; onDisconnect?: DisconnectionHandler; } export class YonexusServerTransport implements ServerTransport { private wss: WebSocketServer | null = null; private _connections = new Map(); private tempConnections = new Set(); private options: ServerTransportOptions; private _isRunning = false; constructor(options: ServerTransportOptions) { this.options = options; } get isRunning(): boolean { return this._isRunning; } get connections(): ReadonlyMap { return this._connections; } async start(): Promise { if (this._isRunning) { throw new Error("Server transport is already running"); } const { listenHost, listenPort } = this.options.config; return new Promise((resolve, reject) => { this.wss = new WebSocketServer({ host: listenHost, port: listenPort }); this.wss.on("error", (error) => { if (!this._isRunning) { reject(error); } }); this.wss.on("listening", () => { this._isRunning = true; resolve(); }); this.wss.on("connection", (ws, req) => { this.handleConnection(ws, req); }); }); } async stop(): Promise { if (!this._isRunning || !this.wss) { return; } // Close all connections for (const conn of this._connections.values()) { conn.ws.close(1000, "Server shutting down"); } this._connections.clear(); for (const ws of this.tempConnections) { ws.close(1000, "Server shutting down"); } this.tempConnections.clear(); return new Promise((resolve) => { this.wss!.close(() => { this._isRunning = false; this.wss = null; resolve(); }); }); } send(identifier: string, message: string): boolean { const conn = this._connections.get(identifier); if (!conn) { return false; } return this.sendToConnection(conn, message); } sendToConnection(connection: ClientConnection, message: string): boolean { const { ws } = connection; if (ws.readyState === WebSocket.OPEN) { ws.send(message); return true; } return false; } broadcast(message: string): void { for (const conn of this._connections.values()) { if (conn.isAuthenticated) { this.sendToConnection(conn, message); } } } closeConnection(identifier: string, code = 1000, reason = "Connection closed"): boolean { const conn = this._connections.get(identifier); if (!conn) { return false; } conn.ws.close(code, reason); this._connections.delete(identifier); return true; } registerConnection(identifier: string, ws: WebSocket): boolean { // Check if already connected if (this._connections.has(identifier)) { // Replace old connection const oldConn = this._connections.get(identifier)!; oldConn.ws.close(1008, "New connection established"); } // Remove from temp connections if present this.tempConnections.delete(ws); const conn: ClientConnection = { identifier, ws, connectedAt: Math.floor(Date.now() / 1000), isAuthenticated: false }; this._connections.set(identifier, conn); return true; } markAuthenticated(identifier: string): boolean { const conn = this._connections.get(identifier); if (!conn) { return false; } conn.isAuthenticated = true; return true; } private handleConnection(ws: WebSocket, _req: import("http").IncomingMessage): void { this.tempConnections.add(ws); const tempConn: ClientConnection = { identifier: null, ws, connectedAt: Math.floor(Date.now() / 1000), isAuthenticated: false }; ws.on("message", (data: RawData) => { const message = data.toString("utf8"); // Try to get identifier from registered connections let identifier: string | null = null; for (const [id, conn] of this._connections) { if (conn.ws === ws) { identifier = id; break; } } const connection = identifier ? this._connections.get(identifier) ?? tempConn : tempConn; this.options.onMessage(connection, message); }); ws.on("close", (code: number, reason: Buffer) => { this.tempConnections.delete(ws); // Find and remove from registered connections for (const [id, conn] of this._connections) { if (conn.ws === ws) { this._connections.delete(id); if (this.options.onDisconnect) { this.options.onDisconnect(id, code, reason); } return; } } if (this.options.onDisconnect) { this.options.onDisconnect(null, code, reason); } }); ws.on("error", (error: Error) => { // Log error but let close handler clean up console.error("[Yonexus.Server] WebSocket error:", safeErrorMessage(error)); }); if (this.options.onConnect) { this.options.onConnect(null, ws); } } } export function createServerTransport(options: ServerTransportOptions): ServerTransport { return new YonexusServerTransport(options); }