Compare commits
1 Commits
182cfb3c41
...
02b7c72e70
| Author | SHA1 | Date | |
|---|---|---|---|
| 02b7c72e70 |
25
src/channels/mentions.ts
Normal file
25
src/channels/mentions.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
// Parse <@user-id> mentions from message content. A mention does NOT count
|
||||
// when it sits inside a backtick span (single ` or triple ``` — any backtick
|
||||
// run toggles a code region). Returns unique ids in first-seen order.
|
||||
export function parseMentions(content: string): string[] {
|
||||
if (typeof content !== 'string' || !content) return [];
|
||||
|
||||
// strip backtick-delimited regions: split on runs of backticks; odd
|
||||
// segments (between an opening and closing run) are code -> dropped.
|
||||
const segments = content.split(/`+/);
|
||||
let outside = '';
|
||||
for (let i = 0; i < segments.length; i += 2) outside += segments[i] + ' ';
|
||||
|
||||
const ids: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
const re = /<@([^>`\s]+)>/g;
|
||||
let m: RegExpExecArray | null;
|
||||
while ((m = re.exec(outside)) !== null) {
|
||||
const id = m[1];
|
||||
if (!seen.has(id)) {
|
||||
seen.add(id);
|
||||
ids.push(id);
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { DataSource, EntityManager } from 'typeorm';
|
||||
import { ChannelTurnState } from '../entities/channel-turn-state.entity';
|
||||
import { ChannelTurnState, TurnFrame } from '../entities/channel-turn-state.entity';
|
||||
import { ChannelMember } from '../entities/channel-member.entity';
|
||||
import { computeShuffle } from './turn-shuffle';
|
||||
|
||||
@@ -18,11 +18,13 @@ export class TurnService {
|
||||
manager: EntityManager,
|
||||
channelId: string,
|
||||
): Promise<ChannelTurnState | null> {
|
||||
return manager
|
||||
const state = await manager
|
||||
.createQueryBuilder(ChannelTurnState, 's')
|
||||
.setLock('pessimistic_write')
|
||||
.where('s.channelId = :channelId', { channelId })
|
||||
.getOne();
|
||||
if (state && !Array.isArray(state.frames)) state.frames = [];
|
||||
return state;
|
||||
}
|
||||
|
||||
private async ensureState(
|
||||
@@ -31,7 +33,6 @@ export class TurnService {
|
||||
): Promise<ChannelTurnState> {
|
||||
let state = await this.loadLocked(manager, channelId);
|
||||
if (state) return state;
|
||||
// lazy init from current channel members (sorted by userId)
|
||||
const members = await manager.find(ChannelMember, { where: { channelId } });
|
||||
const order = members.map((m) => m.userId).sort();
|
||||
state = manager.create(ChannelTurnState, {
|
||||
@@ -41,35 +42,62 @@ export class TurnService {
|
||||
roundEvents: [],
|
||||
norepStreak: [],
|
||||
lastNormalSpeaker: null,
|
||||
frames: [],
|
||||
});
|
||||
return manager.save(ChannelTurnState, state);
|
||||
}
|
||||
|
||||
// Called when a discuss/work channel is created.
|
||||
private frames(state: ChannelTurnState): TurnFrame[] {
|
||||
if (!Array.isArray(state.frames)) state.frames = [];
|
||||
return state.frames;
|
||||
}
|
||||
|
||||
// effective current speaker = top sub-frame's pointer, else root speaker
|
||||
private effectiveCurrent(state: ChannelTurnState): string | null {
|
||||
const fr = this.frames(state);
|
||||
while (fr.length) {
|
||||
const top = fr[fr.length - 1];
|
||||
if (!top.order.length) {
|
||||
fr.pop();
|
||||
continue;
|
||||
}
|
||||
const idx = Math.min(top.idx, top.order.length - 1);
|
||||
return top.order[idx];
|
||||
}
|
||||
return state.currentSpeaker;
|
||||
}
|
||||
|
||||
// advance / pop the active sub-frame; returns the new effective speaker.
|
||||
// A single linear pass: acting at the last index pops the frame.
|
||||
private advanceSubFrame(state: ChannelTurnState): string | null {
|
||||
const fr = this.frames(state);
|
||||
const top = fr[fr.length - 1];
|
||||
if (top.idx >= top.order.length - 1) {
|
||||
fr.pop();
|
||||
} else {
|
||||
top.idx += 1;
|
||||
}
|
||||
return this.effectiveCurrent(state);
|
||||
}
|
||||
|
||||
async initForChannel(channelId: string, memberUserIds: string[]): Promise<void> {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
const existing = await manager.findOne(ChannelTurnState, { where: { channelId } });
|
||||
const order = [...new Set(memberUserIds)].sort();
|
||||
const base = {
|
||||
orderUserIds: order,
|
||||
currentSpeaker: null,
|
||||
roundEvents: [] as ChannelTurnState['roundEvents'],
|
||||
norepStreak: [] as string[],
|
||||
lastNormalSpeaker: null,
|
||||
frames: [] as TurnFrame[],
|
||||
};
|
||||
if (existing) {
|
||||
existing.orderUserIds = order;
|
||||
existing.currentSpeaker = null;
|
||||
existing.roundEvents = [];
|
||||
existing.norepStreak = [];
|
||||
existing.lastNormalSpeaker = null;
|
||||
Object.assign(existing, base);
|
||||
await manager.save(ChannelTurnState, existing);
|
||||
return;
|
||||
}
|
||||
await manager.save(
|
||||
ChannelTurnState,
|
||||
manager.create(ChannelTurnState, {
|
||||
channelId,
|
||||
orderUserIds: order,
|
||||
currentSpeaker: null,
|
||||
roundEvents: [],
|
||||
norepStreak: [],
|
||||
lastNormalSpeaker: null,
|
||||
}),
|
||||
);
|
||||
await manager.save(ChannelTurnState, manager.create(ChannelTurnState, { channelId, ...base }));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -77,7 +105,7 @@ export class TurnService {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
if (!state.orderUserIds.includes(userId)) {
|
||||
state.orderUserIds = [...state.orderUserIds, userId]; // append to tail
|
||||
state.orderUserIds = [...state.orderUserIds, userId];
|
||||
await manager.save(ChannelTurnState, state);
|
||||
}
|
||||
});
|
||||
@@ -89,27 +117,68 @@ export class TurnService {
|
||||
if (!state) return;
|
||||
const order = state.orderUserIds;
|
||||
const idx = order.indexOf(userId);
|
||||
if (idx === -1) return;
|
||||
// if the leaver is the current speaker, the next one takes over
|
||||
let nextCurrent = state.currentSpeaker;
|
||||
if (state.currentSpeaker === userId) {
|
||||
nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null;
|
||||
if (nextCurrent === userId) nextCurrent = null;
|
||||
|
||||
if (idx !== -1) {
|
||||
let nextCurrent = state.currentSpeaker;
|
||||
if (state.currentSpeaker === userId) {
|
||||
nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null;
|
||||
if (nextCurrent === userId) nextCurrent = null;
|
||||
}
|
||||
state.orderUserIds = order.filter((u) => u !== userId);
|
||||
state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null;
|
||||
}
|
||||
state.orderUserIds = order.filter((u) => u !== userId);
|
||||
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
|
||||
state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null;
|
||||
|
||||
// strip the leaver from every sub-frame; drop emptied frames; clamp idx
|
||||
const fr = this.frames(state)
|
||||
.map((f) => ({ order: f.order.filter((u) => u !== userId), idx: f.idx }))
|
||||
.filter((f) => f.order.length > 0)
|
||||
.map((f) => ({ order: f.order, idx: Math.min(f.idx, f.order.length - 1) }));
|
||||
state.frames = fr;
|
||||
await manager.save(ChannelTurnState, state);
|
||||
});
|
||||
}
|
||||
|
||||
// A normal (non-command) message delivered to a discuss/work channel.
|
||||
async onNormalMessage(channelId: string, authorUserId: string): Promise<TurnDecision> {
|
||||
// A normal (non-command) message in a discuss/work channel.
|
||||
// mentionIds = raw parsed mentions; the at-list is (mentions - author)
|
||||
// intersected with channel members.
|
||||
async onNormalMessage(
|
||||
channelId: string,
|
||||
authorUserId: string,
|
||||
mentionIds: string[] = [],
|
||||
): Promise<TurnDecision> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
// any normal message clears the cross-round /no-reply streak
|
||||
state.norepStreak = [];
|
||||
|
||||
const memberRows = await manager.find(ChannelMember, { where: { channelId } });
|
||||
const memberSet = new Set(memberRows.map((m) => m.userId));
|
||||
const atList = [...new Set(mentionIds)].filter(
|
||||
(id) => id !== authorUserId && memberSet.has(id),
|
||||
);
|
||||
|
||||
const fr = this.frames(state);
|
||||
|
||||
// ---- a sub-frame is active
|
||||
if (fr.length) {
|
||||
const top = fr[fr.length - 1];
|
||||
const cur = top.order[Math.min(top.idx, top.order.length - 1)];
|
||||
if (authorUserId === cur) {
|
||||
if (atList.length) {
|
||||
fr.push({ order: atList, idx: 0 });
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: atList[0] };
|
||||
}
|
||||
const next = this.advanceSubFrame(state);
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: next };
|
||||
}
|
||||
// queue-jump within the sub-frame: delivered, no advance, no push
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: null };
|
||||
}
|
||||
|
||||
// ---- root rotation active
|
||||
const order = state.orderUserIds;
|
||||
const n = order.length;
|
||||
if (n <= 1) {
|
||||
@@ -119,7 +188,7 @@ export class TurnService {
|
||||
}
|
||||
|
||||
if (state.currentSpeaker === null) {
|
||||
// activation: mover goes to front, rotation starts at order[1]
|
||||
// activation: mover to front, rotation starts at order[1]
|
||||
const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)];
|
||||
state.orderUserIds = newOrder;
|
||||
state.currentSpeaker = newOrder[1];
|
||||
@@ -130,6 +199,14 @@ export class TurnService {
|
||||
}
|
||||
|
||||
if (authorUserId === state.currentSpeaker) {
|
||||
// current speaker mentioning -> push a sub-frame; root pointer (this
|
||||
// speaker) is left as-is and resumes after the sub-frame pops
|
||||
if (atList.length) {
|
||||
fr.push({ order: atList, idx: 0 });
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { wakeupUserId: atList[0] };
|
||||
}
|
||||
|
||||
const idx = order.indexOf(authorUserId);
|
||||
const isLast = idx === n - 1;
|
||||
state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }];
|
||||
@@ -163,14 +240,23 @@ export class TurnService {
|
||||
});
|
||||
}
|
||||
|
||||
// /no-reply command in a discuss/work channel.
|
||||
async onNoReply(channelId: string, senderUserId: string): Promise<CommandDecision> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
const fr = this.frames(state);
|
||||
|
||||
// sub-frame: /no-reply counts as "acted"; advance/pop, no shuffle/pause
|
||||
if (fr.length) {
|
||||
const top = fr[fr.length - 1];
|
||||
const cur = top.order[Math.min(top.idx, top.order.length - 1)];
|
||||
if (senderUserId !== cur) return { ack: null };
|
||||
const next = this.advanceSubFrame(state);
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: next } };
|
||||
}
|
||||
|
||||
const order = state.orderUserIds;
|
||||
const n = order.length;
|
||||
|
||||
// only the current speaker's /no-reply has any effect
|
||||
if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) {
|
||||
return { ack: null };
|
||||
}
|
||||
@@ -182,7 +268,6 @@ export class TurnService {
|
||||
state.norepStreak = [...state.norepStreak, senderUserId];
|
||||
}
|
||||
|
||||
// pause when every current member has consecutively /no-reply'd
|
||||
const allCovered = order.every((u) => state.norepStreak.includes(u));
|
||||
if (allCovered) {
|
||||
state.currentSpeaker = null;
|
||||
@@ -214,11 +299,19 @@ export class TurnService {
|
||||
});
|
||||
}
|
||||
|
||||
// /force-proceed command in a discuss/work channel: skip the stuck current
|
||||
// speaker (not recorded, streak untouched), advance to the next one.
|
||||
async onForceProceed(channelId: string): Promise<CommandDecision> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
const state = await this.ensureState(manager, channelId);
|
||||
const fr = this.frames(state);
|
||||
|
||||
if (fr.length) {
|
||||
const top = fr[fr.length - 1];
|
||||
if (!top.order.length) return { ack: null };
|
||||
const next = this.advanceSubFrame(state);
|
||||
await manager.save(ChannelTurnState, state);
|
||||
return { ack: { wakeupUserId: next } };
|
||||
}
|
||||
|
||||
const order = state.orderUserIds;
|
||||
const n = order.length;
|
||||
if (n <= 1 || state.currentSpeaker === null) return { ack: null };
|
||||
|
||||
@@ -2,6 +2,11 @@ import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from
|
||||
|
||||
export type RoundEvent = { u: string; a: 'normal' | 'noreply' };
|
||||
|
||||
// A mention sub-rotation frame pushed on top of the root rotation.
|
||||
// currentSpeaker of an active sub-frame = order[idx]. Single linear pass:
|
||||
// after the member at the last index acts, the frame pops.
|
||||
export type TurnFrame = { order: string[]; idx: number };
|
||||
|
||||
// Per-channel rotation state for discuss/work x_type channels.
|
||||
// All mutations must be serialized per channel (pessimistic row lock).
|
||||
@Entity('channel_turn_state')
|
||||
@@ -36,6 +41,12 @@ export class ChannelTurnState {
|
||||
@Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true })
|
||||
lastNormalSpeaker!: string | null;
|
||||
|
||||
// mention sub-rotation stack on top of the root rotation. Empty = root
|
||||
// active. Top of stack is the active frame; the root rotation
|
||||
// (order/currentSpeaker/round/streak) is paused while it is non-empty.
|
||||
@Column({ name: 'frames', type: 'json', nullable: true })
|
||||
frames!: TurnFrame[] | null;
|
||||
|
||||
@UpdateDateColumn()
|
||||
updatedAt!: Date;
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import { Message } from '../entities/message.entity';
|
||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
|
||||
import { WakeMapping } from '../entities/wake-mapping.entity';
|
||||
import { parseSlashCommand } from '../channels/slash-commands';
|
||||
import { parseMentions } from '../channels/mentions';
|
||||
import { TurnService } from '../channels/turn.service';
|
||||
import { EventsService } from '../events/events.service';
|
||||
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
|
||||
@@ -179,18 +180,24 @@ export class MessagingController {
|
||||
data: responseBody,
|
||||
});
|
||||
|
||||
// mentions: <@id> outside backtick spans
|
||||
const mentionIds = parseMentions(body.content ?? '');
|
||||
|
||||
if (isRotating) {
|
||||
// discuss/work: rotation decides the single wakeup target
|
||||
const decision = await this.turn.onNormalMessage(channelId, authorUserId);
|
||||
// discuss/work: rotation (incl. mention sub-frames) picks the target
|
||||
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
|
||||
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
|
||||
} else {
|
||||
// general/report/triage/custom: wakeup from x_type + wake_mapping
|
||||
// general/report/triage/custom: wakeup from x_type + wake_mapping;
|
||||
// general also honors the message's at-list
|
||||
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
|
||||
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
|
||||
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
|
||||
await this.realtime.emitMessageCreated(channelId, responseBody, {
|
||||
xType,
|
||||
authorUserId,
|
||||
wakeUserIds,
|
||||
mentionUserIds,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -18,18 +18,24 @@ type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
|
||||
// Precedence:
|
||||
// 1. the author never gets woken by their own message
|
||||
// 2. triage/custom: only wake users in the channel's wake_mapping
|
||||
// 3. general: wake everyone
|
||||
// (mentions change nothing here)
|
||||
// 3. general: if the message has an at-list, wake only the at'd users;
|
||||
// otherwise wake everyone
|
||||
// 4. report (and anything else): wake nobody
|
||||
export function computeWakeup(args: {
|
||||
xType: XType;
|
||||
recipientUserId: string;
|
||||
authorUserId: string;
|
||||
wakeUserIds: Set<string>;
|
||||
mentionUserIds?: Set<string>;
|
||||
}): boolean {
|
||||
const { xType, recipientUserId, authorUserId, wakeUserIds } = args;
|
||||
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args;
|
||||
if (recipientUserId === authorUserId) return false;
|
||||
switch (xType) {
|
||||
case 'general':
|
||||
if (mentionUserIds && mentionUserIds.size > 0) {
|
||||
return mentionUserIds.has(recipientUserId);
|
||||
}
|
||||
return true;
|
||||
case 'triage':
|
||||
case 'custom':
|
||||
@@ -166,7 +172,12 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
||||
async emitMessageCreated(
|
||||
channelId: string,
|
||||
data: Record<string, unknown>,
|
||||
ctx: { xType: XType; authorUserId: string; wakeUserIds: Set<string> },
|
||||
ctx: {
|
||||
xType: XType;
|
||||
authorUserId: string;
|
||||
wakeUserIds: Set<string>;
|
||||
mentionUserIds?: Set<string>;
|
||||
},
|
||||
): Promise<void> {
|
||||
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
|
||||
for (const s of sockets) {
|
||||
@@ -176,6 +187,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
||||
recipientUserId,
|
||||
authorUserId: ctx.authorUserId,
|
||||
wakeUserIds: ctx.wakeUserIds,
|
||||
mentionUserIds: ctx.mentionUserIds,
|
||||
});
|
||||
s.emit('message.created', { ...data, wakeup });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user