Files
Fabric.Backend.Guild/src/realtime/realtime.gateway.ts
hzhang 30069377e7 feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to
nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw
fabric inbound had to poll `/api/channels?guildId=...` every 60s to
discover newly-joined channels (any DM another user just dragged the
agent into). Now the server tells the agent's socket directly so
sub/unsub is realtime.

Changes:
- realtime.gateway.ts:
  * handleConnection joins the socket into a `user:<userId>` room.
    All of a user's connected sockets now share that room.
  * New `emitToUser(userId, event, data)` helper that emits into
    that room. No-op for offline users (next connect resyncs via the
    plugin's initial channel-list fetch).
- channels.service.ts:
  * Inject RealtimeGateway (RealtimeModule is @Global, no module
    plumbing needed).
  * Private `notifyMembership(kind, channelId, userIds, extra)`
    helper that emits `channel.<kind>` (joined|left) with payload
    {channelId, userId, xType, occurredAt}.
  * create(): emit channel.joined to every seeded member (creator +
    explicit memberUserIds + triage on-duty).
  * joinChannel(): emit channel.joined to userId (only if the row was
    actually inserted, idempotent on existing membership).
  * leaveChannel(): emit channel.left to userId iff a row was
    actually deleted.

Event shape:
  {
    channelId: string,
    userId: string,
    xType?: string,
    occurredAt: ISO string,
  }

Client-side contract (fabric plugin):
  socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId}))
  socket.on('channel.left',   m => socket.emit('leave_channel', {channelId: m.channelId}))

The plugin keeps its 60s polling resync as a safety net for missed
events (transient socket drops between emit and reconnect, partial
failures, etc).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:07:46 +01:00

226 lines
7.5 KiB
TypeScript

import {
ConnectedSocket,
MessageBody,
OnGatewayConnection,
OnGatewayDisconnect,
SubscribeMessage,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { introspectGuildToken } from '../common/center-auth.js';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
// Wakeup for non-rotating channels only (general/report/triage/custom).
// discuss/work go through TurnService + emitMessageTargeted, never here.
// Precedence:
// 1. the author never gets woken by their own message
// 2. triage/custom: only wake users in the channel's wake_mapping
// (mentions change nothing here)
// 3. general: if the message has an at-list, wake only the at'd users;
// otherwise wake everyone
// 4. report (and anything else): wake nobody
export function computeWakeup(args: {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
}): boolean {
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args;
if (recipientUserId === authorUserId) return false;
switch (xType) {
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId);
}
return true;
case 'triage':
case 'custom':
return wakeUserIds.has(recipientUserId);
case 'dm':
// 1:1 conversation: every non-author participant is always woken.
return true;
default:
return false;
}
}
@WebSocketGateway({
namespace: '/realtime',
cors: {
origin: '*',
},
})
export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server!: Server;
private readonly logger = new Logger(RealtimeGateway.name);
private readonly onlineUsers = new Set<string>();
private userIdFromClient(client: Socket): string {
const authUser = client.handshake.auth?.userId;
const headerUser = client.handshake.headers['x-user-id'];
const userId = typeof authUser === 'string' ? authUser : Array.isArray(headerUser) ? headerUser[0] : headerUser;
return userId && typeof userId === 'string' && userId.trim() !== '' ? userId : `anon:${client.id}`;
}
async handleConnection(client: Socket): Promise<void> {
const authToken = client.handshake.auth?.token;
const headerAuth = client.handshake.headers['authorization'];
const bearer =
typeof authToken === 'string'
? authToken
: typeof headerAuth === 'string' && headerAuth.startsWith('Bearer ')
? headerAuth.slice(7)
: '';
if (!bearer) {
this.logger.warn(`socket rejected (missing token): ${client.id}`);
client.disconnect(true);
return;
}
const result = await introspectGuildToken(bearer);
if (!result.active || !result.user?.id) {
this.logger.warn(`socket rejected: ${client.id}`);
client.disconnect(true);
return;
}
this.logger.log(`socket connected: ${client.id}`);
const userId = result.user.id || this.userIdFromClient(client);
client.data.userId = userId;
this.onlineUsers.add(userId);
// Per-user room: lets server code emit user-scoped events (e.g.
// channel.joined when membership changes) without bookkeeping a
// userId→sockets map. All of this user's sockets receive the event.
client.join(`user:${userId}`);
this.server.emit('presence.online', {
userId,
onlineCount: this.onlineUsers.size,
occurredAt: new Date().toISOString(),
});
}
handleDisconnect(client: Socket): void {
this.logger.log(`socket disconnected: ${client.id}`);
const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`;
this.onlineUsers.delete(userId);
this.server.emit('presence.offline', {
userId,
onlineCount: this.onlineUsers.size,
occurredAt: new Date().toISOString(),
});
}
@SubscribeMessage('join_channel')
joinChannel(
@ConnectedSocket() client: Socket,
@MessageBody() body: { channelId?: string },
): { ok: boolean } {
if (!body?.channelId) return { ok: false };
client.join(`channel:${body.channelId}`);
return { ok: true };
}
@SubscribeMessage('leave_channel')
leaveChannel(
@ConnectedSocket() client: Socket,
@MessageBody() body: { channelId?: string },
): { ok: boolean } {
if (!body?.channelId) return { ok: false };
client.leave(`channel:${body.channelId}`);
return { ok: true };
}
@SubscribeMessage('typing.start')
typingStart(
@ConnectedSocket() client: Socket,
@MessageBody() body: { channelId?: string },
): { ok: boolean } {
if (!body?.channelId) return { ok: false };
const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`;
this.server.to(`channel:${body.channelId}`).emit('typing.start', {
channelId: body.channelId,
userId,
occurredAt: new Date().toISOString(),
});
return { ok: true };
}
@SubscribeMessage('typing.stop')
typingStop(
@ConnectedSocket() client: Socket,
@MessageBody() body: { channelId?: string },
): { ok: boolean } {
if (!body?.channelId) return { ok: false };
const userId = typeof client.data.userId === 'string' ? client.data.userId : `anon:${client.id}`;
this.server.to(`channel:${body.channelId}`).emit('typing.stop', {
channelId: body.channelId,
userId,
occurredAt: new Date().toISOString(),
});
return { ok: true };
}
emitChannelEvent(channelId: string, event: string, data: Record<string, unknown>): void {
this.server.to(`channel:${channelId}`).emit(event, data);
}
// Emit a user-scoped event to all sockets currently connected for `userId`
// (via the `user:<userId>` room joined in handleConnection). No-op for
// offline users — the next connect's initial channel-list fetch covers it.
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
if (!userId) return;
this.server.to(`user:${userId}`).emit(event, data);
}
// Emits message.created per-recipient so each carries its own `wakeup` flag.
async emitMessageCreated(
channelId: string,
data: Record<string, unknown>,
ctx: {
xType: XType;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
},
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const wakeup = computeWakeup({
xType: ctx.xType,
recipientUserId,
authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds,
});
s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType });
}
}
// discuss/work + /ack: exactly one recipient (the new current speaker) gets
// wakeup=true; everyone else false. One message-id; metadata at push only.
async emitMessageTargeted(
channelId: string,
data: Record<string, unknown>,
wakeupUserId: string | null,
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId;
s.emit('message.created', { ...data, channelId, wakeup });
}
}
}