feat(guild): <@id> mention mechanism

- parse <@user-id> outside backtick spans
- general: message with an at-list wakes only the at'd users (else all)
- report/triage/custom: mentions change nothing
- discuss/work: mention by current speaker pushes a sub-rotation frame
  (atList = mentions - sender, intersected with channel members); single
  linear pass (real/no-reply/force-proceed), then pop back to the saved
  parent pointer (resumes at the pusher); nested frames supported

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
h z
2026-05-15 15:27:35 +01:00
parent 182cfb3c41
commit 02b7c72e70
5 changed files with 193 additions and 45 deletions

25
src/channels/mentions.ts Normal file
View File

@@ -0,0 +1,25 @@
// Parse <@user-id> mentions from message content. A mention does NOT count
// when it sits inside a backtick span (single ` or triple ``` — any backtick
// run toggles a code region). Returns unique ids in first-seen order.
export function parseMentions(content: string): string[] {
if (typeof content !== 'string' || !content) return [];
// strip backtick-delimited regions: split on runs of backticks; odd
// segments (between an opening and closing run) are code -> dropped.
const segments = content.split(/`+/);
let outside = '';
for (let i = 0; i < segments.length; i += 2) outside += segments[i] + ' ';
const ids: string[] = [];
const seen = new Set<string>();
const re = /<@([^>`\s]+)>/g;
let m: RegExpExecArray | null;
while ((m = re.exec(outside)) !== null) {
const id = m[1];
if (!seen.has(id)) {
seen.add(id);
ids.push(id);
}
}
return ids;
}

View File

@@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { ChannelTurnState } from '../entities/channel-turn-state.entity';
import { ChannelTurnState, TurnFrame } from '../entities/channel-turn-state.entity';
import { ChannelMember } from '../entities/channel-member.entity';
import { computeShuffle } from './turn-shuffle';
@@ -18,11 +18,13 @@ export class TurnService {
manager: EntityManager,
channelId: string,
): Promise<ChannelTurnState | null> {
return manager
const state = await manager
.createQueryBuilder(ChannelTurnState, 's')
.setLock('pessimistic_write')
.where('s.channelId = :channelId', { channelId })
.getOne();
if (state && !Array.isArray(state.frames)) state.frames = [];
return state;
}
private async ensureState(
@@ -31,7 +33,6 @@ export class TurnService {
): Promise<ChannelTurnState> {
let state = await this.loadLocked(manager, channelId);
if (state) return state;
// lazy init from current channel members (sorted by userId)
const members = await manager.find(ChannelMember, { where: { channelId } });
const order = members.map((m) => m.userId).sort();
state = manager.create(ChannelTurnState, {
@@ -41,35 +42,62 @@ export class TurnService {
roundEvents: [],
norepStreak: [],
lastNormalSpeaker: null,
frames: [],
});
return manager.save(ChannelTurnState, state);
}
// Called when a discuss/work channel is created.
private frames(state: ChannelTurnState): TurnFrame[] {
if (!Array.isArray(state.frames)) state.frames = [];
return state.frames;
}
// effective current speaker = top sub-frame's pointer, else root speaker
private effectiveCurrent(state: ChannelTurnState): string | null {
const fr = this.frames(state);
while (fr.length) {
const top = fr[fr.length - 1];
if (!top.order.length) {
fr.pop();
continue;
}
const idx = Math.min(top.idx, top.order.length - 1);
return top.order[idx];
}
return state.currentSpeaker;
}
// advance / pop the active sub-frame; returns the new effective speaker.
// A single linear pass: acting at the last index pops the frame.
private advanceSubFrame(state: ChannelTurnState): string | null {
const fr = this.frames(state);
const top = fr[fr.length - 1];
if (top.idx >= top.order.length - 1) {
fr.pop();
} else {
top.idx += 1;
}
return this.effectiveCurrent(state);
}
async initForChannel(channelId: string, memberUserIds: string[]): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const existing = await manager.findOne(ChannelTurnState, { where: { channelId } });
const order = [...new Set(memberUserIds)].sort();
const base = {
orderUserIds: order,
currentSpeaker: null,
roundEvents: [] as ChannelTurnState['roundEvents'],
norepStreak: [] as string[],
lastNormalSpeaker: null,
frames: [] as TurnFrame[],
};
if (existing) {
existing.orderUserIds = order;
existing.currentSpeaker = null;
existing.roundEvents = [];
existing.norepStreak = [];
existing.lastNormalSpeaker = null;
Object.assign(existing, base);
await manager.save(ChannelTurnState, existing);
return;
}
await manager.save(
ChannelTurnState,
manager.create(ChannelTurnState, {
channelId,
orderUserIds: order,
currentSpeaker: null,
roundEvents: [],
norepStreak: [],
lastNormalSpeaker: null,
}),
);
await manager.save(ChannelTurnState, manager.create(ChannelTurnState, { channelId, ...base }));
});
}
@@ -77,7 +105,7 @@ export class TurnService {
await this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
if (!state.orderUserIds.includes(userId)) {
state.orderUserIds = [...state.orderUserIds, userId]; // append to tail
state.orderUserIds = [...state.orderUserIds, userId];
await manager.save(ChannelTurnState, state);
}
});
@@ -89,27 +117,68 @@ export class TurnService {
if (!state) return;
const order = state.orderUserIds;
const idx = order.indexOf(userId);
if (idx === -1) return;
// if the leaver is the current speaker, the next one takes over
if (idx !== -1) {
let nextCurrent = state.currentSpeaker;
if (state.currentSpeaker === userId) {
nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null;
if (nextCurrent === userId) nextCurrent = null;
}
state.orderUserIds = order.filter((u) => u !== userId);
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null;
}
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
// strip the leaver from every sub-frame; drop emptied frames; clamp idx
const fr = this.frames(state)
.map((f) => ({ order: f.order.filter((u) => u !== userId), idx: f.idx }))
.filter((f) => f.order.length > 0)
.map((f) => ({ order: f.order, idx: Math.min(f.idx, f.order.length - 1) }));
state.frames = fr;
await manager.save(ChannelTurnState, state);
});
}
// A normal (non-command) message delivered to a discuss/work channel.
async onNormalMessage(channelId: string, authorUserId: string): Promise<TurnDecision> {
// A normal (non-command) message in a discuss/work channel.
// mentionIds = raw parsed mentions; the at-list is (mentions - author)
// intersected with channel members.
async onNormalMessage(
channelId: string,
authorUserId: string,
mentionIds: string[] = [],
): Promise<TurnDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
// any normal message clears the cross-round /no-reply streak
state.norepStreak = [];
const memberRows = await manager.find(ChannelMember, { where: { channelId } });
const memberSet = new Set(memberRows.map((m) => m.userId));
const atList = [...new Set(mentionIds)].filter(
(id) => id !== authorUserId && memberSet.has(id),
);
const fr = this.frames(state);
// ---- a sub-frame is active
if (fr.length) {
const top = fr[fr.length - 1];
const cur = top.order[Math.min(top.idx, top.order.length - 1)];
if (authorUserId === cur) {
if (atList.length) {
fr.push({ order: atList, idx: 0 });
await manager.save(ChannelTurnState, state);
return { wakeupUserId: atList[0] };
}
const next = this.advanceSubFrame(state);
await manager.save(ChannelTurnState, state);
return { wakeupUserId: next };
}
// queue-jump within the sub-frame: delivered, no advance, no push
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
}
// ---- root rotation active
const order = state.orderUserIds;
const n = order.length;
if (n <= 1) {
@@ -119,7 +188,7 @@ export class TurnService {
}
if (state.currentSpeaker === null) {
// activation: mover goes to front, rotation starts at order[1]
// activation: mover to front, rotation starts at order[1]
const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)];
state.orderUserIds = newOrder;
state.currentSpeaker = newOrder[1];
@@ -130,6 +199,14 @@ export class TurnService {
}
if (authorUserId === state.currentSpeaker) {
// current speaker mentioning -> push a sub-frame; root pointer (this
// speaker) is left as-is and resumes after the sub-frame pops
if (atList.length) {
fr.push({ order: atList, idx: 0 });
await manager.save(ChannelTurnState, state);
return { wakeupUserId: atList[0] };
}
const idx = order.indexOf(authorUserId);
const isLast = idx === n - 1;
state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }];
@@ -163,14 +240,23 @@ export class TurnService {
});
}
// /no-reply command in a discuss/work channel.
async onNoReply(channelId: string, senderUserId: string): Promise<CommandDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const fr = this.frames(state);
// sub-frame: /no-reply counts as "acted"; advance/pop, no shuffle/pause
if (fr.length) {
const top = fr[fr.length - 1];
const cur = top.order[Math.min(top.idx, top.order.length - 1)];
if (senderUserId !== cur) return { ack: null };
const next = this.advanceSubFrame(state);
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: next } };
}
const order = state.orderUserIds;
const n = order.length;
// only the current speaker's /no-reply has any effect
if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) {
return { ack: null };
}
@@ -182,7 +268,6 @@ export class TurnService {
state.norepStreak = [...state.norepStreak, senderUserId];
}
// pause when every current member has consecutively /no-reply'd
const allCovered = order.every((u) => state.norepStreak.includes(u));
if (allCovered) {
state.currentSpeaker = null;
@@ -214,11 +299,19 @@ export class TurnService {
});
}
// /force-proceed command in a discuss/work channel: skip the stuck current
// speaker (not recorded, streak untouched), advance to the next one.
async onForceProceed(channelId: string): Promise<CommandDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const fr = this.frames(state);
if (fr.length) {
const top = fr[fr.length - 1];
if (!top.order.length) return { ack: null };
const next = this.advanceSubFrame(state);
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: next } };
}
const order = state.orderUserIds;
const n = order.length;
if (n <= 1 || state.currentSpeaker === null) return { ack: null };

View File

@@ -2,6 +2,11 @@ import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from
export type RoundEvent = { u: string; a: 'normal' | 'noreply' };
// A mention sub-rotation frame pushed on top of the root rotation.
// currentSpeaker of an active sub-frame = order[idx]. Single linear pass:
// after the member at the last index acts, the frame pops.
export type TurnFrame = { order: string[]; idx: number };
// Per-channel rotation state for discuss/work x_type channels.
// All mutations must be serialized per channel (pessimistic row lock).
@Entity('channel_turn_state')
@@ -36,6 +41,12 @@ export class ChannelTurnState {
@Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true })
lastNormalSpeaker!: string | null;
// mention sub-rotation stack on top of the root rotation. Empty = root
// active. Top of stack is the active frame; the root rotation
// (order/currentSpeaker/round/streak) is paused while it is non-empty.
@Column({ name: 'frames', type: 'json', nullable: true })
frames!: TurnFrame[] | null;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -18,6 +18,7 @@ import { Message } from '../entities/message.entity';
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
import { WakeMapping } from '../entities/wake-mapping.entity';
import { parseSlashCommand } from '../channels/slash-commands';
import { parseMentions } from '../channels/mentions';
import { TurnService } from '../channels/turn.service';
import { EventsService } from '../events/events.service';
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
@@ -179,18 +180,24 @@ export class MessagingController {
data: responseBody,
});
// mentions: <@id> outside backtick spans
const mentionIds = parseMentions(body.content ?? '');
if (isRotating) {
// discuss/work: rotation decides the single wakeup target
const decision = await this.turn.onNormalMessage(channelId, authorUserId);
// discuss/work: rotation (incl. mention sub-frames) picks the target
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else {
// general/report/triage/custom: wakeup from x_type + wake_mapping
// general/report/triage/custom: wakeup from x_type + wake_mapping;
// general also honors the message's at-list
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
await this.realtime.emitMessageCreated(channelId, responseBody, {
xType,
authorUserId,
wakeUserIds,
mentionUserIds,
});
}

View File

@@ -18,18 +18,24 @@ type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
// Precedence:
// 1. the author never gets woken by their own message
// 2. triage/custom: only wake users in the channel's wake_mapping
// 3. general: wake everyone
// (mentions change nothing here)
// 3. general: if the message has an at-list, wake only the at'd users;
// otherwise wake everyone
// 4. report (and anything else): wake nobody
export function computeWakeup(args: {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
}): boolean {
const { xType, recipientUserId, authorUserId, wakeUserIds } = args;
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args;
if (recipientUserId === authorUserId) return false;
switch (xType) {
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId);
}
return true;
case 'triage':
case 'custom':
@@ -166,7 +172,12 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
async emitMessageCreated(
channelId: string,
data: Record<string, unknown>,
ctx: { xType: XType; authorUserId: string; wakeUserIds: Set<string> },
ctx: {
xType: XType;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
},
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) {
@@ -176,6 +187,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
recipientUserId,
authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds,
});
s.emit('message.created', { ...data, wakeup });
}