import { AUTH_ATTEMPT_WINDOW_SECONDS, AUTH_MAX_ATTEMPTS_PER_WINDOW, AUTH_RECENT_NONCE_WINDOW_SIZE, type BuiltinEnvelope, type HelloPayload, type PairConfirmPayload, YONEXUS_PROTOCOL_VERSION, buildAuthFailed, buildAuthSuccess, buildDisconnectNotice, buildError, buildHeartbeatAck, buildStatusUpdate, buildHelloAck, buildPairFailed, buildPairRequest, buildPairSuccess, buildRePairRequired, CodecError, decodeBuiltin, encodeBuiltin, encodeRuleMessage, extractAuthRequestSigningInput, isBuiltinMessage, isTimestampFresh, isValidAuthNonce, parseRuleMessage, type AuthRequestPayload, type HeartbeatPayload } from "../../../Yonexus.Protocol/src/index.js"; import type { YonexusServerConfig } from "./config.js"; import { canAuthenticate, createClientRecord, hasPendingPairing, isPairingExpired, type ClientRecord, type ServerRegistry } from "./persistence.js"; import { verifySignature } from "../../../Yonexus.Client/plugin/crypto/keypair.js"; import type { YonexusServerStore } from "./store.js"; import { type ClientConnection, type ServerTransport } from "./transport.js"; import { createPairingService, type PairingService } from "../services/pairing.js"; import { createDiscordNotificationService, type DiscordNotificationService } from "../notifications/discord.js"; import { safeErrorMessage } from "./logging.js"; import type { ServerRuleRegistry } from "./rules.js"; export interface YonexusServerRuntimeOptions { config: YonexusServerConfig; store: YonexusServerStore; transport: ServerTransport; notificationService?: DiscordNotificationService; ruleRegistry?: ServerRuleRegistry; onClientAuthenticated?: (identifier: string) => void; now?: () => number; sweepIntervalMs?: number; } export interface ServerLifecycleState { readonly isStarted: boolean; readonly registry: ServerRegistry; } export class YonexusServerRuntime { private readonly options: YonexusServerRuntimeOptions; private readonly now: () => number; private readonly registry: ServerRegistry; private readonly pairingService: PairingService; private readonly notificationService: DiscordNotificationService; private readonly sweepIntervalMs: number; private sweepTimer: NodeJS.Timeout | null = null; private started = false; constructor(options: YonexusServerRuntimeOptions) { this.options = options; this.now = options.now ?? (() => Math.floor(Date.now() / 1000)); this.registry = { clients: new Map(), sessions: new Map() }; this.sweepIntervalMs = options.sweepIntervalMs ?? 30_000; this.pairingService = createPairingService({ now: this.now }); this.notificationService = options.notificationService ?? createDiscordNotificationService({ botToken: options.config.notifyBotToken, adminUserId: options.config.adminUserId }); } get state(): ServerLifecycleState { return { isStarted: this.started, registry: this.registry }; } async start(): Promise { if (this.started) { return; } const persisted = await this.options.store.load(); for (const record of persisted.clients.values()) { this.registry.clients.set(record.identifier, record); } for (const identifier of this.options.config.followerIdentifiers) { if (!this.registry.clients.has(identifier)) { this.registry.clients.set(identifier, createClientRecord(identifier)); } } await this.options.transport.start(); this.startSweepTimer(); this.started = true; } async stop(): Promise { if (!this.started) { return; } this.stopSweepTimer(); await this.persist(); this.registry.sessions.clear(); await this.options.transport.stop(); this.started = false; } handleDisconnect(identifier: string | null): void { if (!identifier) { return; } const existing = this.registry.sessions.get(identifier); if (!existing) { return; } const record = this.registry.clients.get(identifier); if (record) { record.status = "offline"; record.updatedAt = this.now(); } this.registry.sessions.delete(identifier); } async handleMessage(connection: ClientConnection, raw: string): Promise { if (!isBuiltinMessage(raw)) { // Handle rule message - rewrite and dispatch await this.handleRuleMessage(connection, raw); return; } let envelope: BuiltinEnvelope; try { envelope = decodeBuiltin(raw); } catch (error) { const message = error instanceof CodecError ? error.message : "Invalid builtin message"; this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message }, { timestamp: this.now() } ) ) ); return; } if (envelope.type === "hello") { await this.handleHello(connection, envelope as BuiltinEnvelope<"hello", HelloPayload>); return; } if (envelope.type === "pair_confirm") { await this.handlePairConfirm( connection, envelope as BuiltinEnvelope<"pair_confirm", PairConfirmPayload> ); return; } if (envelope.type === "auth_request") { await this.handleAuthRequest( connection, envelope as BuiltinEnvelope<"auth_request", AuthRequestPayload> ); return; } if (envelope.type === "heartbeat") { await this.handleHeartbeat( connection, envelope as BuiltinEnvelope<"heartbeat", HeartbeatPayload> ); return; } this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message: `Unsupported builtin type: ${String(envelope.type)}` }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); } private async handleHello( connection: ClientConnection, envelope: BuiltinEnvelope<"hello", HelloPayload> ): Promise { const payload = envelope.payload; if (!payload) { this.options.transport.sendToConnection( connection, encodeBuiltin(buildError({ code: "MALFORMED_MESSAGE", message: "hello payload is required" }, { timestamp: this.now() })) ); return; } const helloIdentifier = payload.identifier?.trim(); if (!helloIdentifier || !this.options.config.followerIdentifiers.includes(helloIdentifier)) { this.options.transport.sendToConnection( connection, encodeBuiltin(buildError({ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" }, { timestamp: this.now() })) ); return; } if (payload.protocolVersion !== YONEXUS_PROTOCOL_VERSION) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "UNSUPPORTED_PROTOCOL_VERSION", message: `Unsupported protocol version: ${payload.protocolVersion}` }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); connection.ws.close(1002, "Unsupported protocol version"); return; } const record = this.ensureClientRecord(helloIdentifier); record.updatedAt = this.now(); this.options.transport.assignIdentifierToTemp(connection.ws, helloIdentifier); this.registry.sessions.set(helloIdentifier, { identifier: helloIdentifier, socket: connection.ws, isAuthenticated: false, connectedAt: connection.connectedAt, lastActivityAt: this.now(), publicKey: payload.publicKey?.trim() || undefined }); const nextAction = this.determineNextAction(record); this.options.transport.sendToConnection( { ...connection, identifier: helloIdentifier }, encodeBuiltin( buildHelloAck( { identifier: helloIdentifier, nextAction }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); if (nextAction === "pair_required" || nextAction === "waiting_pair_confirm") { await this.beginPairing({ record, connection: { ...connection, identifier: helloIdentifier }, requestId: envelope.requestId, reusePending: nextAction === "waiting_pair_confirm" }); } await this.persist(); } private async handlePairConfirm( connection: ClientConnection, envelope: BuiltinEnvelope<"pair_confirm", PairConfirmPayload> ): Promise { const payload = envelope.payload; if (!payload) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message: "pair_confirm payload is required" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const identifier = payload.identifier?.trim(); if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildPairFailed( { identifier: identifier || "unknown", reason: "identifier_not_allowed" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const record = this.ensureClientRecord(identifier); const submittedCode = payload.pairingCode?.trim(); if (!submittedCode) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message: "pairingCode is required" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const result = this.pairingService.confirmPairing(record, submittedCode); if (!result.success || !result.secret || !result.pairedAt) { const reason = result.reason === "not_pending" ? "internal_error" : result.reason ?? "internal_error"; this.options.transport.sendToConnection( connection, encodeBuiltin( buildPairFailed( { identifier, reason }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); await this.persist(); return; } if (connection.identifier !== identifier) { this.options.transport.assignIdentifierToTemp(connection.ws, identifier); } const session = this.registry.sessions.get(identifier); record.publicKey = session?.publicKey ?? record.publicKey; record.updatedAt = this.now(); this.options.transport.sendToConnection( { ...connection, identifier }, encodeBuiltin( buildPairSuccess( { identifier, secret: result.secret, pairedAt: result.pairedAt }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); await this.persist(); } private async handleAuthRequest( connection: ClientConnection, envelope: BuiltinEnvelope<"auth_request", AuthRequestPayload> ): Promise { const payload = envelope.payload; if (!payload) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message: "auth_request payload is required" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const identifier = payload.identifier?.trim(); if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildAuthFailed( { identifier: identifier || "unknown", reason: "unknown_identifier" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const record = this.ensureClientRecord(identifier); const session = this.registry.sessions.get(identifier); if (!session || !canAuthenticate(record) || !record.secret || !record.publicKey) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildAuthFailed( { identifier, reason: "not_paired" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const now = this.now(); record.recentHandshakeAttempts = record.recentHandshakeAttempts.filter( (timestamp) => now - timestamp < AUTH_ATTEMPT_WINDOW_SECONDS ); record.recentHandshakeAttempts.push(now); if (record.recentHandshakeAttempts.length >= AUTH_MAX_ATTEMPTS_PER_WINDOW) { await this.triggerRePairRequired(connection, record, envelope.requestId, "rate_limited"); return; } if (!isValidAuthNonce(payload.nonce)) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildAuthFailed( { identifier, reason: "invalid_signature" }, { requestId: envelope.requestId, timestamp: now } ) ) ); return; } const freshness = isTimestampFresh(payload.proofTimestamp, now); if (!freshness.ok) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildAuthFailed( { identifier, reason: freshness.reason }, { requestId: envelope.requestId, timestamp: now } ) ) ); return; } const hasNonceCollision = record.recentNonces.some((entry) => entry.nonce === payload.nonce); if (hasNonceCollision) { await this.triggerRePairRequired(connection, record, envelope.requestId, "nonce_collision"); return; } const publicKey = payload.publicKey?.trim() || session.publicKey || record.publicKey; if (!publicKey || publicKey !== record.publicKey) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildAuthFailed( { identifier, reason: "invalid_signature" }, { requestId: envelope.requestId, timestamp: now } ) ) ); return; } const isValidSignature = await verifySignature( publicKey, extractAuthRequestSigningInput(payload, record.secret), payload.signature ); if (!isValidSignature) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildAuthFailed( { identifier, reason: "invalid_signature" }, { requestId: envelope.requestId, timestamp: now } ) ) ); return; } record.recentNonces = [...record.recentNonces, { nonce: payload.nonce, timestamp: now }].slice( -AUTH_RECENT_NONCE_WINDOW_SIZE ); record.lastAuthenticatedAt = now; record.lastHeartbeatAt = now; record.status = "online"; record.updatedAt = now; if (session) { session.isAuthenticated = true; session.lastActivityAt = now; session.publicKey = publicKey; } const promoted = this.options.transport.promoteToAuthenticated(identifier, connection.ws); if (promoted) { this.options.onClientAuthenticated?.(identifier); } this.options.transport.sendToConnection( { ...connection, identifier }, encodeBuiltin( buildAuthSuccess( { identifier, authenticatedAt: now, status: "online" }, { requestId: envelope.requestId, timestamp: now } ) ) ); await this.persist(); } private determineNextAction(record: ClientRecord): "pair_required" | "auth_required" | "waiting_pair_confirm" { if (hasPendingPairing(record) && !isPairingExpired(record, this.now())) { return "waiting_pair_confirm"; } if (canAuthenticate(record)) { return "auth_required"; } return "pair_required"; } private ensureClientRecord(identifier: string): ClientRecord { const existing = this.registry.clients.get(identifier); if (existing) { return existing; } const created = createClientRecord(identifier); this.registry.clients.set(identifier, created); return created; } private async beginPairing(options: { record: ClientRecord; connection: ClientConnection; requestId?: string; reusePending?: boolean; }): Promise { const { record, connection, requestId, reusePending = false } = options; const request = reusePending && hasPendingPairing(record) && !isPairingExpired(record, this.now()) ? { identifier: record.identifier, pairingCode: record.pairingCode ?? "", expiresAt: record.pairingExpiresAt ?? this.now(), ttlSeconds: this.pairingService.getRemainingTtl(record), createdAt: record.updatedAt } : this.pairingService.createPairingRequest(record); const notified = reusePending ? record.pairingNotifyStatus === "sent" : await this.notificationService.sendPairingNotification(request); if (notified) { this.pairingService.markNotificationSent(record); } else { this.pairingService.markNotificationFailed(record); } // Persist immediately so the pairing code is readable from disk (e.g. via CLI) if (!reusePending) { await this.persist(); } this.options.transport.sendToConnection( connection, encodeBuiltin( buildPairRequest( { identifier: record.identifier, expiresAt: request.expiresAt, ttlSeconds: request.ttlSeconds, adminNotification: notified ? "sent" : "failed", codeDelivery: "out_of_band" }, { requestId, timestamp: this.now() } ) ) ); // Pairing remains pending regardless of notification status. // The admin can retrieve the pairing code via the server CLI command. } private async handleHeartbeat( connection: ClientConnection, envelope: BuiltinEnvelope<"heartbeat", HeartbeatPayload> ): Promise { const payload = envelope.payload; if (!payload) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message: "heartbeat payload is required" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const identifier = payload.identifier?.trim(); if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const session = this.registry.sessions.get(identifier); if (!session || !session.isAuthenticated) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "AUTH_FAILED", message: "heartbeat requires authentication" }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); return; } const record = this.ensureClientRecord(identifier); const now = this.now(); record.lastHeartbeatAt = now; record.status = "online"; record.updatedAt = now; session.lastActivityAt = now; this.options.transport.sendToConnection( { ...connection, identifier }, encodeBuiltin( buildHeartbeatAck( { identifier, status: record.status }, { requestId: envelope.requestId, timestamp: now } ) ) ); await this.persist(); } private startSweepTimer(): void { this.stopSweepTimer(); this.sweepTimer = setInterval(() => { void this.runLivenessSweep(); }, this.sweepIntervalMs); } private stopSweepTimer(): void { if (!this.sweepTimer) { return; } clearInterval(this.sweepTimer); this.sweepTimer = null; } private async runLivenessSweep(): Promise { const now = this.now(); let hasChanges = false; for (const record of this.registry.clients.values()) { const nextStatus = this.getLivenessStatus(record, now); if (!nextStatus || nextStatus === record.status) { continue; } record.status = nextStatus; record.updatedAt = now; hasChanges = true; if (nextStatus === "unstable") { this.options.transport.send( record.identifier, encodeBuiltin( buildStatusUpdate( { identifier: record.identifier, status: "unstable", reason: "heartbeat_timeout_7m" }, { timestamp: now } ) ) ); continue; } if (nextStatus === "offline") { this.options.transport.send( record.identifier, encodeBuiltin( buildDisconnectNotice( { identifier: record.identifier, reason: "heartbeat_timeout_11m" }, { timestamp: now } ) ) ); this.options.transport.closeConnection(record.identifier, 1001, "Heartbeat timeout"); this.registry.sessions.delete(record.identifier); } } if (hasChanges) { await this.persist(); } } private getLivenessStatus( record: ClientRecord, now: number ): "online" | "unstable" | "offline" | null { const session = this.registry.sessions.get(record.identifier); if (!session || !session.isAuthenticated || !record.lastHeartbeatAt) { return null; } const silenceSeconds = now - record.lastHeartbeatAt; if (silenceSeconds >= 11 * 60) { return "offline"; } if (silenceSeconds >= 7 * 60) { return "unstable"; } return "online"; } private async triggerRePairRequired( connection: ClientConnection, record: ClientRecord, requestId: string | undefined, reason: "nonce_collision" | "rate_limited" ): Promise { record.secret = undefined; record.pairingStatus = "revoked"; record.pairingCode = undefined; record.pairingExpiresAt = undefined; record.pairingNotifyStatus = undefined; record.recentNonces = []; record.recentHandshakeAttempts = []; record.status = "offline"; record.updatedAt = this.now(); this.options.transport.sendToConnection( connection, encodeBuiltin( buildRePairRequired( { identifier: record.identifier, reason }, { requestId, timestamp: this.now() } ) ) ); await this.persist(); } private async persist(): Promise { await this.options.store.save(this.registry.clients.values()); } /** * Send a rule message to a specific client. * * @param identifier - The target client identifier * @param message - The complete rule message with identifier and content * @returns True if message was sent, false if client not connected/authenticated */ sendMessageToClient(identifier: string, message: string): boolean { const session = this.registry.sessions.get(identifier); if (!session || !session.isAuthenticated) { return false; } // Validate the message is a properly formatted rule message try { // Quick check: must not be a builtin message and must have :: delimiter if (message.startsWith("builtin::")) { return false; } const delimiterIndex = message.indexOf("::"); if (delimiterIndex === -1) { return false; } parseRuleMessage(message); } catch { return false; } return this.options.transport.send(identifier, message); } /** * Send a rule message to a specific client using separate rule identifier and content. * * @param identifier - The target client identifier * @param ruleIdentifier - The rule identifier * @param content - The message content * @returns True if message was sent, false if client not connected/authenticated or invalid format */ sendRuleMessageToClient(identifier: string, ruleIdentifier: string, content: string): boolean { const session = this.registry.sessions.get(identifier); if (!session || !session.isAuthenticated) { return false; } try { const encoded = encodeRuleMessage(ruleIdentifier, content); return this.options.transport.send(identifier, encoded); } catch { return false; } } /** * Handle incoming rule message from a client. * Rewrites the message to include sender identifier before dispatch. * * @param connection - The client connection * @param raw - The raw rule message */ private async handleRuleMessage(connection: ClientConnection, raw: string): Promise { // Get sender identifier from connection or session let senderIdentifier = connection.identifier; if (!senderIdentifier) { // Try to find identifier from WebSocket for (const [id, session] of this.registry.sessions.entries()) { if (session.socket === connection.ws) { senderIdentifier = id; break; } } } if (!senderIdentifier) { // Cannot determine sender - close connection connection.ws.close(1008, "Cannot identify sender"); return; } const session = this.registry.sessions.get(senderIdentifier); if (!session || !session.isAuthenticated) { // Only accept rule messages from authenticated clients connection.ws.close(1008, "Not authenticated"); return; } try { const parsed = parseRuleMessage(raw); const rewritten = `${parsed.ruleIdentifier}::${senderIdentifier}::${parsed.content}`; session.lastActivityAt = this.now(); this.options.ruleRegistry?.dispatch(rewritten); } catch (error) { // Malformed rule message this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "MALFORMED_MESSAGE", message: safeErrorMessage(error) || "Invalid rule message format" }, { timestamp: this.now() } ) ) ); } } } export function createYonexusServerRuntime( options: YonexusServerRuntimeOptions ): YonexusServerRuntime { return new YonexusServerRuntime(options); }