965 lines
27 KiB
TypeScript
965 lines
27 KiB
TypeScript
import {
|
|
AUTH_ATTEMPT_WINDOW_SECONDS,
|
|
AUTH_MAX_ATTEMPTS_PER_WINDOW,
|
|
AUTH_RECENT_NONCE_WINDOW_SIZE,
|
|
type BuiltinEnvelope,
|
|
type HelloPayload,
|
|
type PairConfirmPayload,
|
|
YONEXUS_PROTOCOL_VERSION,
|
|
buildAuthFailed,
|
|
buildAuthSuccess,
|
|
buildDisconnectNotice,
|
|
buildError,
|
|
buildHeartbeatAck,
|
|
buildStatusUpdate,
|
|
buildHelloAck,
|
|
buildPairFailed,
|
|
buildPairRequest,
|
|
buildPairSuccess,
|
|
buildRePairRequired,
|
|
CodecError,
|
|
decodeBuiltin,
|
|
encodeBuiltin,
|
|
encodeRuleMessage,
|
|
extractAuthRequestSigningInput,
|
|
isBuiltinMessage,
|
|
isTimestampFresh,
|
|
isValidAuthNonce,
|
|
parseRuleMessage,
|
|
type AuthRequestPayload,
|
|
type HeartbeatPayload
|
|
} from "../../../Yonexus.Protocol/src/index.js";
|
|
import type { YonexusServerConfig } from "./config.js";
|
|
import {
|
|
canAuthenticate,
|
|
createClientRecord,
|
|
hasPendingPairing,
|
|
isPairingExpired,
|
|
type ClientRecord,
|
|
type ServerRegistry
|
|
} from "./persistence.js";
|
|
import { verifySignature } from "../../../Yonexus.Client/plugin/crypto/keypair.js";
|
|
import type { YonexusServerStore } from "./store.js";
|
|
import { type ClientConnection, type ServerTransport } from "./transport.js";
|
|
import { createPairingService, type PairingService } from "../services/pairing.js";
|
|
import {
|
|
createDiscordNotificationService,
|
|
type DiscordNotificationService
|
|
} from "../notifications/discord.js";
|
|
import { safeErrorMessage } from "./logging.js";
|
|
|
|
export interface YonexusServerRuntimeOptions {
|
|
config: YonexusServerConfig;
|
|
store: YonexusServerStore;
|
|
transport: ServerTransport;
|
|
now?: () => number;
|
|
sweepIntervalMs?: number;
|
|
}
|
|
|
|
export interface ServerLifecycleState {
|
|
readonly isStarted: boolean;
|
|
readonly registry: ServerRegistry;
|
|
}
|
|
|
|
export class YonexusServerRuntime {
|
|
private readonly options: YonexusServerRuntimeOptions;
|
|
private readonly now: () => number;
|
|
private readonly registry: ServerRegistry;
|
|
private readonly pairingService: PairingService;
|
|
private readonly notificationService: DiscordNotificationService;
|
|
private readonly sweepIntervalMs: number;
|
|
private sweepTimer: NodeJS.Timeout | null = null;
|
|
private started = false;
|
|
|
|
constructor(options: YonexusServerRuntimeOptions) {
|
|
this.options = options;
|
|
this.now = options.now ?? (() => Math.floor(Date.now() / 1000));
|
|
this.registry = {
|
|
clients: new Map(),
|
|
sessions: new Map()
|
|
};
|
|
this.sweepIntervalMs = options.sweepIntervalMs ?? 30_000;
|
|
this.pairingService = createPairingService({ now: this.now });
|
|
this.notificationService = createDiscordNotificationService({
|
|
botToken: options.config.notifyBotToken,
|
|
adminUserId: options.config.adminUserId
|
|
});
|
|
}
|
|
|
|
get state(): ServerLifecycleState {
|
|
return {
|
|
isStarted: this.started,
|
|
registry: this.registry
|
|
};
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
if (this.started) {
|
|
return;
|
|
}
|
|
|
|
const persisted = await this.options.store.load();
|
|
for (const record of persisted.clients.values()) {
|
|
this.registry.clients.set(record.identifier, record);
|
|
}
|
|
|
|
for (const identifier of this.options.config.followerIdentifiers) {
|
|
if (!this.registry.clients.has(identifier)) {
|
|
this.registry.clients.set(identifier, createClientRecord(identifier));
|
|
}
|
|
}
|
|
|
|
await this.options.transport.start();
|
|
this.startSweepTimer();
|
|
this.started = true;
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
if (!this.started) {
|
|
return;
|
|
}
|
|
|
|
this.stopSweepTimer();
|
|
await this.persist();
|
|
this.registry.sessions.clear();
|
|
await this.options.transport.stop();
|
|
this.started = false;
|
|
}
|
|
|
|
handleDisconnect(identifier: string | null): void {
|
|
if (!identifier) {
|
|
return;
|
|
}
|
|
|
|
const existing = this.registry.sessions.get(identifier);
|
|
if (!existing) {
|
|
return;
|
|
}
|
|
|
|
const record = this.registry.clients.get(identifier);
|
|
if (record) {
|
|
record.status = "offline";
|
|
record.updatedAt = this.now();
|
|
}
|
|
|
|
this.registry.sessions.delete(identifier);
|
|
}
|
|
|
|
async handleMessage(connection: ClientConnection, raw: string): Promise<void> {
|
|
if (!isBuiltinMessage(raw)) {
|
|
// Handle rule message - rewrite and dispatch
|
|
await this.handleRuleMessage(connection, raw);
|
|
return;
|
|
}
|
|
|
|
let envelope: BuiltinEnvelope;
|
|
try {
|
|
envelope = decodeBuiltin(raw);
|
|
} catch (error) {
|
|
const message = error instanceof CodecError ? error.message : "Invalid builtin message";
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "MALFORMED_MESSAGE", message },
|
|
{ timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (envelope.type === "hello") {
|
|
await this.handleHello(connection, envelope as BuiltinEnvelope<"hello", HelloPayload>);
|
|
return;
|
|
}
|
|
|
|
if (envelope.type === "pair_confirm") {
|
|
await this.handlePairConfirm(
|
|
connection,
|
|
envelope as BuiltinEnvelope<"pair_confirm", PairConfirmPayload>
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (envelope.type === "auth_request") {
|
|
await this.handleAuthRequest(
|
|
connection,
|
|
envelope as BuiltinEnvelope<"auth_request", AuthRequestPayload>
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (envelope.type === "heartbeat") {
|
|
await this.handleHeartbeat(
|
|
connection,
|
|
envelope as BuiltinEnvelope<"heartbeat", HeartbeatPayload>
|
|
);
|
|
return;
|
|
}
|
|
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{
|
|
code: "MALFORMED_MESSAGE",
|
|
message: `Unsupported builtin type: ${String(envelope.type)}`
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
}
|
|
|
|
private async handleHello(
|
|
connection: ClientConnection,
|
|
envelope: BuiltinEnvelope<"hello", HelloPayload>
|
|
): Promise<void> {
|
|
const payload = envelope.payload;
|
|
if (!payload) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(buildError({ code: "MALFORMED_MESSAGE", message: "hello payload is required" }, { timestamp: this.now() }))
|
|
);
|
|
return;
|
|
}
|
|
|
|
const helloIdentifier = payload.identifier?.trim();
|
|
if (!helloIdentifier || !this.options.config.followerIdentifiers.includes(helloIdentifier)) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(buildError({ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" }, { timestamp: this.now() }))
|
|
);
|
|
return;
|
|
}
|
|
|
|
if (payload.protocolVersion !== YONEXUS_PROTOCOL_VERSION) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{
|
|
code: "UNSUPPORTED_PROTOCOL_VERSION",
|
|
message: `Unsupported protocol version: ${payload.protocolVersion}`
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
connection.ws.close(1002, "Unsupported protocol version");
|
|
return;
|
|
}
|
|
|
|
const record = this.ensureClientRecord(helloIdentifier);
|
|
record.updatedAt = this.now();
|
|
|
|
this.options.transport.assignIdentifierToTemp(connection.ws, helloIdentifier);
|
|
this.registry.sessions.set(helloIdentifier, {
|
|
identifier: helloIdentifier,
|
|
socket: connection.ws,
|
|
isAuthenticated: false,
|
|
connectedAt: connection.connectedAt,
|
|
lastActivityAt: this.now(),
|
|
publicKey: payload.publicKey?.trim() || undefined
|
|
});
|
|
|
|
const nextAction = this.determineNextAction(record);
|
|
this.options.transport.sendToConnection(
|
|
{ ...connection, identifier: helloIdentifier },
|
|
encodeBuiltin(
|
|
buildHelloAck(
|
|
{
|
|
identifier: helloIdentifier,
|
|
nextAction
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
|
|
if (nextAction === "pair_required" || nextAction === "waiting_pair_confirm") {
|
|
await this.beginPairing({
|
|
record,
|
|
connection: { ...connection, identifier: helloIdentifier },
|
|
requestId: envelope.requestId,
|
|
reusePending: nextAction === "waiting_pair_confirm"
|
|
});
|
|
}
|
|
|
|
await this.persist();
|
|
}
|
|
|
|
private async handlePairConfirm(
|
|
connection: ClientConnection,
|
|
envelope: BuiltinEnvelope<"pair_confirm", PairConfirmPayload>
|
|
): Promise<void> {
|
|
const payload = envelope.payload;
|
|
if (!payload) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "MALFORMED_MESSAGE", message: "pair_confirm payload is required" },
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const identifier = payload.identifier?.trim();
|
|
if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildPairFailed(
|
|
{
|
|
identifier: identifier || "unknown",
|
|
reason: "identifier_not_allowed"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const record = this.ensureClientRecord(identifier);
|
|
const submittedCode = payload.pairingCode?.trim();
|
|
if (!submittedCode) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "MALFORMED_MESSAGE", message: "pairingCode is required" },
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const result = this.pairingService.confirmPairing(record, submittedCode);
|
|
if (!result.success || !result.secret || !result.pairedAt) {
|
|
const reason = result.reason === "not_pending" ? "internal_error" : result.reason ?? "internal_error";
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildPairFailed(
|
|
{
|
|
identifier,
|
|
reason
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
|
|
await this.persist();
|
|
return;
|
|
}
|
|
|
|
if (connection.identifier !== identifier) {
|
|
this.options.transport.assignIdentifierToTemp(connection.ws, identifier);
|
|
}
|
|
|
|
const session = this.registry.sessions.get(identifier);
|
|
record.publicKey = session?.publicKey ?? record.publicKey;
|
|
record.updatedAt = this.now();
|
|
|
|
this.options.transport.sendToConnection(
|
|
{ ...connection, identifier },
|
|
encodeBuiltin(
|
|
buildPairSuccess(
|
|
{
|
|
identifier,
|
|
secret: result.secret,
|
|
pairedAt: result.pairedAt
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
|
|
await this.persist();
|
|
}
|
|
|
|
private async handleAuthRequest(
|
|
connection: ClientConnection,
|
|
envelope: BuiltinEnvelope<"auth_request", AuthRequestPayload>
|
|
): Promise<void> {
|
|
const payload = envelope.payload;
|
|
if (!payload) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "MALFORMED_MESSAGE", message: "auth_request payload is required" },
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const identifier = payload.identifier?.trim();
|
|
if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildAuthFailed(
|
|
{
|
|
identifier: identifier || "unknown",
|
|
reason: "unknown_identifier"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const record = this.ensureClientRecord(identifier);
|
|
const session = this.registry.sessions.get(identifier);
|
|
if (!session || !canAuthenticate(record) || !record.secret || !record.publicKey) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildAuthFailed(
|
|
{
|
|
identifier,
|
|
reason: "not_paired"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const now = this.now();
|
|
record.recentHandshakeAttempts = record.recentHandshakeAttempts.filter(
|
|
(timestamp) => now - timestamp < AUTH_ATTEMPT_WINDOW_SECONDS
|
|
);
|
|
record.recentHandshakeAttempts.push(now);
|
|
|
|
if (record.recentHandshakeAttempts.length > AUTH_MAX_ATTEMPTS_PER_WINDOW) {
|
|
await this.triggerRePairRequired(connection, record, envelope.requestId, "rate_limited");
|
|
return;
|
|
}
|
|
|
|
if (!isValidAuthNonce(payload.nonce)) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildAuthFailed(
|
|
{
|
|
identifier,
|
|
reason: "invalid_signature"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const freshness = isTimestampFresh(payload.proofTimestamp, now);
|
|
if (!freshness.ok) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildAuthFailed(
|
|
{
|
|
identifier,
|
|
reason: freshness.reason
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const hasNonceCollision = record.recentNonces.some((entry) => entry.nonce === payload.nonce);
|
|
if (hasNonceCollision) {
|
|
await this.triggerRePairRequired(connection, record, envelope.requestId, "nonce_collision");
|
|
return;
|
|
}
|
|
|
|
const publicKey = payload.publicKey?.trim() || session.publicKey || record.publicKey;
|
|
if (!publicKey || publicKey !== record.publicKey) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildAuthFailed(
|
|
{
|
|
identifier,
|
|
reason: "invalid_signature"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const isValidSignature = await verifySignature(
|
|
publicKey,
|
|
extractAuthRequestSigningInput(payload, record.secret),
|
|
payload.signature
|
|
);
|
|
|
|
if (!isValidSignature) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildAuthFailed(
|
|
{
|
|
identifier,
|
|
reason: "invalid_signature"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
record.recentNonces = [...record.recentNonces, { nonce: payload.nonce, timestamp: now }].slice(
|
|
-AUTH_RECENT_NONCE_WINDOW_SIZE
|
|
);
|
|
record.lastAuthenticatedAt = now;
|
|
record.lastHeartbeatAt = now;
|
|
record.status = "online";
|
|
record.updatedAt = now;
|
|
|
|
if (session) {
|
|
session.isAuthenticated = true;
|
|
session.lastActivityAt = now;
|
|
session.publicKey = publicKey;
|
|
}
|
|
this.options.transport.promoteToAuthenticated(identifier, connection.ws);
|
|
this.options.transport.sendToConnection(
|
|
{ ...connection, identifier },
|
|
encodeBuiltin(
|
|
buildAuthSuccess(
|
|
{
|
|
identifier,
|
|
authenticatedAt: now,
|
|
status: "online"
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
|
|
await this.persist();
|
|
}
|
|
|
|
private determineNextAction(record: ClientRecord): "pair_required" | "auth_required" | "waiting_pair_confirm" {
|
|
if (hasPendingPairing(record) && !isPairingExpired(record, this.now())) {
|
|
return "waiting_pair_confirm";
|
|
}
|
|
|
|
if (canAuthenticate(record)) {
|
|
return "auth_required";
|
|
}
|
|
|
|
return "pair_required";
|
|
}
|
|
|
|
private ensureClientRecord(identifier: string): ClientRecord {
|
|
const existing = this.registry.clients.get(identifier);
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
|
|
const created = createClientRecord(identifier);
|
|
this.registry.clients.set(identifier, created);
|
|
return created;
|
|
}
|
|
|
|
private async beginPairing(options: {
|
|
record: ClientRecord;
|
|
connection: ClientConnection;
|
|
requestId?: string;
|
|
reusePending?: boolean;
|
|
}): Promise<void> {
|
|
const { record, connection, requestId, reusePending = false } = options;
|
|
|
|
const request =
|
|
reusePending && hasPendingPairing(record) && !isPairingExpired(record, this.now())
|
|
? {
|
|
identifier: record.identifier,
|
|
pairingCode: record.pairingCode ?? "",
|
|
expiresAt: record.pairingExpiresAt ?? this.now(),
|
|
ttlSeconds: this.pairingService.getRemainingTtl(record),
|
|
createdAt: record.updatedAt
|
|
}
|
|
: this.pairingService.createPairingRequest(record);
|
|
|
|
const notified = reusePending
|
|
? record.pairingNotifyStatus === "sent"
|
|
: await this.notificationService.sendPairingNotification(request);
|
|
|
|
if (notified) {
|
|
this.pairingService.markNotificationSent(record);
|
|
} else {
|
|
this.pairingService.markNotificationFailed(record);
|
|
}
|
|
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildPairRequest(
|
|
{
|
|
identifier: record.identifier,
|
|
expiresAt: request.expiresAt,
|
|
ttlSeconds: this.pairingService.getRemainingTtl(record),
|
|
adminNotification: notified ? "sent" : "failed",
|
|
codeDelivery: "out_of_band"
|
|
},
|
|
{ requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
|
|
if (!notified) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildPairFailed(
|
|
{
|
|
identifier: record.identifier,
|
|
reason: "admin_notification_failed"
|
|
},
|
|
{ requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
this.pairingService.clearPairingState(record);
|
|
}
|
|
}
|
|
|
|
private async handleHeartbeat(
|
|
connection: ClientConnection,
|
|
envelope: BuiltinEnvelope<"heartbeat", HeartbeatPayload>
|
|
): Promise<void> {
|
|
const payload = envelope.payload;
|
|
if (!payload) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "MALFORMED_MESSAGE", message: "heartbeat payload is required" },
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const identifier = payload.identifier?.trim();
|
|
if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" },
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const session = this.registry.sessions.get(identifier);
|
|
if (!session || !session.isAuthenticated) {
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{ code: "AUTH_FAILED", message: "heartbeat requires authentication" },
|
|
{ requestId: envelope.requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
return;
|
|
}
|
|
|
|
const record = this.ensureClientRecord(identifier);
|
|
const now = this.now();
|
|
record.lastHeartbeatAt = now;
|
|
record.status = "online";
|
|
record.updatedAt = now;
|
|
session.lastActivityAt = now;
|
|
|
|
this.options.transport.sendToConnection(
|
|
{ ...connection, identifier },
|
|
encodeBuiltin(
|
|
buildHeartbeatAck(
|
|
{
|
|
identifier,
|
|
status: record.status
|
|
},
|
|
{ requestId: envelope.requestId, timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
|
|
await this.persist();
|
|
}
|
|
|
|
private startSweepTimer(): void {
|
|
this.stopSweepTimer();
|
|
this.sweepTimer = setInterval(() => {
|
|
void this.runLivenessSweep();
|
|
}, this.sweepIntervalMs);
|
|
}
|
|
|
|
private stopSweepTimer(): void {
|
|
if (!this.sweepTimer) {
|
|
return;
|
|
}
|
|
|
|
clearInterval(this.sweepTimer);
|
|
this.sweepTimer = null;
|
|
}
|
|
|
|
private async runLivenessSweep(): Promise<void> {
|
|
const now = this.now();
|
|
let hasChanges = false;
|
|
|
|
for (const record of this.registry.clients.values()) {
|
|
const nextStatus = this.getLivenessStatus(record, now);
|
|
if (!nextStatus || nextStatus === record.status) {
|
|
continue;
|
|
}
|
|
|
|
record.status = nextStatus;
|
|
record.updatedAt = now;
|
|
hasChanges = true;
|
|
|
|
if (nextStatus === "unstable") {
|
|
this.options.transport.send(
|
|
record.identifier,
|
|
encodeBuiltin(
|
|
buildStatusUpdate(
|
|
{
|
|
identifier: record.identifier,
|
|
status: "unstable",
|
|
reason: "heartbeat_timeout_7m"
|
|
},
|
|
{ timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
continue;
|
|
}
|
|
|
|
if (nextStatus === "offline") {
|
|
this.options.transport.send(
|
|
record.identifier,
|
|
encodeBuiltin(
|
|
buildDisconnectNotice(
|
|
{
|
|
identifier: record.identifier,
|
|
reason: "heartbeat_timeout_11m"
|
|
},
|
|
{ timestamp: now }
|
|
)
|
|
)
|
|
);
|
|
this.options.transport.closeConnection(record.identifier, 1001, "Heartbeat timeout");
|
|
this.registry.sessions.delete(record.identifier);
|
|
}
|
|
}
|
|
|
|
if (hasChanges) {
|
|
await this.persist();
|
|
}
|
|
}
|
|
|
|
private getLivenessStatus(
|
|
record: ClientRecord,
|
|
now: number
|
|
): "online" | "unstable" | "offline" | null {
|
|
const session = this.registry.sessions.get(record.identifier);
|
|
if (!session || !session.isAuthenticated || !record.lastHeartbeatAt) {
|
|
return null;
|
|
}
|
|
|
|
const silenceSeconds = now - record.lastHeartbeatAt;
|
|
if (silenceSeconds >= 11 * 60) {
|
|
return "offline";
|
|
}
|
|
|
|
if (silenceSeconds >= 7 * 60) {
|
|
return "unstable";
|
|
}
|
|
|
|
return "online";
|
|
}
|
|
|
|
private async triggerRePairRequired(
|
|
connection: ClientConnection,
|
|
record: ClientRecord,
|
|
requestId: string | undefined,
|
|
reason: "nonce_collision" | "rate_limited"
|
|
): Promise<void> {
|
|
record.secret = undefined;
|
|
record.pairingStatus = "revoked";
|
|
record.pairingCode = undefined;
|
|
record.pairingExpiresAt = undefined;
|
|
record.pairingNotifyStatus = undefined;
|
|
record.recentNonces = [];
|
|
record.recentHandshakeAttempts = [];
|
|
record.status = "offline";
|
|
record.updatedAt = this.now();
|
|
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildRePairRequired(
|
|
{
|
|
identifier: record.identifier,
|
|
reason
|
|
},
|
|
{ requestId, timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
|
|
await this.persist();
|
|
}
|
|
|
|
private async persist(): Promise<void> {
|
|
await this.options.store.save(this.registry.clients.values());
|
|
}
|
|
|
|
/**
|
|
* Send a rule message to a specific client.
|
|
*
|
|
* @param identifier - The target client identifier
|
|
* @param message - The complete rule message with identifier and content
|
|
* @returns True if message was sent, false if client not connected/authenticated
|
|
*/
|
|
sendMessageToClient(identifier: string, message: string): boolean {
|
|
const session = this.registry.sessions.get(identifier);
|
|
if (!session || !session.isAuthenticated) {
|
|
return false;
|
|
}
|
|
|
|
// Validate the message is a properly formatted rule message
|
|
try {
|
|
// Quick check: must not be a builtin message and must have :: delimiter
|
|
if (message.startsWith("builtin::")) {
|
|
return false;
|
|
}
|
|
const delimiterIndex = message.indexOf("::");
|
|
if (delimiterIndex === -1) {
|
|
return false;
|
|
}
|
|
parseRuleMessage(message);
|
|
} catch {
|
|
return false;
|
|
}
|
|
|
|
return this.options.transport.send(identifier, message);
|
|
}
|
|
|
|
/**
|
|
* Send a rule message to a specific client using separate rule identifier and content.
|
|
*
|
|
* @param identifier - The target client identifier
|
|
* @param ruleIdentifier - The rule identifier
|
|
* @param content - The message content
|
|
* @returns True if message was sent, false if client not connected/authenticated or invalid format
|
|
*/
|
|
sendRuleMessageToClient(identifier: string, ruleIdentifier: string, content: string): boolean {
|
|
const session = this.registry.sessions.get(identifier);
|
|
if (!session || !session.isAuthenticated) {
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
const encoded = encodeRuleMessage(ruleIdentifier, content);
|
|
return this.options.transport.send(identifier, encoded);
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle incoming rule message from a client.
|
|
* Rewrites the message to include sender identifier before dispatch.
|
|
*
|
|
* @param connection - The client connection
|
|
* @param raw - The raw rule message
|
|
*/
|
|
private async handleRuleMessage(connection: ClientConnection, raw: string): Promise<void> {
|
|
// Get sender identifier from connection or session
|
|
let senderIdentifier = connection.identifier;
|
|
if (!senderIdentifier) {
|
|
// Try to find identifier from WebSocket
|
|
for (const [id, session] of this.registry.sessions.entries()) {
|
|
if (session.socket === connection.ws) {
|
|
senderIdentifier = id;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!senderIdentifier) {
|
|
// Cannot determine sender - close connection
|
|
connection.ws.close(1008, "Cannot identify sender");
|
|
return;
|
|
}
|
|
|
|
const session = this.registry.sessions.get(senderIdentifier);
|
|
if (!session || !session.isAuthenticated) {
|
|
// Only accept rule messages from authenticated clients
|
|
connection.ws.close(1008, "Not authenticated");
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const parsed = parseRuleMessage(raw);
|
|
const rewritten = `${parsed.ruleIdentifier}::${senderIdentifier}::${parsed.content}`;
|
|
|
|
// TODO: Dispatch to registered rules via rule registry
|
|
// For now, just log the rewritten message
|
|
// this.ruleRegistry.dispatch(rewritten);
|
|
|
|
// Update last activity
|
|
session.lastActivityAt = this.now();
|
|
|
|
// Future: dispatch to rule registry
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
|
void rewritten;
|
|
} catch (error) {
|
|
// Malformed rule message
|
|
this.options.transport.sendToConnection(
|
|
connection,
|
|
encodeBuiltin(
|
|
buildError(
|
|
{
|
|
code: "MALFORMED_MESSAGE",
|
|
message: safeErrorMessage(error) || "Invalid rule message format"
|
|
},
|
|
{ timestamp: this.now() }
|
|
)
|
|
)
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
export function createYonexusServerRuntime(
|
|
options: YonexusServerRuntimeOptions
|
|
): YonexusServerRuntime {
|
|
return new YonexusServerRuntime(options);
|
|
}
|