import { type BuiltinEnvelope, type HelloPayload, YONEXUS_PROTOCOL_VERSION, buildError, buildHelloAck, decodeBuiltin, encodeBuiltin, isBuiltinMessage } from "../../../Yonexus.Protocol/src/index.js"; import type { YonexusServerConfig } from "./config.js"; import { canAuthenticate, createClientRecord, hasPendingPairing, isPairingExpired, type ClientRecord, type ServerRegistry } from "./persistence.js"; import type { YonexusServerStore } from "./store.js"; import { type ClientConnection, type ServerTransport } from "./transport.js"; import { createPairingService, type PairingService } from "../services/pairing.js"; import { createDiscordNotificationService, type DiscordNotificationService } from "../notifications/discord.js"; export interface YonexusServerRuntimeOptions { config: YonexusServerConfig; store: YonexusServerStore; transport: ServerTransport; now?: () => number; } export interface ServerLifecycleState { readonly isStarted: boolean; readonly registry: ServerRegistry; } export class YonexusServerRuntime { private readonly options: YonexusServerRuntimeOptions; private readonly now: () => number; private readonly registry: ServerRegistry; private readonly pairingService: PairingService; private readonly notificationService: DiscordNotificationService; private started = false; constructor(options: YonexusServerRuntimeOptions) { this.options = options; this.now = options.now ?? (() => Math.floor(Date.now() / 1000)); this.registry = { clients: new Map(), sessions: new Map() }; this.pairingService = createPairingService({ now: this.now }); this.notificationService = createDiscordNotificationService({ botToken: options.config.notifyBotToken, adminUserId: options.config.adminUserId }); } get state(): ServerLifecycleState { return { isStarted: this.started, registry: this.registry }; } async start(): Promise { if (this.started) { return; } const persisted = await this.options.store.load(); for (const record of persisted.clients.values()) { this.registry.clients.set(record.identifier, record); } for (const identifier of this.options.config.followerIdentifiers) { if (!this.registry.clients.has(identifier)) { this.registry.clients.set(identifier, createClientRecord(identifier)); } } await this.options.transport.start(); this.started = true; } async stop(): Promise { if (!this.started) { return; } await this.persist(); this.registry.sessions.clear(); await this.options.transport.stop(); this.started = false; } handleDisconnect(identifier: string | null): void { if (!identifier) { return; } const existing = this.registry.sessions.get(identifier); if (!existing) { return; } const record = this.registry.clients.get(identifier); if (record) { record.status = "offline"; record.updatedAt = this.now(); } this.registry.sessions.delete(identifier); } async handleMessage(connection: ClientConnection, raw: string): Promise { if (!isBuiltinMessage(raw)) { return; } const envelope = decodeBuiltin(raw); if (envelope.type === "hello") { await this.handleHello(connection, envelope as BuiltinEnvelope<"hello", HelloPayload>); } } private async handleHello( connection: ClientConnection, envelope: BuiltinEnvelope<"hello", HelloPayload> ): Promise { const payload = envelope.payload; if (!payload) { this.options.transport.sendToConnection( connection, encodeBuiltin(buildError({ code: "MALFORMED_MESSAGE", message: "hello payload is required" }, { timestamp: this.now() })) ); return; } const helloIdentifier = payload.identifier?.trim(); if (!helloIdentifier || !this.options.config.followerIdentifiers.includes(helloIdentifier)) { this.options.transport.sendToConnection( connection, encodeBuiltin(buildError({ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" }, { timestamp: this.now() })) ); return; } if (payload.protocolVersion !== YONEXUS_PROTOCOL_VERSION) { this.options.transport.sendToConnection( connection, encodeBuiltin( buildError( { code: "UNSUPPORTED_PROTOCOL_VERSION", message: `Unsupported protocol version: ${payload.protocolVersion}` }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); connection.ws.close(1002, "Unsupported protocol version"); return; } const record = this.ensureClientRecord(helloIdentifier); record.updatedAt = this.now(); this.options.transport.registerConnection(helloIdentifier, connection.ws); this.registry.sessions.set(helloIdentifier, { identifier: helloIdentifier, socket: connection.ws, isAuthenticated: false, connectedAt: connection.connectedAt, lastActivityAt: this.now() }); const nextAction = this.determineNextAction(record); this.options.transport.sendToConnection( { ...connection, identifier: helloIdentifier }, encodeBuiltin( buildHelloAck( { identifier: helloIdentifier, nextAction }, { requestId: envelope.requestId, timestamp: this.now() } ) ) ); if (nextAction === "pair_required") { await this.beginPairing(record); } await this.persist(); } private determineNextAction(record: ClientRecord): "pair_required" | "auth_required" | "waiting_pair_confirm" { if (hasPendingPairing(record) && !isPairingExpired(record, this.now())) { return "waiting_pair_confirm"; } if (canAuthenticate(record)) { return "auth_required"; } return "pair_required"; } private ensureClientRecord(identifier: string): ClientRecord { const existing = this.registry.clients.get(identifier); if (existing) { return existing; } const created = createClientRecord(identifier); this.registry.clients.set(identifier, created); return created; } private async beginPairing(record: ClientRecord): Promise { if (hasPendingPairing(record) && !isPairingExpired(record, this.now())) { return; } const request = this.pairingService.createPairingRequest(record); const notified = await this.notificationService.sendPairingNotification(request); if (notified) { this.pairingService.markNotificationSent(record); } else { this.pairingService.markNotificationFailed(record); } } private async persist(): Promise { await this.options.store.save(this.registry.clients.values()); } } export function createYonexusServerRuntime( options: YonexusServerRuntimeOptions ): YonexusServerRuntime { return new YonexusServerRuntime(options); }