import { ConnectedSocket, MessageBody, OnGatewayConnection, OnGatewayDisconnect, SubscribeMessage, WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; import { Logger } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; import { introspectGuildToken } from '../common/center-auth.js'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; /** * Per-recipient delivery decision for a non-rotating channel message. * * • `wake` — push the event AND wake the recipient (model turn fires) * • `observer` — push the event with wakeup=false (silent; UI displays * but the openclaw plugin records-only without dispatch). Currently * used for the Center admin observing triage traffic * • `skip` — don't even emit the event to this recipient * * Wakeup-only channels (general/report/dm/custom) never return * 'observer'; the legacy behaviour is preserved end-to-end. * * Precedence for triage (the only place 'skip' / 'observer' fire): * 1. author never gets back their own message * 2. wake_mapping (on-duty) → wake * 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage * mention exception') * 4. admin (Center-scoped, at most one) → observer * 5. everyone else → skip (was 'deliver, wakeup=false' before) */ export type DeliveryDecision = 'wake' | 'observer' | 'skip'; export interface ComputeDeliveryArgs { xType: XType; recipientUserId: string; authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; /** Single Center-scoped admin userId, or null. */ adminUserId?: string | null; } export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision { const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args; if (recipientUserId === authorUserId) return 'skip'; switch (xType) { case 'triage': if (wakeUserIds.has(recipientUserId)) return 'wake'; if (mentionUserIds?.has(recipientUserId)) return 'wake'; if (adminUserId && recipientUserId === adminUserId) return 'observer'; return 'skip'; case 'general': if (mentionUserIds && mentionUserIds.size > 0) { return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer'; } return 'wake'; case 'custom': // wake_mapping decides who wakes; everyone else still sees the // message (observer) — preserves the legacy "deliver to all, wake // some" contract for custom channels. return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer'; case 'dm': return 'wake'; default: // report (and anything else): deliver as observer, no wake return 'observer'; } } /** * @deprecated Use computeDelivery (returns 3-state). Kept for any * external callers; treats 'observer' and 'skip' both as `false`. */ export function computeWakeup(args: { xType: XType; recipientUserId: string; authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; }): boolean { return computeDelivery(args) === 'wake'; } @WebSocketGateway({ namespace: '/realtime', cors: { origin: '*', }, }) export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server!: Server; private readonly logger = new Logger(RealtimeGateway.name); private readonly onlineUsers = new Set(); private userIdFromClient(client: Socket): string { const authUser = client.handshake.auth?.userId; const headerUser = client.handshake.headers['x-user-id']; const userId = typeof authUser === 'string' ? authUser : Array.isArray(headerUser) ? headerUser[0] : headerUser; return userId && typeof userId === 'string' && userId.trim() !== '' ? userId : `anon:${client.id}`; } async handleConnection(client: Socket): Promise { const authToken = client.handshake.auth?.token; const headerAuth = client.handshake.headers['authorization']; const bearer = typeof authToken === 'string' ? authToken : typeof headerAuth === 'string' && headerAuth.startsWith('Bearer ') ? headerAuth.slice(7) : ''; if (!bearer) { this.logger.warn(`socket rejected (missing token): ${client.id}`); client.disconnect(true); return; } const result = await introspectGuildToken(bearer); if (!result.active || !result.user?.id) { this.logger.warn(`socket rejected: ${client.id}`); client.disconnect(true); return; } this.logger.log(`socket connected: ${client.id}`); const userId = result.user.id || this.userIdFromClient(client); client.data.userId = userId; this.onlineUsers.add(userId); // Per-user room: lets server code emit user-scoped events (e.g. // channel.joined when membership changes) without bookkeeping a // userId→sockets map. All of this user's sockets receive the event. client.join(`user:${userId}`); this.server.emit('presence.online', { userId, onlineCount: this.onlineUsers.size, occurredAt: new Date().toISOString(), }); } handleDisconnect(client: Socket): void { this.logger.log(`socket disconnected: ${client.id}`); const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`; this.onlineUsers.delete(userId); this.server.emit('presence.offline', { userId, onlineCount: this.onlineUsers.size, occurredAt: new Date().toISOString(), }); } @SubscribeMessage('join_channel') joinChannel( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; client.join(`channel:${body.channelId}`); return { ok: true }; } @SubscribeMessage('leave_channel') leaveChannel( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; client.leave(`channel:${body.channelId}`); return { ok: true }; } @SubscribeMessage('typing.start') typingStart( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`; this.server.to(`channel:${body.channelId}`).emit('typing.start', { channelId: body.channelId, userId, occurredAt: new Date().toISOString(), }); return { ok: true }; } @SubscribeMessage('typing.stop') typingStop( @ConnectedSocket() client: Socket, @MessageBody() body: { channelId?: string }, ): { ok: boolean } { if (!body?.channelId) return { ok: false }; const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`; this.server.to(`channel:${body.channelId}`).emit('typing.stop', { channelId: body.channelId, userId, occurredAt: new Date().toISOString(), }); return { ok: true }; } emitChannelEvent(channelId: string, event: string, data: Record): void { this.server.to(`channel:${channelId}`).emit(event, data); } // Emit a user-scoped event to all sockets currently connected for `userId` // (via the `user:` room joined in handleConnection). No-op for // offline users — the next connect's initial channel-list fetch covers it. emitToUser(userId: string, event: string, data: Record): void { if (!userId) return; this.server.to(`user:${userId}`).emit(event, data); } // Emits message.created per-recipient using the 3-state delivery // decision (wake / observer / skip). Skipped recipients receive // nothing — used by triage channels to keep non-on-duty / non-mention // / non-admin users completely out of the loop. async emitMessageCreated( channelId: string, data: Record, ctx: { xType: XType; authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; /** Single Center-scoped admin userId (or null). */ adminUserId?: string | null; }, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const decision = computeDelivery({ xType: ctx.xType, recipientUserId, authorUserId: ctx.authorUserId, wakeUserIds: ctx.wakeUserIds, mentionUserIds: ctx.mentionUserIds, adminUserId: ctx.adminUserId, }); if (decision === 'skip') continue; s.emit('message.created', { ...data, channelId, wakeup: decision === 'wake', xType: ctx.xType, }); } } // discuss/work + /ack: exactly one recipient (the new current speaker) gets // wakeup=true; everyone else false. One message-id; metadata at push only. async emitMessageTargeted( channelId: string, data: Record, wakeupUserId: string | null, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId; s.emit('message.created', { ...data, channelId, wakeup }); } } }