From b44a4cae66255edf96f1e5c90ae67422b5262ec0 Mon Sep 17 00:00:00 2001 From: nav Date: Wed, 8 Apr 2026 21:05:03 +0000 Subject: [PATCH] Add server WebSocket transport --- plugin/core/transport.ts | 226 +++++++++++++++++++++++++++++++++++++++ plugin/index.ts | 11 ++ 2 files changed, 237 insertions(+) create mode 100644 plugin/core/transport.ts diff --git a/plugin/core/transport.ts b/plugin/core/transport.ts new file mode 100644 index 0000000..2f14b05 --- /dev/null +++ b/plugin/core/transport.ts @@ -0,0 +1,226 @@ +: +import { WebSocketServer, WebSocket, RawData } from "ws"; +import type { YonexusServerConfig } from "./config.js"; +import type { ClientRecord } from "./persistence.js"; + +export interface ClientConnection { + readonly identifier: string | null; + readonly ws: WebSocket; + readonly connectedAt: number; + isAuthenticated: boolean; +} + +export interface ServerTransport { + readonly isRunning: boolean; + readonly connections: ReadonlyMap; + start(): Promise; + stop(): Promise; + send(identifier: string, message: string): boolean; + broadcast(message: string): void; + closeConnection(identifier: string, code?: number, reason?: string): boolean; +} + +export type MessageHandler = (identifier: string | null, message: string) => void; +export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void; +export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void; + +export interface ServerTransportOptions { + config: YonexusServerConfig; + onMessage: MessageHandler; + onConnect?: ConnectionHandler; + onDisconnect?: DisconnectionHandler; +} + +export class YonexusServerTransport implements ServerTransport { + private wss: WebSocketServer | null = null; + private _connections = new Map(); + private tempConnections = new Set(); + private options: ServerTransportOptions; + private _isRunning = false; + + constructor(options: ServerTransportOptions) { + this.options = options; + } + + get isRunning(): boolean { + return this._isRunning; + } + + get connections(): ReadonlyMap { + return this._connections; + } + + async start(): Promise { + if (this._isRunning) { + throw new Error("Server transport is already running"); + } + + const { listenHost, listenPort } = this.options.config; + + return new Promise((resolve, reject) => { + this.wss = new WebSocketServer({ + host: listenHost, + port: listenPort + }); + + this.wss.on("error", (error) => { + if (!this._isRunning) { + reject(error); + } + }); + + this.wss.on("listening", () => { + this._isRunning = true; + resolve(); + }); + + this.wss.on("connection", (ws, req) => { + this.handleConnection(ws, req); + }); + }); + } + + async stop(): Promise { + if (!this._isRunning || !this.wss) { + return; + } + + // Close all connections + for (const conn of this._connections.values()) { + conn.ws.close(1000, "Server shutting down"); + } + this._connections.clear(); + + for (const ws of this.tempConnections) { + ws.close(1000, "Server shutting down"); + } + this.tempConnections.clear(); + + return new Promise((resolve) => { + this.wss!.close(() => { + this._isRunning = false; + this.wss = null; + resolve(); + }); + }); + } + + send(identifier: string, message: string): boolean { + const conn = this._connections.get(identifier); + if (!conn || !conn.isAuthenticated) { + return false; + } + + if (conn.ws.readyState === WebSocket.OPEN) { + conn.ws.send(message); + return true; + } + return false; + } + + broadcast(message: string): void { + for (const conn of this._connections.values()) { + if (conn.isAuthenticated && conn.ws.readyState === WebSocket.OPEN) { + conn.ws.send(message); + } + } + } + + closeConnection(identifier: string, code = 1000, reason = "Connection closed"): boolean { + const conn = this._connections.get(identifier); + if (!conn) { + return false; + } + + conn.ws.close(code, reason); + this._connections.delete(identifier); + return true; + } + + registerConnection(identifier: string, ws: WebSocket): boolean { + // Check if already connected + if (this._connections.has(identifier)) { + // Replace old connection + const oldConn = this._connections.get(identifier)!; + oldConn.ws.close(1008, "New connection established"); + } + + // Remove from temp connections if present + this.tempConnections.delete(ws); + + const conn: ClientConnection = { + identifier, + ws, + connectedAt: Math.floor(Date.now() / 1000), + isAuthenticated: false + }; + + this._connections.set(identifier, conn); + return true; + } + + markAuthenticated(identifier: string): boolean { + const conn = this._connections.get(identifier); + if (!conn) { + return false; + } + conn.isAuthenticated = true; + return true; + } + + private handleConnection(ws: WebSocket, _req: import("http").IncomingMessage): void { + this.tempConnections.add(ws); + + const tempConn: ClientConnection = { + identifier: null, + ws, + connectedAt: Math.floor(Date.now() / 1000), + isAuthenticated: false + }; + + ws.on("message", (data: RawData) => { + const message = data.toString("utf8"); + // Try to get identifier from registered connections + let identifier: string | null = null; + for (const [id, conn] of this._connections) { + if (conn.ws === ws) { + identifier = id; + break; + } + } + this.options.onMessage(identifier, message); + }); + + ws.on("close", (code: number, reason: Buffer) => { + this.tempConnections.delete(ws); + + // Find and remove from registered connections + for (const [id, conn] of this._connections) { + if (conn.ws === ws) { + this._connections.delete(id); + if (this.options.onDisconnect) { + this.options.onDisconnect(id, code, reason); + } + return; + } + } + + if (this.options.onDisconnect) { + this.options.onDisconnect(null, code, reason); + } + }); + + ws.on("error", (error: Error) => { + // Log error but let close handler clean up + console.error("WebSocket error:", error.message); + }); + + if (this.options.onConnect) { + this.options.onConnect(null, ws); + } + } +} + +export function createServerTransport(options: ServerTransportOptions): ServerTransport { + return new YonexusServerTransport(options); +} diff --git a/plugin/index.ts b/plugin/index.ts index 36e1261..2970f34 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -57,4 +57,15 @@ export function createYonexusServerPlugin(): YonexusServerPluginRuntime { } export default createYonexusServerPlugin; +export { + createServerTransport, + YonexusServerTransport, + type ServerTransport, + type ServerTransportOptions, + type ClientConnection, + type MessageHandler, + type ConnectionHandler, + type DisconnectionHandler +} from "./core/transport.js"; + export { manifest };