From 075fcb7974207e6c463225859728ac3ad78c9e19 Mon Sep 17 00:00:00 2001 From: nav Date: Wed, 8 Apr 2026 22:39:49 +0000 Subject: [PATCH] feat: add server liveness sweep and rule registry --- plugin/core/rules.ts | 89 +++++++++++++++++++++++++++++++++++++ plugin/core/runtime.ts | 99 ++++++++++++++++++++++++++++++++++++++++++ plugin/index.ts | 7 +++ 3 files changed, 195 insertions(+) create mode 100644 plugin/core/rules.ts diff --git a/plugin/core/rules.ts b/plugin/core/rules.ts new file mode 100644 index 0000000..abb60ee --- /dev/null +++ b/plugin/core/rules.ts @@ -0,0 +1,89 @@ +import { + BUILTIN_RULE, + CodecError, + parseRewrittenRuleMessage +} from "../../../Yonexus.Protocol/src/index.js"; + +export type ServerRuleProcessor = (message: string) => unknown; + +export class ServerRuleRegistryError extends Error { + constructor(message: string) { + super(message); + this.name = "ServerRuleRegistryError"; + } +} + +export interface ServerRuleRegistry { + readonly size: number; + registerRule(rule: string, processor: ServerRuleProcessor): void; + hasRule(rule: string): boolean; + dispatch(raw: string): boolean; + getRules(): readonly string[]; +} + +export class YonexusServerRuleRegistry implements ServerRuleRegistry { + private readonly rules = new Map(); + + get size(): number { + return this.rules.size; + } + + registerRule(rule: string, processor: ServerRuleProcessor): void { + const normalizedRule = this.normalizeRule(rule); + if (this.rules.has(normalizedRule)) { + throw new ServerRuleRegistryError( + `Rule '${normalizedRule}' is already registered` + ); + } + + this.rules.set(normalizedRule, processor); + } + + hasRule(rule: string): boolean { + return this.rules.has(rule.trim()); + } + + dispatch(raw: string): boolean { + const parsed = parseRewrittenRuleMessage(raw); + const processor = this.rules.get(parsed.ruleIdentifier); + if (!processor) { + return false; + } + + processor(raw); + return true; + } + + getRules(): readonly string[] { + return [...this.rules.keys()]; + } + + private normalizeRule(rule: string): string { + const normalizedRule = rule.trim(); + if (!normalizedRule) { + throw new ServerRuleRegistryError("Rule identifier must be a non-empty string"); + } + + if (normalizedRule === BUILTIN_RULE) { + throw new ServerRuleRegistryError( + `Rule identifier '${BUILTIN_RULE}' is reserved` + ); + } + + try { + parseRewrittenRuleMessage(`${normalizedRule}::sender::probe`); + } catch (error) { + if (error instanceof CodecError) { + throw new ServerRuleRegistryError(error.message); + } + + throw error; + } + + return normalizedRule; + } +} + +export function createServerRuleRegistry(): ServerRuleRegistry { + return new YonexusServerRuleRegistry(); +} diff --git a/plugin/core/runtime.ts b/plugin/core/runtime.ts index 04a4c12..b5bac3c 100644 --- a/plugin/core/runtime.ts +++ b/plugin/core/runtime.ts @@ -8,8 +8,10 @@ import { YONEXUS_PROTOCOL_VERSION, buildAuthFailed, buildAuthSuccess, + buildDisconnectNotice, buildError, buildHeartbeatAck, + buildStatusUpdate, buildHelloAck, buildPairFailed, buildPairRequest, @@ -47,6 +49,7 @@ export interface YonexusServerRuntimeOptions { store: YonexusServerStore; transport: ServerTransport; now?: () => number; + sweepIntervalMs?: number; } export interface ServerLifecycleState { @@ -60,6 +63,8 @@ export class YonexusServerRuntime { private readonly registry: ServerRegistry; private readonly pairingService: PairingService; private readonly notificationService: DiscordNotificationService; + private readonly sweepIntervalMs: number; + private sweepTimer: NodeJS.Timeout | null = null; private started = false; constructor(options: YonexusServerRuntimeOptions) { @@ -69,6 +74,7 @@ export class YonexusServerRuntime { clients: new Map(), sessions: new Map() }; + this.sweepIntervalMs = options.sweepIntervalMs ?? 30_000; this.pairingService = createPairingService({ now: this.now }); this.notificationService = createDiscordNotificationService({ botToken: options.config.notifyBotToken, @@ -100,6 +106,7 @@ export class YonexusServerRuntime { } await this.options.transport.start(); + this.startSweepTimer(); this.started = true; } @@ -108,6 +115,7 @@ export class YonexusServerRuntime { return; } + this.stopSweepTimer(); await this.persist(); this.registry.sessions.clear(); await this.options.transport.stop(); @@ -669,6 +677,97 @@ export class YonexusServerRuntime { await this.persist(); } + private startSweepTimer(): void { + this.stopSweepTimer(); + this.sweepTimer = setInterval(() => { + void this.runLivenessSweep(); + }, this.sweepIntervalMs); + } + + private stopSweepTimer(): void { + if (!this.sweepTimer) { + return; + } + + clearInterval(this.sweepTimer); + this.sweepTimer = null; + } + + private async runLivenessSweep(): Promise { + const now = this.now(); + let hasChanges = false; + + for (const record of this.registry.clients.values()) { + const nextStatus = this.getLivenessStatus(record, now); + if (!nextStatus || nextStatus === record.status) { + continue; + } + + record.status = nextStatus; + record.updatedAt = now; + hasChanges = true; + + if (nextStatus === "unstable") { + this.options.transport.send( + record.identifier, + encodeBuiltin( + buildStatusUpdate( + { + identifier: record.identifier, + status: "unstable", + reason: "heartbeat_timeout_7m" + }, + { timestamp: now } + ) + ) + ); + continue; + } + + if (nextStatus === "offline") { + this.options.transport.send( + record.identifier, + encodeBuiltin( + buildDisconnectNotice( + { + identifier: record.identifier, + reason: "heartbeat_timeout_11m" + }, + { timestamp: now } + ) + ) + ); + this.options.transport.closeConnection(record.identifier, 1001, "Heartbeat timeout"); + this.registry.sessions.delete(record.identifier); + } + } + + if (hasChanges) { + await this.persist(); + } + } + + private getLivenessStatus( + record: ClientRecord, + now: number + ): "online" | "unstable" | "offline" | null { + const session = this.registry.sessions.get(record.identifier); + if (!session || !session.isAuthenticated || !record.lastHeartbeatAt) { + return null; + } + + const silenceSeconds = now - record.lastHeartbeatAt; + if (silenceSeconds >= 11 * 60) { + return "offline"; + } + + if (silenceSeconds >= 7 * 60) { + return "unstable"; + } + + return "online"; + } + private async triggerRePairRequired( connection: ClientConnection, record: ClientRecord, diff --git a/plugin/index.ts b/plugin/index.ts index 94840cb..eb2d894 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -73,6 +73,13 @@ export { type YonexusServerRuntimeOptions, type ServerLifecycleState } from "./core/runtime.js"; +export { + createServerRuleRegistry, + YonexusServerRuleRegistry, + ServerRuleRegistryError, + type ServerRuleRegistry, + type ServerRuleProcessor +} from "./core/rules.js"; export { createPairingService,