Files
Yonexus.Server/plugin/core/transport.ts

236 lines
6.3 KiB
TypeScript

import { WebSocketServer, WebSocket, RawData } from "ws";
import type { YonexusServerConfig } from "./config.js";
import type { ClientRecord } from "./persistence.js";
export interface ClientConnection {
readonly identifier: string | null;
readonly ws: WebSocket;
readonly connectedAt: number;
isAuthenticated: boolean;
}
export interface ServerTransport {
readonly isRunning: boolean;
readonly connections: ReadonlyMap<string, ClientConnection>;
start(): Promise<void>;
stop(): Promise<void>;
send(identifier: string, message: string): boolean;
sendToConnection(connection: ClientConnection, message: string): boolean;
broadcast(message: string): void;
closeConnection(identifier: string, code?: number, reason?: string): boolean;
registerConnection(identifier: string, ws: WebSocket): boolean;
markAuthenticated(identifier: string): boolean;
}
export type MessageHandler = (connection: ClientConnection, message: string) => void;
export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void;
export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void;
export interface ServerTransportOptions {
config: YonexusServerConfig;
onMessage: MessageHandler;
onConnect?: ConnectionHandler;
onDisconnect?: DisconnectionHandler;
}
export class YonexusServerTransport implements ServerTransport {
private wss: WebSocketServer | null = null;
private _connections = new Map<string, ClientConnection>();
private tempConnections = new Set<WebSocket>();
private options: ServerTransportOptions;
private _isRunning = false;
constructor(options: ServerTransportOptions) {
this.options = options;
}
get isRunning(): boolean {
return this._isRunning;
}
get connections(): ReadonlyMap<string, ClientConnection> {
return this._connections;
}
async start(): Promise<void> {
if (this._isRunning) {
throw new Error("Server transport is already running");
}
const { listenHost, listenPort } = this.options.config;
return new Promise((resolve, reject) => {
this.wss = new WebSocketServer({
host: listenHost,
port: listenPort
});
this.wss.on("error", (error) => {
if (!this._isRunning) {
reject(error);
}
});
this.wss.on("listening", () => {
this._isRunning = true;
resolve();
});
this.wss.on("connection", (ws, req) => {
this.handleConnection(ws, req);
});
});
}
async stop(): Promise<void> {
if (!this._isRunning || !this.wss) {
return;
}
// Close all connections
for (const conn of this._connections.values()) {
conn.ws.close(1000, "Server shutting down");
}
this._connections.clear();
for (const ws of this.tempConnections) {
ws.close(1000, "Server shutting down");
}
this.tempConnections.clear();
return new Promise((resolve) => {
this.wss!.close(() => {
this._isRunning = false;
this.wss = null;
resolve();
});
});
}
send(identifier: string, message: string): boolean {
const conn = this._connections.get(identifier);
if (!conn) {
return false;
}
return this.sendToConnection(conn, message);
}
sendToConnection(connection: ClientConnection, message: string): boolean {
const { ws } = connection;
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
return true;
}
return false;
}
broadcast(message: string): void {
for (const conn of this._connections.values()) {
if (conn.isAuthenticated) {
this.sendToConnection(conn, message);
}
}
}
closeConnection(identifier: string, code = 1000, reason = "Connection closed"): boolean {
const conn = this._connections.get(identifier);
if (!conn) {
return false;
}
conn.ws.close(code, reason);
this._connections.delete(identifier);
return true;
}
registerConnection(identifier: string, ws: WebSocket): boolean {
// Check if already connected
if (this._connections.has(identifier)) {
// Replace old connection
const oldConn = this._connections.get(identifier)!;
oldConn.ws.close(1008, "New connection established");
}
// Remove from temp connections if present
this.tempConnections.delete(ws);
const conn: ClientConnection = {
identifier,
ws,
connectedAt: Math.floor(Date.now() / 1000),
isAuthenticated: false
};
this._connections.set(identifier, conn);
return true;
}
markAuthenticated(identifier: string): boolean {
const conn = this._connections.get(identifier);
if (!conn) {
return false;
}
conn.isAuthenticated = true;
return true;
}
private handleConnection(ws: WebSocket, _req: import("http").IncomingMessage): void {
this.tempConnections.add(ws);
const tempConn: ClientConnection = {
identifier: null,
ws,
connectedAt: Math.floor(Date.now() / 1000),
isAuthenticated: false
};
ws.on("message", (data: RawData) => {
const message = data.toString("utf8");
// Try to get identifier from registered connections
let identifier: string | null = null;
for (const [id, conn] of this._connections) {
if (conn.ws === ws) {
identifier = id;
break;
}
}
const connection = identifier ? this._connections.get(identifier) ?? tempConn : tempConn;
this.options.onMessage(connection, message);
});
ws.on("close", (code: number, reason: Buffer) => {
this.tempConnections.delete(ws);
// Find and remove from registered connections
for (const [id, conn] of this._connections) {
if (conn.ws === ws) {
this._connections.delete(id);
if (this.options.onDisconnect) {
this.options.onDisconnect(id, code, reason);
}
return;
}
}
if (this.options.onDisconnect) {
this.options.onDisconnect(null, code, reason);
}
});
ws.on("error", (error: Error) => {
// Log error but let close handler clean up
console.error("WebSocket error:", error.message);
});
if (this.options.onConnect) {
this.options.onConnect(null, ws);
}
}
}
export function createServerTransport(options: ServerTransportOptions): ServerTransport {
return new YonexusServerTransport(options);
}