diff --git a/src/channels/channels.service.ts b/src/channels/channels.service.ts index e7575bc..acb2810 100644 --- a/src/channels/channels.service.ts +++ b/src/channels/channels.service.ts @@ -5,6 +5,7 @@ import { Channel } from '../entities/channel.entity.js'; import { ChannelMember } from '../entities/channel-member.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { TurnService } from './turn.service.js'; +import { RealtimeGateway } from '../realtime/realtime.gateway.js'; const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const; type XType = (typeof X_TYPES)[number]; @@ -35,8 +36,34 @@ export class ChannelsService { @InjectRepository(WakeMapping) private readonly wakeRepo: Repository, private readonly turnService: TurnService, + // RealtimeGateway is provided by the global RealtimeModule. Used to + // push channel.joined / channel.left so connected clients (e.g. the + // OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately + // instead of waiting for the polling fallback. + private readonly realtime: RealtimeGateway, ) {} + // Push a channel membership change to each affected user's socket-room. + // Best-effort: offline users see the new state on their next connect + // (the inbound runs an initial channel-list fetch on connect). + private notifyMembership( + kind: 'joined' | 'left', + channelId: string, + userIds: string[] | Set, + extra: Record = {}, + ): void { + const ids = userIds instanceof Set ? [...userIds] : userIds; + const payload = { + channelId, + ...extra, + occurredAt: new Date().toISOString(), + }; + for (const u of ids) { + if (!u) continue; + this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u }); + } + } + // Channels visible to a user within a guild: // - every public channel of the guild (incl. ones created before the user // joined the guild), OR @@ -93,6 +120,7 @@ export class ChannelsService { if (channel.xType === 'discuss' || channel.xType === 'work') { await this.turnService.onMemberAdded(channelId, userId); } + this.notifyMembership('joined', channelId, [userId], { xType: channel.xType }); } return { status: 'ok', channelId, userId, member: true }; } @@ -102,11 +130,14 @@ export class ChannelsService { if (!channel) throw new NotFoundException('channel not found'); // remove every channel-scoped row that references this user - await this.memberRepo.delete({ channelId, userId }); + const deleted = await this.memberRepo.delete({ channelId, userId }); await this.wakeRepo.delete({ channelId, userId }); if (channel.xType === 'discuss' || channel.xType === 'work') { await this.turnService.onMemberRemoved(channelId, userId); } + if ((deleted.affected ?? 0) > 0) { + this.notifyMembership('left', channelId, [userId], { xType: channel.xType }); + } return { status: 'ok', channelId, userId, member: false }; } @@ -160,6 +191,12 @@ export class ChannelsService { [...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })), ); + // Push channel.joined to every seeded member (creator + invitees + + // triage on-duty) so their connected sockets sub the new room + // immediately. Skips offline users — next connect's channel-list + // fetch covers them. + this.notifyMembership('joined', channel.id, memberIds, { xType }); + // wake_mapping: triage -> the on-duty user; custom -> each listener const wakeUserIds = new Set(); if (xType === 'triage') wakeUserIds.add(onDuty); diff --git a/src/realtime/realtime.gateway.ts b/src/realtime/realtime.gateway.ts index ce680cf..9808ccf 100644 --- a/src/realtime/realtime.gateway.ts +++ b/src/realtime/realtime.gateway.ts @@ -96,6 +96,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect const userId = result.user.id || this.userIdFromClient(client); client.data.userId = userId; this.onlineUsers.add(userId); + // Per-user room: lets server code emit user-scoped events (e.g. + // channel.joined when membership changes) without bookkeeping a + // userId→sockets map. All of this user's sockets receive the event. + client.join(`user:${userId}`); this.server.emit('presence.online', { userId, onlineCount: this.onlineUsers.size, @@ -171,6 +175,14 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect this.server.to(`channel:${channelId}`).emit(event, data); } + // Emit a user-scoped event to all sockets currently connected for `userId` + // (via the `user:` room joined in handleConnection). No-op for + // offline users — the next connect's initial channel-list fetch covers it. + emitToUser(userId: string, event: string, data: Record): void { + if (!userId) return; + this.server.to(`user:${userId}`).emit(event, data); + } + // Emits message.created per-recipient so each carries its own `wakeup` flag. async emitMessageCreated( channelId: string,