2 Commits

Author SHA1 Message Date
h z
5b835e0871 Merge pull request 'feat(realtime): push channel.joined/left events to user-scoped rooms' (#1) from feat/push-channel-membership-events into main 2026-05-21 07:12:51 +00:00
hanghang zhang
e33f1ecc53 feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to
nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw
fabric inbound had to poll `/api/channels?guildId=...` every 60s to
discover newly-joined channels (any DM another user just dragged the
agent into). Now the server tells the agent's socket directly so
sub/unsub is realtime.

Changes:
- realtime.gateway.ts:
  * handleConnection joins the socket into a `user:<userId>` room.
    All of a user's connected sockets now share that room.
  * New `emitToUser(userId, event, data)` helper that emits into
    that room. No-op for offline users (next connect resyncs via the
    plugin's initial channel-list fetch).
- channels.service.ts:
  * Inject RealtimeGateway (RealtimeModule is @Global, no module
    plumbing needed).
  * Private `notifyMembership(kind, channelId, userIds, extra)`
    helper that emits `channel.<kind>` (joined|left) with payload
    {channelId, userId, xType, occurredAt}.
  * create(): emit channel.joined to every seeded member (creator +
    explicit memberUserIds + triage on-duty).
  * joinChannel(): emit channel.joined to userId (only if the row was
    actually inserted, idempotent on existing membership).
  * leaveChannel(): emit channel.left to userId iff a row was
    actually deleted.

Event shape:
  {
    channelId: string,
    userId: string,
    xType?: string,
    occurredAt: ISO string,
  }

Client-side contract (fabric plugin):
  socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId}))
  socket.on('channel.left',   m => socket.emit('leave_channel', {channelId: m.channelId}))

The plugin keeps its 60s polling resync as a safety net for missed
events (transient socket drops between emit and reconnect, partial
failures, etc).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:07:46 +01:00
6 changed files with 31 additions and 227 deletions

View File

@@ -1,39 +0,0 @@
// Operator convenience: force-refresh the in-memory Center admin cache
// without waiting for the 1-day TTL. Used after `center user set-admin`
// to make new admin visible immediately to triage delivery.
//
// Usage (inside the deployed container):
// docker exec fabric-backend-guild node dist/cli/admin-refresh.js
//
// Prints the (possibly null) result as JSON. Exit 0 always — a "no
// admin" outcome is a valid state, not an error.
import 'reflect-metadata';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '../app.module.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] });
try {
const cache = app.get(AdminCacheService);
const before = cache.snapshot();
const after = await cache.get(true);
process.stdout.write(
JSON.stringify({
ok: true,
before,
after,
changed: JSON.stringify(before) !== JSON.stringify(after),
}) + '\n',
);
} finally {
await app.close();
}
}
void main().catch((error: unknown) => {
const message = error instanceof Error ? error.message : 'unknown error';
process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n');
process.exit(1);
});

View File

@@ -1,73 +0,0 @@
/**
* Center-scoped admin cache.
*
* Holds the at-most-one admin user (email + userId) fetched from Center.
* Used to decide who to deliver triage messages to as a silent observer
* (wake=false), regardless of on-duty / mention status.
*
* Refresh policy (per spec, 2026-05-22):
* • TTL = 1 day. Center admin changes are rare; agents tolerate a
* day's stale cache without surprises
* • on first lookup the cache lazy-fetches
* • cli `admin refresh` forces an out-of-band refresh without waiting
* for TTL expiry
*
* Failure mode: a Center fetch error is treated identically to "no
* admin" — guild keeps operating without an observer. The cache holds
* the failed-fetch decision for the same TTL so we don't hammer Center.
*/
import { Injectable, Logger } from '@nestjs/common';
import { fetchAdminEmail } from './center-auth.js';
const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000;
export interface CachedAdmin {
email: string;
userId: string;
}
@Injectable()
export class AdminCacheService {
private readonly logger = new Logger(AdminCacheService.name);
private cached: CachedAdmin | null = null;
private cachedAt = 0;
private inflight: Promise<CachedAdmin | null> | null = null;
/**
* Return the cached admin, fetching from Center if the cache is empty
* or older than the TTL. Returns null if no admin is set.
*
* `force=true` bypasses the cache and refreshes immediately — used by
* the cli refresh command.
*/
async get(force = false): Promise<CachedAdmin | null> {
const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS;
if (!force && this.cachedAt > 0 && fresh) {
return this.cached;
}
if (this.inflight) return this.inflight;
this.inflight = (async () => {
try {
const result = await fetchAdminEmail();
this.cached = result;
this.cachedAt = Date.now();
this.logger.log(
`admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`,
);
return result;
} finally {
this.inflight = null;
}
})();
return this.inflight;
}
/** Snapshot of the cached admin (no fetch). Returns null if not yet
* populated. Used by the hot delivery path which doesn't want to
* block on a Center round-trip. */
snapshot(): CachedAdmin | null {
return this.cached;
}
}

View File

@@ -26,31 +26,6 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo
}; };
} }
/**
* Fetch the single Center-scoped admin user (if any).
* Same x-api-key auth as introspect / resolve-names.
* Returns `null` when no admin is set OR the request fails (treated
* identically — the guild simply falls back to "no admin observer").
*/
export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> {
const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL;
const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY;
if (!centerBaseUrl || !centerApiKey) return null;
try {
const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, {
method: 'GET',
headers: { 'x-api-key': centerApiKey },
});
if (!res.ok) return null;
const data = (await res.json()) as { email?: string; userId?: string } | null;
if (!data || !data.email || !data.userId) return null;
return { email: data.email, userId: data.userId };
} catch {
return null;
}
}
// Resolve <@user.name:NAME> names to userIds within this guild node via // Resolve <@user.name:NAME> names to userIds within this guild node via
// Center. Unresolved names are simply absent from the returned map. // Center. Unresolved names are simply absent from the returned map.
export async function resolveUserNames(names: string[]): Promise<Record<string, string>> { export async function resolveUserNames(names: string[]): Promise<Record<string, string>> {

View File

@@ -21,7 +21,6 @@ import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js'; import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
import { parseSlashCommand } from '../channels/slash-commands.js'; import { parseSlashCommand } from '../channels/slash-commands.js';
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js'; import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
import { resolveUserNames } from '../common/center-auth.js'; import { resolveUserNames } from '../common/center-auth.js';
@@ -51,7 +50,6 @@ export class MessagingController {
private readonly turn: TurnService, private readonly turn: TurnService,
private readonly events: EventsService, private readonly events: EventsService,
private readonly realtime: RealtimeGateway, private readonly realtime: RealtimeGateway,
private readonly adminCache: AdminCacheService,
) {} ) {}
private async getIdempotentResponse( private async getIdempotentResponse(
@@ -227,19 +225,16 @@ export class MessagingController {
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds); const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else { } else {
// general/report/triage/custom: 3-state delivery // general/report/triage/custom: wakeup from x_type + wake_mapping;
// (wake / observer / skip) — see realtime.gateway.computeDelivery. // general also honors the message's at-list
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
const wakeRows = await this.wakeRepo.find({ where: { channelId } }); const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId)); const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
const admin = await this.adminCache.get();
await this.realtime.emitMessageCreated(channelId, responseBody, { await this.realtime.emitMessageCreated(channelId, responseBody, {
xType, xType,
authorUserId, authorUserId,
wakeUserIds, wakeUserIds,
mentionUserIds, mentionUserIds,
adminUserId: admin?.userId ?? null,
}); });
} }

View File

@@ -6,12 +6,9 @@ import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js'; import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
@Module({ @Module({
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])], imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])],
controllers: [MessagingController], controllers: [MessagingController],
providers: [AdminCacheService],
exports: [AdminCacheService],
}) })
export class MessagingModule {} export class MessagingModule {}

View File

@@ -13,70 +13,15 @@ import { introspectGuildToken } from '../common/center-auth.js';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
/** // Wakeup for non-rotating channels only (general/report/triage/custom).
* Per-recipient delivery decision for a non-rotating channel message. // discuss/work go through TurnService + emitMessageTargeted, never here.
* // Precedence:
* • `wake` — push the event AND wake the recipient (model turn fires) // 1. the author never gets woken by their own message
* • `observer` — push the event with wakeup=false (silent; UI displays // 2. triage/custom: only wake users in the channel's wake_mapping
* but the openclaw plugin records-only without dispatch). Currently // (mentions change nothing here)
* used for the Center admin observing triage traffic // 3. general: if the message has an at-list, wake only the at'd users;
* • `skip` — don't even emit the event to this recipient // otherwise wake everyone
* // 4. report (and anything else): wake nobody
* Wakeup-only channels (general/report/dm/custom) never return
* 'observer'; the legacy behaviour is preserved end-to-end.
*
* Precedence for triage (the only place 'skip' / 'observer' fire):
* 1. author never gets back their own message
* 2. wake_mapping (on-duty) → wake
* 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage
* mention exception')
* 4. admin (Center-scoped, at most one) → observer
* 5. everyone else → skip (was 'deliver, wakeup=false' before)
*/
export type DeliveryDecision = 'wake' | 'observer' | 'skip';
export interface ComputeDeliveryArgs {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId, or null. */
adminUserId?: string | null;
}
export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision {
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args;
if (recipientUserId === authorUserId) return 'skip';
switch (xType) {
case 'triage':
if (wakeUserIds.has(recipientUserId)) return 'wake';
if (mentionUserIds?.has(recipientUserId)) return 'wake';
if (adminUserId && recipientUserId === adminUserId) return 'observer';
return 'skip';
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer';
}
return 'wake';
case 'custom':
// wake_mapping decides who wakes; everyone else still sees the
// message (observer) — preserves the legacy "deliver to all, wake
// some" contract for custom channels.
return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer';
case 'dm':
return 'wake';
default:
// report (and anything else): deliver as observer, no wake
return 'observer';
}
}
/**
* @deprecated Use computeDelivery (returns 3-state). Kept for any
* external callers; treats 'observer' and 'skip' both as `false`.
*/
export function computeWakeup(args: { export function computeWakeup(args: {
xType: XType; xType: XType;
recipientUserId: string; recipientUserId: string;
@@ -84,7 +29,23 @@ export function computeWakeup(args: {
wakeUserIds: Set<string>; wakeUserIds: Set<string>;
mentionUserIds?: Set<string>; mentionUserIds?: Set<string>;
}): boolean { }): boolean {
return computeDelivery(args) === 'wake'; const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args;
if (recipientUserId === authorUserId) return false;
switch (xType) {
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId);
}
return true;
case 'triage':
case 'custom':
return wakeUserIds.has(recipientUserId);
case 'dm':
// 1:1 conversation: every non-author participant is always woken.
return true;
default:
return false;
}
} }
@WebSocketGateway({ @WebSocketGateway({
@@ -222,10 +183,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
this.server.to(`user:${userId}`).emit(event, data); this.server.to(`user:${userId}`).emit(event, data);
} }
// Emits message.created per-recipient using the 3-state delivery // Emits message.created per-recipient so each carries its own `wakeup` flag.
// decision (wake / observer / skip). Skipped recipients receive
// nothing — used by triage channels to keep non-on-duty / non-mention
// / non-admin users completely out of the loop.
async emitMessageCreated( async emitMessageCreated(
channelId: string, channelId: string,
data: Record<string, unknown>, data: Record<string, unknown>,
@@ -234,28 +192,19 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
authorUserId: string; authorUserId: string;
wakeUserIds: Set<string>; wakeUserIds: Set<string>;
mentionUserIds?: Set<string>; mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId (or null). */
adminUserId?: string | null;
}, },
): Promise<void> { ): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) { for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const decision = computeDelivery({ const wakeup = computeWakeup({
xType: ctx.xType, xType: ctx.xType,
recipientUserId, recipientUserId,
authorUserId: ctx.authorUserId, authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds, wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds, mentionUserIds: ctx.mentionUserIds,
adminUserId: ctx.adminUserId,
});
if (decision === 'skip') continue;
s.emit('message.created', {
...data,
channelId,
wakeup: decision === 'wake',
xType: ctx.xType,
}); });
s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType });
} }
} }