import { ConnectedSocket, MessageBody, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage, WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; import { Logger } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; import { introspectGuildToken } from '../common/center-auth.js'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; // Wakeup for non-rotating channels only (general/report/triage/custom). // discuss/work go through TurnService + emitMessageTargeted, never here. // Precedence: // 1. the author never gets woken by their own message // 2. triage/custom: only wake users in the channel's wake_mapping // (mentions change nothing here) // 3. general: if the message has an at-list, wake only the at'd users; // otherwise wake everyone // 4. report (and anything else): wake nobody export function computeWakeup(args: { xType: XType; recipientUserId: string; authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; }): boolean { const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args; if (recipientUserId === authorUserId) return false; switch (xType) { case 'general': if (mentionUserIds && mentionUserIds.size > 0) { return mentionUserIds.has(recipientUserId); } return true; case 'triage': case 'custom': return wakeUserIds.has(recipientUserId); case 'dm': // 1:1 conversation: every non-author participant is always woken. return true; default: return false; } } @WebSocketGateway({ namespace: '/realtime', cors: { origin: '*', }, }) export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server!: Server; private readonly logger = new Logger(RealtimeGateway.name); private readonly onlineUsers = new Set(); private userIdFromClient(client: Socket): string { const authUser = client.handshake.auth?.userId; const headerUser = client.handshake.headers['x-user-id']; const userId = typeof authUser === 'string' ? authUser : Array.isArray(headerUser) ? headerUser[0] : headerUser; return userId && typeof userId === 'string' && userId.trim() !== '' ? userId : `anon:${client.id}`; } async handleConnection(client: Socket): Promise { const authToken = client.handshake.auth?.token; const headerAuth = client.handshake.headers['authorization']; const bearer = typeof authToken === 'string' ? authToken : typeof headerAuth === 'string' && headerAuth.startsWith('Bearer ') ? headerAuth.slice(7) : ''; if (!bearer) { this.logger.warn(`socket rejected (missing token): ${client.id}`); client.disconnect(true); return; } const result = await introspectGuildToken(bearer); if (!result.active || !result.user?.id) { this.logger.warn(`socket rejected: ${client.id}`); client.disconnect(true); return; } this.logger.log(`socket connected: ${client.id}`); const userId = result.user.id || this.userIdFromClient(client); client.data.userId = userId; this.onlineUsers.add(userId); this.server.emit('presence.online', { userId, onlineCount: this.onlineUsers.size, occurredAt: new Date().toISOString(), }); } handleDisconnect(client: Socket): void { this.logger.log(`socket disconnected: ${client.id}`); const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`; this.onlineUsers.delete(userId); this.server.emit('presence.offline', { userId, onlineCount: this.onlineUsers.size, occurredAt: new Date().toISOString(), }); } @SubscribeMessage('join_channel') joinChannel( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; client.join(`channel:${body.channelId}`); return { ok: true }; } @SubscribeMessage('leave_channel') leaveChannel( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; client.leave(`channel:${body.channelId}`); return { ok: true }; } @SubscribeMessage('typing.start') typingStart( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`; this.server.to(`channel:${body.channelId}`).emit('typing.start', { channelId: body.channelId, userId, occurredAt: new Date().toISOString(), }); return { ok: true }; } @SubscribeMessage('typing.stop') typingStop( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`; this.server.to(`channel:${body.channelId}`).emit('typing.stop', { channelId: body.channelId, userId, occurredAt: new Date().toISOString(), }); return { ok: true }; } emitChannelEvent(channelId: string, event: string, data: Record): void { this.server.to(`channel:${channelId}`).emit(event, data); } // Emits message.created per-recipient so each carries its own `wakeup` flag. async emitMessageCreated( channelId: string, data: Record, ctx: { xType: XType; authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; }, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const wakeup = computeWakeup({ xType: ctx.xType, recipientUserId, authorUserId: ctx.authorUserId, wakeUserIds: ctx.wakeUserIds, mentionUserIds: ctx.mentionUserIds, }); s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType }); } } // discuss/work + /ack: exactly one recipient (the new current speaker) gets // wakeup=true; everyone else false. One message-id; metadata at push only. async emitMessageTargeted( channelId: string, data: Record, wakeupUserId: string | null, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId; s.emit('message.created', { ...data, channelId, wakeup }); } } }