188 lines
5.1 KiB
TypeScript
188 lines
5.1 KiB
TypeScript
import {
|
|
Body,
|
|
ConflictException,
|
|
Controller,
|
|
DefaultValuePipe,
|
|
ForbiddenException,
|
|
Get,
|
|
Headers,
|
|
HttpException,
|
|
NotFoundException,
|
|
Param,
|
|
ParseIntPipe,
|
|
Patch,
|
|
Post,
|
|
Query,
|
|
Req,
|
|
} from '@nestjs/common';
|
|
import { InjectRepository } from '@nestjs/typeorm';
|
|
import { Repository } from 'typeorm';
|
|
import * as bcrypt from 'bcryptjs';
|
|
import { randomBytes } from 'crypto';
|
|
import { GuildNode } from '../entities/guild-node.entity';
|
|
import { AuditService } from '../audit/audit.service';
|
|
import { RegisterNodeDto } from './dto.register-node.dto';
|
|
import { UpdateNodeStatusDto } from './dto.update-node-status.dto';
|
|
import { FABRIC_PROTOCOL_VERSION, normalizeVersion } from '../common/version';
|
|
|
|
@Controller('nodes')
|
|
export class NodesController {
|
|
constructor(
|
|
@InjectRepository(GuildNode)
|
|
private readonly nodeRepo: Repository<GuildNode>,
|
|
private readonly audit: AuditService,
|
|
) {}
|
|
|
|
@Post('register')
|
|
async register(
|
|
@Req() req: { ip?: string; socket?: { remoteAddress?: string } },
|
|
@Body() body: RegisterNodeDto,
|
|
@Headers('x-fabric-version') fabricVersion?: string,
|
|
) {
|
|
const remoteAddress = (req.ip ?? req.socket?.remoteAddress ?? '').toLowerCase();
|
|
const isLoopback =
|
|
remoteAddress === '127.0.0.1' ||
|
|
remoteAddress === '::1' ||
|
|
remoteAddress === '::ffff:127.0.0.1';
|
|
if (!isLoopback) {
|
|
throw new ForbiddenException('register endpoint only allows localhost caller');
|
|
}
|
|
|
|
const requestedVersion = normalizeVersion(fabricVersion);
|
|
if (requestedVersion !== FABRIC_PROTOCOL_VERSION) {
|
|
throw new HttpException(
|
|
{
|
|
error: {
|
|
code: 'FABRIC_VERSION_NOT_SUPPORTED',
|
|
message: `unsupported protocol version: ${requestedVersion}`,
|
|
retryable: false,
|
|
},
|
|
supportedVersion: FABRIC_PROTOCOL_VERSION,
|
|
},
|
|
400,
|
|
);
|
|
}
|
|
|
|
const existedByNodeId = await this.nodeRepo.findOne({
|
|
where: { nodeId: body.nodeId },
|
|
});
|
|
if (existedByNodeId) {
|
|
throw new ConflictException('nodeId already exists');
|
|
}
|
|
|
|
const existedByEndpoint = await this.nodeRepo.findOne({
|
|
where: { endpoint: body.endpoint },
|
|
});
|
|
if (existedByEndpoint) {
|
|
throw new ConflictException('endpoint already exists');
|
|
}
|
|
|
|
const node = this.nodeRepo.create({
|
|
nodeId: body.nodeId,
|
|
name: body.name,
|
|
endpoint: body.endpoint,
|
|
status: 'active',
|
|
apiKeyHash: null,
|
|
});
|
|
const rawApiKey = `gk_${randomBytes(24).toString('hex')}`;
|
|
node.apiKeyHash = await bcrypt.hash(rawApiKey, 10);
|
|
const saved = await this.nodeRepo.save(node);
|
|
await this.audit.write({
|
|
action: 'node.register',
|
|
targetType: 'node',
|
|
targetId: saved.nodeId,
|
|
detail: JSON.stringify({ endpoint: saved.endpoint }),
|
|
});
|
|
|
|
return {
|
|
status: 'accepted',
|
|
negotiatedVersion: FABRIC_PROTOCOL_VERSION,
|
|
node: {
|
|
id: saved.id,
|
|
nodeId: saved.nodeId,
|
|
name: saved.name,
|
|
endpoint: saved.endpoint,
|
|
status: saved.status,
|
|
},
|
|
apiKey: rawApiKey,
|
|
};
|
|
}
|
|
|
|
@Post(':nodeId/heartbeat')
|
|
async heartbeat(@Param('nodeId') nodeId: string) {
|
|
const node = await this.nodeRepo.findOne({ where: { nodeId } });
|
|
if (!node) {
|
|
throw new NotFoundException('node not found');
|
|
}
|
|
|
|
node.lastHeartbeatAt = new Date();
|
|
if (node.status !== 'revoked') {
|
|
node.status = 'active';
|
|
}
|
|
|
|
const saved = await this.nodeRepo.save(node);
|
|
await this.audit.write({
|
|
action: 'node.heartbeat',
|
|
targetType: 'node',
|
|
targetId: saved.nodeId,
|
|
detail: JSON.stringify({ status: saved.status }),
|
|
});
|
|
return {
|
|
status: 'ok',
|
|
nodeId: saved.nodeId,
|
|
nodeStatus: saved.status,
|
|
lastHeartbeatAt: saved.lastHeartbeatAt,
|
|
};
|
|
}
|
|
|
|
@Patch(':nodeId/status')
|
|
async updateStatus(
|
|
@Param('nodeId') nodeId: string,
|
|
@Body() body: UpdateNodeStatusDto,
|
|
) {
|
|
const node = await this.nodeRepo.findOne({ where: { nodeId } });
|
|
if (!node) {
|
|
throw new NotFoundException('node not found');
|
|
}
|
|
|
|
node.status = body.status;
|
|
const saved = await this.nodeRepo.save(node);
|
|
await this.audit.write({
|
|
action: 'node.status.update',
|
|
targetType: 'node',
|
|
targetId: saved.nodeId,
|
|
detail: JSON.stringify({ status: saved.status }),
|
|
});
|
|
return {
|
|
id: saved.id,
|
|
nodeId: saved.nodeId,
|
|
name: saved.name,
|
|
endpoint: saved.endpoint,
|
|
status: saved.status,
|
|
};
|
|
}
|
|
|
|
@Get()
|
|
async list(
|
|
@Query('page', new DefaultValuePipe(1), ParseIntPipe) page: number,
|
|
@Query('pageSize', new DefaultValuePipe(20), ParseIntPipe) pageSize: number,
|
|
) {
|
|
const safePage = page < 1 ? 1 : page;
|
|
const safePageSize = pageSize < 1 ? 20 : Math.min(pageSize, 100);
|
|
|
|
const [items, total] = await this.nodeRepo.findAndCount({
|
|
order: { createdAt: 'DESC' },
|
|
skip: (safePage - 1) * safePageSize,
|
|
take: safePageSize,
|
|
});
|
|
|
|
return {
|
|
items,
|
|
page: safePage,
|
|
pageSize: safePageSize,
|
|
total,
|
|
totalPages: Math.max(1, Math.ceil(total / safePageSize)),
|
|
};
|
|
}
|
|
}
|