feat: add server liveness sweep and rule registry
This commit is contained in:
89
plugin/core/rules.ts
Normal file
89
plugin/core/rules.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
import {
|
||||
BUILTIN_RULE,
|
||||
CodecError,
|
||||
parseRewrittenRuleMessage
|
||||
} from "../../../Yonexus.Protocol/src/index.js";
|
||||
|
||||
export type ServerRuleProcessor = (message: string) => unknown;
|
||||
|
||||
export class ServerRuleRegistryError extends Error {
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "ServerRuleRegistryError";
|
||||
}
|
||||
}
|
||||
|
||||
export interface ServerRuleRegistry {
|
||||
readonly size: number;
|
||||
registerRule(rule: string, processor: ServerRuleProcessor): void;
|
||||
hasRule(rule: string): boolean;
|
||||
dispatch(raw: string): boolean;
|
||||
getRules(): readonly string[];
|
||||
}
|
||||
|
||||
export class YonexusServerRuleRegistry implements ServerRuleRegistry {
|
||||
private readonly rules = new Map<string, ServerRuleProcessor>();
|
||||
|
||||
get size(): number {
|
||||
return this.rules.size;
|
||||
}
|
||||
|
||||
registerRule(rule: string, processor: ServerRuleProcessor): void {
|
||||
const normalizedRule = this.normalizeRule(rule);
|
||||
if (this.rules.has(normalizedRule)) {
|
||||
throw new ServerRuleRegistryError(
|
||||
`Rule '${normalizedRule}' is already registered`
|
||||
);
|
||||
}
|
||||
|
||||
this.rules.set(normalizedRule, processor);
|
||||
}
|
||||
|
||||
hasRule(rule: string): boolean {
|
||||
return this.rules.has(rule.trim());
|
||||
}
|
||||
|
||||
dispatch(raw: string): boolean {
|
||||
const parsed = parseRewrittenRuleMessage(raw);
|
||||
const processor = this.rules.get(parsed.ruleIdentifier);
|
||||
if (!processor) {
|
||||
return false;
|
||||
}
|
||||
|
||||
processor(raw);
|
||||
return true;
|
||||
}
|
||||
|
||||
getRules(): readonly string[] {
|
||||
return [...this.rules.keys()];
|
||||
}
|
||||
|
||||
private normalizeRule(rule: string): string {
|
||||
const normalizedRule = rule.trim();
|
||||
if (!normalizedRule) {
|
||||
throw new ServerRuleRegistryError("Rule identifier must be a non-empty string");
|
||||
}
|
||||
|
||||
if (normalizedRule === BUILTIN_RULE) {
|
||||
throw new ServerRuleRegistryError(
|
||||
`Rule identifier '${BUILTIN_RULE}' is reserved`
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
parseRewrittenRuleMessage(`${normalizedRule}::sender::probe`);
|
||||
} catch (error) {
|
||||
if (error instanceof CodecError) {
|
||||
throw new ServerRuleRegistryError(error.message);
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
return normalizedRule;
|
||||
}
|
||||
}
|
||||
|
||||
export function createServerRuleRegistry(): ServerRuleRegistry {
|
||||
return new YonexusServerRuleRegistry();
|
||||
}
|
||||
@@ -8,8 +8,10 @@ import {
|
||||
YONEXUS_PROTOCOL_VERSION,
|
||||
buildAuthFailed,
|
||||
buildAuthSuccess,
|
||||
buildDisconnectNotice,
|
||||
buildError,
|
||||
buildHeartbeatAck,
|
||||
buildStatusUpdate,
|
||||
buildHelloAck,
|
||||
buildPairFailed,
|
||||
buildPairRequest,
|
||||
@@ -47,6 +49,7 @@ export interface YonexusServerRuntimeOptions {
|
||||
store: YonexusServerStore;
|
||||
transport: ServerTransport;
|
||||
now?: () => number;
|
||||
sweepIntervalMs?: number;
|
||||
}
|
||||
|
||||
export interface ServerLifecycleState {
|
||||
@@ -60,6 +63,8 @@ export class YonexusServerRuntime {
|
||||
private readonly registry: ServerRegistry;
|
||||
private readonly pairingService: PairingService;
|
||||
private readonly notificationService: DiscordNotificationService;
|
||||
private readonly sweepIntervalMs: number;
|
||||
private sweepTimer: NodeJS.Timeout | null = null;
|
||||
private started = false;
|
||||
|
||||
constructor(options: YonexusServerRuntimeOptions) {
|
||||
@@ -69,6 +74,7 @@ export class YonexusServerRuntime {
|
||||
clients: new Map(),
|
||||
sessions: new Map()
|
||||
};
|
||||
this.sweepIntervalMs = options.sweepIntervalMs ?? 30_000;
|
||||
this.pairingService = createPairingService({ now: this.now });
|
||||
this.notificationService = createDiscordNotificationService({
|
||||
botToken: options.config.notifyBotToken,
|
||||
@@ -100,6 +106,7 @@ export class YonexusServerRuntime {
|
||||
}
|
||||
|
||||
await this.options.transport.start();
|
||||
this.startSweepTimer();
|
||||
this.started = true;
|
||||
}
|
||||
|
||||
@@ -108,6 +115,7 @@ export class YonexusServerRuntime {
|
||||
return;
|
||||
}
|
||||
|
||||
this.stopSweepTimer();
|
||||
await this.persist();
|
||||
this.registry.sessions.clear();
|
||||
await this.options.transport.stop();
|
||||
@@ -669,6 +677,97 @@ export class YonexusServerRuntime {
|
||||
await this.persist();
|
||||
}
|
||||
|
||||
private startSweepTimer(): void {
|
||||
this.stopSweepTimer();
|
||||
this.sweepTimer = setInterval(() => {
|
||||
void this.runLivenessSweep();
|
||||
}, this.sweepIntervalMs);
|
||||
}
|
||||
|
||||
private stopSweepTimer(): void {
|
||||
if (!this.sweepTimer) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearInterval(this.sweepTimer);
|
||||
this.sweepTimer = null;
|
||||
}
|
||||
|
||||
private async runLivenessSweep(): Promise<void> {
|
||||
const now = this.now();
|
||||
let hasChanges = false;
|
||||
|
||||
for (const record of this.registry.clients.values()) {
|
||||
const nextStatus = this.getLivenessStatus(record, now);
|
||||
if (!nextStatus || nextStatus === record.status) {
|
||||
continue;
|
||||
}
|
||||
|
||||
record.status = nextStatus;
|
||||
record.updatedAt = now;
|
||||
hasChanges = true;
|
||||
|
||||
if (nextStatus === "unstable") {
|
||||
this.options.transport.send(
|
||||
record.identifier,
|
||||
encodeBuiltin(
|
||||
buildStatusUpdate(
|
||||
{
|
||||
identifier: record.identifier,
|
||||
status: "unstable",
|
||||
reason: "heartbeat_timeout_7m"
|
||||
},
|
||||
{ timestamp: now }
|
||||
)
|
||||
)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nextStatus === "offline") {
|
||||
this.options.transport.send(
|
||||
record.identifier,
|
||||
encodeBuiltin(
|
||||
buildDisconnectNotice(
|
||||
{
|
||||
identifier: record.identifier,
|
||||
reason: "heartbeat_timeout_11m"
|
||||
},
|
||||
{ timestamp: now }
|
||||
)
|
||||
)
|
||||
);
|
||||
this.options.transport.closeConnection(record.identifier, 1001, "Heartbeat timeout");
|
||||
this.registry.sessions.delete(record.identifier);
|
||||
}
|
||||
}
|
||||
|
||||
if (hasChanges) {
|
||||
await this.persist();
|
||||
}
|
||||
}
|
||||
|
||||
private getLivenessStatus(
|
||||
record: ClientRecord,
|
||||
now: number
|
||||
): "online" | "unstable" | "offline" | null {
|
||||
const session = this.registry.sessions.get(record.identifier);
|
||||
if (!session || !session.isAuthenticated || !record.lastHeartbeatAt) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const silenceSeconds = now - record.lastHeartbeatAt;
|
||||
if (silenceSeconds >= 11 * 60) {
|
||||
return "offline";
|
||||
}
|
||||
|
||||
if (silenceSeconds >= 7 * 60) {
|
||||
return "unstable";
|
||||
}
|
||||
|
||||
return "online";
|
||||
}
|
||||
|
||||
private async triggerRePairRequired(
|
||||
connection: ClientConnection,
|
||||
record: ClientRecord,
|
||||
|
||||
@@ -73,6 +73,13 @@ export {
|
||||
type YonexusServerRuntimeOptions,
|
||||
type ServerLifecycleState
|
||||
} from "./core/runtime.js";
|
||||
export {
|
||||
createServerRuleRegistry,
|
||||
YonexusServerRuleRegistry,
|
||||
ServerRuleRegistryError,
|
||||
type ServerRuleRegistry,
|
||||
type ServerRuleProcessor
|
||||
} from "./core/rules.js";
|
||||
|
||||
export {
|
||||
createPairingService,
|
||||
|
||||
Reference in New Issue
Block a user