From f7c7531385955fb957964705e22e85f5d55c6dba Mon Sep 17 00:00:00 2001 From: nav Date: Wed, 8 Apr 2026 21:13:16 +0000 Subject: [PATCH] Add server runtime and hello handshake --- package-lock.json | 53 ++++++++++ package.json | 3 + plugin/core/config.ts | 14 +-- plugin/core/runtime.ts | 218 +++++++++++++++++++++++++++++++++++++++ plugin/core/transport.ts | 33 +++--- plugin/index.ts | 6 ++ 6 files changed, 308 insertions(+), 19 deletions(-) create mode 100644 package-lock.json create mode 100644 plugin/core/runtime.ts diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..c3552f5 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,53 @@ +{ + "name": "yonexus-server", + "version": "0.1.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "yonexus-server", + "version": "0.1.0", + "dependencies": { + "ws": "^8.18.0" + }, + "devDependencies": { + "typescript": "^5.6.3" + } + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + }, + "node_modules/ws": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/package.json b/package.json index 20dc0cc..10f9115 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,9 @@ "clean": "rm -rf dist", "check": "tsc -p tsconfig.json --noEmit" }, + "dependencies": { + "ws": "^8.18.0" + }, "devDependencies": { "typescript": "^5.6.3" } diff --git a/plugin/core/config.ts b/plugin/core/config.ts index 7f352ca..0bb78cc 100644 --- a/plugin/core/config.ts +++ b/plugin/core/config.ts @@ -48,10 +48,10 @@ function isValidWsUrl(value: string): boolean { } export function validateYonexusServerConfig(raw: unknown): YonexusServerConfig { - const source = raw as Record | null; + const source = (raw && typeof raw === "object" ? raw : {}) as Record; const issues: string[] = []; - const rawIdentifiers = source?.followerIdentifiers; + const rawIdentifiers = source.followerIdentifiers; const followerIdentifiers = Array.isArray(rawIdentifiers) ? rawIdentifiers .filter((value): value is string => typeof value === "string") @@ -67,23 +67,23 @@ export function validateYonexusServerConfig(raw: unknown): YonexusServerConfig { issues.push("followerIdentifiers must not contain duplicates"); } - const notifyBotToken = source?.notifyBotToken; + const notifyBotToken = source.notifyBotToken; if (!isNonEmptyString(notifyBotToken)) { issues.push("notifyBotToken is required"); } - const adminUserId = source?.adminUserId; + const adminUserId = source.adminUserId; if (!isNonEmptyString(adminUserId)) { issues.push("adminUserId is required"); } - const listenPort = source?.listenPort; + const listenPort = source.listenPort; if (!isValidPort(listenPort)) { issues.push("listenPort must be an integer between 1 and 65535"); } - const listenHost = normalizeOptionalString(source?.listenHost) ?? "0.0.0.0"; - const publicWsUrl = normalizeOptionalString(source?.publicWsUrl); + const listenHost = normalizeOptionalString(source.listenHost) ?? "0.0.0.0"; + const publicWsUrl = normalizeOptionalString(source.publicWsUrl); if (publicWsUrl !== undefined && !isValidWsUrl(publicWsUrl)) { issues.push("publicWsUrl must be a valid ws:// or wss:// URL when provided"); diff --git a/plugin/core/runtime.ts b/plugin/core/runtime.ts new file mode 100644 index 0000000..c6f212c --- /dev/null +++ b/plugin/core/runtime.ts @@ -0,0 +1,218 @@ +import { + type BuiltinEnvelope, + type HelloPayload, + YONEXUS_PROTOCOL_VERSION, + buildError, + buildHelloAck, + decodeBuiltin, + encodeBuiltin, + isBuiltinMessage +} from "../../../Yonexus.Protocol/src/index.js"; +import type { YonexusServerConfig } from "./config.js"; +import { + canAuthenticate, + createClientRecord, + hasPendingPairing, + isPairingExpired, + type ClientRecord, + type ServerRegistry +} from "./persistence.js"; +import type { YonexusServerStore } from "./store.js"; +import { type ClientConnection, type ServerTransport } from "./transport.js"; + +export interface YonexusServerRuntimeOptions { + config: YonexusServerConfig; + store: YonexusServerStore; + transport: ServerTransport; + now?: () => number; +} + +export interface ServerLifecycleState { + readonly isStarted: boolean; + readonly registry: ServerRegistry; +} + +export class YonexusServerRuntime { + private readonly options: YonexusServerRuntimeOptions; + private readonly now: () => number; + private readonly registry: ServerRegistry; + private started = false; + constructor(options: YonexusServerRuntimeOptions) { + this.options = options; + this.now = options.now ?? (() => Math.floor(Date.now() / 1000)); + this.registry = { + clients: new Map(), + sessions: new Map() + }; + } + + get state(): ServerLifecycleState { + return { + isStarted: this.started, + registry: this.registry + }; + } + + async start(): Promise { + if (this.started) { + return; + } + + const persisted = await this.options.store.load(); + for (const record of persisted.clients.values()) { + this.registry.clients.set(record.identifier, record); + } + + for (const identifier of this.options.config.followerIdentifiers) { + if (!this.registry.clients.has(identifier)) { + this.registry.clients.set(identifier, createClientRecord(identifier)); + } + } + + await this.options.transport.start(); + this.started = true; + } + + async stop(): Promise { + if (!this.started) { + return; + } + + await this.persist(); + this.registry.sessions.clear(); + await this.options.transport.stop(); + this.started = false; + } + + handleDisconnect(identifier: string | null): void { + if (!identifier) { + return; + } + + const existing = this.registry.sessions.get(identifier); + if (!existing) { + return; + } + + const record = this.registry.clients.get(identifier); + if (record) { + record.status = "offline"; + record.updatedAt = this.now(); + } + + this.registry.sessions.delete(identifier); + } + + async handleMessage(connection: ClientConnection, raw: string): Promise { + if (!isBuiltinMessage(raw)) { + return; + } + + const envelope = decodeBuiltin(raw); + + if (envelope.type === "hello") { + await this.handleHello(connection, envelope as BuiltinEnvelope<"hello", HelloPayload>); + } + } + + private async handleHello( + connection: ClientConnection, + envelope: BuiltinEnvelope<"hello", HelloPayload> + ): Promise { + const payload = envelope.payload; + if (!payload) { + this.options.transport.sendToConnection( + connection, + encodeBuiltin(buildError({ code: "MALFORMED_MESSAGE", message: "hello payload is required" }, { timestamp: this.now() })) + ); + return; + } + + const helloIdentifier = payload.identifier?.trim(); + if (!helloIdentifier || !this.options.config.followerIdentifiers.includes(helloIdentifier)) { + this.options.transport.sendToConnection( + connection, + encodeBuiltin(buildError({ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" }, { timestamp: this.now() })) + ); + return; + } + + if (payload.protocolVersion !== YONEXUS_PROTOCOL_VERSION) { + this.options.transport.sendToConnection( + connection, + encodeBuiltin( + buildError( + { + code: "UNSUPPORTED_PROTOCOL_VERSION", + message: `Unsupported protocol version: ${payload.protocolVersion}` + }, + { requestId: envelope.requestId, timestamp: this.now() } + ) + ) + ); + connection.ws.close(1002, "Unsupported protocol version"); + return; + } + + const record = this.ensureClientRecord(helloIdentifier); + record.updatedAt = this.now(); + + this.options.transport.registerConnection(helloIdentifier, connection.ws); + this.registry.sessions.set(helloIdentifier, { + identifier: helloIdentifier, + socket: connection.ws, + isAuthenticated: false, + connectedAt: connection.connectedAt, + lastActivityAt: this.now() + }); + + const nextAction = this.determineNextAction(record); + this.options.transport.sendToConnection( + { ...connection, identifier: helloIdentifier }, + encodeBuiltin( + buildHelloAck( + { + identifier: helloIdentifier, + nextAction + }, + { requestId: envelope.requestId, timestamp: this.now() } + ) + ) + ); + + await this.persist(); + } + + private determineNextAction(record: ClientRecord): "pair_required" | "auth_required" | "waiting_pair_confirm" { + if (hasPendingPairing(record) && !isPairingExpired(record, this.now())) { + return "waiting_pair_confirm"; + } + + if (canAuthenticate(record)) { + return "auth_required"; + } + + return "pair_required"; + } + + private ensureClientRecord(identifier: string): ClientRecord { + const existing = this.registry.clients.get(identifier); + if (existing) { + return existing; + } + + const created = createClientRecord(identifier); + this.registry.clients.set(identifier, created); + return created; + } + + private async persist(): Promise { + await this.options.store.save(this.registry.clients.values()); + } +} + +export function createYonexusServerRuntime( + options: YonexusServerRuntimeOptions +): YonexusServerRuntime { + return new YonexusServerRuntime(options); +} diff --git a/plugin/core/transport.ts b/plugin/core/transport.ts index 2f14b05..2743e24 100644 --- a/plugin/core/transport.ts +++ b/plugin/core/transport.ts @@ -1,4 +1,3 @@ -: import { WebSocketServer, WebSocket, RawData } from "ws"; import type { YonexusServerConfig } from "./config.js"; import type { ClientRecord } from "./persistence.js"; @@ -16,11 +15,14 @@ export interface ServerTransport { start(): Promise; stop(): Promise; send(identifier: string, message: string): boolean; + sendToConnection(connection: ClientConnection, message: string): boolean; broadcast(message: string): void; closeConnection(identifier: string, code?: number, reason?: string): boolean; + registerConnection(identifier: string, ws: WebSocket): boolean; + markAuthenticated(identifier: string): boolean; } -export type MessageHandler = (identifier: string | null, message: string) => void; +export type MessageHandler = (connection: ClientConnection, message: string) => void; export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void; export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void; @@ -107,12 +109,17 @@ export class YonexusServerTransport implements ServerTransport { send(identifier: string, message: string): boolean { const conn = this._connections.get(identifier); - if (!conn || !conn.isAuthenticated) { + if (!conn) { return false; } - - if (conn.ws.readyState === WebSocket.OPEN) { - conn.ws.send(message); + + return this.sendToConnection(conn, message); + } + + sendToConnection(connection: ClientConnection, message: string): boolean { + const { ws } = connection; + if (ws.readyState === WebSocket.OPEN) { + ws.send(message); return true; } return false; @@ -120,8 +127,8 @@ export class YonexusServerTransport implements ServerTransport { broadcast(message: string): void { for (const conn of this._connections.values()) { - if (conn.isAuthenticated && conn.ws.readyState === WebSocket.OPEN) { - conn.ws.send(message); + if (conn.isAuthenticated) { + this.sendToConnection(conn, message); } } } @@ -131,7 +138,7 @@ export class YonexusServerTransport implements ServerTransport { if (!conn) { return false; } - + conn.ws.close(code, reason); this._connections.delete(identifier); return true; @@ -188,12 +195,14 @@ export class YonexusServerTransport implements ServerTransport { break; } } - this.options.onMessage(identifier, message); + + const connection = identifier ? this._connections.get(identifier) ?? tempConn : tempConn; + this.options.onMessage(connection, message); }); ws.on("close", (code: number, reason: Buffer) => { this.tempConnections.delete(ws); - + // Find and remove from registered connections for (const [id, conn] of this._connections) { if (conn.ws === ws) { @@ -204,7 +213,7 @@ export class YonexusServerTransport implements ServerTransport { return; } } - + if (this.options.onDisconnect) { this.options.onDisconnect(null, code, reason); } diff --git a/plugin/index.ts b/plugin/index.ts index 2970f34..95c77be 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -67,5 +67,11 @@ export { type ConnectionHandler, type DisconnectionHandler } from "./core/transport.js"; +export { + createYonexusServerRuntime, + YonexusServerRuntime, + type YonexusServerRuntimeOptions, + type ServerLifecycleState +} from "./core/runtime.js"; export { manifest };