import { Body, Controller, Delete, Get, Headers, NotFoundException, Param, Patch, Post, Query, } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { DataSource, Repository } from 'typeorm'; import { CreateMessageDto } from './dto.create-message.dto'; import { Channel } from '../entities/channel.entity'; import { Message } from '../entities/message.entity'; import { IdempotencyRecord } from '../entities/idempotency-record.entity'; import { EventsService } from '../events/events.service'; import { clampLimit, computeNextExpectedSeq } from './pagination.util'; import { RealtimeGateway } from '../realtime/realtime.gateway'; const EDIT_WINDOW_MS = 15 * 60 * 1000; const DEFAULT_PAGE_LIMIT = 50; const MAX_PAGE_LIMIT = 200; @Controller('channels/:id/messages') export class MessagingController { constructor( private readonly dataSource: DataSource, @InjectRepository(Channel) private readonly channelRepo: Repository, @InjectRepository(Message) private readonly messageRepo: Repository, @InjectRepository(IdempotencyRecord) private readonly idemRepo: Repository, private readonly events: EventsService, private readonly realtime: RealtimeGateway, ) {} private async getIdempotentResponse( scope: string, idempotencyKey?: string, ): Promise | null> { if (!idempotencyKey) return null; const row = await this.idemRepo.findOne({ where: { scope, idempotencyKey } }); return row?.responseBody ?? null; } private async saveIdempotentResponse( scope: string, idempotencyKey: string | undefined, responseBody: Record, ): Promise { if (!idempotencyKey) return; const row = this.idemRepo.create({ scope, idempotencyKey, responseBody, }); await this.idemRepo.save(row); } private toView(m: Message) { return { messageId: m.messageId, seq: m.seq, content: m.content, authorUserId: m.authorUserId, replyToMessageId: m.replyToMessageId, mentions: m.mentions ?? [], attachments: m.attachments ?? [], createdAt: m.createdAt.toISOString(), editedAt: m.editedAt ? m.editedAt.toISOString() : null, deletedAt: m.deletedAt ? m.deletedAt.toISOString() : null, isDeleted: m.isDeleted, }; } @Post() async create( @Param('id') channelId: string, @Body() body: CreateMessageDto, @Headers('idempotency-key') idempotencyKey?: string, ) { const scope = `POST:/channels/${channelId}/messages`; const existed = await this.getIdempotentResponse(scope, idempotencyKey); if (existed) return existed; const message = await this.dataSource.transaction(async (manager) => { const channel = await manager.findOne(Channel, { where: { id: channelId }, lock: { mode: 'pessimistic_write' }, }); if (!channel) { throw new NotFoundException('channel not found'); } const nextSeq = channel.lastSeq + 1; channel.lastSeq = nextSeq; await manager.save(Channel, channel); const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`; const row = manager.create(Message, { messageId, channelId, conversationId: null, authorUserId: body.authorUserId ?? 'anonymous', seq: nextSeq, content: body.content, replyToMessageId: body.replyToMessageId ?? null, mentions: body.mentions ?? [], attachments: body.attachments ?? [], editedAt: null, deletedAt: null, isDeleted: false, }); return manager.save(Message, row); }); const responseBody = this.toView(message) as Record; await this.saveIdempotentResponse(scope, idempotencyKey, responseBody); await this.events.emit({ eventType: 'message.created', channelId, actorId: body.authorUserId ?? 'anonymous', data: responseBody, }); this.realtime.emitChannelEvent(channelId, 'message.created', responseBody); return responseBody; } @Patch(':messageId') async edit( @Param('id') channelId: string, @Param('messageId') messageId: string, @Body() body: { content?: string }, @Headers('idempotency-key') idempotencyKey?: string, ) { const scope = `PATCH:/channels/${channelId}/messages/${messageId}`; const existed = await this.getIdempotentResponse(scope, idempotencyKey); if (existed) return existed; const item = await this.messageRepo.findOne({ where: { channelId, messageId } }); if (!item) return { status: 'not_found' }; const now = Date.now(); const createdAt = new Date(item.createdAt).getTime(); if (now - createdAt > EDIT_WINDOW_MS) { return { status: 'edit_window_expired', messageId }; } item.content = body.content ?? item.content; item.editedAt = new Date(); const saved = await this.messageRepo.save(item); const responseBody = this.toView(saved) as Record; await this.saveIdempotentResponse(scope, idempotencyKey, responseBody); await this.events.emit({ eventType: 'message.updated', channelId, actorId: saved.authorUserId, data: responseBody, }); this.realtime.emitChannelEvent(channelId, 'message.updated', responseBody); return responseBody; } @Delete(':messageId') async remove( @Param('id') channelId: string, @Param('messageId') messageId: string, @Headers('idempotency-key') idempotencyKey?: string, ) { const scope = `DELETE:/channels/${channelId}/messages/${messageId}`; const existed = await this.getIdempotentResponse(scope, idempotencyKey); if (existed) return existed; const item = await this.messageRepo.findOne({ where: { channelId, messageId } }); if (!item) return { status: 'not_found' }; item.isDeleted = true; item.deletedAt = new Date(); item.content = '[deleted]'; item.mentions = []; item.attachments = []; await this.messageRepo.save(item); const responseBody = { status: 'deleted', mode: 'soft', messageId, } as Record; await this.saveIdempotentResponse(scope, idempotencyKey, responseBody); await this.events.emit({ eventType: 'message.deleted', channelId, actorId: item.authorUserId, data: { messageId, seq: item.seq, deletedAt: item.deletedAt?.toISOString() ?? null, }, }); this.realtime.emitChannelEvent(channelId, 'message.deleted', { messageId, seq: item.seq, deletedAt: item.deletedAt?.toISOString() ?? null, }); return responseBody; } @Get() async listBySeq( @Param('id') channelId: string, @Query('seq_from') seqFrom?: string, @Query('seq_to') seqTo?: string, @Query('limit') limit?: string, ) { const from = seqFrom ? Number(seqFrom) : 1; const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER; const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT); if (from > to) { return { items: [], page: { seqFrom: from, seqTo: to, limit: safeLimit, returned: 0, hasMore: false, nextExpectedSeq: from, highestCommittedSeq: 0, }, }; } const channel = await this.channelRepo.findOne({ where: { id: channelId } }); if (!channel) { throw new NotFoundException('channel not found'); } const qb = this.messageRepo .createQueryBuilder('m') .where('m.channelId = :channelId', { channelId }) .andWhere('m.seq >= :from', { from }) .andWhere('m.seq <= :to', { to }) .orderBy('m.seq', 'ASC'); const total = await qb.getCount(); const rows = await qb.limit(safeLimit).getMany(); const items = rows.map((m) => this.toView(m)); const nextExpectedSeq = computeNextExpectedSeq( from, rows.map((row) => row.seq), ); return { items, page: { seqFrom: from, seqTo: to, limit: safeLimit, returned: items.length, hasMore: total > items.length, nextExpectedSeq, highestCommittedSeq: channel.lastSeq, }, }; } }