feat(realtime): push channel.joined/left events to user-scoped rooms #1

Merged
hzhang merged 1 commits from feat/push-channel-membership-events into main 2026-05-21 07:12:51 +00:00
2 changed files with 50 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js'; import { ChannelMember } from '../entities/channel-member.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { TurnService } from './turn.service.js'; import { TurnService } from './turn.service.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const; const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
type XType = (typeof X_TYPES)[number]; type XType = (typeof X_TYPES)[number];
@@ -35,8 +36,34 @@ export class ChannelsService {
@InjectRepository(WakeMapping) @InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>, private readonly wakeRepo: Repository<WakeMapping>,
private readonly turnService: TurnService, private readonly turnService: TurnService,
// RealtimeGateway is provided by the global RealtimeModule. Used to
// push channel.joined / channel.left so connected clients (e.g. the
// OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately
// instead of waiting for the polling fallback.
private readonly realtime: RealtimeGateway,
) {} ) {}
// Push a channel membership change to each affected user's socket-room.
// Best-effort: offline users see the new state on their next connect
// (the inbound runs an initial channel-list fetch on connect).
private notifyMembership(
kind: 'joined' | 'left',
channelId: string,
userIds: string[] | Set<string>,
extra: Record<string, unknown> = {},
): void {
const ids = userIds instanceof Set ? [...userIds] : userIds;
const payload = {
channelId,
...extra,
occurredAt: new Date().toISOString(),
};
for (const u of ids) {
if (!u) continue;
this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u });
}
}
// Channels visible to a user within a guild: // Channels visible to a user within a guild:
// - every public channel of the guild (incl. ones created before the user // - every public channel of the guild (incl. ones created before the user
// joined the guild), OR // joined the guild), OR
@@ -93,6 +120,7 @@ export class ChannelsService {
if (channel.xType === 'discuss' || channel.xType === 'work') { if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberAdded(channelId, userId); await this.turnService.onMemberAdded(channelId, userId);
} }
this.notifyMembership('joined', channelId, [userId], { xType: channel.xType });
} }
return { status: 'ok', channelId, userId, member: true }; return { status: 'ok', channelId, userId, member: true };
} }
@@ -102,11 +130,14 @@ export class ChannelsService {
if (!channel) throw new NotFoundException('channel not found'); if (!channel) throw new NotFoundException('channel not found');
// remove every channel-scoped row that references this user // remove every channel-scoped row that references this user
await this.memberRepo.delete({ channelId, userId }); const deleted = await this.memberRepo.delete({ channelId, userId });
await this.wakeRepo.delete({ channelId, userId }); await this.wakeRepo.delete({ channelId, userId });
if (channel.xType === 'discuss' || channel.xType === 'work') { if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberRemoved(channelId, userId); await this.turnService.onMemberRemoved(channelId, userId);
} }
if ((deleted.affected ?? 0) > 0) {
this.notifyMembership('left', channelId, [userId], { xType: channel.xType });
}
return { status: 'ok', channelId, userId, member: false }; return { status: 'ok', channelId, userId, member: false };
} }
@@ -160,6 +191,12 @@ export class ChannelsService {
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })), [...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
); );
// Push channel.joined to every seeded member (creator + invitees +
// triage on-duty) so their connected sockets sub the new room
// immediately. Skips offline users — next connect's channel-list
// fetch covers them.
this.notifyMembership('joined', channel.id, memberIds, { xType });
// wake_mapping: triage -> the on-duty user; custom -> each listener // wake_mapping: triage -> the on-duty user; custom -> each listener
const wakeUserIds = new Set<string>(); const wakeUserIds = new Set<string>();
if (xType === 'triage') wakeUserIds.add(onDuty); if (xType === 'triage') wakeUserIds.add(onDuty);

View File

@@ -96,6 +96,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
const userId = result.user.id || this.userIdFromClient(client); const userId = result.user.id || this.userIdFromClient(client);
client.data.userId = userId; client.data.userId = userId;
this.onlineUsers.add(userId); this.onlineUsers.add(userId);
// Per-user room: lets server code emit user-scoped events (e.g.
// channel.joined when membership changes) without bookkeeping a
// userId→sockets map. All of this user's sockets receive the event.
client.join(`user:${userId}`);
this.server.emit('presence.online', { this.server.emit('presence.online', {
userId, userId,
onlineCount: this.onlineUsers.size, onlineCount: this.onlineUsers.size,
@@ -171,6 +175,14 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
this.server.to(`channel:${channelId}`).emit(event, data); this.server.to(`channel:${channelId}`).emit(event, data);
} }
// Emit a user-scoped event to all sockets currently connected for `userId`
// (via the `user:<userId>` room joined in handleConnection). No-op for
// offline users — the next connect's initial channel-list fetch covers it.
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
if (!userId) return;
this.server.to(`user:${userId}`).emit(event, data);
}
// Emits message.created per-recipient so each carries its own `wakeup` flag. // Emits message.created per-recipient so each carries its own `wakeup` flag.
async emitMessageCreated( async emitMessageCreated(
channelId: string, channelId: string,