Handle heartbeat builtin messages
This commit is contained in:
@@ -9,6 +9,7 @@ import {
|
||||
buildAuthFailed,
|
||||
buildAuthSuccess,
|
||||
buildError,
|
||||
buildHeartbeatAck,
|
||||
buildHelloAck,
|
||||
buildPairFailed,
|
||||
buildPairRequest,
|
||||
@@ -20,7 +21,8 @@ import {
|
||||
isBuiltinMessage,
|
||||
isTimestampFresh,
|
||||
isValidAuthNonce,
|
||||
type AuthRequestPayload
|
||||
type AuthRequestPayload,
|
||||
type HeartbeatPayload
|
||||
} from "../../../Yonexus.Protocol/src/index.js";
|
||||
import type { YonexusServerConfig } from "./config.js";
|
||||
import {
|
||||
@@ -156,6 +158,14 @@ export class YonexusServerRuntime {
|
||||
connection,
|
||||
envelope as BuiltinEnvelope<"auth_request", AuthRequestPayload>
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (envelope.type === "heartbeat") {
|
||||
await this.handleHeartbeat(
|
||||
connection,
|
||||
envelope as BuiltinEnvelope<"heartbeat", HeartbeatPayload>
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -590,6 +600,75 @@ export class YonexusServerRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
private async handleHeartbeat(
|
||||
connection: ClientConnection,
|
||||
envelope: BuiltinEnvelope<"heartbeat", HeartbeatPayload>
|
||||
): Promise<void> {
|
||||
const payload = envelope.payload;
|
||||
if (!payload) {
|
||||
this.options.transport.sendToConnection(
|
||||
connection,
|
||||
encodeBuiltin(
|
||||
buildError(
|
||||
{ code: "MALFORMED_MESSAGE", message: "heartbeat payload is required" },
|
||||
{ requestId: envelope.requestId, timestamp: this.now() }
|
||||
)
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const identifier = payload.identifier?.trim();
|
||||
if (!identifier || !this.options.config.followerIdentifiers.includes(identifier)) {
|
||||
this.options.transport.sendToConnection(
|
||||
connection,
|
||||
encodeBuiltin(
|
||||
buildError(
|
||||
{ code: "IDENTIFIER_NOT_ALLOWED", message: "identifier is not allowed" },
|
||||
{ requestId: envelope.requestId, timestamp: this.now() }
|
||||
)
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const session = this.registry.sessions.get(identifier);
|
||||
if (!session || !session.isAuthenticated) {
|
||||
this.options.transport.sendToConnection(
|
||||
connection,
|
||||
encodeBuiltin(
|
||||
buildError(
|
||||
{ code: "AUTH_FAILED", message: "heartbeat requires authentication" },
|
||||
{ requestId: envelope.requestId, timestamp: this.now() }
|
||||
)
|
||||
)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const record = this.ensureClientRecord(identifier);
|
||||
const now = this.now();
|
||||
record.lastHeartbeatAt = now;
|
||||
record.status = "online";
|
||||
record.updatedAt = now;
|
||||
session.lastActivityAt = now;
|
||||
|
||||
this.options.transport.sendToConnection(
|
||||
{ ...connection, identifier },
|
||||
encodeBuiltin(
|
||||
buildHeartbeatAck(
|
||||
{
|
||||
identifier,
|
||||
status: record.status
|
||||
},
|
||||
{ requestId: envelope.requestId, timestamp: now }
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
await this.persist();
|
||||
}
|
||||
|
||||
private async triggerRePairRequired(
|
||||
connection: ClientConnection,
|
||||
record: ClientRecord,
|
||||
|
||||
Reference in New Issue
Block a user