From 02b7c72e70cc5c0607b3d950405bb1dabbffe17b Mon Sep 17 00:00:00 2001 From: hzhang Date: Fri, 15 May 2026 15:27:35 +0100 Subject: [PATCH] feat(guild): <@id> mention mechanism - parse <@user-id> outside backtick spans - general: message with an at-list wakes only the at'd users (else all) - report/triage/custom: mentions change nothing - discuss/work: mention by current speaker pushes a sub-rotation frame (atList = mentions - sender, intersected with channel members); single linear pass (real/no-reply/force-proceed), then pop back to the saved parent pointer (resumes at the pusher); nested frames supported Co-Authored-By: Claude Opus 4.7 (1M context) --- src/channels/mentions.ts | 25 ++++ src/channels/turn.service.ts | 171 +++++++++++++++++----- src/entities/channel-turn-state.entity.ts | 11 ++ src/messaging/messaging.controller.ts | 13 +- src/realtime/realtime.gateway.ts | 18 ++- 5 files changed, 193 insertions(+), 45 deletions(-) create mode 100644 src/channels/mentions.ts diff --git a/src/channels/mentions.ts b/src/channels/mentions.ts new file mode 100644 index 0000000..ad4193c --- /dev/null +++ b/src/channels/mentions.ts @@ -0,0 +1,25 @@ +// Parse <@user-id> mentions from message content. A mention does NOT count +// when it sits inside a backtick span (single ` or triple ``` — any backtick +// run toggles a code region). Returns unique ids in first-seen order. +export function parseMentions(content: string): string[] { + if (typeof content !== 'string' || !content) return []; + + // strip backtick-delimited regions: split on runs of backticks; odd + // segments (between an opening and closing run) are code -> dropped. + const segments = content.split(/`+/); + let outside = ''; + for (let i = 0; i < segments.length; i += 2) outside += segments[i] + ' '; + + const ids: string[] = []; + const seen = new Set(); + const re = /<@([^>`\s]+)>/g; + let m: RegExpExecArray | null; + while ((m = re.exec(outside)) !== null) { + const id = m[1]; + if (!seen.has(id)) { + seen.add(id); + ids.push(id); + } + } + return ids; +} diff --git a/src/channels/turn.service.ts b/src/channels/turn.service.ts index de00507..e7cf711 100644 --- a/src/channels/turn.service.ts +++ b/src/channels/turn.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { DataSource, EntityManager } from 'typeorm'; -import { ChannelTurnState } from '../entities/channel-turn-state.entity'; +import { ChannelTurnState, TurnFrame } from '../entities/channel-turn-state.entity'; import { ChannelMember } from '../entities/channel-member.entity'; import { computeShuffle } from './turn-shuffle'; @@ -18,11 +18,13 @@ export class TurnService { manager: EntityManager, channelId: string, ): Promise { - return manager + const state = await manager .createQueryBuilder(ChannelTurnState, 's') .setLock('pessimistic_write') .where('s.channelId = :channelId', { channelId }) .getOne(); + if (state && !Array.isArray(state.frames)) state.frames = []; + return state; } private async ensureState( @@ -31,7 +33,6 @@ export class TurnService { ): Promise { let state = await this.loadLocked(manager, channelId); if (state) return state; - // lazy init from current channel members (sorted by userId) const members = await manager.find(ChannelMember, { where: { channelId } }); const order = members.map((m) => m.userId).sort(); state = manager.create(ChannelTurnState, { @@ -41,35 +42,62 @@ export class TurnService { roundEvents: [], norepStreak: [], lastNormalSpeaker: null, + frames: [], }); return manager.save(ChannelTurnState, state); } - // Called when a discuss/work channel is created. + private frames(state: ChannelTurnState): TurnFrame[] { + if (!Array.isArray(state.frames)) state.frames = []; + return state.frames; + } + + // effective current speaker = top sub-frame's pointer, else root speaker + private effectiveCurrent(state: ChannelTurnState): string | null { + const fr = this.frames(state); + while (fr.length) { + const top = fr[fr.length - 1]; + if (!top.order.length) { + fr.pop(); + continue; + } + const idx = Math.min(top.idx, top.order.length - 1); + return top.order[idx]; + } + return state.currentSpeaker; + } + + // advance / pop the active sub-frame; returns the new effective speaker. + // A single linear pass: acting at the last index pops the frame. + private advanceSubFrame(state: ChannelTurnState): string | null { + const fr = this.frames(state); + const top = fr[fr.length - 1]; + if (top.idx >= top.order.length - 1) { + fr.pop(); + } else { + top.idx += 1; + } + return this.effectiveCurrent(state); + } + async initForChannel(channelId: string, memberUserIds: string[]): Promise { await this.dataSource.transaction(async (manager) => { const existing = await manager.findOne(ChannelTurnState, { where: { channelId } }); const order = [...new Set(memberUserIds)].sort(); + const base = { + orderUserIds: order, + currentSpeaker: null, + roundEvents: [] as ChannelTurnState['roundEvents'], + norepStreak: [] as string[], + lastNormalSpeaker: null, + frames: [] as TurnFrame[], + }; if (existing) { - existing.orderUserIds = order; - existing.currentSpeaker = null; - existing.roundEvents = []; - existing.norepStreak = []; - existing.lastNormalSpeaker = null; + Object.assign(existing, base); await manager.save(ChannelTurnState, existing); return; } - await manager.save( - ChannelTurnState, - manager.create(ChannelTurnState, { - channelId, - orderUserIds: order, - currentSpeaker: null, - roundEvents: [], - norepStreak: [], - lastNormalSpeaker: null, - }), - ); + await manager.save(ChannelTurnState, manager.create(ChannelTurnState, { channelId, ...base })); }); } @@ -77,7 +105,7 @@ export class TurnService { await this.dataSource.transaction(async (manager) => { const state = await this.ensureState(manager, channelId); if (!state.orderUserIds.includes(userId)) { - state.orderUserIds = [...state.orderUserIds, userId]; // append to tail + state.orderUserIds = [...state.orderUserIds, userId]; await manager.save(ChannelTurnState, state); } }); @@ -89,27 +117,68 @@ export class TurnService { if (!state) return; const order = state.orderUserIds; const idx = order.indexOf(userId); - if (idx === -1) return; - // if the leaver is the current speaker, the next one takes over - let nextCurrent = state.currentSpeaker; - if (state.currentSpeaker === userId) { - nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null; - if (nextCurrent === userId) nextCurrent = null; + + if (idx !== -1) { + let nextCurrent = state.currentSpeaker; + if (state.currentSpeaker === userId) { + nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null; + if (nextCurrent === userId) nextCurrent = null; + } + state.orderUserIds = order.filter((u) => u !== userId); + state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null; } - state.orderUserIds = order.filter((u) => u !== userId); state.norepStreak = state.norepStreak.filter((u) => u !== userId); - state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null; + + // strip the leaver from every sub-frame; drop emptied frames; clamp idx + const fr = this.frames(state) + .map((f) => ({ order: f.order.filter((u) => u !== userId), idx: f.idx })) + .filter((f) => f.order.length > 0) + .map((f) => ({ order: f.order, idx: Math.min(f.idx, f.order.length - 1) })); + state.frames = fr; await manager.save(ChannelTurnState, state); }); } - // A normal (non-command) message delivered to a discuss/work channel. - async onNormalMessage(channelId: string, authorUserId: string): Promise { + // A normal (non-command) message in a discuss/work channel. + // mentionIds = raw parsed mentions; the at-list is (mentions - author) + // intersected with channel members. + async onNormalMessage( + channelId: string, + authorUserId: string, + mentionIds: string[] = [], + ): Promise { return this.dataSource.transaction(async (manager) => { const state = await this.ensureState(manager, channelId); - // any normal message clears the cross-round /no-reply streak state.norepStreak = []; + const memberRows = await manager.find(ChannelMember, { where: { channelId } }); + const memberSet = new Set(memberRows.map((m) => m.userId)); + const atList = [...new Set(mentionIds)].filter( + (id) => id !== authorUserId && memberSet.has(id), + ); + + const fr = this.frames(state); + + // ---- a sub-frame is active + if (fr.length) { + const top = fr[fr.length - 1]; + const cur = top.order[Math.min(top.idx, top.order.length - 1)]; + if (authorUserId === cur) { + if (atList.length) { + fr.push({ order: atList, idx: 0 }); + await manager.save(ChannelTurnState, state); + return { wakeupUserId: atList[0] }; + } + const next = this.advanceSubFrame(state); + await manager.save(ChannelTurnState, state); + return { wakeupUserId: next }; + } + // queue-jump within the sub-frame: delivered, no advance, no push + await manager.save(ChannelTurnState, state); + return { wakeupUserId: null }; + } + + // ---- root rotation active const order = state.orderUserIds; const n = order.length; if (n <= 1) { @@ -119,7 +188,7 @@ export class TurnService { } if (state.currentSpeaker === null) { - // activation: mover goes to front, rotation starts at order[1] + // activation: mover to front, rotation starts at order[1] const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)]; state.orderUserIds = newOrder; state.currentSpeaker = newOrder[1]; @@ -130,6 +199,14 @@ export class TurnService { } if (authorUserId === state.currentSpeaker) { + // current speaker mentioning -> push a sub-frame; root pointer (this + // speaker) is left as-is and resumes after the sub-frame pops + if (atList.length) { + fr.push({ order: atList, idx: 0 }); + await manager.save(ChannelTurnState, state); + return { wakeupUserId: atList[0] }; + } + const idx = order.indexOf(authorUserId); const isLast = idx === n - 1; state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }]; @@ -163,14 +240,23 @@ export class TurnService { }); } - // /no-reply command in a discuss/work channel. async onNoReply(channelId: string, senderUserId: string): Promise { return this.dataSource.transaction(async (manager) => { const state = await this.ensureState(manager, channelId); + const fr = this.frames(state); + + // sub-frame: /no-reply counts as "acted"; advance/pop, no shuffle/pause + if (fr.length) { + const top = fr[fr.length - 1]; + const cur = top.order[Math.min(top.idx, top.order.length - 1)]; + if (senderUserId !== cur) return { ack: null }; + const next = this.advanceSubFrame(state); + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: next } }; + } + const order = state.orderUserIds; const n = order.length; - - // only the current speaker's /no-reply has any effect if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) { return { ack: null }; } @@ -182,7 +268,6 @@ export class TurnService { state.norepStreak = [...state.norepStreak, senderUserId]; } - // pause when every current member has consecutively /no-reply'd const allCovered = order.every((u) => state.norepStreak.includes(u)); if (allCovered) { state.currentSpeaker = null; @@ -214,11 +299,19 @@ export class TurnService { }); } - // /force-proceed command in a discuss/work channel: skip the stuck current - // speaker (not recorded, streak untouched), advance to the next one. async onForceProceed(channelId: string): Promise { return this.dataSource.transaction(async (manager) => { const state = await this.ensureState(manager, channelId); + const fr = this.frames(state); + + if (fr.length) { + const top = fr[fr.length - 1]; + if (!top.order.length) return { ack: null }; + const next = this.advanceSubFrame(state); + await manager.save(ChannelTurnState, state); + return { ack: { wakeupUserId: next } }; + } + const order = state.orderUserIds; const n = order.length; if (n <= 1 || state.currentSpeaker === null) return { ack: null }; diff --git a/src/entities/channel-turn-state.entity.ts b/src/entities/channel-turn-state.entity.ts index 817443b..3c4ee87 100644 --- a/src/entities/channel-turn-state.entity.ts +++ b/src/entities/channel-turn-state.entity.ts @@ -2,6 +2,11 @@ import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from export type RoundEvent = { u: string; a: 'normal' | 'noreply' }; +// A mention sub-rotation frame pushed on top of the root rotation. +// currentSpeaker of an active sub-frame = order[idx]. Single linear pass: +// after the member at the last index acts, the frame pops. +export type TurnFrame = { order: string[]; idx: number }; + // Per-channel rotation state for discuss/work x_type channels. // All mutations must be serialized per channel (pessimistic row lock). @Entity('channel_turn_state') @@ -36,6 +41,12 @@ export class ChannelTurnState { @Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true }) lastNormalSpeaker!: string | null; + // mention sub-rotation stack on top of the root rotation. Empty = root + // active. Top of stack is the active frame; the root rotation + // (order/currentSpeaker/round/streak) is paused while it is non-empty. + @Column({ name: 'frames', type: 'json', nullable: true }) + frames!: TurnFrame[] | null; + @UpdateDateColumn() updatedAt!: Date; } diff --git a/src/messaging/messaging.controller.ts b/src/messaging/messaging.controller.ts index c1b298c..5980b2a 100644 --- a/src/messaging/messaging.controller.ts +++ b/src/messaging/messaging.controller.ts @@ -18,6 +18,7 @@ import { Message } from '../entities/message.entity'; import { IdempotencyRecord } from '../entities/idempotency-record.entity'; import { WakeMapping } from '../entities/wake-mapping.entity'; import { parseSlashCommand } from '../channels/slash-commands'; +import { parseMentions } from '../channels/mentions'; import { TurnService } from '../channels/turn.service'; import { EventsService } from '../events/events.service'; import { clampLimit, computeNextExpectedSeq } from './pagination.util'; @@ -179,18 +180,24 @@ export class MessagingController { data: responseBody, }); + // mentions: <@id> outside backtick spans + const mentionIds = parseMentions(body.content ?? ''); + if (isRotating) { - // discuss/work: rotation decides the single wakeup target - const decision = await this.turn.onNormalMessage(channelId, authorUserId); + // discuss/work: rotation (incl. mention sub-frames) picks the target + const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds); await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); } else { - // general/report/triage/custom: wakeup from x_type + wake_mapping + // general/report/triage/custom: wakeup from x_type + wake_mapping; + // general also honors the message's at-list const wakeRows = await this.wakeRepo.find({ where: { channelId } }); const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); + const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId)); await this.realtime.emitMessageCreated(channelId, responseBody, { xType, authorUserId, wakeUserIds, + mentionUserIds, }); } diff --git a/src/realtime/realtime.gateway.ts b/src/realtime/realtime.gateway.ts index eb81632..2b5a01a 100644 --- a/src/realtime/realtime.gateway.ts +++ b/src/realtime/realtime.gateway.ts @@ -18,18 +18,24 @@ type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom'; // Precedence: // 1. the author never gets woken by their own message // 2. triage/custom: only wake users in the channel's wake_mapping -// 3. general: wake everyone +// (mentions change nothing here) +// 3. general: if the message has an at-list, wake only the at'd users; +// otherwise wake everyone // 4. report (and anything else): wake nobody export function computeWakeup(args: { xType: XType; recipientUserId: string; authorUserId: string; wakeUserIds: Set; + mentionUserIds?: Set; }): boolean { - const { xType, recipientUserId, authorUserId, wakeUserIds } = args; + const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args; if (recipientUserId === authorUserId) return false; switch (xType) { case 'general': + if (mentionUserIds && mentionUserIds.size > 0) { + return mentionUserIds.has(recipientUserId); + } return true; case 'triage': case 'custom': @@ -166,7 +172,12 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect async emitMessageCreated( channelId: string, data: Record, - ctx: { xType: XType; authorUserId: string; wakeUserIds: Set }, + ctx: { + xType: XType; + authorUserId: string; + wakeUserIds: Set; + mentionUserIds?: Set; + }, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { @@ -176,6 +187,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect recipientUserId, authorUserId: ctx.authorUserId, wakeUserIds: ctx.wakeUserIds, + mentionUserIds: ctx.mentionUserIds, }); s.emit('message.created', { ...data, wakeup }); }