feat(guild-messaging): switch seq allocation to DB transaction with row lock
This commit is contained in:
@@ -9,6 +9,10 @@ export class Message {
|
|||||||
@PrimaryGeneratedColumn('uuid')
|
@PrimaryGeneratedColumn('uuid')
|
||||||
id!: string;
|
id!: string;
|
||||||
|
|
||||||
|
@Index()
|
||||||
|
@Column({ type: 'varchar', length: 80, unique: true })
|
||||||
|
messageId!: string;
|
||||||
|
|
||||||
@Index()
|
@Index()
|
||||||
@Column({ type: 'char', length: 36, nullable: true })
|
@Column({ type: 'char', length: 36, nullable: true })
|
||||||
channelId!: string | null;
|
channelId!: string | null;
|
||||||
@@ -26,6 +30,24 @@ export class Message {
|
|||||||
@Column({ type: 'text' })
|
@Column({ type: 'text' })
|
||||||
content!: string;
|
content!: string;
|
||||||
|
|
||||||
|
@Column({ type: 'varchar', length: 80, nullable: true })
|
||||||
|
replyToMessageId!: string | null;
|
||||||
|
|
||||||
|
@Column({ type: 'json', nullable: true })
|
||||||
|
mentions!: string[] | null;
|
||||||
|
|
||||||
|
@Column({ type: 'json', nullable: true })
|
||||||
|
attachments!: Array<{ url: string; name?: string; mimeType?: string }> | null;
|
||||||
|
|
||||||
|
@Column({ type: 'datetime', nullable: true })
|
||||||
|
editedAt!: Date | null;
|
||||||
|
|
||||||
|
@Column({ type: 'datetime', nullable: true })
|
||||||
|
deletedAt!: Date | null;
|
||||||
|
|
||||||
|
@Column({ type: 'boolean', default: false })
|
||||||
|
isDeleted!: boolean;
|
||||||
|
|
||||||
@CreateDateColumn()
|
@CreateDateColumn()
|
||||||
@Index()
|
@Index()
|
||||||
createdAt!: Date;
|
createdAt!: Date;
|
||||||
|
|||||||
@@ -1,19 +1,19 @@
|
|||||||
import { Body, Controller, Delete, Get, Param, Patch, Post, Query } from '@nestjs/common';
|
import {
|
||||||
|
Body,
|
||||||
|
Controller,
|
||||||
|
Delete,
|
||||||
|
Get,
|
||||||
|
NotFoundException,
|
||||||
|
Param,
|
||||||
|
Patch,
|
||||||
|
Post,
|
||||||
|
Query,
|
||||||
|
} from '@nestjs/common';
|
||||||
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
import { DataSource, Repository } from 'typeorm';
|
||||||
import { CreateMessageDto } from './dto.create-message.dto';
|
import { CreateMessageDto } from './dto.create-message.dto';
|
||||||
|
import { Channel } from '../entities/channel.entity';
|
||||||
type Message = {
|
import { Message } from '../entities/message.entity';
|
||||||
messageId: string;
|
|
||||||
seq: number;
|
|
||||||
content: string;
|
|
||||||
authorUserId: string;
|
|
||||||
replyToMessageId: string | null;
|
|
||||||
mentions: string[];
|
|
||||||
attachments: Array<{ url: string; name?: string; mimeType?: string }>;
|
|
||||||
createdAt: string;
|
|
||||||
editedAt: string | null;
|
|
||||||
deletedAt: string | null;
|
|
||||||
isDeleted: boolean;
|
|
||||||
};
|
|
||||||
|
|
||||||
const EDIT_WINDOW_MS = 15 * 60 * 1000;
|
const EDIT_WINDOW_MS = 15 * 60 * 1000;
|
||||||
const DEFAULT_PAGE_LIMIT = 50;
|
const DEFAULT_PAGE_LIMIT = 50;
|
||||||
@@ -21,39 +21,73 @@ const MAX_PAGE_LIMIT = 200;
|
|||||||
|
|
||||||
@Controller('channels/:id/messages')
|
@Controller('channels/:id/messages')
|
||||||
export class MessagingController {
|
export class MessagingController {
|
||||||
private seqByChannel = new Map<string, number>();
|
constructor(
|
||||||
private messagesByChannel = new Map<string, Message[]>();
|
private readonly dataSource: DataSource,
|
||||||
|
@InjectRepository(Channel)
|
||||||
|
private readonly channelRepo: Repository<Channel>,
|
||||||
|
@InjectRepository(Message)
|
||||||
|
private readonly messageRepo: Repository<Message>,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
private toView(m: Message) {
|
||||||
|
return {
|
||||||
|
messageId: m.messageId,
|
||||||
|
seq: m.seq,
|
||||||
|
content: m.content,
|
||||||
|
authorUserId: m.authorUserId,
|
||||||
|
replyToMessageId: m.replyToMessageId,
|
||||||
|
mentions: m.mentions ?? [],
|
||||||
|
attachments: m.attachments ?? [],
|
||||||
|
createdAt: m.createdAt.toISOString(),
|
||||||
|
editedAt: m.editedAt ? m.editedAt.toISOString() : null,
|
||||||
|
deletedAt: m.deletedAt ? m.deletedAt.toISOString() : null,
|
||||||
|
isDeleted: m.isDeleted,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@Post()
|
@Post()
|
||||||
create(@Param('id') channelId: string, @Body() body: CreateMessageDto) {
|
async create(@Param('id') channelId: string, @Body() body: CreateMessageDto) {
|
||||||
const next = (this.seqByChannel.get(channelId) ?? 0) + 1;
|
const message = await this.dataSource.transaction(async (manager) => {
|
||||||
this.seqByChannel.set(channelId, next);
|
const channel = await manager.findOne(Channel, {
|
||||||
|
where: { id: channelId },
|
||||||
|
lock: { mode: 'pessimistic_write' },
|
||||||
|
});
|
||||||
|
if (!channel) {
|
||||||
|
throw new NotFoundException('channel not found');
|
||||||
|
}
|
||||||
|
|
||||||
const message: Message = {
|
const nextSeq = channel.lastSeq + 1;
|
||||||
messageId: body.clientMessageId ?? `m-${channelId}-${next}`,
|
channel.lastSeq = nextSeq;
|
||||||
seq: next,
|
await manager.save(Channel, channel);
|
||||||
content: body.content,
|
|
||||||
authorUserId: body.authorUserId ?? 'anonymous',
|
|
||||||
replyToMessageId: body.replyToMessageId ?? null,
|
|
||||||
mentions: body.mentions ?? [],
|
|
||||||
attachments: body.attachments ?? [],
|
|
||||||
createdAt: new Date().toISOString(),
|
|
||||||
editedAt: null,
|
|
||||||
deletedAt: null,
|
|
||||||
isDeleted: false,
|
|
||||||
};
|
|
||||||
|
|
||||||
const arr = this.messagesByChannel.get(channelId) ?? [];
|
const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`;
|
||||||
arr.push(message);
|
const row = manager.create(Message, {
|
||||||
this.messagesByChannel.set(channelId, arr);
|
messageId,
|
||||||
|
channelId,
|
||||||
|
conversationId: null,
|
||||||
|
authorUserId: body.authorUserId ?? 'anonymous',
|
||||||
|
seq: nextSeq,
|
||||||
|
content: body.content,
|
||||||
|
replyToMessageId: body.replyToMessageId ?? null,
|
||||||
|
mentions: body.mentions ?? [],
|
||||||
|
attachments: body.attachments ?? [],
|
||||||
|
editedAt: null,
|
||||||
|
deletedAt: null,
|
||||||
|
isDeleted: false,
|
||||||
|
});
|
||||||
|
return manager.save(Message, row);
|
||||||
|
});
|
||||||
|
|
||||||
return message;
|
return this.toView(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Patch(':messageId')
|
@Patch(':messageId')
|
||||||
edit(@Param('id') channelId: string, @Param('messageId') messageId: string, @Body() body: { content?: string }) {
|
async edit(
|
||||||
const arr = this.messagesByChannel.get(channelId) ?? [];
|
@Param('id') channelId: string,
|
||||||
const item = arr.find((m) => m.messageId === messageId);
|
@Param('messageId') messageId: string,
|
||||||
|
@Body() body: { content?: string },
|
||||||
|
) {
|
||||||
|
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
||||||
if (!item) return { status: 'not_found' };
|
if (!item) return { status: 'not_found' };
|
||||||
|
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
@@ -63,27 +97,28 @@ export class MessagingController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
item.content = body.content ?? item.content;
|
item.content = body.content ?? item.content;
|
||||||
item.editedAt = new Date().toISOString();
|
item.editedAt = new Date();
|
||||||
return item;
|
const saved = await this.messageRepo.save(item);
|
||||||
|
return this.toView(saved);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Delete(':messageId')
|
@Delete(':messageId')
|
||||||
remove(@Param('id') channelId: string, @Param('messageId') messageId: string) {
|
async remove(@Param('id') channelId: string, @Param('messageId') messageId: string) {
|
||||||
const arr = this.messagesByChannel.get(channelId) ?? [];
|
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
||||||
const item = arr.find((m) => m.messageId === messageId);
|
|
||||||
if (!item) return { status: 'not_found' };
|
if (!item) return { status: 'not_found' };
|
||||||
|
|
||||||
item.isDeleted = true;
|
item.isDeleted = true;
|
||||||
item.deletedAt = new Date().toISOString();
|
item.deletedAt = new Date();
|
||||||
item.content = '[deleted]';
|
item.content = '[deleted]';
|
||||||
item.mentions = [];
|
item.mentions = [];
|
||||||
item.attachments = [];
|
item.attachments = [];
|
||||||
|
await this.messageRepo.save(item);
|
||||||
|
|
||||||
return { status: 'deleted', mode: 'soft', messageId };
|
return { status: 'deleted', mode: 'soft', messageId };
|
||||||
}
|
}
|
||||||
|
|
||||||
@Get()
|
@Get()
|
||||||
listBySeq(
|
async listBySeq(
|
||||||
@Param('id') channelId: string,
|
@Param('id') channelId: string,
|
||||||
@Query('seq_from') seqFrom?: string,
|
@Query('seq_from') seqFrom?: string,
|
||||||
@Query('seq_to') seqTo?: string,
|
@Query('seq_to') seqTo?: string,
|
||||||
@@ -96,9 +131,17 @@ export class MessagingController {
|
|||||||
Number.isFinite(requestedLimit) && requestedLimit > 0
|
Number.isFinite(requestedLimit) && requestedLimit > 0
|
||||||
? Math.min(requestedLimit, MAX_PAGE_LIMIT)
|
? Math.min(requestedLimit, MAX_PAGE_LIMIT)
|
||||||
: DEFAULT_PAGE_LIMIT;
|
: DEFAULT_PAGE_LIMIT;
|
||||||
const arr = this.messagesByChannel.get(channelId) ?? [];
|
|
||||||
const filtered = arr.filter((m) => m.seq >= from && m.seq <= to);
|
const qb = this.messageRepo
|
||||||
const items = filtered.slice(0, safeLimit);
|
.createQueryBuilder('m')
|
||||||
|
.where('m.channelId = :channelId', { channelId })
|
||||||
|
.andWhere('m.seq >= :from', { from })
|
||||||
|
.andWhere('m.seq <= :to', { to })
|
||||||
|
.orderBy('m.seq', 'ASC');
|
||||||
|
|
||||||
|
const total = await qb.getCount();
|
||||||
|
const rows = await qb.limit(safeLimit).getMany();
|
||||||
|
const items = rows.map((m) => this.toView(m));
|
||||||
|
|
||||||
return {
|
return {
|
||||||
items,
|
items,
|
||||||
@@ -107,7 +150,7 @@ export class MessagingController {
|
|||||||
seqTo: to,
|
seqTo: to,
|
||||||
limit: safeLimit,
|
limit: safeLimit,
|
||||||
returned: items.length,
|
returned: items.length,
|
||||||
hasMore: filtered.length > items.length,
|
hasMore: total > items.length,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,11 @@
|
|||||||
import { Module } from '@nestjs/common';
|
import { Module } from '@nestjs/common';
|
||||||
|
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||||
import { MessagingController } from './messaging.controller';
|
import { MessagingController } from './messaging.controller';
|
||||||
|
import { Channel } from '../entities/channel.entity';
|
||||||
|
import { Message } from '../entities/message.entity';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
|
imports: [TypeOrmModule.forFeature([Channel, Message])],
|
||||||
controllers: [MessagingController],
|
controllers: [MessagingController],
|
||||||
})
|
})
|
||||||
export class MessagingModule {}
|
export class MessagingModule {}
|
||||||
|
|||||||
@@ -48,7 +48,7 @@
|
|||||||
- [x] 编辑消息(可编辑窗口策略先简化)
|
- [x] 编辑消息(可编辑窗口策略先简化)
|
||||||
- [x] 删除消息(软删 vs 硬删,先定策略)
|
- [x] 删除消息(软删 vs 硬删,先定策略)
|
||||||
- [x] `GET messages` 分页(seq 区间 + limit)
|
- [x] `GET messages` 分页(seq 区间 + limit)
|
||||||
- [ ] seq 分配改为 DB 原子方案(避免并发冲突)
|
- [x] seq 分配改为 DB 原子方案(避免并发冲突)
|
||||||
|
|
||||||
### 2.3 一致性与回补
|
### 2.3 一致性与回补
|
||||||
- [ ] 回补接口:`seq_from/seq_to`
|
- [ ] 回补接口:`seq_from/seq_to`
|
||||||
|
|||||||
Reference in New Issue
Block a user