feat(guild): wake_mapping, per-recipient wakeup, discuss/work turn engine, channel join/leave

- wake_mapping table; triage onDuty (auto-added member) / custom listeners
- per-recipient wakeup metadata on message.created (one message-id; added
  only at push). Rules: author=false; triage/custom=wake_mapping only;
  general=all; report=none
- discuss/work rotation: channel_turn_state (order/currentSpeaker/round
  events/cross-round no-reply streak); null activation, queue-jump,
  /no-reply pass, all-/no-reply pause, end-of-round shuffle (trailing
  no-reply run to tail, head shuffled, first != last normal speaker)
- slash-command registry (/no-reply, /force-proceed); registered commands
  intercepted and never delivered; guild-authored /ack persisted
- POST /channels/:id/join|leave; leave cleans channel_members, wake_mapping
  and turn-state order

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
h z
2026-05-15 14:51:09 +01:00
parent 605d3ac092
commit 6b993522cf
14 changed files with 657 additions and 34 deletions

View File

@@ -8,6 +8,7 @@ import { MetricsService } from './common/metrics.service';
import { ApiKeyGuard } from './common/api-key.guard';
import { GuildsModule } from './guilds/guilds.module';
import { ChannelsModule } from './channels/channels.module';
import { TurnModule } from './channels/turn.module';
import { MessagingModule } from './messaging/messaging.module';
import { EventsModule } from './events/events.module';
import { RealtimeModule } from './realtime/realtime.module';
@@ -19,6 +20,7 @@ import { MembersModule } from './members/members.module';
EventsModule,
RealtimeModule,
GuildsModule,
TurnModule,
ChannelsModule,
MembersModule,
MessagingModule,

View File

@@ -1,4 +1,4 @@
import { Body, Controller, Get, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
import { Body, Controller, Get, Param, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
import { ChannelsService } from './channels.service';
// ApiKeyGuard attaches the introspected Center user id onto the request.
@@ -27,8 +27,24 @@ export class ChannelsController {
xType: body.xType as string | undefined,
isPublic: Boolean(body.isPublic),
memberUserIds: Array.isArray(body.memberUserIds) ? (body.memberUserIds as string[]) : [],
onDuty: body.onDuty as string | undefined,
listeners: Array.isArray(body.listeners) ? (body.listeners as string[]) : [],
},
userId,
);
}
@Post(':id/join')
join(@Req() req: AuthedRequest, @Param('id') channelId: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.joinChannel(channelId, userId);
}
@Post(':id/leave')
leave(@Req() req: AuthedRequest, @Param('id') channelId: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.leaveChannel(channelId, userId);
}
}

View File

@@ -3,10 +3,11 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { ChannelsController } from './channels.controller';
import { Channel } from '../entities/channel.entity';
import { ChannelMember } from '../entities/channel-member.entity';
import { WakeMapping } from '../entities/wake-mapping.entity';
import { ChannelsService } from './channels.service';
@Module({
imports: [TypeOrmModule.forFeature([Channel, ChannelMember])],
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, WakeMapping])],
controllers: [ChannelsController],
providers: [ChannelsService],
exports: [ChannelsService],

View File

@@ -1,8 +1,10 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { BadRequestException, ForbiddenException, Injectable, NotFoundException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { In, Repository } from 'typeorm';
import { Channel } from '../entities/channel.entity';
import { ChannelMember } from '../entities/channel-member.entity';
import { WakeMapping } from '../entities/wake-mapping.entity';
import { TurnService } from './turn.service';
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom'] as const;
type XType = (typeof X_TYPES)[number];
@@ -14,6 +16,10 @@ type CreateChannelInput = {
xType?: string;
isPublic?: boolean;
memberUserIds?: string[];
// required when xType === 'triage': the on-duty user for this channel
onDuty?: string;
// optional when xType === 'custom': users to wake on this channel
listeners?: string[];
};
@Injectable()
@@ -23,6 +29,9 @@ export class ChannelsService {
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>,
private readonly turnService: TurnService,
) {}
// Channels visible to a user within a guild:
@@ -42,7 +51,39 @@ export class ChannelsService {
});
const memberChannelIds = new Set(memberRows.map((m) => m.channelId));
return all.filter((c) => c.isPublic || memberChannelIds.has(c.id));
return all
.filter((c) => c.isPublic || memberChannelIds.has(c.id))
.map((c) => ({ ...c, isMember: memberChannelIds.has(c.id) }));
}
async joinChannel(channelId: string, userId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
const existing = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!existing) {
if (!channel.isPublic) {
throw new ForbiddenException('cannot join a non-public channel');
}
await this.memberRepo.save(this.memberRepo.create({ channelId, userId }));
if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberAdded(channelId, userId);
}
}
return { status: 'ok', channelId, userId, member: true };
}
async leaveChannel(channelId: string, userId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
// remove every channel-scoped row that references this user
await this.memberRepo.delete({ channelId, userId });
await this.wakeRepo.delete({ channelId, userId });
if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberRemoved(channelId, userId);
}
return { status: 'ok', channelId, userId, member: false };
}
async create(input: CreateChannelInput, creatorUserId: string) {
@@ -57,6 +98,14 @@ export class ChannelsService {
throw new BadRequestException(`xType must be one of: ${X_TYPES.join(', ')}`);
}
const onDuty = String(input.onDuty ?? '').trim();
if (xType === 'triage' && !onDuty) {
throw new BadRequestException('onDuty is required for triage channels');
}
const listeners = (input.listeners ?? [])
.map((x) => String(x ?? '').trim())
.filter(Boolean);
const channel = await this.channelRepo.save(
this.channelRepo.create({
guildId,
@@ -75,10 +124,29 @@ export class ChannelsService {
const trimmed = String(id ?? '').trim();
if (trimmed) memberIds.add(trimmed);
}
// triage: the on-duty user is auto-added to the channel if not already in it
if (xType === 'triage') memberIds.add(onDuty);
await this.memberRepo.save(
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
);
// wake_mapping: triage -> the on-duty user; custom -> each listener
const wakeUserIds = new Set<string>();
if (xType === 'triage') wakeUserIds.add(onDuty);
if (xType === 'custom') listeners.forEach((l) => wakeUserIds.add(l));
if (wakeUserIds.size) {
await this.wakeRepo.save(
[...wakeUserIds].map((userId) => this.wakeRepo.create({ channelId: channel.id, userId })),
);
}
// discuss/work: initialize rotation state (order = members sorted by id,
// currentSpeaker = null until someone proactively speaks)
if (xType === 'discuss' || xType === 'work') {
await this.turnService.initForChannel(channel.id, [...memberIds]);
}
return channel;
}
}

View File

@@ -0,0 +1,32 @@
// Registry of known slash commands. Only a message whose content matches a
// REGISTERED command name is treated as a command (intercepted, never
// delivered). Anything else that merely starts with '/' (e.g. /etc/passwd)
// is delivered as a normal message.
export const SLASH_COMMANDS = ['no-reply', 'force-proceed'] as const;
export type SlashCommandName = (typeof SLASH_COMMANDS)[number];
export type ParsedCommand = {
name: SlashCommandName;
opts: string[];
};
// Matches: /<name> optionally followed by whitespace + opts. The name must be
// a registered command (case-sensitive, exact) for it to count as a command.
export function parseSlashCommand(content: string): ParsedCommand | null {
if (typeof content !== 'string') return null;
const trimmed = content.trim();
if (!trimmed.startsWith('/')) return null;
const m = /^\/(\S+)(?:\s+([\s\S]*))?$/.exec(trimmed);
if (!m) return null;
const name = m[1];
if (!(SLASH_COMMANDS as readonly string[]).includes(name)) return null;
const opts = (m[2] ?? '').trim();
return {
name: name as SlashCommandName,
opts: opts ? opts.split(/\s+/) : [],
};
}

View File

@@ -0,0 +1,52 @@
import { RoundEvent } from '../entities/channel-turn-state.entity';
export type ShuffleResult = { paused: true } | { paused: false; newOrder: string[] };
function shuffleInPlace<T>(arr: T[]): T[] {
for (let i = arr.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[arr[i], arr[j]] = [arr[j], arr[i]];
}
return arr;
}
// End-of-round shuffle.
// - tail = the *last contiguous run* of /no-reply in the round's turn events
// (anchor = first of that run; kept in event order)
// - head = every current member not in tail, shuffled randomly
// - constraint: head[0] !== D, where D = the last member who delivered a
// normal message in the round
// - if the constraint is unsatisfiable (head empty, or head === [D]) the
// rotation pauses instead of shuffling (per spec B.3)
export function computeShuffle(roundEvents: RoundEvent[], currentMembers: string[]): ShuffleResult {
const memberSet = new Set(currentMembers);
// trailing contiguous /no-reply run, in event order, limited to current members
const tail: string[] = [];
for (let i = roundEvents.length - 1; i >= 0; i--) {
if (roundEvents[i].a !== 'noreply') break;
if (memberSet.has(roundEvents[i].u)) tail.unshift(roundEvents[i].u);
}
const tailSet = new Set(tail);
// D = last normal speaker in the round (the one right before the trailing run)
let d: string | null = null;
for (let i = roundEvents.length - 1; i >= 0; i--) {
if (roundEvents[i].a === 'normal' && memberSet.has(roundEvents[i].u)) {
d = roundEvents[i].u;
break;
}
}
const head = currentMembers.filter((u) => !tailSet.has(u));
if (head.length === 0) return { paused: true };
if (head.length === 1 && head[0] === d) return { paused: true };
shuffleInPlace(head);
if (head[0] === d) {
// length >= 2 here (the [d] singleton case returned paused above)
[head[0], head[1]] = [head[1], head[0]];
}
return { paused: false, newOrder: [...head, ...tail] };
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { TurnService } from './turn.service';
@Global()
@Module({
providers: [TurnService],
exports: [TurnService],
})
export class TurnModule {}

View File

@@ -0,0 +1,251 @@
import { Injectable } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { ChannelTurnState } from '../entities/channel-turn-state.entity';
import { ChannelMember } from '../entities/channel-member.entity';
import { computeShuffle } from './turn-shuffle';
// wakeupUserId: the single user who should receive wakeup=true on the
// resulting push (null = nobody / paused). For commands, `ack` present means
// the controller must emit a guild-authored /ack message.
export type TurnDecision = { wakeupUserId: string | null };
export type CommandDecision = { ack: TurnDecision | null };
@Injectable()
export class TurnService {
constructor(private readonly dataSource: DataSource) {}
private async loadLocked(
manager: EntityManager,
channelId: string,
): Promise<ChannelTurnState | null> {
return manager
.createQueryBuilder(ChannelTurnState, 's')
.setLock('pessimistic_write')
.where('s.channelId = :channelId', { channelId })
.getOne();
}
private async ensureState(
manager: EntityManager,
channelId: string,
): Promise<ChannelTurnState> {
let state = await this.loadLocked(manager, channelId);
if (state) return state;
// lazy init from current channel members (sorted by userId)
const members = await manager.find(ChannelMember, { where: { channelId } });
const order = members.map((m) => m.userId).sort();
state = manager.create(ChannelTurnState, {
channelId,
orderUserIds: order,
currentSpeaker: null,
roundEvents: [],
norepStreak: [],
lastNormalSpeaker: null,
});
return manager.save(ChannelTurnState, state);
}
// Called when a discuss/work channel is created.
async initForChannel(channelId: string, memberUserIds: string[]): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const existing = await manager.findOne(ChannelTurnState, { where: { channelId } });
const order = [...new Set(memberUserIds)].sort();
if (existing) {
existing.orderUserIds = order;
existing.currentSpeaker = null;
existing.roundEvents = [];
existing.norepStreak = [];
existing.lastNormalSpeaker = null;
await manager.save(ChannelTurnState, existing);
return;
}
await manager.save(
ChannelTurnState,
manager.create(ChannelTurnState, {
channelId,
orderUserIds: order,
currentSpeaker: null,
roundEvents: [],
norepStreak: [],
lastNormalSpeaker: null,
}),
);
});
}
async onMemberAdded(channelId: string, userId: string): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
if (!state.orderUserIds.includes(userId)) {
state.orderUserIds = [...state.orderUserIds, userId]; // append to tail
await manager.save(ChannelTurnState, state);
}
});
}
async onMemberRemoved(channelId: string, userId: string): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const state = await this.loadLocked(manager, channelId);
if (!state) return;
const order = state.orderUserIds;
const idx = order.indexOf(userId);
if (idx === -1) return;
// if the leaver is the current speaker, the next one takes over
let nextCurrent = state.currentSpeaker;
if (state.currentSpeaker === userId) {
nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null;
if (nextCurrent === userId) nextCurrent = null;
}
state.orderUserIds = order.filter((u) => u !== userId);
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null;
await manager.save(ChannelTurnState, state);
});
}
// A normal (non-command) message delivered to a discuss/work channel.
async onNormalMessage(channelId: string, authorUserId: string): Promise<TurnDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
// any normal message clears the cross-round /no-reply streak
state.norepStreak = [];
const order = state.orderUserIds;
const n = order.length;
if (n <= 1) {
state.currentSpeaker = null;
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
}
if (state.currentSpeaker === null) {
// activation: mover goes to front, rotation starts at order[1]
const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)];
state.orderUserIds = newOrder;
state.currentSpeaker = newOrder[1];
state.roundEvents = [{ u: authorUserId, a: 'normal' }];
state.lastNormalSpeaker = authorUserId;
await manager.save(ChannelTurnState, state);
return { wakeupUserId: newOrder[1] };
}
if (authorUserId === state.currentSpeaker) {
const idx = order.indexOf(authorUserId);
const isLast = idx === n - 1;
state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }];
state.lastNormalSpeaker = authorUserId;
if (isLast) {
const res = computeShuffle(state.roundEvents, order);
if (res.paused) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
}
state.orderUserIds = res.newOrder;
state.roundEvents = [];
state.lastNormalSpeaker = null;
state.currentSpeaker = res.newOrder[0];
await manager.save(ChannelTurnState, state);
return { wakeupUserId: res.newOrder[0] };
}
const succ = order[idx + 1];
state.currentSpeaker = succ;
await manager.save(ChannelTurnState, state);
return { wakeupUserId: succ };
}
// queue-jump normal message: no advance, nobody woken
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
});
}
// /no-reply command in a discuss/work channel.
async onNoReply(channelId: string, senderUserId: string): Promise<CommandDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const order = state.orderUserIds;
const n = order.length;
// only the current speaker's /no-reply has any effect
if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) {
return { ack: null };
}
const idx = order.indexOf(senderUserId);
const isLast = idx === n - 1;
state.roundEvents = [...state.roundEvents, { u: senderUserId, a: 'noreply' }];
if (!state.norepStreak.includes(senderUserId)) {
state.norepStreak = [...state.norepStreak, senderUserId];
}
// pause when every current member has consecutively /no-reply'd
const allCovered = order.every((u) => state.norepStreak.includes(u));
if (allCovered) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: null } };
}
if (isLast) {
const res = computeShuffle(state.roundEvents, order);
if (res.paused) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: null } };
}
state.orderUserIds = res.newOrder;
state.roundEvents = [];
state.lastNormalSpeaker = null;
state.currentSpeaker = res.newOrder[0];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: res.newOrder[0] } };
}
const succ = order[idx + 1];
state.currentSpeaker = succ;
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: succ } };
});
}
// /force-proceed command in a discuss/work channel: skip the stuck current
// speaker (not recorded, streak untouched), advance to the next one.
async onForceProceed(channelId: string): Promise<CommandDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const order = state.orderUserIds;
const n = order.length;
if (n <= 1 || state.currentSpeaker === null) return { ack: null };
const idx = order.indexOf(state.currentSpeaker);
const isLast = idx === n - 1;
if (isLast) {
const res = computeShuffle(state.roundEvents, order);
if (res.paused) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: null } };
}
state.orderUserIds = res.newOrder;
state.roundEvents = [];
state.lastNormalSpeaker = null;
state.currentSpeaker = res.newOrder[0];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: res.newOrder[0] } };
}
const succ = order[idx + 1];
state.currentSpeaker = succ;
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: succ } };
});
}
}

View File

@@ -2,6 +2,8 @@ import { TypeOrmModuleOptions } from '@nestjs/typeorm';
import { Guild } from './entities/guild.entity';
import { Channel } from './entities/channel.entity';
import { ChannelMember } from './entities/channel-member.entity';
import { WakeMapping } from './entities/wake-mapping.entity';
import { ChannelTurnState } from './entities/channel-turn-state.entity';
import { Message } from './entities/message.entity';
import { DmConversation } from './entities/dm-conversation.entity';
import { DmParticipant } from './entities/dm-participant.entity';
@@ -21,6 +23,8 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
Guild,
Channel,
ChannelMember,
WakeMapping,
ChannelTurnState,
Message,
DmConversation,
DmParticipant,

View File

@@ -0,0 +1,41 @@
import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
export type RoundEvent = { u: string; a: 'normal' | 'noreply' };
// Per-channel rotation state for discuss/work x_type channels.
// All mutations must be serialized per channel (pessimistic row lock).
@Entity('channel_turn_state')
export class ChannelTurnState {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index({ unique: true })
@Column({ name: 'channel_id', type: 'char', length: 36 })
channelId!: string;
// speaking order; userIds
@Column({ name: 'order_user_ids', type: 'json' })
orderUserIds!: string[];
// null = paused (created, or all-members-consecutively-/no-reply)
@Column({ name: 'current_speaker', type: 'varchar', length: 64, nullable: true })
currentSpeaker!: string | null;
// ordered turn actions of the *current* round (skipped/queue-jumps excluded);
// used to compute the shuffle tail/anchor
@Column({ name: 'round_events', type: 'json' })
roundEvents!: RoundEvent[];
// distinct userIds that consecutively replied /no-reply via rotation since
// the last normal message; persists ACROSS rounds; reset by any normal msg.
// When it covers every current member -> pause.
@Column({ name: 'norep_streak', type: 'json' })
norepStreak!: string[];
// last member who delivered a normal message in the current round (D)
@Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true })
lastNormalSpeaker!: string | null;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -0,0 +1,19 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
@Entity('wake_mapping')
@Index(['channelId', 'userId'], { unique: true })
@Index(['userId'])
export class WakeMapping {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index()
@Column({ name: 'channel_id', type: 'char', length: 36 })
channelId!: string;
@Column({ name: 'user_id', type: 'varchar', length: 64 })
userId!: string;
@CreateDateColumn()
createdAt!: Date;
}

View File

@@ -16,6 +16,9 @@ import { CreateMessageDto } from './dto.create-message.dto';
import { Channel } from '../entities/channel.entity';
import { Message } from '../entities/message.entity';
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
import { WakeMapping } from '../entities/wake-mapping.entity';
import { parseSlashCommand } from '../channels/slash-commands';
import { TurnService } from '../channels/turn.service';
import { EventsService } from '../events/events.service';
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
import { RealtimeGateway } from '../realtime/realtime.gateway';
@@ -34,6 +37,9 @@ export class MessagingController {
private readonly messageRepo: Repository<Message>,
@InjectRepository(IdempotencyRecord)
private readonly idemRepo: Repository<IdempotencyRecord>,
@InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>,
private readonly turn: TurnService,
private readonly events: EventsService,
private readonly realtime: RealtimeGateway,
) {}
@@ -77,6 +83,52 @@ export class MessagingController {
};
}
// Persists one message (allocates a seq under a channel row lock) and
// returns its view. Used for normal messages and for guild /ack messages.
private async persistMessage(
channelId: string,
input: { authorUserId: string; content: string; clientMessageId?: string | null; replyToMessageId?: string | null; mentions?: string[]; attachments?: Array<{ url: string; name?: string; mimeType?: string }> },
): Promise<Message> {
return this.dataSource.transaction(async (manager) => {
const channel = await manager.findOne(Channel, {
where: { id: channelId },
lock: { mode: 'pessimistic_write' },
});
if (!channel) {
throw new NotFoundException('channel not found');
}
const nextSeq = channel.lastSeq + 1;
channel.lastSeq = nextSeq;
await manager.save(Channel, channel);
const messageId = input.clientMessageId ?? `m-${channelId}-${nextSeq}`;
const row = manager.create(Message, {
messageId,
channelId,
conversationId: null,
authorUserId: input.authorUserId,
seq: nextSeq,
content: input.content,
replyToMessageId: input.replyToMessageId ?? null,
mentions: input.mentions ?? [],
attachments: input.attachments ?? [],
editedAt: null,
deletedAt: null,
isDeleted: false,
});
return manager.save(Message, row);
});
}
// Emits a guild-authored /ack message to the channel; wakeup=true only for
// the new current speaker (null = nobody). One message-id; persisted.
private async emitAck(channelId: string, wakeupUserId: string | null): Promise<void> {
const ack = await this.persistMessage(channelId, { authorUserId: 'guild', content: '/ack' });
const body = this.toView(ack) as Record<string, unknown>;
await this.events.emit({ eventType: 'message.created', channelId, actorId: 'guild', data: body });
await this.realtime.emitMessageTargeted(channelId, body, wakeupUserId);
}
@Post()
async create(
@Param('id') channelId: string,
@@ -87,35 +139,34 @@ export class MessagingController {
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
const message = await this.dataSource.transaction(async (manager) => {
const channel = await manager.findOne(Channel, {
where: { id: channelId },
lock: { mode: 'pessimistic_write' },
});
if (!channel) {
throw new NotFoundException('channel not found');
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
const xType = channel.xType ?? 'general';
const isRotating = xType === 'discuss' || xType === 'work';
const authorUserId = String(body.authorUserId ?? 'anonymous');
// ---- command interception: registered slash commands are never delivered
const cmd = parseSlashCommand(body.content ?? '');
if (cmd) {
if (isRotating && cmd.name === 'no-reply') {
const { ack } = await this.turn.onNoReply(channelId, authorUserId);
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
} else if (isRotating && cmd.name === 'force-proceed') {
const { ack } = await this.turn.onForceProceed(channelId);
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
}
// non-rotating channels (or no effect): swallowed, nothing delivered
return { status: 'command', command: cmd.name };
}
const nextSeq = channel.lastSeq + 1;
channel.lastSeq = nextSeq;
await manager.save(Channel, channel);
const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`;
const row = manager.create(Message, {
messageId,
channelId,
conversationId: null,
authorUserId: body.authorUserId ?? 'anonymous',
seq: nextSeq,
content: body.content,
replyToMessageId: body.replyToMessageId ?? null,
mentions: body.mentions ?? [],
attachments: body.attachments ?? [],
editedAt: null,
deletedAt: null,
isDeleted: false,
});
return manager.save(Message, row);
// ---- normal message
const message = await this.persistMessage(channelId, {
authorUserId,
content: body.content,
clientMessageId: body.clientMessageId,
replyToMessageId: body.replyToMessageId,
mentions: body.mentions,
attachments: body.attachments,
});
const responseBody = this.toView(message) as Record<string, unknown>;
@@ -124,10 +175,24 @@ export class MessagingController {
await this.events.emit({
eventType: 'message.created',
channelId,
actorId: body.authorUserId ?? 'anonymous',
actorId: authorUserId,
data: responseBody,
});
this.realtime.emitChannelEvent(channelId, 'message.created', responseBody);
if (isRotating) {
// discuss/work: rotation decides the single wakeup target
const decision = await this.turn.onNormalMessage(channelId, authorUserId);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else {
// general/report/triage/custom: wakeup from x_type + wake_mapping
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
await this.realtime.emitMessageCreated(channelId, responseBody, {
xType,
authorUserId,
wakeUserIds,
});
}
return responseBody;
}

View File

@@ -4,9 +4,10 @@ import { MessagingController } from './messaging.controller';
import { Channel } from '../entities/channel.entity';
import { Message } from '../entities/message.entity';
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
import { WakeMapping } from '../entities/wake-mapping.entity';
@Module({
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord])],
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord, WakeMapping])],
controllers: [MessagingController],
})
export class MessagingModule {}

View File

@@ -11,6 +11,34 @@ import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { introspectGuildToken } from '../common/center-auth';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
// Wakeup for non-rotating channels only (general/report/triage/custom).
// discuss/work go through TurnService + emitMessageTargeted, never here.
// Precedence:
// 1. the author never gets woken by their own message
// 2. triage/custom: only wake users in the channel's wake_mapping
// 3. general: wake everyone
// 4. report (and anything else): wake nobody
export function computeWakeup(args: {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
}): boolean {
const { xType, recipientUserId, authorUserId, wakeUserIds } = args;
if (recipientUserId === authorUserId) return false;
switch (xType) {
case 'general':
return true;
case 'triage':
case 'custom':
return wakeUserIds.has(recipientUserId);
default:
return false;
}
}
@WebSocketGateway({
namespace: '/realtime',
cors: {
@@ -133,4 +161,38 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
emitChannelEvent(channelId: string, event: string, data: Record<string, unknown>): void {
this.server.to(`channel:${channelId}`).emit(event, data);
}
// Emits message.created per-recipient so each carries its own `wakeup` flag.
async emitMessageCreated(
channelId: string,
data: Record<string, unknown>,
ctx: { xType: XType; authorUserId: string; wakeUserIds: Set<string> },
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const wakeup = computeWakeup({
xType: ctx.xType,
recipientUserId,
authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds,
});
s.emit('message.created', { ...data, wakeup });
}
}
// discuss/work + /ack: exactly one recipient (the new current speaker) gets
// wakeup=true; everyone else false. One message-id; metadata at push only.
async emitMessageTargeted(
channelId: string,
data: Record<string, unknown>,
wakeupUserId: string | null,
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId;
s.emit('message.created', { ...data, wakeup });
}
}
}