feat: bootstrap from Fabric monorepo
This commit is contained in:
59
src/messaging/dto.create-message.dto.ts
Normal file
59
src/messaging/dto.create-message.dto.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import {
|
||||
ArrayMaxSize,
|
||||
IsArray,
|
||||
IsOptional,
|
||||
IsString,
|
||||
MaxLength,
|
||||
ValidateNested,
|
||||
} from 'class-validator';
|
||||
import { Type } from 'class-transformer';
|
||||
|
||||
class AttachmentDto {
|
||||
@IsString()
|
||||
@MaxLength(2048)
|
||||
url!: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@MaxLength(255)
|
||||
name?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@MaxLength(100)
|
||||
mimeType?: string;
|
||||
}
|
||||
|
||||
export class CreateMessageDto {
|
||||
@IsString()
|
||||
@MaxLength(4000)
|
||||
content!: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@MaxLength(80)
|
||||
clientMessageId?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@MaxLength(80)
|
||||
replyToMessageId?: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsArray()
|
||||
@ArrayMaxSize(50)
|
||||
@IsString({ each: true })
|
||||
mentions?: string[];
|
||||
|
||||
@IsOptional()
|
||||
@IsArray()
|
||||
@ArrayMaxSize(10)
|
||||
@ValidateNested({ each: true })
|
||||
@Type(() => AttachmentDto)
|
||||
attachments?: AttachmentDto[];
|
||||
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@MaxLength(64)
|
||||
authorUserId?: string;
|
||||
}
|
||||
278
src/messaging/messaging.controller.ts
Normal file
278
src/messaging/messaging.controller.ts
Normal file
@@ -0,0 +1,278 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Delete,
|
||||
Get,
|
||||
Headers,
|
||||
NotFoundException,
|
||||
Param,
|
||||
Patch,
|
||||
Post,
|
||||
Query,
|
||||
} from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { DataSource, Repository } from 'typeorm';
|
||||
import { CreateMessageDto } from './dto.create-message.dto';
|
||||
import { Channel } from '../entities/channel.entity';
|
||||
import { Message } from '../entities/message.entity';
|
||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
|
||||
import { EventsService } from '../events/events.service';
|
||||
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
|
||||
import { RealtimeGateway } from '../realtime/realtime.gateway';
|
||||
|
||||
const EDIT_WINDOW_MS = 15 * 60 * 1000;
|
||||
const DEFAULT_PAGE_LIMIT = 50;
|
||||
const MAX_PAGE_LIMIT = 200;
|
||||
|
||||
@Controller('channels/:id/messages')
|
||||
export class MessagingController {
|
||||
constructor(
|
||||
private readonly dataSource: DataSource,
|
||||
@InjectRepository(Channel)
|
||||
private readonly channelRepo: Repository<Channel>,
|
||||
@InjectRepository(Message)
|
||||
private readonly messageRepo: Repository<Message>,
|
||||
@InjectRepository(IdempotencyRecord)
|
||||
private readonly idemRepo: Repository<IdempotencyRecord>,
|
||||
private readonly events: EventsService,
|
||||
private readonly realtime: RealtimeGateway,
|
||||
) {}
|
||||
|
||||
private async getIdempotentResponse(
|
||||
scope: string,
|
||||
idempotencyKey?: string,
|
||||
): Promise<Record<string, unknown> | null> {
|
||||
if (!idempotencyKey) return null;
|
||||
const row = await this.idemRepo.findOne({ where: { scope, idempotencyKey } });
|
||||
return row?.responseBody ?? null;
|
||||
}
|
||||
|
||||
private async saveIdempotentResponse(
|
||||
scope: string,
|
||||
idempotencyKey: string | undefined,
|
||||
responseBody: Record<string, unknown>,
|
||||
): Promise<void> {
|
||||
if (!idempotencyKey) return;
|
||||
const row = this.idemRepo.create({
|
||||
scope,
|
||||
idempotencyKey,
|
||||
responseBody,
|
||||
});
|
||||
await this.idemRepo.save(row);
|
||||
}
|
||||
|
||||
private toView(m: Message) {
|
||||
return {
|
||||
messageId: m.messageId,
|
||||
seq: m.seq,
|
||||
content: m.content,
|
||||
authorUserId: m.authorUserId,
|
||||
replyToMessageId: m.replyToMessageId,
|
||||
mentions: m.mentions ?? [],
|
||||
attachments: m.attachments ?? [],
|
||||
createdAt: m.createdAt.toISOString(),
|
||||
editedAt: m.editedAt ? m.editedAt.toISOString() : null,
|
||||
deletedAt: m.deletedAt ? m.deletedAt.toISOString() : null,
|
||||
isDeleted: m.isDeleted,
|
||||
};
|
||||
}
|
||||
|
||||
@Post()
|
||||
async create(
|
||||
@Param('id') channelId: string,
|
||||
@Body() body: CreateMessageDto,
|
||||
@Headers('idempotency-key') idempotencyKey?: string,
|
||||
) {
|
||||
const scope = `POST:/channels/${channelId}/messages`;
|
||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||
if (existed) return existed;
|
||||
|
||||
const message = await this.dataSource.transaction(async (manager) => {
|
||||
const channel = await manager.findOne(Channel, {
|
||||
where: { id: channelId },
|
||||
lock: { mode: 'pessimistic_write' },
|
||||
});
|
||||
if (!channel) {
|
||||
throw new NotFoundException('channel not found');
|
||||
}
|
||||
|
||||
const nextSeq = channel.lastSeq + 1;
|
||||
channel.lastSeq = nextSeq;
|
||||
await manager.save(Channel, channel);
|
||||
|
||||
const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`;
|
||||
const row = manager.create(Message, {
|
||||
messageId,
|
||||
channelId,
|
||||
conversationId: null,
|
||||
authorUserId: body.authorUserId ?? 'anonymous',
|
||||
seq: nextSeq,
|
||||
content: body.content,
|
||||
replyToMessageId: body.replyToMessageId ?? null,
|
||||
mentions: body.mentions ?? [],
|
||||
attachments: body.attachments ?? [],
|
||||
editedAt: null,
|
||||
deletedAt: null,
|
||||
isDeleted: false,
|
||||
});
|
||||
return manager.save(Message, row);
|
||||
});
|
||||
|
||||
const responseBody = this.toView(message) as Record<string, unknown>;
|
||||
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
|
||||
|
||||
await this.events.emit({
|
||||
eventType: 'message.created',
|
||||
channelId,
|
||||
actorId: body.authorUserId ?? 'anonymous',
|
||||
data: responseBody,
|
||||
});
|
||||
this.realtime.emitChannelEvent(channelId, 'message.created', responseBody);
|
||||
|
||||
return responseBody;
|
||||
}
|
||||
|
||||
@Patch(':messageId')
|
||||
async edit(
|
||||
@Param('id') channelId: string,
|
||||
@Param('messageId') messageId: string,
|
||||
@Body() body: { content?: string },
|
||||
@Headers('idempotency-key') idempotencyKey?: string,
|
||||
) {
|
||||
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
|
||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||
if (existed) return existed;
|
||||
|
||||
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
||||
if (!item) return { status: 'not_found' };
|
||||
|
||||
const now = Date.now();
|
||||
const createdAt = new Date(item.createdAt).getTime();
|
||||
if (now - createdAt > EDIT_WINDOW_MS) {
|
||||
return { status: 'edit_window_expired', messageId };
|
||||
}
|
||||
|
||||
item.content = body.content ?? item.content;
|
||||
item.editedAt = new Date();
|
||||
const saved = await this.messageRepo.save(item);
|
||||
const responseBody = this.toView(saved) as Record<string, unknown>;
|
||||
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
|
||||
|
||||
await this.events.emit({
|
||||
eventType: 'message.updated',
|
||||
channelId,
|
||||
actorId: saved.authorUserId,
|
||||
data: responseBody,
|
||||
});
|
||||
this.realtime.emitChannelEvent(channelId, 'message.updated', responseBody);
|
||||
|
||||
return responseBody;
|
||||
}
|
||||
|
||||
@Delete(':messageId')
|
||||
async remove(
|
||||
@Param('id') channelId: string,
|
||||
@Param('messageId') messageId: string,
|
||||
@Headers('idempotency-key') idempotencyKey?: string,
|
||||
) {
|
||||
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
|
||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||
if (existed) return existed;
|
||||
|
||||
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
||||
if (!item) return { status: 'not_found' };
|
||||
|
||||
item.isDeleted = true;
|
||||
item.deletedAt = new Date();
|
||||
item.content = '[deleted]';
|
||||
item.mentions = [];
|
||||
item.attachments = [];
|
||||
await this.messageRepo.save(item);
|
||||
|
||||
const responseBody = {
|
||||
status: 'deleted',
|
||||
mode: 'soft',
|
||||
messageId,
|
||||
} as Record<string, unknown>;
|
||||
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
|
||||
|
||||
await this.events.emit({
|
||||
eventType: 'message.deleted',
|
||||
channelId,
|
||||
actorId: item.authorUserId,
|
||||
data: {
|
||||
messageId,
|
||||
seq: item.seq,
|
||||
deletedAt: item.deletedAt?.toISOString() ?? null,
|
||||
},
|
||||
});
|
||||
this.realtime.emitChannelEvent(channelId, 'message.deleted', {
|
||||
messageId,
|
||||
seq: item.seq,
|
||||
deletedAt: item.deletedAt?.toISOString() ?? null,
|
||||
});
|
||||
|
||||
return responseBody;
|
||||
}
|
||||
|
||||
@Get()
|
||||
async listBySeq(
|
||||
@Param('id') channelId: string,
|
||||
@Query('seq_from') seqFrom?: string,
|
||||
@Query('seq_to') seqTo?: string,
|
||||
@Query('limit') limit?: string,
|
||||
) {
|
||||
const from = seqFrom ? Number(seqFrom) : 1;
|
||||
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
|
||||
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
|
||||
|
||||
if (from > to) {
|
||||
return {
|
||||
items: [],
|
||||
page: {
|
||||
seqFrom: from,
|
||||
seqTo: to,
|
||||
limit: safeLimit,
|
||||
returned: 0,
|
||||
hasMore: false,
|
||||
nextExpectedSeq: from,
|
||||
highestCommittedSeq: 0,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||
if (!channel) {
|
||||
throw new NotFoundException('channel not found');
|
||||
}
|
||||
|
||||
const qb = this.messageRepo
|
||||
.createQueryBuilder('m')
|
||||
.where('m.channelId = :channelId', { channelId })
|
||||
.andWhere('m.seq >= :from', { from })
|
||||
.andWhere('m.seq <= :to', { to })
|
||||
.orderBy('m.seq', 'ASC');
|
||||
|
||||
const total = await qb.getCount();
|
||||
const rows = await qb.limit(safeLimit).getMany();
|
||||
const items = rows.map((m) => this.toView(m));
|
||||
|
||||
const nextExpectedSeq = computeNextExpectedSeq(
|
||||
from,
|
||||
rows.map((row) => row.seq),
|
||||
);
|
||||
|
||||
return {
|
||||
items,
|
||||
page: {
|
||||
seqFrom: from,
|
||||
seqTo: to,
|
||||
limit: safeLimit,
|
||||
returned: items.length,
|
||||
hasMore: total > items.length,
|
||||
nextExpectedSeq,
|
||||
highestCommittedSeq: channel.lastSeq,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
12
src/messaging/messaging.module.ts
Normal file
12
src/messaging/messaging.module.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { MessagingController } from './messaging.controller';
|
||||
import { Channel } from '../entities/channel.entity';
|
||||
import { Message } from '../entities/message.entity';
|
||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
|
||||
|
||||
@Module({
|
||||
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord])],
|
||||
controllers: [MessagingController],
|
||||
})
|
||||
export class MessagingModule {}
|
||||
15
src/messaging/pagination.util.spec.ts
Normal file
15
src/messaging/pagination.util.spec.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
|
||||
|
||||
describe('pagination utils', () => {
|
||||
it('clamps limit safely', () => {
|
||||
expect(clampLimit(undefined, 50, 200)).toBe(50);
|
||||
expect(clampLimit('500', 50, 200)).toBe(200);
|
||||
expect(clampLimit('-1', 50, 200)).toBe(50);
|
||||
});
|
||||
|
||||
it('computes next expected seq', () => {
|
||||
expect(computeNextExpectedSeq(1, [1, 2, 3])).toBe(4);
|
||||
expect(computeNextExpectedSeq(1, [1, 3, 4])).toBe(2);
|
||||
});
|
||||
});
|
||||
14
src/messaging/pagination.util.ts
Normal file
14
src/messaging/pagination.util.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
export function clampLimit(input: string | undefined, defaultLimit: number, maxLimit: number): number {
|
||||
const requested = input ? Number(input) : defaultLimit;
|
||||
if (!Number.isFinite(requested) || requested <= 0) return defaultLimit;
|
||||
return Math.min(requested, maxLimit);
|
||||
}
|
||||
|
||||
export function computeNextExpectedSeq(from: number, seqs: number[]): number {
|
||||
let next = from;
|
||||
for (const seq of seqs) {
|
||||
if (seq > next) break;
|
||||
if (seq === next) next += 1;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
Reference in New Issue
Block a user