dev/2026-04-08 #1
226
plugin/core/transport.ts
Normal file
226
plugin/core/transport.ts
Normal file
@@ -0,0 +1,226 @@
|
||||
:
|
||||
import { WebSocketServer, WebSocket, RawData } from "ws";
|
||||
import type { YonexusServerConfig } from "./config.js";
|
||||
import type { ClientRecord } from "./persistence.js";
|
||||
|
||||
export interface ClientConnection {
|
||||
readonly identifier: string | null;
|
||||
readonly ws: WebSocket;
|
||||
readonly connectedAt: number;
|
||||
isAuthenticated: boolean;
|
||||
}
|
||||
|
||||
export interface ServerTransport {
|
||||
readonly isRunning: boolean;
|
||||
readonly connections: ReadonlyMap<string, ClientConnection>;
|
||||
start(): Promise<void>;
|
||||
stop(): Promise<void>;
|
||||
send(identifier: string, message: string): boolean;
|
||||
broadcast(message: string): void;
|
||||
closeConnection(identifier: string, code?: number, reason?: string): boolean;
|
||||
}
|
||||
|
||||
export type MessageHandler = (identifier: string | null, message: string) => void;
|
||||
export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void;
|
||||
export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void;
|
||||
|
||||
export interface ServerTransportOptions {
|
||||
config: YonexusServerConfig;
|
||||
onMessage: MessageHandler;
|
||||
onConnect?: ConnectionHandler;
|
||||
onDisconnect?: DisconnectionHandler;
|
||||
}
|
||||
|
||||
export class YonexusServerTransport implements ServerTransport {
|
||||
private wss: WebSocketServer | null = null;
|
||||
private _connections = new Map<string, ClientConnection>();
|
||||
private tempConnections = new Set<WebSocket>();
|
||||
private options: ServerTransportOptions;
|
||||
private _isRunning = false;
|
||||
|
||||
constructor(options: ServerTransportOptions) {
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
get isRunning(): boolean {
|
||||
return this._isRunning;
|
||||
}
|
||||
|
||||
get connections(): ReadonlyMap<string, ClientConnection> {
|
||||
return this._connections;
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
if (this._isRunning) {
|
||||
throw new Error("Server transport is already running");
|
||||
}
|
||||
|
||||
const { listenHost, listenPort } = this.options.config;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.wss = new WebSocketServer({
|
||||
host: listenHost,
|
||||
port: listenPort
|
||||
});
|
||||
|
||||
this.wss.on("error", (error) => {
|
||||
if (!this._isRunning) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
|
||||
this.wss.on("listening", () => {
|
||||
this._isRunning = true;
|
||||
resolve();
|
||||
});
|
||||
|
||||
this.wss.on("connection", (ws, req) => {
|
||||
this.handleConnection(ws, req);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (!this._isRunning || !this.wss) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Close all connections
|
||||
for (const conn of this._connections.values()) {
|
||||
conn.ws.close(1000, "Server shutting down");
|
||||
}
|
||||
this._connections.clear();
|
||||
|
||||
for (const ws of this.tempConnections) {
|
||||
ws.close(1000, "Server shutting down");
|
||||
}
|
||||
this.tempConnections.clear();
|
||||
|
||||
return new Promise((resolve) => {
|
||||
this.wss!.close(() => {
|
||||
this._isRunning = false;
|
||||
this.wss = null;
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
send(identifier: string, message: string): boolean {
|
||||
const conn = this._connections.get(identifier);
|
||||
if (!conn || !conn.isAuthenticated) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (conn.ws.readyState === WebSocket.OPEN) {
|
||||
conn.ws.send(message);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
broadcast(message: string): void {
|
||||
for (const conn of this._connections.values()) {
|
||||
if (conn.isAuthenticated && conn.ws.readyState === WebSocket.OPEN) {
|
||||
conn.ws.send(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
closeConnection(identifier: string, code = 1000, reason = "Connection closed"): boolean {
|
||||
const conn = this._connections.get(identifier);
|
||||
if (!conn) {
|
||||
return false;
|
||||
}
|
||||
|
||||
conn.ws.close(code, reason);
|
||||
this._connections.delete(identifier);
|
||||
return true;
|
||||
}
|
||||
|
||||
registerConnection(identifier: string, ws: WebSocket): boolean {
|
||||
// Check if already connected
|
||||
if (this._connections.has(identifier)) {
|
||||
// Replace old connection
|
||||
const oldConn = this._connections.get(identifier)!;
|
||||
oldConn.ws.close(1008, "New connection established");
|
||||
}
|
||||
|
||||
// Remove from temp connections if present
|
||||
this.tempConnections.delete(ws);
|
||||
|
||||
const conn: ClientConnection = {
|
||||
identifier,
|
||||
ws,
|
||||
connectedAt: Math.floor(Date.now() / 1000),
|
||||
isAuthenticated: false
|
||||
};
|
||||
|
||||
this._connections.set(identifier, conn);
|
||||
return true;
|
||||
}
|
||||
|
||||
markAuthenticated(identifier: string): boolean {
|
||||
const conn = this._connections.get(identifier);
|
||||
if (!conn) {
|
||||
return false;
|
||||
}
|
||||
conn.isAuthenticated = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
private handleConnection(ws: WebSocket, _req: import("http").IncomingMessage): void {
|
||||
this.tempConnections.add(ws);
|
||||
|
||||
const tempConn: ClientConnection = {
|
||||
identifier: null,
|
||||
ws,
|
||||
connectedAt: Math.floor(Date.now() / 1000),
|
||||
isAuthenticated: false
|
||||
};
|
||||
|
||||
ws.on("message", (data: RawData) => {
|
||||
const message = data.toString("utf8");
|
||||
// Try to get identifier from registered connections
|
||||
let identifier: string | null = null;
|
||||
for (const [id, conn] of this._connections) {
|
||||
if (conn.ws === ws) {
|
||||
identifier = id;
|
||||
break;
|
||||
}
|
||||
}
|
||||
this.options.onMessage(identifier, message);
|
||||
});
|
||||
|
||||
ws.on("close", (code: number, reason: Buffer) => {
|
||||
this.tempConnections.delete(ws);
|
||||
|
||||
// Find and remove from registered connections
|
||||
for (const [id, conn] of this._connections) {
|
||||
if (conn.ws === ws) {
|
||||
this._connections.delete(id);
|
||||
if (this.options.onDisconnect) {
|
||||
this.options.onDisconnect(id, code, reason);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.options.onDisconnect) {
|
||||
this.options.onDisconnect(null, code, reason);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("error", (error: Error) => {
|
||||
// Log error but let close handler clean up
|
||||
console.error("WebSocket error:", error.message);
|
||||
});
|
||||
|
||||
if (this.options.onConnect) {
|
||||
this.options.onConnect(null, ws);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function createServerTransport(options: ServerTransportOptions): ServerTransport {
|
||||
return new YonexusServerTransport(options);
|
||||
}
|
||||
@@ -57,4 +57,15 @@ export function createYonexusServerPlugin(): YonexusServerPluginRuntime {
|
||||
}
|
||||
|
||||
export default createYonexusServerPlugin;
|
||||
export {
|
||||
createServerTransport,
|
||||
YonexusServerTransport,
|
||||
type ServerTransport,
|
||||
type ServerTransportOptions,
|
||||
type ClientConnection,
|
||||
type MessageHandler,
|
||||
type ConnectionHandler,
|
||||
type DisconnectionHandler
|
||||
} from "./core/transport.js";
|
||||
|
||||
export { manifest };
|
||||
|
||||
Reference in New Issue
Block a user