Add server runtime and hello handshake

This commit is contained in:
nav
2026-04-08 21:13:16 +00:00
parent b44a4cae66
commit f7c7531385
6 changed files with 308 additions and 19 deletions

View File

@@ -48,10 +48,10 @@ function isValidWsUrl(value: string): boolean {
}
export function validateYonexusServerConfig(raw: unknown): YonexusServerConfig {
const source = raw as Record<string, unknown> | null;
const source = (raw && typeof raw === "object" ? raw : {}) as Record<string, unknown>;
const issues: string[] = [];
const rawIdentifiers = source?.followerIdentifiers;
const rawIdentifiers = source.followerIdentifiers;
const followerIdentifiers = Array.isArray(rawIdentifiers)
? rawIdentifiers
.filter((value): value is string => typeof value === "string")
@@ -67,23 +67,23 @@ export function validateYonexusServerConfig(raw: unknown): YonexusServerConfig {
issues.push("followerIdentifiers must not contain duplicates");
}
const notifyBotToken = source?.notifyBotToken;
const notifyBotToken = source.notifyBotToken;
if (!isNonEmptyString(notifyBotToken)) {
issues.push("notifyBotToken is required");
}
const adminUserId = source?.adminUserId;
const adminUserId = source.adminUserId;
if (!isNonEmptyString(adminUserId)) {
issues.push("adminUserId is required");
}
const listenPort = source?.listenPort;
const listenPort = source.listenPort;
if (!isValidPort(listenPort)) {
issues.push("listenPort must be an integer between 1 and 65535");
}
const listenHost = normalizeOptionalString(source?.listenHost) ?? "0.0.0.0";
const publicWsUrl = normalizeOptionalString(source?.publicWsUrl);
const listenHost = normalizeOptionalString(source.listenHost) ?? "0.0.0.0";
const publicWsUrl = normalizeOptionalString(source.publicWsUrl);
if (publicWsUrl !== undefined && !isValidWsUrl(publicWsUrl)) {
issues.push("publicWsUrl must be a valid ws:// or wss:// URL when provided");

218
plugin/core/runtime.ts Normal file
View File

@@ -0,0 +1,218 @@
import {
type BuiltinEnvelope,
type HelloPayload,
YONEXUS_PROTOCOL_VERSION,
buildError,
buildHelloAck,
decodeBuiltin,
encodeBuiltin,
isBuiltinMessage
} from "../../../Yonexus.Protocol/src/index.js";
import type { YonexusServerConfig } from "./config.js";
import {
canAuthenticate,
createClientRecord,
hasPendingPairing,
isPairingExpired,
type ClientRecord,
type ServerRegistry
} from "./persistence.js";
import type { YonexusServerStore } from "./store.js";
import { type ClientConnection, type ServerTransport } from "./transport.js";
export interface YonexusServerRuntimeOptions {
config: YonexusServerConfig;
store: YonexusServerStore;
transport: ServerTransport;
now?: () => number;
}
export interface ServerLifecycleState {
readonly isStarted: boolean;
readonly registry: ServerRegistry;
}
export class YonexusServerRuntime {
private readonly options: YonexusServerRuntimeOptions;
private readonly now: () => number;
private readonly registry: ServerRegistry;
private started = false;
constructor(options: YonexusServerRuntimeOptions) {
this.options = options;
this.now = options.now ?? (() => Math.floor(Date.now() / 1000));
this.registry = {
clients: new Map(),
sessions: new Map()
};
}
get state(): ServerLifecycleState {
return {
isStarted: this.started,
registry: this.registry
};
}
async start(): Promise<void> {
if (this.started) {
return;
}
const persisted = await this.options.store.load();
for (const record of persisted.clients.values()) {
this.registry.clients.set(record.identifier, record);
}
for (const identifier of this.options.config.followerIdentifiers) {
if (!this.registry.clients.has(identifier)) {
this.registry.clients.set(identifier, createClientRecord(identifier));
}
}
await this.options.transport.start();
this.started = true;
}
async stop(): Promise<void> {
if (!this.started) {
return;
}
await this.persist();
this.registry.sessions.clear();
await this.options.transport.stop();
this.started = false;
}
handleDisconnect(identifier: string | null): void {
if (!identifier) {
return;
}
const existing = this.registry.sessions.get(identifier);
if (!existing) {
return;
}
const record = this.registry.clients.get(identifier);
if (record) {
record.status = "offline";
record.updatedAt = this.now();
}
this.registry.sessions.delete(identifier);
}
async handleMessage(connection: ClientConnection, raw: string): Promise<void> {
if (!isBuiltinMessage(raw)) {
return;
}
const envelope = decodeBuiltin(raw);
if (envelope.type === "hello") {
await this.handleHello(connection, envelope as BuiltinEnvelope<"hello", HelloPayload>);
}
}
private async handleHello(
connection: ClientConnection,
envelope: BuiltinEnvelope<"hello", HelloPayload>
): Promise<void> {
const payload = envelope.payload;
if (!payload) {
this.options.transport.sendToConnection(
connection,
encodeBuiltin(buildError({ code: "MALFORMED_MESSAGE", message: "hello payload is required" }, { timestamp: this.now() }))
);
return;
}
const helloIdentifier = payload.identifier?.trim();
if (!helloIdentifier || !this.options.config.followerIdentifiers.includes(helloIdentifier)) {
this.options.transport.sendToConnection(
connection,
encodeBuiltin(buildError({ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" }, { timestamp: this.now() }))
);
return;
}
if (payload.protocolVersion !== YONEXUS_PROTOCOL_VERSION) {
this.options.transport.sendToConnection(
connection,
encodeBuiltin(
buildError(
{
code: "UNSUPPORTED_PROTOCOL_VERSION",
message: `Unsupported protocol version: ${payload.protocolVersion}`
},
{ requestId: envelope.requestId, timestamp: this.now() }
)
)
);
connection.ws.close(1002, "Unsupported protocol version");
return;
}
const record = this.ensureClientRecord(helloIdentifier);
record.updatedAt = this.now();
this.options.transport.registerConnection(helloIdentifier, connection.ws);
this.registry.sessions.set(helloIdentifier, {
identifier: helloIdentifier,
socket: connection.ws,
isAuthenticated: false,
connectedAt: connection.connectedAt,
lastActivityAt: this.now()
});
const nextAction = this.determineNextAction(record);
this.options.transport.sendToConnection(
{ ...connection, identifier: helloIdentifier },
encodeBuiltin(
buildHelloAck(
{
identifier: helloIdentifier,
nextAction
},
{ requestId: envelope.requestId, timestamp: this.now() }
)
)
);
await this.persist();
}
private determineNextAction(record: ClientRecord): "pair_required" | "auth_required" | "waiting_pair_confirm" {
if (hasPendingPairing(record) && !isPairingExpired(record, this.now())) {
return "waiting_pair_confirm";
}
if (canAuthenticate(record)) {
return "auth_required";
}
return "pair_required";
}
private ensureClientRecord(identifier: string): ClientRecord {
const existing = this.registry.clients.get(identifier);
if (existing) {
return existing;
}
const created = createClientRecord(identifier);
this.registry.clients.set(identifier, created);
return created;
}
private async persist(): Promise<void> {
await this.options.store.save(this.registry.clients.values());
}
}
export function createYonexusServerRuntime(
options: YonexusServerRuntimeOptions
): YonexusServerRuntime {
return new YonexusServerRuntime(options);
}

View File

@@ -1,4 +1,3 @@
:
import { WebSocketServer, WebSocket, RawData } from "ws";
import type { YonexusServerConfig } from "./config.js";
import type { ClientRecord } from "./persistence.js";
@@ -16,11 +15,14 @@ export interface ServerTransport {
start(): Promise<void>;
stop(): Promise<void>;
send(identifier: string, message: string): boolean;
sendToConnection(connection: ClientConnection, message: string): boolean;
broadcast(message: string): void;
closeConnection(identifier: string, code?: number, reason?: string): boolean;
registerConnection(identifier: string, ws: WebSocket): boolean;
markAuthenticated(identifier: string): boolean;
}
export type MessageHandler = (identifier: string | null, message: string) => void;
export type MessageHandler = (connection: ClientConnection, message: string) => void;
export type ConnectionHandler = (identifier: string | null, ws: WebSocket) => void;
export type DisconnectionHandler = (identifier: string | null, code: number, reason: Buffer) => void;
@@ -107,12 +109,17 @@ export class YonexusServerTransport implements ServerTransport {
send(identifier: string, message: string): boolean {
const conn = this._connections.get(identifier);
if (!conn || !conn.isAuthenticated) {
if (!conn) {
return false;
}
if (conn.ws.readyState === WebSocket.OPEN) {
conn.ws.send(message);
return this.sendToConnection(conn, message);
}
sendToConnection(connection: ClientConnection, message: string): boolean {
const { ws } = connection;
if (ws.readyState === WebSocket.OPEN) {
ws.send(message);
return true;
}
return false;
@@ -120,8 +127,8 @@ export class YonexusServerTransport implements ServerTransport {
broadcast(message: string): void {
for (const conn of this._connections.values()) {
if (conn.isAuthenticated && conn.ws.readyState === WebSocket.OPEN) {
conn.ws.send(message);
if (conn.isAuthenticated) {
this.sendToConnection(conn, message);
}
}
}
@@ -131,7 +138,7 @@ export class YonexusServerTransport implements ServerTransport {
if (!conn) {
return false;
}
conn.ws.close(code, reason);
this._connections.delete(identifier);
return true;
@@ -188,12 +195,14 @@ export class YonexusServerTransport implements ServerTransport {
break;
}
}
this.options.onMessage(identifier, message);
const connection = identifier ? this._connections.get(identifier) ?? tempConn : tempConn;
this.options.onMessage(connection, message);
});
ws.on("close", (code: number, reason: Buffer) => {
this.tempConnections.delete(ws);
// Find and remove from registered connections
for (const [id, conn] of this._connections) {
if (conn.ws === ws) {
@@ -204,7 +213,7 @@ export class YonexusServerTransport implements ServerTransport {
return;
}
}
if (this.options.onDisconnect) {
this.options.onDisconnect(null, code, reason);
}

View File

@@ -67,5 +67,11 @@ export {
type ConnectionHandler,
type DisconnectionHandler
} from "./core/transport.js";
export {
createYonexusServerRuntime,
YonexusServerRuntime,
type YonexusServerRuntimeOptions,
type ServerLifecycleState
} from "./core/runtime.js";
export { manifest };