feat(realtime): push channel.joined/left events to user-scoped rooms #1
@@ -5,6 +5,7 @@ import { Channel } from '../entities/channel.entity.js';
|
|||||||
import { ChannelMember } from '../entities/channel-member.entity.js';
|
import { ChannelMember } from '../entities/channel-member.entity.js';
|
||||||
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
||||||
import { TurnService } from './turn.service.js';
|
import { TurnService } from './turn.service.js';
|
||||||
|
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
|
||||||
|
|
||||||
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
|
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
|
||||||
type XType = (typeof X_TYPES)[number];
|
type XType = (typeof X_TYPES)[number];
|
||||||
@@ -35,8 +36,34 @@ export class ChannelsService {
|
|||||||
@InjectRepository(WakeMapping)
|
@InjectRepository(WakeMapping)
|
||||||
private readonly wakeRepo: Repository<WakeMapping>,
|
private readonly wakeRepo: Repository<WakeMapping>,
|
||||||
private readonly turnService: TurnService,
|
private readonly turnService: TurnService,
|
||||||
|
// RealtimeGateway is provided by the global RealtimeModule. Used to
|
||||||
|
// push channel.joined / channel.left so connected clients (e.g. the
|
||||||
|
// OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately
|
||||||
|
// instead of waiting for the polling fallback.
|
||||||
|
private readonly realtime: RealtimeGateway,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
// Push a channel membership change to each affected user's socket-room.
|
||||||
|
// Best-effort: offline users see the new state on their next connect
|
||||||
|
// (the inbound runs an initial channel-list fetch on connect).
|
||||||
|
private notifyMembership(
|
||||||
|
kind: 'joined' | 'left',
|
||||||
|
channelId: string,
|
||||||
|
userIds: string[] | Set<string>,
|
||||||
|
extra: Record<string, unknown> = {},
|
||||||
|
): void {
|
||||||
|
const ids = userIds instanceof Set ? [...userIds] : userIds;
|
||||||
|
const payload = {
|
||||||
|
channelId,
|
||||||
|
...extra,
|
||||||
|
occurredAt: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
for (const u of ids) {
|
||||||
|
if (!u) continue;
|
||||||
|
this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Channels visible to a user within a guild:
|
// Channels visible to a user within a guild:
|
||||||
// - every public channel of the guild (incl. ones created before the user
|
// - every public channel of the guild (incl. ones created before the user
|
||||||
// joined the guild), OR
|
// joined the guild), OR
|
||||||
@@ -93,6 +120,7 @@ export class ChannelsService {
|
|||||||
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
||||||
await this.turnService.onMemberAdded(channelId, userId);
|
await this.turnService.onMemberAdded(channelId, userId);
|
||||||
}
|
}
|
||||||
|
this.notifyMembership('joined', channelId, [userId], { xType: channel.xType });
|
||||||
}
|
}
|
||||||
return { status: 'ok', channelId, userId, member: true };
|
return { status: 'ok', channelId, userId, member: true };
|
||||||
}
|
}
|
||||||
@@ -102,11 +130,14 @@ export class ChannelsService {
|
|||||||
if (!channel) throw new NotFoundException('channel not found');
|
if (!channel) throw new NotFoundException('channel not found');
|
||||||
|
|
||||||
// remove every channel-scoped row that references this user
|
// remove every channel-scoped row that references this user
|
||||||
await this.memberRepo.delete({ channelId, userId });
|
const deleted = await this.memberRepo.delete({ channelId, userId });
|
||||||
await this.wakeRepo.delete({ channelId, userId });
|
await this.wakeRepo.delete({ channelId, userId });
|
||||||
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
||||||
await this.turnService.onMemberRemoved(channelId, userId);
|
await this.turnService.onMemberRemoved(channelId, userId);
|
||||||
}
|
}
|
||||||
|
if ((deleted.affected ?? 0) > 0) {
|
||||||
|
this.notifyMembership('left', channelId, [userId], { xType: channel.xType });
|
||||||
|
}
|
||||||
return { status: 'ok', channelId, userId, member: false };
|
return { status: 'ok', channelId, userId, member: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -160,6 +191,12 @@ export class ChannelsService {
|
|||||||
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
|
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Push channel.joined to every seeded member (creator + invitees +
|
||||||
|
// triage on-duty) so their connected sockets sub the new room
|
||||||
|
// immediately. Skips offline users — next connect's channel-list
|
||||||
|
// fetch covers them.
|
||||||
|
this.notifyMembership('joined', channel.id, memberIds, { xType });
|
||||||
|
|
||||||
// wake_mapping: triage -> the on-duty user; custom -> each listener
|
// wake_mapping: triage -> the on-duty user; custom -> each listener
|
||||||
const wakeUserIds = new Set<string>();
|
const wakeUserIds = new Set<string>();
|
||||||
if (xType === 'triage') wakeUserIds.add(onDuty);
|
if (xType === 'triage') wakeUserIds.add(onDuty);
|
||||||
|
|||||||
@@ -96,6 +96,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
|||||||
const userId = result.user.id || this.userIdFromClient(client);
|
const userId = result.user.id || this.userIdFromClient(client);
|
||||||
client.data.userId = userId;
|
client.data.userId = userId;
|
||||||
this.onlineUsers.add(userId);
|
this.onlineUsers.add(userId);
|
||||||
|
// Per-user room: lets server code emit user-scoped events (e.g.
|
||||||
|
// channel.joined when membership changes) without bookkeeping a
|
||||||
|
// userId→sockets map. All of this user's sockets receive the event.
|
||||||
|
client.join(`user:${userId}`);
|
||||||
this.server.emit('presence.online', {
|
this.server.emit('presence.online', {
|
||||||
userId,
|
userId,
|
||||||
onlineCount: this.onlineUsers.size,
|
onlineCount: this.onlineUsers.size,
|
||||||
@@ -171,6 +175,14 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
|||||||
this.server.to(`channel:${channelId}`).emit(event, data);
|
this.server.to(`channel:${channelId}`).emit(event, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Emit a user-scoped event to all sockets currently connected for `userId`
|
||||||
|
// (via the `user:<userId>` room joined in handleConnection). No-op for
|
||||||
|
// offline users — the next connect's initial channel-list fetch covers it.
|
||||||
|
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
|
||||||
|
if (!userId) return;
|
||||||
|
this.server.to(`user:${userId}`).emit(event, data);
|
||||||
|
}
|
||||||
|
|
||||||
// Emits message.created per-recipient so each carries its own `wakeup` flag.
|
// Emits message.created per-recipient so each carries its own `wakeup` flag.
|
||||||
async emitMessageCreated(
|
async emitMessageCreated(
|
||||||
channelId: string,
|
channelId: string,
|
||||||
|
|||||||
Reference in New Issue
Block a user