feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw fabric inbound had to poll `/api/channels?guildId=...` every 60s to discover newly-joined channels (any DM another user just dragged the agent into). Now the server tells the agent's socket directly so sub/unsub is realtime. Changes: - realtime.gateway.ts: * handleConnection joins the socket into a `user:<userId>` room. All of a user's connected sockets now share that room. * New `emitToUser(userId, event, data)` helper that emits into that room. No-op for offline users (next connect resyncs via the plugin's initial channel-list fetch). - channels.service.ts: * Inject RealtimeGateway (RealtimeModule is @Global, no module plumbing needed). * Private `notifyMembership(kind, channelId, userIds, extra)` helper that emits `channel.<kind>` (joined|left) with payload {channelId, userId, xType, occurredAt}. * create(): emit channel.joined to every seeded member (creator + explicit memberUserIds + triage on-duty). * joinChannel(): emit channel.joined to userId (only if the row was actually inserted, idempotent on existing membership). * leaveChannel(): emit channel.left to userId iff a row was actually deleted. Event shape: { channelId: string, userId: string, xType?: string, occurredAt: ISO string, } Client-side contract (fabric plugin): socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId})) socket.on('channel.left', m => socket.emit('leave_channel', {channelId: m.channelId})) The plugin keeps its 60s polling resync as a safety net for missed events (transient socket drops between emit and reconnect, partial failures, etc). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -96,6 +96,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
||||
const userId = result.user.id || this.userIdFromClient(client);
|
||||
client.data.userId = userId;
|
||||
this.onlineUsers.add(userId);
|
||||
// Per-user room: lets server code emit user-scoped events (e.g.
|
||||
// channel.joined when membership changes) without bookkeeping a
|
||||
// userId→sockets map. All of this user's sockets receive the event.
|
||||
client.join(`user:${userId}`);
|
||||
this.server.emit('presence.online', {
|
||||
userId,
|
||||
onlineCount: this.onlineUsers.size,
|
||||
@@ -171,6 +175,14 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
||||
this.server.to(`channel:${channelId}`).emit(event, data);
|
||||
}
|
||||
|
||||
// Emit a user-scoped event to all sockets currently connected for `userId`
|
||||
// (via the `user:<userId>` room joined in handleConnection). No-op for
|
||||
// offline users — the next connect's initial channel-list fetch covers it.
|
||||
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
|
||||
if (!userId) return;
|
||||
this.server.to(`user:${userId}`).emit(event, data);
|
||||
}
|
||||
|
||||
// Emits message.created per-recipient so each carries its own `wakeup` flag.
|
||||
async emitMessageCreated(
|
||||
channelId: string,
|
||||
|
||||
Reference in New Issue
Block a user