diff --git a/src/app.module.ts b/src/app.module.ts index 3432acd..b55fc8d 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -8,6 +8,7 @@ import { MetricsService } from './common/metrics.service'; import { ApiKeyGuard } from './common/api-key.guard'; import { GuildsModule } from './guilds/guilds.module'; import { ChannelsModule } from './channels/channels.module'; +import { TurnModule } from './channels/turn.module'; import { MessagingModule } from './messaging/messaging.module'; import { EventsModule } from './events/events.module'; import { RealtimeModule } from './realtime/realtime.module'; @@ -19,6 +20,7 @@ import { MembersModule } from './members/members.module'; EventsModule, RealtimeModule, GuildsModule, + TurnModule, ChannelsModule, MembersModule, MessagingModule, diff --git a/src/channels/channels.controller.ts b/src/channels/channels.controller.ts index b78b527..651ce7e 100644 --- a/src/channels/channels.controller.ts +++ b/src/channels/channels.controller.ts @@ -1,4 +1,4 @@ -import { Body, Controller, Get, Post, Query, Req, UnauthorizedException } from '@nestjs/common'; +import { Body, Controller, Get, Param, Post, Query, Req, UnauthorizedException } from '@nestjs/common'; import { ChannelsService } from './channels.service'; // ApiKeyGuard attaches the introspected Center user id onto the request. @@ -27,8 +27,24 @@ export class ChannelsController { xType: body.xType as string | undefined, isPublic: Boolean(body.isPublic), memberUserIds: Array.isArray(body.memberUserIds) ? (body.memberUserIds as string[]) : [], + onDuty: body.onDuty as string | undefined, + listeners: Array.isArray(body.listeners) ? (body.listeners as string[]) : [], }, userId, ); } + + @Post(':id/join') + join(@Req() req: AuthedRequest, @Param('id') channelId: string) { + const userId = req.userId ?? ''; + if (!userId) throw new UnauthorizedException('missing user'); + return this.channelsService.joinChannel(channelId, userId); + } + + @Post(':id/leave') + leave(@Req() req: AuthedRequest, @Param('id') channelId: string) { + const userId = req.userId ?? ''; + if (!userId) throw new UnauthorizedException('missing user'); + return this.channelsService.leaveChannel(channelId, userId); + } } diff --git a/src/channels/channels.module.ts b/src/channels/channels.module.ts index 922d050..b0a23e0 100644 --- a/src/channels/channels.module.ts +++ b/src/channels/channels.module.ts @@ -3,10 +3,11 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { ChannelsController } from './channels.controller'; import { Channel } from '../entities/channel.entity'; import { ChannelMember } from '../entities/channel-member.entity'; +import { WakeMapping } from '../entities/wake-mapping.entity'; import { ChannelsService } from './channels.service'; @Module({ - imports: [TypeOrmModule.forFeature([Channel, ChannelMember])], + imports: [TypeOrmModule.forFeature([Channel, ChannelMember, WakeMapping])], controllers: [ChannelsController], providers: [ChannelsService], exports: [ChannelsService], diff --git a/src/channels/channels.service.ts b/src/channels/channels.service.ts index 68b0ec1..cffbb13 100644 --- a/src/channels/channels.service.ts +++ b/src/channels/channels.service.ts @@ -1,8 +1,10 @@ -import { BadRequestException, Injectable } from '@nestjs/common'; +import { BadRequestException, ForbiddenException, Injectable, NotFoundException } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { In, Repository } from 'typeorm'; import { Channel } from '../entities/channel.entity'; import { ChannelMember } from '../entities/channel-member.entity'; +import { WakeMapping } from '../entities/wake-mapping.entity'; +import { TurnService } from './turn.service'; const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom'] as const; type XType = (typeof X_TYPES)[number]; @@ -14,6 +16,10 @@ type CreateChannelInput = { xType?: string; isPublic?: boolean; memberUserIds?: string[]; + // required when xType === 'triage': the on-duty user for this channel + onDuty?: string; + // optional when xType === 'custom': users to wake on this channel + listeners?: string[]; }; @Injectable() @@ -23,6 +29,9 @@ export class ChannelsService { private readonly channelRepo: Repository, @InjectRepository(ChannelMember) private readonly memberRepo: Repository, + @InjectRepository(WakeMapping) + private readonly wakeRepo: Repository, + private readonly turnService: TurnService, ) {} // Channels visible to a user within a guild: @@ -42,7 +51,39 @@ export class ChannelsService { }); const memberChannelIds = new Set(memberRows.map((m) => m.channelId)); - return all.filter((c) => c.isPublic || memberChannelIds.has(c.id)); + return all + .filter((c) => c.isPublic || memberChannelIds.has(c.id)) + .map((c) => ({ ...c, isMember: memberChannelIds.has(c.id) })); + } + + async joinChannel(channelId: string, userId: string) { + const channel = await this.channelRepo.findOne({ where: { id: channelId } }); + if (!channel) throw new NotFoundException('channel not found'); + + const existing = await this.memberRepo.findOne({ where: { channelId, userId } }); + if (!existing) { + if (!channel.isPublic) { + throw new ForbiddenException('cannot join a non-public channel'); + } + await this.memberRepo.save(this.memberRepo.create({ channelId, userId })); + if (channel.xType === 'discuss' || channel.xType === 'work') { + await this.turnService.onMemberAdded(channelId, userId); + } + } + return { status: 'ok', channelId, userId, member: true }; + } + + async leaveChannel(channelId: string, userId: string) { + const channel = await this.channelRepo.findOne({ where: { id: channelId } }); + if (!channel) throw new NotFoundException('channel not found'); + + // remove every channel-scoped row that references this user + await this.memberRepo.delete({ channelId, userId }); + await this.wakeRepo.delete({ channelId, userId }); + if (channel.xType === 'discuss' || channel.xType === 'work') { + await this.turnService.onMemberRemoved(channelId, userId); + } + return { status: 'ok', channelId, userId, member: false }; } async create(input: CreateChannelInput, creatorUserId: string) { @@ -57,6 +98,14 @@ export class ChannelsService { throw new BadRequestException(`xType must be one of: ${X_TYPES.join(', ')}`); } + const onDuty = String(input.onDuty ?? '').trim(); + if (xType === 'triage' && !onDuty) { + throw new BadRequestException('onDuty is required for triage channels'); + } + const listeners = (input.listeners ?? []) + .map((x) => String(x ?? '').trim()) + .filter(Boolean); + const channel = await this.channelRepo.save( this.channelRepo.create({ guildId, @@ -75,10 +124,29 @@ export class ChannelsService { const trimmed = String(id ?? '').trim(); if (trimmed) memberIds.add(trimmed); } + // triage: the on-duty user is auto-added to the channel if not already in it + if (xType === 'triage') memberIds.add(onDuty); + await this.memberRepo.save( [...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })), ); + // wake_mapping: triage -> the on-duty user; custom -> each listener + const wakeUserIds = new Set(); + if (xType === 'triage') wakeUserIds.add(onDuty); + if (xType === 'custom') listeners.forEach((l) => wakeUserIds.add(l)); + if (wakeUserIds.size) { + await this.wakeRepo.save( + [...wakeUserIds].map((userId) => this.wakeRepo.create({ channelId: channel.id, userId })), + ); + } + + // discuss/work: initialize rotation state (order = members sorted by id, + // currentSpeaker = null until someone proactively speaks) + if (xType === 'discuss' || xType === 'work') { + await this.turnService.initForChannel(channel.id, [...memberIds]); + } + return channel; } } diff --git a/src/channels/slash-commands.ts b/src/channels/slash-commands.ts new file mode 100644 index 0000000..f21a385 --- /dev/null +++ b/src/channels/slash-commands.ts @@ -0,0 +1,32 @@ +// Registry of known slash commands. Only a message whose content matches a +// REGISTERED command name is treated as a command (intercepted, never +// delivered). Anything else that merely starts with '/' (e.g. /etc/passwd) +// is delivered as a normal message. + +export const SLASH_COMMANDS = ['no-reply', 'force-proceed'] as const; +export type SlashCommandName = (typeof SLASH_COMMANDS)[number]; + +export type ParsedCommand = { + name: SlashCommandName; + opts: string[]; +}; + +// Matches: / optionally followed by whitespace + opts. The name must be +// a registered command (case-sensitive, exact) for it to count as a command. +export function parseSlashCommand(content: string): ParsedCommand | null { + if (typeof content !== 'string') return null; + const trimmed = content.trim(); + if (!trimmed.startsWith('/')) return null; + + const m = /^\/(\S+)(?:\s+([\s\S]*))?$/.exec(trimmed); + if (!m) return null; + + const name = m[1]; + if (!(SLASH_COMMANDS as readonly string[]).includes(name)) return null; + + const opts = (m[2] ?? '').trim(); + return { + name: name as SlashCommandName, + opts: opts ? opts.split(/\s+/) : [], + }; +} diff --git a/src/channels/turn-shuffle.ts b/src/channels/turn-shuffle.ts new file mode 100644 index 0000000..95f1f06 --- /dev/null +++ b/src/channels/turn-shuffle.ts @@ -0,0 +1,52 @@ +import { RoundEvent } from '../entities/channel-turn-state.entity'; + +export type ShuffleResult = { paused: true } | { paused: false; newOrder: string[] }; + +function shuffleInPlace(arr: T[]): T[] { + for (let i = arr.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [arr[i], arr[j]] = [arr[j], arr[i]]; + } + return arr; +} + +// End-of-round shuffle. +// - tail = the *last contiguous run* of /no-reply in the round's turn events +// (anchor = first of that run; kept in event order) +// - head = every current member not in tail, shuffled randomly +// - constraint: head[0] !== D, where D = the last member who delivered a +// normal message in the round +// - if the constraint is unsatisfiable (head empty, or head === [D]) the +// rotation pauses instead of shuffling (per spec B.3) +export function computeShuffle(roundEvents: RoundEvent[], currentMembers: string[]): ShuffleResult { + const memberSet = new Set(currentMembers); + + // trailing contiguous /no-reply run, in event order, limited to current members + const tail: string[] = []; + for (let i = roundEvents.length - 1; i >= 0; i--) { + if (roundEvents[i].a !== 'noreply') break; + if (memberSet.has(roundEvents[i].u)) tail.unshift(roundEvents[i].u); + } + const tailSet = new Set(tail); + + // D = last normal speaker in the round (the one right before the trailing run) + let d: string | null = null; + for (let i = roundEvents.length - 1; i >= 0; i--) { + if (roundEvents[i].a === 'normal' && memberSet.has(roundEvents[i].u)) { + d = roundEvents[i].u; + break; + } + } + + const head = currentMembers.filter((u) => !tailSet.has(u)); + if (head.length === 0) return { paused: true }; + if (head.length === 1 && head[0] === d) return { paused: true }; + + shuffleInPlace(head); + if (head[0] === d) { + // length >= 2 here (the [d] singleton case returned paused above) + [head[0], head[1]] = [head[1], head[0]]; + } + + return { paused: false, newOrder: [...head, ...tail] }; +} diff --git a/src/channels/turn.module.ts b/src/channels/turn.module.ts new file mode 100644 index 0000000..8d96dc7 --- /dev/null +++ b/src/channels/turn.module.ts @@ -0,0 +1,9 @@ +import { Global, Module } from '@nestjs/common'; +import { TurnService } from './turn.service'; + +@Global() +@Module({ + providers: [TurnService], + exports: [TurnService], +}) +export class TurnModule {} diff --git a/src/channels/turn.service.ts b/src/channels/turn.service.ts new file mode 100644 index 0000000..de00507 --- /dev/null +++ b/src/channels/turn.service.ts @@ -0,0 +1,251 @@ +import { Injectable } from '@nestjs/common'; +import { DataSource, EntityManager } from 'typeorm'; +import { ChannelTurnState } from '../entities/channel-turn-state.entity'; +import { ChannelMember } from '../entities/channel-member.entity'; +import { computeShuffle } from './turn-shuffle'; + +// wakeupUserId: the single user who should receive wakeup=true on the +// resulting push (null = nobody / paused). For commands, `ack` present means +// the controller must emit a guild-authored /ack message. +export type TurnDecision = { wakeupUserId: string | null }; +export type CommandDecision = { ack: TurnDecision | null }; + +@Injectable() +export class TurnService { + constructor(private readonly dataSource: DataSource) {} + + private async loadLocked( + manager: EntityManager, + channelId: string, + ): Promise { + return manager + .createQueryBuilder(ChannelTurnState, 's') + .setLock('pessimistic_write') + .where('s.channelId = :channelId', { channelId }) + .getOne(); + } + + private async ensureState( + manager: EntityManager, + channelId: string, + ): Promise { + let state = await this.loadLocked(manager, channelId); + if (state) return state; + // lazy init from current channel members (sorted by userId) + const members = await manager.find(ChannelMember, { where: { channelId } }); + const order = members.map((m) => m.userId).sort(); + state = manager.create(ChannelTurnState, { + channelId, + orderUserIds: order, + currentSpeaker: null, + roundEvents: [], + norepStreak: [], + lastNormalSpeaker: null, + }); + return manager.save(ChannelTurnState, state); + } + + // Called when a discuss/work channel is created. + async initForChannel(channelId: string, memberUserIds: string[]): Promise { + await this.dataSource.transaction(async (manager) => { + const existing = await manager.findOne(ChannelTurnState, { where: { channelId } }); + const order = [...new Set(memberUserIds)].sort(); + if (existing) { + existing.orderUserIds = order; + existing.currentSpeaker = null; + existing.roundEvents = []; + existing.norepStreak = []; + existing.lastNormalSpeaker = null; + await manager.save(ChannelTurnState, existing); + return; + } + await manager.save( + ChannelTurnState, + manager.create(ChannelTurnState, { + channelId, + orderUserIds: order, + currentSpeaker: null, + roundEvents: [], + norepStreak: [], + lastNormalSpeaker: null, + }), + ); + }); + } + + async onMemberAdded(channelId: string, userId: string): Promise { + await this.dataSource.transaction(async (manager) => { + const state = await this.ensureState(manager, channelId); + if (!state.orderUserIds.includes(userId)) { + state.orderUserIds = [...state.orderUserIds, userId]; // append to tail + await manager.save(ChannelTurnState, state); + } + }); + } + + async onMemberRemoved(channelId: string, userId: string): Promise { + await this.dataSource.transaction(async (manager) => { + const state = await this.loadLocked(manager, channelId); + if (!state) return; + const order = state.orderUserIds; + const idx = order.indexOf(userId); + if (idx === -1) return; + // if the leaver is the current speaker, the next one takes over + let nextCurrent = state.currentSpeaker; + if (state.currentSpeaker === userId) { + nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null; + if (nextCurrent === userId) nextCurrent = null; + } + state.orderUserIds = order.filter((u) => u !== userId); + state.norepStreak = state.norepStreak.filter((u) => u !== userId); + state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null; + await manager.save(ChannelTurnState, state); + }); + } + + // A normal (non-command) message delivered to a discuss/work channel. + async onNormalMessage(channelId: string, authorUserId: string): Promise { + return this.dataSource.transaction(async (manager) => { + const state = await this.ensureState(manager, channelId); + // any normal message clears the cross-round /no-reply streak + state.norepStreak = []; + + const order = state.orderUserIds; + const n = order.length; + if (n <= 1) { + state.currentSpeaker = null; + await manager.save(ChannelTurnState, state); + return { wakeupUserId: null }; + } + + if (state.currentSpeaker === null) { + // activation: mover goes to front, rotation starts at order[1] + const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)]; + state.orderUserIds = newOrder; + state.currentSpeaker = newOrder[1]; + state.roundEvents = [{ u: authorUserId, a: 'normal' }]; + state.lastNormalSpeaker = authorUserId; + await manager.save(ChannelTurnState, state); + return { wakeupUserId: newOrder[1] }; + } + + if (authorUserId === state.currentSpeaker) { + const idx = order.indexOf(authorUserId); + const isLast = idx === n - 1; + state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }]; + state.lastNormalSpeaker = authorUserId; + + if (isLast) { + const res = computeShuffle(state.roundEvents, order); + if (res.paused) { + state.currentSpeaker = null; + state.roundEvents = []; + await manager.save(ChannelTurnState, state); + return { wakeupUserId: null }; + } + state.orderUserIds = res.newOrder; + state.roundEvents = []; + state.lastNormalSpeaker = null; + state.currentSpeaker = res.newOrder[0]; + await manager.save(ChannelTurnState, state); + return { wakeupUserId: res.newOrder[0] }; + } + + const succ = order[idx + 1]; + state.currentSpeaker = succ; + await manager.save(ChannelTurnState, state); + return { wakeupUserId: succ }; + } + + // queue-jump normal message: no advance, nobody woken + await manager.save(ChannelTurnState, state); + return { wakeupUserId: null }; + }); + } + + // /no-reply command in a discuss/work channel. + async onNoReply(channelId: string, senderUserId: string): Promise { + return this.dataSource.transaction(async (manager) => { + const state = await this.ensureState(manager, channelId); + const order = state.orderUserIds; + const n = order.length; + + // only the current speaker's /no-reply has any effect + if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) { + return { ack: null }; + } + + const idx = order.indexOf(senderUserId); + const isLast = idx === n - 1; + state.roundEvents = [...state.roundEvents, { u: senderUserId, a: 'noreply' }]; + if (!state.norepStreak.includes(senderUserId)) { + state.norepStreak = [...state.norepStreak, senderUserId]; + } + + // pause when every current member has consecutively /no-reply'd + const allCovered = order.every((u) => state.norepStreak.includes(u)); + if (allCovered) { + state.currentSpeaker = null; + state.roundEvents = []; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: null } }; + } + + if (isLast) { + const res = computeShuffle(state.roundEvents, order); + if (res.paused) { + state.currentSpeaker = null; + state.roundEvents = []; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: null } }; + } + state.orderUserIds = res.newOrder; + state.roundEvents = []; + state.lastNormalSpeaker = null; + state.currentSpeaker = res.newOrder[0]; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: res.newOrder[0] } }; + } + + const succ = order[idx + 1]; + state.currentSpeaker = succ; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: succ } }; + }); + } + + // /force-proceed command in a discuss/work channel: skip the stuck current + // speaker (not recorded, streak untouched), advance to the next one. + async onForceProceed(channelId: string): Promise { + return this.dataSource.transaction(async (manager) => { + const state = await this.ensureState(manager, channelId); + const order = state.orderUserIds; + const n = order.length; + if (n <= 1 || state.currentSpeaker === null) return { ack: null }; + + const idx = order.indexOf(state.currentSpeaker); + const isLast = idx === n - 1; + + if (isLast) { + const res = computeShuffle(state.roundEvents, order); + if (res.paused) { + state.currentSpeaker = null; + state.roundEvents = []; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: null } }; + } + state.orderUserIds = res.newOrder; + state.roundEvents = []; + state.lastNormalSpeaker = null; + state.currentSpeaker = res.newOrder[0]; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: res.newOrder[0] } }; + } + + const succ = order[idx + 1]; + state.currentSpeaker = succ; + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: succ } }; + }); + } +} diff --git a/src/database.config.ts b/src/database.config.ts index 6d8b917..62e2ee2 100644 --- a/src/database.config.ts +++ b/src/database.config.ts @@ -2,6 +2,8 @@ import { TypeOrmModuleOptions } from '@nestjs/typeorm'; import { Guild } from './entities/guild.entity'; import { Channel } from './entities/channel.entity'; import { ChannelMember } from './entities/channel-member.entity'; +import { WakeMapping } from './entities/wake-mapping.entity'; +import { ChannelTurnState } from './entities/channel-turn-state.entity'; import { Message } from './entities/message.entity'; import { DmConversation } from './entities/dm-conversation.entity'; import { DmParticipant } from './entities/dm-participant.entity'; @@ -21,6 +23,8 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({ Guild, Channel, ChannelMember, + WakeMapping, + ChannelTurnState, Message, DmConversation, DmParticipant, diff --git a/src/entities/channel-turn-state.entity.ts b/src/entities/channel-turn-state.entity.ts new file mode 100644 index 0000000..817443b --- /dev/null +++ b/src/entities/channel-turn-state.entity.ts @@ -0,0 +1,41 @@ +import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'; + +export type RoundEvent = { u: string; a: 'normal' | 'noreply' }; + +// Per-channel rotation state for discuss/work x_type channels. +// All mutations must be serialized per channel (pessimistic row lock). +@Entity('channel_turn_state') +export class ChannelTurnState { + @PrimaryGeneratedColumn('uuid') + id!: string; + + @Index({ unique: true }) + @Column({ name: 'channel_id', type: 'char', length: 36 }) + channelId!: string; + + // speaking order; userIds + @Column({ name: 'order_user_ids', type: 'json' }) + orderUserIds!: string[]; + + // null = paused (created, or all-members-consecutively-/no-reply) + @Column({ name: 'current_speaker', type: 'varchar', length: 64, nullable: true }) + currentSpeaker!: string | null; + + // ordered turn actions of the *current* round (skipped/queue-jumps excluded); + // used to compute the shuffle tail/anchor + @Column({ name: 'round_events', type: 'json' }) + roundEvents!: RoundEvent[]; + + // distinct userIds that consecutively replied /no-reply via rotation since + // the last normal message; persists ACROSS rounds; reset by any normal msg. + // When it covers every current member -> pause. + @Column({ name: 'norep_streak', type: 'json' }) + norepStreak!: string[]; + + // last member who delivered a normal message in the current round (D) + @Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true }) + lastNormalSpeaker!: string | null; + + @UpdateDateColumn() + updatedAt!: Date; +} diff --git a/src/entities/wake-mapping.entity.ts b/src/entities/wake-mapping.entity.ts new file mode 100644 index 0000000..25f6098 --- /dev/null +++ b/src/entities/wake-mapping.entity.ts @@ -0,0 +1,19 @@ +import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm'; + +@Entity('wake_mapping') +@Index(['channelId', 'userId'], { unique: true }) +@Index(['userId']) +export class WakeMapping { + @PrimaryGeneratedColumn('uuid') + id!: string; + + @Index() + @Column({ name: 'channel_id', type: 'char', length: 36 }) + channelId!: string; + + @Column({ name: 'user_id', type: 'varchar', length: 64 }) + userId!: string; + + @CreateDateColumn() + createdAt!: Date; +} diff --git a/src/messaging/messaging.controller.ts b/src/messaging/messaging.controller.ts index 9e88c89..c1b298c 100644 --- a/src/messaging/messaging.controller.ts +++ b/src/messaging/messaging.controller.ts @@ -16,6 +16,9 @@ import { CreateMessageDto } from './dto.create-message.dto'; import { Channel } from '../entities/channel.entity'; import { Message } from '../entities/message.entity'; import { IdempotencyRecord } from '../entities/idempotency-record.entity'; +import { WakeMapping } from '../entities/wake-mapping.entity'; +import { parseSlashCommand } from '../channels/slash-commands'; +import { TurnService } from '../channels/turn.service'; import { EventsService } from '../events/events.service'; import { clampLimit, computeNextExpectedSeq } from './pagination.util'; import { RealtimeGateway } from '../realtime/realtime.gateway'; @@ -34,6 +37,9 @@ export class MessagingController { private readonly messageRepo: Repository, @InjectRepository(IdempotencyRecord) private readonly idemRepo: Repository, + @InjectRepository(WakeMapping) + private readonly wakeRepo: Repository, + private readonly turn: TurnService, private readonly events: EventsService, private readonly realtime: RealtimeGateway, ) {} @@ -77,6 +83,52 @@ export class MessagingController { }; } + // Persists one message (allocates a seq under a channel row lock) and + // returns its view. Used for normal messages and for guild /ack messages. + private async persistMessage( + channelId: string, + input: { authorUserId: string; content: string; clientMessageId?: string | null; replyToMessageId?: string | null; mentions?: string[]; attachments?: Array<{ url: string; name?: string; mimeType?: string }> }, + ): Promise { + return this.dataSource.transaction(async (manager) => { + const channel = await manager.findOne(Channel, { + where: { id: channelId }, + lock: { mode: 'pessimistic_write' }, + }); + if (!channel) { + throw new NotFoundException('channel not found'); + } + const nextSeq = channel.lastSeq + 1; + channel.lastSeq = nextSeq; + await manager.save(Channel, channel); + + const messageId = input.clientMessageId ?? `m-${channelId}-${nextSeq}`; + const row = manager.create(Message, { + messageId, + channelId, + conversationId: null, + authorUserId: input.authorUserId, + seq: nextSeq, + content: input.content, + replyToMessageId: input.replyToMessageId ?? null, + mentions: input.mentions ?? [], + attachments: input.attachments ?? [], + editedAt: null, + deletedAt: null, + isDeleted: false, + }); + return manager.save(Message, row); + }); + } + + // Emits a guild-authored /ack message to the channel; wakeup=true only for + // the new current speaker (null = nobody). One message-id; persisted. + private async emitAck(channelId: string, wakeupUserId: string | null): Promise { + const ack = await this.persistMessage(channelId, { authorUserId: 'guild', content: '/ack' }); + const body = this.toView(ack) as Record; + await this.events.emit({ eventType: 'message.created', channelId, actorId: 'guild', data: body }); + await this.realtime.emitMessageTargeted(channelId, body, wakeupUserId); + } + @Post() async create( @Param('id') channelId: string, @@ -87,35 +139,34 @@ export class MessagingController { const existed = await this.getIdempotentResponse(scope, idempotencyKey); if (existed) return existed; - const message = await this.dataSource.transaction(async (manager) => { - const channel = await manager.findOne(Channel, { - where: { id: channelId }, - lock: { mode: 'pessimistic_write' }, - }); - if (!channel) { - throw new NotFoundException('channel not found'); + const channel = await this.channelRepo.findOne({ where: { id: channelId } }); + if (!channel) throw new NotFoundException('channel not found'); + const xType = channel.xType ?? 'general'; + const isRotating = xType === 'discuss' || xType === 'work'; + const authorUserId = String(body.authorUserId ?? 'anonymous'); + + // ---- command interception: registered slash commands are never delivered + const cmd = parseSlashCommand(body.content ?? ''); + if (cmd) { + if (isRotating && cmd.name === 'no-reply') { + const { ack } = await this.turn.onNoReply(channelId, authorUserId); + if (ack) await this.emitAck(channelId, ack.wakeupUserId); + } else if (isRotating && cmd.name === 'force-proceed') { + const { ack } = await this.turn.onForceProceed(channelId); + if (ack) await this.emitAck(channelId, ack.wakeupUserId); } + // non-rotating channels (or no effect): swallowed, nothing delivered + return { status: 'command', command: cmd.name }; + } - const nextSeq = channel.lastSeq + 1; - channel.lastSeq = nextSeq; - await manager.save(Channel, channel); - - const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`; - const row = manager.create(Message, { - messageId, - channelId, - conversationId: null, - authorUserId: body.authorUserId ?? 'anonymous', - seq: nextSeq, - content: body.content, - replyToMessageId: body.replyToMessageId ?? null, - mentions: body.mentions ?? [], - attachments: body.attachments ?? [], - editedAt: null, - deletedAt: null, - isDeleted: false, - }); - return manager.save(Message, row); + // ---- normal message + const message = await this.persistMessage(channelId, { + authorUserId, + content: body.content, + clientMessageId: body.clientMessageId, + replyToMessageId: body.replyToMessageId, + mentions: body.mentions, + attachments: body.attachments, }); const responseBody = this.toView(message) as Record; @@ -124,10 +175,24 @@ export class MessagingController { await this.events.emit({ eventType: 'message.created', channelId, - actorId: body.authorUserId ?? 'anonymous', + actorId: authorUserId, data: responseBody, }); - this.realtime.emitChannelEvent(channelId, 'message.created', responseBody); + + if (isRotating) { + // discuss/work: rotation decides the single wakeup target + const decision = await this.turn.onNormalMessage(channelId, authorUserId); + await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); + } else { + // general/report/triage/custom: wakeup from x_type + wake_mapping + const wakeRows = await this.wakeRepo.find({ where: { channelId } }); + const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); + await this.realtime.emitMessageCreated(channelId, responseBody, { + xType, + authorUserId, + wakeUserIds, + }); + } return responseBody; } diff --git a/src/messaging/messaging.module.ts b/src/messaging/messaging.module.ts index 3cba2a6..0ac6d6a 100644 --- a/src/messaging/messaging.module.ts +++ b/src/messaging/messaging.module.ts @@ -4,9 +4,10 @@ import { MessagingController } from './messaging.controller'; import { Channel } from '../entities/channel.entity'; import { Message } from '../entities/message.entity'; import { IdempotencyRecord } from '../entities/idempotency-record.entity'; +import { WakeMapping } from '../entities/wake-mapping.entity'; @Module({ - imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord])], + imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord, WakeMapping])], controllers: [MessagingController], }) export class MessagingModule {} diff --git a/src/realtime/realtime.gateway.ts b/src/realtime/realtime.gateway.ts index 825cd6c..eb81632 100644 --- a/src/realtime/realtime.gateway.ts +++ b/src/realtime/realtime.gateway.ts @@ -11,6 +11,34 @@ import { Logger } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; import { introspectGuildToken } from '../common/center-auth'; +type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom'; + +// Wakeup for non-rotating channels only (general/report/triage/custom). +// discuss/work go through TurnService + emitMessageTargeted, never here. +// Precedence: +// 1. the author never gets woken by their own message +// 2. triage/custom: only wake users in the channel's wake_mapping +// 3. general: wake everyone +// 4. report (and anything else): wake nobody +export function computeWakeup(args: { + xType: XType; + recipientUserId: string; + authorUserId: string; + wakeUserIds: Set; +}): boolean { + const { xType, recipientUserId, authorUserId, wakeUserIds } = args; + if (recipientUserId === authorUserId) return false; + switch (xType) { + case 'general': + return true; + case 'triage': + case 'custom': + return wakeUserIds.has(recipientUserId); + default: + return false; + } +} + @WebSocketGateway({ namespace: '/realtime', cors: { @@ -133,4 +161,38 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect emitChannelEvent(channelId: string, event: string, data: Record): void { this.server.to(`channel:${channelId}`).emit(event, data); } + + // Emits message.created per-recipient so each carries its own `wakeup` flag. + async emitMessageCreated( + channelId: string, + data: Record, + ctx: { xType: XType; authorUserId: string; wakeUserIds: Set }, + ): Promise { + const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); + for (const s of sockets) { + const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; + const wakeup = computeWakeup({ + xType: ctx.xType, + recipientUserId, + authorUserId: ctx.authorUserId, + wakeUserIds: ctx.wakeUserIds, + }); + s.emit('message.created', { ...data, wakeup }); + } + } + + // discuss/work + /ack: exactly one recipient (the new current speaker) gets + // wakeup=true; everyone else false. One message-id; metadata at push only. + async emitMessageTargeted( + channelId: string, + data: Record, + wakeupUserId: string | null, + ): Promise { + const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); + for (const s of sockets) { + const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; + const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId; + s.emit('message.created', { ...data, wakeup }); + } + } }