Compare commits
1 Commits
605d3ac092
...
6b993522cf
| Author | SHA1 | Date | |
|---|---|---|---|
| 6b993522cf |
@@ -8,6 +8,7 @@ import { MetricsService } from './common/metrics.service';
|
||||
import { ApiKeyGuard } from './common/api-key.guard';
|
||||
import { GuildsModule } from './guilds/guilds.module';
|
||||
import { ChannelsModule } from './channels/channels.module';
|
||||
import { TurnModule } from './channels/turn.module';
|
||||
import { MessagingModule } from './messaging/messaging.module';
|
||||
import { EventsModule } from './events/events.module';
|
||||
import { RealtimeModule } from './realtime/realtime.module';
|
||||
@@ -19,6 +20,7 @@ import { MembersModule } from './members/members.module';
|
||||
EventsModule,
|
||||
RealtimeModule,
|
||||
GuildsModule,
|
||||
TurnModule,
|
||||
ChannelsModule,
|
||||
MembersModule,
|
||||
MessagingModule,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Body, Controller, Get, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
|
||||
import { Body, Controller, Get, Param, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
|
||||
import { ChannelsService } from './channels.service';
|
||||
|
||||
// ApiKeyGuard attaches the introspected Center user id onto the request.
|
||||
@@ -27,8 +27,24 @@ export class ChannelsController {
|
||||
xType: body.xType as string | undefined,
|
||||
isPublic: Boolean(body.isPublic),
|
||||
memberUserIds: Array.isArray(body.memberUserIds) ? (body.memberUserIds as string[]) : [],
|
||||
onDuty: body.onDuty as string | undefined,
|
||||
listeners: Array.isArray(body.listeners) ? (body.listeners as string[]) : [],
|
||||
},
|
||||
userId,
|
||||
);
|
||||
}
|
||||
|
||||
@Post(':id/join')
|
||||
join(@Req() req: AuthedRequest, @Param('id') channelId: string) {
|
||||
const userId = req.userId ?? '';
|
||||
if (!userId) throw new UnauthorizedException('missing user');
|
||||
return this.channelsService.joinChannel(channelId, userId);
|
||||
}
|
||||
|
||||
@Post(':id/leave')
|
||||
leave(@Req() req: AuthedRequest, @Param('id') channelId: string) {
|
||||
const userId = req.userId ?? '';
|
||||
if (!userId) throw new UnauthorizedException('missing user');
|
||||
return this.channelsService.leaveChannel(channelId, userId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,10 +3,11 @@ import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { ChannelsController } from './channels.controller';
|
||||
import { Channel } from '../entities/channel.entity';
|
||||
import { ChannelMember } from '../entities/channel-member.entity';
|
||||
import { WakeMapping } from '../entities/wake-mapping.entity';
|
||||
import { ChannelsService } from './channels.service';
|
||||
|
||||
@Module({
|
||||
imports: [TypeOrmModule.forFeature([Channel, ChannelMember])],
|
||||
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, WakeMapping])],
|
||||
controllers: [ChannelsController],
|
||||
providers: [ChannelsService],
|
||||
exports: [ChannelsService],
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { BadRequestException, ForbiddenException, Injectable, NotFoundException } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { In, Repository } from 'typeorm';
|
||||
import { Channel } from '../entities/channel.entity';
|
||||
import { ChannelMember } from '../entities/channel-member.entity';
|
||||
import { WakeMapping } from '../entities/wake-mapping.entity';
|
||||
import { TurnService } from './turn.service';
|
||||
|
||||
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom'] as const;
|
||||
type XType = (typeof X_TYPES)[number];
|
||||
@@ -14,6 +16,10 @@ type CreateChannelInput = {
|
||||
xType?: string;
|
||||
isPublic?: boolean;
|
||||
memberUserIds?: string[];
|
||||
// required when xType === 'triage': the on-duty user for this channel
|
||||
onDuty?: string;
|
||||
// optional when xType === 'custom': users to wake on this channel
|
||||
listeners?: string[];
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
@@ -23,6 +29,9 @@ export class ChannelsService {
|
||||
private readonly channelRepo: Repository<Channel>,
|
||||
@InjectRepository(ChannelMember)
|
||||
private readonly memberRepo: Repository<ChannelMember>,
|
||||
@InjectRepository(WakeMapping)
|
||||
private readonly wakeRepo: Repository<WakeMapping>,
|
||||
private readonly turnService: TurnService,
|
||||
) {}
|
||||
|
||||
// Channels visible to a user within a guild:
|
||||
@@ -42,7 +51,39 @@ export class ChannelsService {
|
||||
});
|
||||
const memberChannelIds = new Set(memberRows.map((m) => m.channelId));
|
||||
|
||||
return all.filter((c) => c.isPublic || memberChannelIds.has(c.id));
|
||||
return all
|
||||
.filter((c) => c.isPublic || memberChannelIds.has(c.id))
|
||||
.map((c) => ({ ...c, isMember: memberChannelIds.has(c.id) }));
|
||||
}
|
||||
|
||||
async joinChannel(channelId: string, userId: string) {
|
||||
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||
if (!channel) throw new NotFoundException('channel not found');
|
||||
|
||||
const existing = await this.memberRepo.findOne({ where: { channelId, userId } });
|
||||
if (!existing) {
|
||||
if (!channel.isPublic) {
|
||||
throw new ForbiddenException('cannot join a non-public channel');
|
||||
}
|
||||
await this.memberRepo.save(this.memberRepo.create({ channelId, userId }));
|
||||
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
||||
await this.turnService.onMemberAdded(channelId, userId);
|
||||
}
|
||||
}
|
||||
return { status: 'ok', channelId, userId, member: true };
|
||||
}
|
||||
|
||||
async leaveChannel(channelId: string, userId: string) {
|
||||
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||
if (!channel) throw new NotFoundException('channel not found');
|
||||
|
||||
// remove every channel-scoped row that references this user
|
||||
await this.memberRepo.delete({ channelId, userId });
|
||||
await this.wakeRepo.delete({ channelId, userId });
|
||||
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
||||
await this.turnService.onMemberRemoved(channelId, userId);
|
||||
}
|
||||
return { status: 'ok', channelId, userId, member: false };
|
||||
}
|
||||
|
||||
async create(input: CreateChannelInput, creatorUserId: string) {
|
||||
@@ -57,6 +98,14 @@ export class ChannelsService {
|
||||
throw new BadRequestException(`xType must be one of: ${X_TYPES.join(', ')}`);
|
||||
}
|
||||
|
||||
const onDuty = String(input.onDuty ?? '').trim();
|
||||
if (xType === 'triage' && !onDuty) {
|
||||
throw new BadRequestException('onDuty is required for triage channels');
|
||||
}
|
||||
const listeners = (input.listeners ?? [])
|
||||
.map((x) => String(x ?? '').trim())
|
||||
.filter(Boolean);
|
||||
|
||||
const channel = await this.channelRepo.save(
|
||||
this.channelRepo.create({
|
||||
guildId,
|
||||
@@ -75,10 +124,29 @@ export class ChannelsService {
|
||||
const trimmed = String(id ?? '').trim();
|
||||
if (trimmed) memberIds.add(trimmed);
|
||||
}
|
||||
// triage: the on-duty user is auto-added to the channel if not already in it
|
||||
if (xType === 'triage') memberIds.add(onDuty);
|
||||
|
||||
await this.memberRepo.save(
|
||||
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
|
||||
);
|
||||
|
||||
// wake_mapping: triage -> the on-duty user; custom -> each listener
|
||||
const wakeUserIds = new Set<string>();
|
||||
if (xType === 'triage') wakeUserIds.add(onDuty);
|
||||
if (xType === 'custom') listeners.forEach((l) => wakeUserIds.add(l));
|
||||
if (wakeUserIds.size) {
|
||||
await this.wakeRepo.save(
|
||||
[...wakeUserIds].map((userId) => this.wakeRepo.create({ channelId: channel.id, userId })),
|
||||
);
|
||||
}
|
||||
|
||||
// discuss/work: initialize rotation state (order = members sorted by id,
|
||||
// currentSpeaker = null until someone proactively speaks)
|
||||
if (xType === 'discuss' || xType === 'work') {
|
||||
await this.turnService.initForChannel(channel.id, [...memberIds]);
|
||||
}
|
||||
|
||||
return channel;
|
||||
}
|
||||
}
|
||||
|
||||
32
src/channels/slash-commands.ts
Normal file
32
src/channels/slash-commands.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
// Registry of known slash commands. Only a message whose content matches a
|
||||
// REGISTERED command name is treated as a command (intercepted, never
|
||||
// delivered). Anything else that merely starts with '/' (e.g. /etc/passwd)
|
||||
// is delivered as a normal message.
|
||||
|
||||
export const SLASH_COMMANDS = ['no-reply', 'force-proceed'] as const;
|
||||
export type SlashCommandName = (typeof SLASH_COMMANDS)[number];
|
||||
|
||||
export type ParsedCommand = {
|
||||
name: SlashCommandName;
|
||||
opts: string[];
|
||||
};
|
||||
|
||||
// Matches: /<name> optionally followed by whitespace + opts. The name must be
|
||||
// a registered command (case-sensitive, exact) for it to count as a command.
|
||||
export function parseSlashCommand(content: string): ParsedCommand | null {
|
||||
if (typeof content !== 'string') return null;
|
||||
const trimmed = content.trim();
|
||||
if (!trimmed.startsWith('/')) return null;
|
||||
|
||||
const m = /^\/(\S+)(?:\s+([\s\S]*))?$/.exec(trimmed);
|
||||
if (!m) return null;
|
||||
|
||||
const name = m[1];
|
||||
if (!(SLASH_COMMANDS as readonly string[]).includes(name)) return null;
|
||||
|
||||
const opts = (m[2] ?? '').trim();
|
||||
return {
|
||||
name: name as SlashCommandName,
|
||||
opts: opts ? opts.split(/\s+/) : [],
|
||||
};
|
||||
}
|
||||
52
src/channels/turn-shuffle.ts
Normal file
52
src/channels/turn-shuffle.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import { RoundEvent } from '../entities/channel-turn-state.entity';
|
||||
|
||||
export type ShuffleResult = { paused: true } | { paused: false; newOrder: string[] };
|
||||
|
||||
function shuffleInPlace<T>(arr: T[]): T[] {
|
||||
for (let i = arr.length - 1; i > 0; i--) {
|
||||
const j = Math.floor(Math.random() * (i + 1));
|
||||
[arr[i], arr[j]] = [arr[j], arr[i]];
|
||||
}
|
||||
return arr;
|
||||
}
|
||||
|
||||
// End-of-round shuffle.
|
||||
// - tail = the *last contiguous run* of /no-reply in the round's turn events
|
||||
// (anchor = first of that run; kept in event order)
|
||||
// - head = every current member not in tail, shuffled randomly
|
||||
// - constraint: head[0] !== D, where D = the last member who delivered a
|
||||
// normal message in the round
|
||||
// - if the constraint is unsatisfiable (head empty, or head === [D]) the
|
||||
// rotation pauses instead of shuffling (per spec B.3)
|
||||
export function computeShuffle(roundEvents: RoundEvent[], currentMembers: string[]): ShuffleResult {
|
||||
const memberSet = new Set(currentMembers);
|
||||
|
||||
// trailing contiguous /no-reply run, in event order, limited to current members
|
||||
const tail: string[] = [];
|
||||
for (let i = roundEvents.length - 1; i >= 0; i--) {
|
||||
if (roundEvents[i].a !== 'noreply') break;
|
||||
if (memberSet.has(roundEvents[i].u)) tail.unshift(roundEvents[i].u);
|
||||
}
|
||||
const tailSet = new Set(tail);
|
||||
|
||||
// D = last normal speaker in the round (the one right before the trailing run)
|
||||
let d: string | null = null;
|
||||
for (let i = roundEvents.length - 1; i >= 0; i--) {
|
||||
if (roundEvents[i].a === 'normal' && memberSet.has(roundEvents[i].u)) {
|
||||
d = roundEvents[i].u;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const head = currentMembers.filter((u) => !tailSet.has(u));
|
||||
if (head.length === 0) return { paused: true };
|
||||
if (head.length === 1 && head[0] === d) return { paused: true };
|
||||
|
||||
shuffleInPlace(head);
|
||||
if (head[0] === d) {
|
||||
// length >= 2 here (the [d] singleton case returned paused above)
|
||||
[head[0], head[1]] = [head[1], head[0]];
|
||||
}
|
||||
|
||||
return { paused: false, newOrder: [...head, ...tail] };
|
||||
}
|
||||
9
src/channels/turn.module.ts
Normal file
9
src/channels/turn.module.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { Global, Module } from '@nestjs/common';
|
||||
import { TurnService } from './turn.service';
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
providers: [TurnService],
|
||||
exports: [TurnService],
|
||||
})
|
||||
export class TurnModule {}
|
||||
251
src/channels/turn.service.ts
Normal file
251
src/channels/turn.service.ts
Normal file
@@ -0,0 +1,251 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { DataSource, EntityManager } from 'typeorm';
|
||||
import { ChannelTurnState } from '../entities/channel-turn-state.entity';
|
||||
import { ChannelMember } from '../entities/channel-member.entity';
|
||||
import { computeShuffle } from './turn-shuffle';
|
||||
|
||||
// wakeupUserId: the single user who should receive wakeup=true on the
|
||||
// resulting push (null = nobody / paused). For commands, `ack` present means
|
||||
// the controller must emit a guild-authored /ack message.
|
||||
export type TurnDecision = { wakeupUserId: string | null };
|
||||
export type CommandDecision = { ack: TurnDecision | null };
|
||||
|
||||
@Injectable()
|
||||
export class TurnService {
|
||||
constructor(private readonly dataSource: DataSource) {}
|
||||
|
||||
private async loadLocked(
|
||||
manager: EntityManager,
|
||||
channelId: string,
|
||||
): Promise<ChannelTurnState | null> {
|
||||
return manager
|
||||
.createQueryBuilder(ChannelTurnState, 's')
|
||||
.setLock('pessimistic_write')
|
||||
.where('s.channelId = :channelId', { channelId })
|
||||
.getOne();
|
||||
}
|
||||
|
||||
private async ensureState(
|
||||
manager: EntityManager,
|
||||
channelId: string,
|
||||
): Promise<ChannelTurnState> {
|
||||
let state = await this.loadLocked(manager, channelId);
|
||||
if (state) return state;
|
||||
// lazy init from current channel members (sorted by userId)
|
||||
const members = await manager.find(ChannelMember, { where: { channelId } });
|
||||
const order = members.map((m) => m.userId).sort();
|
||||
state = manager.create(ChannelTurnState, {
|
||||
channelId,
|
||||
orderUserIds: order,
|
||||
currentSpeaker: null,
|
||||
roundEvents: [],
|
||||
norepStreak: [],
|
||||
lastNormalSpeaker: null,
|
||||
});
|
||||
return manager.save(ChannelTurnState, state);
|
||||
}
|
||||
|
||||
// Called when a discuss/work channel is created.
|
||||
async initForChannel(channelId: string, memberUserIds: string[]): Promise<void> {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
const existing = await manager.findOne(ChannelTurnState, { where: { channelId } });
|
||||
const order = [...new Set(memberUserIds)].sort();
|
||||
if (existing) {
|
||||
existing.orderUserIds = order;
|
||||
existing.currentSpeaker = null;
|
||||
existing.roundEvents = [];
|
||||
existing.norepStreak = [];
|
||||
existing.lastNormalSpeaker = null;
|
||||
await manager.save(ChannelTurnState, existing);
|
||||
return;
|
||||
}
|
||||
await manager.save(
|
||||
ChannelTurnState,
|
||||
manager.create(ChannelTurnState, {
|
||||
channelId,
|
||||
orderUserIds: order,
|
||||
currentSpeaker: null,
|
||||
roundEvents: [],
|
||||
norepStreak: [],
|
||||
lastNormalSpeaker: null,
|
||||
}),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async onMemberAdded(channelId: string, userId: string): Promise<void> {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
if (!state.orderUserIds.includes(userId)) {
|
||||
state.orderUserIds = [...state.orderUserIds, userId]; // append to tail
|
||||
await manager.save(ChannelTurnState, state);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async onMemberRemoved(channelId: string, userId: string): Promise<void> {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.loadLocked(manager, channelId);
|
||||
if (!state) return;
|
||||
const order = state.orderUserIds;
|
||||
const idx = order.indexOf(userId);
|
||||
if (idx === -1) return;
|
||||
// if the leaver is the current speaker, the next one takes over
|
||||
let nextCurrent = state.currentSpeaker;
|
||||
if (state.currentSpeaker === userId) {
|
||||
nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null;
|
||||
if (nextCurrent === userId) nextCurrent = null;
|
||||
}
|
||||
state.orderUserIds = order.filter((u) => u !== userId);
|
||||
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
|
||||
state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
});
|
||||
}
|
||||
|
||||
// A normal (non-command) message delivered to a discuss/work channel.
|
||||
async onNormalMessage(channelId: string, authorUserId: string): Promise<TurnDecision> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
// any normal message clears the cross-round /no-reply streak
|
||||
state.norepStreak = [];
|
||||
|
||||
const order = state.orderUserIds;
|
||||
const n = order.length;
|
||||
if (n <= 1) {
|
||||
state.currentSpeaker = null;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: null };
|
||||
}
|
||||
|
||||
if (state.currentSpeaker === null) {
|
||||
// activation: mover goes to front, rotation starts at order[1]
|
||||
const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)];
|
||||
state.orderUserIds = newOrder;
|
||||
state.currentSpeaker = newOrder[1];
|
||||
state.roundEvents = [{ u: authorUserId, a: 'normal' }];
|
||||
state.lastNormalSpeaker = authorUserId;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: newOrder[1] };
|
||||
}
|
||||
|
||||
if (authorUserId === state.currentSpeaker) {
|
||||
const idx = order.indexOf(authorUserId);
|
||||
const isLast = idx === n - 1;
|
||||
state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }];
|
||||
state.lastNormalSpeaker = authorUserId;
|
||||
|
||||
if (isLast) {
|
||||
const res = computeShuffle(state.roundEvents, order);
|
||||
if (res.paused) {
|
||||
state.currentSpeaker = null;
|
||||
state.roundEvents = [];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: null };
|
||||
}
|
||||
state.orderUserIds = res.newOrder;
|
||||
state.roundEvents = [];
|
||||
state.lastNormalSpeaker = null;
|
||||
state.currentSpeaker = res.newOrder[0];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: res.newOrder[0] };
|
||||
}
|
||||
|
||||
const succ = order[idx + 1];
|
||||
state.currentSpeaker = succ;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: succ };
|
||||
}
|
||||
|
||||
// queue-jump normal message: no advance, nobody woken
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: null };
|
||||
});
|
||||
}
|
||||
|
||||
// /no-reply command in a discuss/work channel.
|
||||
async onNoReply(channelId: string, senderUserId: string): Promise<CommandDecision> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
const order = state.orderUserIds;
|
||||
const n = order.length;
|
||||
|
||||
// only the current speaker's /no-reply has any effect
|
||||
if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) {
|
||||
return { ack: null };
|
||||
}
|
||||
|
||||
const idx = order.indexOf(senderUserId);
|
||||
const isLast = idx === n - 1;
|
||||
state.roundEvents = [...state.roundEvents, { u: senderUserId, a: 'noreply' }];
|
||||
if (!state.norepStreak.includes(senderUserId)) {
|
||||
state.norepStreak = [...state.norepStreak, senderUserId];
|
||||
}
|
||||
|
||||
// pause when every current member has consecutively /no-reply'd
|
||||
const allCovered = order.every((u) => state.norepStreak.includes(u));
|
||||
if (allCovered) {
|
||||
state.currentSpeaker = null;
|
||||
state.roundEvents = [];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: null } };
|
||||
}
|
||||
|
||||
if (isLast) {
|
||||
const res = computeShuffle(state.roundEvents, order);
|
||||
if (res.paused) {
|
||||
state.currentSpeaker = null;
|
||||
state.roundEvents = [];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: null } };
|
||||
}
|
||||
state.orderUserIds = res.newOrder;
|
||||
state.roundEvents = [];
|
||||
state.lastNormalSpeaker = null;
|
||||
state.currentSpeaker = res.newOrder[0];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: res.newOrder[0] } };
|
||||
}
|
||||
|
||||
const succ = order[idx + 1];
|
||||
state.currentSpeaker = succ;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: succ } };
|
||||
});
|
||||
}
|
||||
|
||||
// /force-proceed command in a discuss/work channel: skip the stuck current
|
||||
// speaker (not recorded, streak untouched), advance to the next one.
|
||||
async onForceProceed(channelId: string): Promise<CommandDecision> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
const order = state.orderUserIds;
|
||||
const n = order.length;
|
||||
if (n <= 1 || state.currentSpeaker === null) return { ack: null };
|
||||
|
||||
const idx = order.indexOf(state.currentSpeaker);
|
||||
const isLast = idx === n - 1;
|
||||
|
||||
if (isLast) {
|
||||
const res = computeShuffle(state.roundEvents, order);
|
||||
if (res.paused) {
|
||||
state.currentSpeaker = null;
|
||||
state.roundEvents = [];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: null } };
|
||||
}
|
||||
state.orderUserIds = res.newOrder;
|
||||
state.roundEvents = [];
|
||||
state.lastNormalSpeaker = null;
|
||||
state.currentSpeaker = res.newOrder[0];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: res.newOrder[0] } };
|
||||
}
|
||||
|
||||
const succ = order[idx + 1];
|
||||
state.currentSpeaker = succ;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: succ } };
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,8 @@ import { TypeOrmModuleOptions } from '@nestjs/typeorm';
|
||||
import { Guild } from './entities/guild.entity';
|
||||
import { Channel } from './entities/channel.entity';
|
||||
import { ChannelMember } from './entities/channel-member.entity';
|
||||
import { WakeMapping } from './entities/wake-mapping.entity';
|
||||
import { ChannelTurnState } from './entities/channel-turn-state.entity';
|
||||
import { Message } from './entities/message.entity';
|
||||
import { DmConversation } from './entities/dm-conversation.entity';
|
||||
import { DmParticipant } from './entities/dm-participant.entity';
|
||||
@@ -21,6 +23,8 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
|
||||
Guild,
|
||||
Channel,
|
||||
ChannelMember,
|
||||
WakeMapping,
|
||||
ChannelTurnState,
|
||||
Message,
|
||||
DmConversation,
|
||||
DmParticipant,
|
||||
|
||||
41
src/entities/channel-turn-state.entity.ts
Normal file
41
src/entities/channel-turn-state.entity.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
|
||||
|
||||
export type RoundEvent = { u: string; a: 'normal' | 'noreply' };
|
||||
|
||||
// Per-channel rotation state for discuss/work x_type channels.
|
||||
// All mutations must be serialized per channel (pessimistic row lock).
|
||||
@Entity('channel_turn_state')
|
||||
export class ChannelTurnState {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id!: string;
|
||||
|
||||
@Index({ unique: true })
|
||||
@Column({ name: 'channel_id', type: 'char', length: 36 })
|
||||
channelId!: string;
|
||||
|
||||
// speaking order; userIds
|
||||
@Column({ name: 'order_user_ids', type: 'json' })
|
||||
orderUserIds!: string[];
|
||||
|
||||
// null = paused (created, or all-members-consecutively-/no-reply)
|
||||
@Column({ name: 'current_speaker', type: 'varchar', length: 64, nullable: true })
|
||||
currentSpeaker!: string | null;
|
||||
|
||||
// ordered turn actions of the *current* round (skipped/queue-jumps excluded);
|
||||
// used to compute the shuffle tail/anchor
|
||||
@Column({ name: 'round_events', type: 'json' })
|
||||
roundEvents!: RoundEvent[];
|
||||
|
||||
// distinct userIds that consecutively replied /no-reply via rotation since
|
||||
// the last normal message; persists ACROSS rounds; reset by any normal msg.
|
||||
// When it covers every current member -> pause.
|
||||
@Column({ name: 'norep_streak', type: 'json' })
|
||||
norepStreak!: string[];
|
||||
|
||||
// last member who delivered a normal message in the current round (D)
|
||||
@Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true })
|
||||
lastNormalSpeaker!: string | null;
|
||||
|
||||
@UpdateDateColumn()
|
||||
updatedAt!: Date;
|
||||
}
|
||||
19
src/entities/wake-mapping.entity.ts
Normal file
19
src/entities/wake-mapping.entity.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
|
||||
|
||||
@Entity('wake_mapping')
|
||||
@Index(['channelId', 'userId'], { unique: true })
|
||||
@Index(['userId'])
|
||||
export class WakeMapping {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id!: string;
|
||||
|
||||
@Index()
|
||||
@Column({ name: 'channel_id', type: 'char', length: 36 })
|
||||
channelId!: string;
|
||||
|
||||
@Column({ name: 'user_id', type: 'varchar', length: 64 })
|
||||
userId!: string;
|
||||
|
||||
@CreateDateColumn()
|
||||
createdAt!: Date;
|
||||
}
|
||||
@@ -16,6 +16,9 @@ import { CreateMessageDto } from './dto.create-message.dto';
|
||||
import { Channel } from '../entities/channel.entity';
|
||||
import { Message } from '../entities/message.entity';
|
||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
|
||||
import { WakeMapping } from '../entities/wake-mapping.entity';
|
||||
import { parseSlashCommand } from '../channels/slash-commands';
|
||||
import { TurnService } from '../channels/turn.service';
|
||||
import { EventsService } from '../events/events.service';
|
||||
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
|
||||
import { RealtimeGateway } from '../realtime/realtime.gateway';
|
||||
@@ -34,6 +37,9 @@ export class MessagingController {
|
||||
private readonly messageRepo: Repository<Message>,
|
||||
@InjectRepository(IdempotencyRecord)
|
||||
private readonly idemRepo: Repository<IdempotencyRecord>,
|
||||
@InjectRepository(WakeMapping)
|
||||
private readonly wakeRepo: Repository<WakeMapping>,
|
||||
private readonly turn: TurnService,
|
||||
private readonly events: EventsService,
|
||||
private readonly realtime: RealtimeGateway,
|
||||
) {}
|
||||
@@ -77,6 +83,52 @@ export class MessagingController {
|
||||
};
|
||||
}
|
||||
|
||||
// Persists one message (allocates a seq under a channel row lock) and
|
||||
// returns its view. Used for normal messages and for guild /ack messages.
|
||||
private async persistMessage(
|
||||
channelId: string,
|
||||
input: { authorUserId: string; content: string; clientMessageId?: string | null; replyToMessageId?: string | null; mentions?: string[]; attachments?: Array<{ url: string; name?: string; mimeType?: string }> },
|
||||
): Promise<Message> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const channel = await manager.findOne(Channel, {
|
||||
where: { id: channelId },
|
||||
lock: { mode: 'pessimistic_write' },
|
||||
});
|
||||
if (!channel) {
|
||||
throw new NotFoundException('channel not found');
|
||||
}
|
||||
const nextSeq = channel.lastSeq + 1;
|
||||
channel.lastSeq = nextSeq;
|
||||
await manager.save(Channel, channel);
|
||||
|
||||
const messageId = input.clientMessageId ?? `m-${channelId}-${nextSeq}`;
|
||||
const row = manager.create(Message, {
|
||||
messageId,
|
||||
channelId,
|
||||
conversationId: null,
|
||||
authorUserId: input.authorUserId,
|
||||
seq: nextSeq,
|
||||
content: input.content,
|
||||
replyToMessageId: input.replyToMessageId ?? null,
|
||||
mentions: input.mentions ?? [],
|
||||
attachments: input.attachments ?? [],
|
||||
editedAt: null,
|
||||
deletedAt: null,
|
||||
isDeleted: false,
|
||||
});
|
||||
return manager.save(Message, row);
|
||||
});
|
||||
}
|
||||
|
||||
// Emits a guild-authored /ack message to the channel; wakeup=true only for
|
||||
// the new current speaker (null = nobody). One message-id; persisted.
|
||||
private async emitAck(channelId: string, wakeupUserId: string | null): Promise<void> {
|
||||
const ack = await this.persistMessage(channelId, { authorUserId: 'guild', content: '/ack' });
|
||||
const body = this.toView(ack) as Record<string, unknown>;
|
||||
await this.events.emit({ eventType: 'message.created', channelId, actorId: 'guild', data: body });
|
||||
await this.realtime.emitMessageTargeted(channelId, body, wakeupUserId);
|
||||
}
|
||||
|
||||
@Post()
|
||||
async create(
|
||||
@Param('id') channelId: string,
|
||||
@@ -87,35 +139,34 @@ export class MessagingController {
|
||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||
if (existed) return existed;
|
||||
|
||||
const message = await this.dataSource.transaction(async (manager) => {
|
||||
const channel = await manager.findOne(Channel, {
|
||||
where: { id: channelId },
|
||||
lock: { mode: 'pessimistic_write' },
|
||||
});
|
||||
if (!channel) {
|
||||
throw new NotFoundException('channel not found');
|
||||
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||
if (!channel) throw new NotFoundException('channel not found');
|
||||
const xType = channel.xType ?? 'general';
|
||||
const isRotating = xType === 'discuss' || xType === 'work';
|
||||
const authorUserId = String(body.authorUserId ?? 'anonymous');
|
||||
|
||||
// ---- command interception: registered slash commands are never delivered
|
||||
const cmd = parseSlashCommand(body.content ?? '');
|
||||
if (cmd) {
|
||||
if (isRotating && cmd.name === 'no-reply') {
|
||||
const { ack } = await this.turn.onNoReply(channelId, authorUserId);
|
||||
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
|
||||
} else if (isRotating && cmd.name === 'force-proceed') {
|
||||
const { ack } = await this.turn.onForceProceed(channelId);
|
||||
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
|
||||
}
|
||||
// non-rotating channels (or no effect): swallowed, nothing delivered
|
||||
return { status: 'command', command: cmd.name };
|
||||
}
|
||||
|
||||
const nextSeq = channel.lastSeq + 1;
|
||||
channel.lastSeq = nextSeq;
|
||||
await manager.save(Channel, channel);
|
||||
|
||||
const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`;
|
||||
const row = manager.create(Message, {
|
||||
messageId,
|
||||
channelId,
|
||||
conversationId: null,
|
||||
authorUserId: body.authorUserId ?? 'anonymous',
|
||||
seq: nextSeq,
|
||||
content: body.content,
|
||||
replyToMessageId: body.replyToMessageId ?? null,
|
||||
mentions: body.mentions ?? [],
|
||||
attachments: body.attachments ?? [],
|
||||
editedAt: null,
|
||||
deletedAt: null,
|
||||
isDeleted: false,
|
||||
});
|
||||
return manager.save(Message, row);
|
||||
// ---- normal message
|
||||
const message = await this.persistMessage(channelId, {
|
||||
authorUserId,
|
||||
content: body.content,
|
||||
clientMessageId: body.clientMessageId,
|
||||
replyToMessageId: body.replyToMessageId,
|
||||
mentions: body.mentions,
|
||||
attachments: body.attachments,
|
||||
});
|
||||
|
||||
const responseBody = this.toView(message) as Record<string, unknown>;
|
||||
@@ -124,10 +175,24 @@ export class MessagingController {
|
||||
await this.events.emit({
|
||||
eventType: 'message.created',
|
||||
channelId,
|
||||
actorId: body.authorUserId ?? 'anonymous',
|
||||
actorId: authorUserId,
|
||||
data: responseBody,
|
||||
});
|
||||
this.realtime.emitChannelEvent(channelId, 'message.created', responseBody);
|
||||
|
||||
if (isRotating) {
|
||||
// discuss/work: rotation decides the single wakeup target
|
||||
const decision = await this.turn.onNormalMessage(channelId, authorUserId);
|
||||
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
|
||||
} else {
|
||||
// general/report/triage/custom: wakeup from x_type + wake_mapping
|
||||
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
|
||||
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
|
||||
await this.realtime.emitMessageCreated(channelId, responseBody, {
|
||||
xType,
|
||||
authorUserId,
|
||||
wakeUserIds,
|
||||
});
|
||||
}
|
||||
|
||||
return responseBody;
|
||||
}
|
||||
|
||||
@@ -4,9 +4,10 @@ import { MessagingController } from './messaging.controller';
|
||||
import { Channel } from '../entities/channel.entity';
|
||||
import { Message } from '../entities/message.entity';
|
||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
|
||||
import { WakeMapping } from '../entities/wake-mapping.entity';
|
||||
|
||||
@Module({
|
||||
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord])],
|
||||
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord, WakeMapping])],
|
||||
controllers: [MessagingController],
|
||||
})
|
||||
export class MessagingModule {}
|
||||
|
||||
@@ -11,6 +11,34 @@ import { Logger } from '@nestjs/common';
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import { introspectGuildToken } from '../common/center-auth';
|
||||
|
||||
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
|
||||
|
||||
// Wakeup for non-rotating channels only (general/report/triage/custom).
|
||||
// discuss/work go through TurnService + emitMessageTargeted, never here.
|
||||
// Precedence:
|
||||
// 1. the author never gets woken by their own message
|
||||
// 2. triage/custom: only wake users in the channel's wake_mapping
|
||||
// 3. general: wake everyone
|
||||
// 4. report (and anything else): wake nobody
|
||||
export function computeWakeup(args: {
|
||||
xType: XType;
|
||||
recipientUserId: string;
|
||||
authorUserId: string;
|
||||
wakeUserIds: Set<string>;
|
||||
}): boolean {
|
||||
const { xType, recipientUserId, authorUserId, wakeUserIds } = args;
|
||||
if (recipientUserId === authorUserId) return false;
|
||||
switch (xType) {
|
||||
case 'general':
|
||||
return true;
|
||||
case 'triage':
|
||||
case 'custom':
|
||||
return wakeUserIds.has(recipientUserId);
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@WebSocketGateway({
|
||||
namespace: '/realtime',
|
||||
cors: {
|
||||
@@ -133,4 +161,38 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
||||
emitChannelEvent(channelId: string, event: string, data: Record<string, unknown>): void {
|
||||
this.server.to(`channel:${channelId}`).emit(event, data);
|
||||
}
|
||||
|
||||
// Emits message.created per-recipient so each carries its own `wakeup` flag.
|
||||
async emitMessageCreated(
|
||||
channelId: string,
|
||||
data: Record<string, unknown>,
|
||||
ctx: { xType: XType; authorUserId: string; wakeUserIds: Set<string> },
|
||||
): Promise<void> {
|
||||
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
|
||||
for (const s of sockets) {
|
||||
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
|
||||
const wakeup = computeWakeup({
|
||||
xType: ctx.xType,
|
||||
recipientUserId,
|
||||
authorUserId: ctx.authorUserId,
|
||||
wakeUserIds: ctx.wakeUserIds,
|
||||
});
|
||||
s.emit('message.created', { ...data, wakeup });
|
||||
}
|
||||
}
|
||||
|
||||
// discuss/work + /ack: exactly one recipient (the new current speaker) gets
|
||||
// wakeup=true; everyone else false. One message-id; metadata at push only.
|
||||
async emitMessageTargeted(
|
||||
channelId: string,
|
||||
data: Record<string, unknown>,
|
||||
wakeupUserId: string | null,
|
||||
): Promise<void> {
|
||||
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
|
||||
for (const s of sockets) {
|
||||
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
|
||||
const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId;
|
||||
s.emit('message.created', { ...data, wakeup });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user