From 7cb046d785775e2c5325842183b2c82f0f1a8e16 Mon Sep 17 00:00:00 2001 From: hzhang Date: Fri, 22 May 2026 22:14:05 +0100 Subject: [PATCH] feat(triage): 3-state delivery + admin observer + admin cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Triage channels now compute a 3-state delivery decision per recipient (wake / observer / skip) instead of the binary wakeup flag, and route according to: 1. author never gets back their own message → skip 2. wake_mapping member (on-duty) → wake 3. mention (NEW: was 'skip' for triage before) → wake 4. Center-scoped admin (at most 1) → observer 5. anyone else → skip (was 'deliver wake=false') Skipping means the websocket emit is omitted entirely — the recipient's openclaw plugin never sees the message and the agent's session stays free of background noise. Observer means delivered with wakeup=false (silent UI / no model dispatch on the plugin side). ## What this PR ships ### realtime/realtime.gateway.ts - new `computeDelivery()` returns DeliveryDecision = 'wake'|'observer'|'skip' - old `computeWakeup()` kept as a deprecated wrapper for callers that still want the boolean answer (treats observer + skip as false) - `emitMessageCreated` accepts `adminUserId?: string|null` and now short-circuits on 'skip' (no socket emit at all) - general kept its current behavior; custom kept its current behavior (members not in wake_mapping become observer instead of `wake=false`) — the user-visible bit is just that the response field is the same `wakeup: boolean`; the explicit 'skip' is new for triage ### common/center-auth.ts - `fetchAdminEmail()` calls GET `${center}/auth/admin-email` with the existing x-api-key (same auth as introspect/resolve-names). Returns `{email, userId}` or `null` on either "no admin" or any error ### common/admin-cache.service.ts (NEW) - `AdminCacheService` — in-memory cache, 1-day TTL, lazy refresh. `get(force=true)` bypasses TTL for cli-triggered refresh - exposed by MessagingModule ### messaging/messaging.controller.ts - non-rotating branch threads `adminUserId` into emitMessageCreated ### cli/admin-refresh.ts (NEW) - `node dist/cli/admin-refresh.js` — force-refresh cache and print before/after JSON. Use after a Center `user set-admin` so triage delivery picks up the new admin without waiting for 24h TTL 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cli/admin-refresh.ts | 39 +++++++++ src/common/admin-cache.service.ts | 73 +++++++++++++++++ src/common/center-auth.ts | 25 ++++++ src/messaging/messaging.controller.ts | 9 ++- src/messaging/messaging.module.ts | 3 + src/realtime/realtime.gateway.ts | 109 +++++++++++++++++++------- 6 files changed, 227 insertions(+), 31 deletions(-) create mode 100644 src/cli/admin-refresh.ts create mode 100644 src/common/admin-cache.service.ts diff --git a/src/cli/admin-refresh.ts b/src/cli/admin-refresh.ts new file mode 100644 index 0000000..56af30d --- /dev/null +++ b/src/cli/admin-refresh.ts @@ -0,0 +1,39 @@ +// Operator convenience: force-refresh the in-memory Center admin cache +// without waiting for the 1-day TTL. Used after `center user set-admin` +// to make new admin visible immediately to triage delivery. +// +// Usage (inside the deployed container): +// docker exec fabric-backend-guild node dist/cli/admin-refresh.js +// +// Prints the (possibly null) result as JSON. Exit 0 always — a "no +// admin" outcome is a valid state, not an error. + +import 'reflect-metadata'; +import { NestFactory } from '@nestjs/core'; +import { AppModule } from '../app.module.js'; +import { AdminCacheService } from '../common/admin-cache.service.js'; + +async function main() { + const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] }); + try { + const cache = app.get(AdminCacheService); + const before = cache.snapshot(); + const after = await cache.get(true); + process.stdout.write( + JSON.stringify({ + ok: true, + before, + after, + changed: JSON.stringify(before) !== JSON.stringify(after), + }) + '\n', + ); + } finally { + await app.close(); + } +} + +void main().catch((error: unknown) => { + const message = error instanceof Error ? error.message : 'unknown error'; + process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n'); + process.exit(1); +}); diff --git a/src/common/admin-cache.service.ts b/src/common/admin-cache.service.ts new file mode 100644 index 0000000..bfbf310 --- /dev/null +++ b/src/common/admin-cache.service.ts @@ -0,0 +1,73 @@ +/** + * Center-scoped admin cache. + * + * Holds the at-most-one admin user (email + userId) fetched from Center. + * Used to decide who to deliver triage messages to as a silent observer + * (wake=false), regardless of on-duty / mention status. + * + * Refresh policy (per spec, 2026-05-22): + * • TTL = 1 day. Center admin changes are rare; agents tolerate a + * day's stale cache without surprises + * • on first lookup the cache lazy-fetches + * • cli `admin refresh` forces an out-of-band refresh without waiting + * for TTL expiry + * + * Failure mode: a Center fetch error is treated identically to "no + * admin" — guild keeps operating without an observer. The cache holds + * the failed-fetch decision for the same TTL so we don't hammer Center. + */ + +import { Injectable, Logger } from '@nestjs/common'; +import { fetchAdminEmail } from './center-auth.js'; + +const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000; + +export interface CachedAdmin { + email: string; + userId: string; +} + +@Injectable() +export class AdminCacheService { + private readonly logger = new Logger(AdminCacheService.name); + private cached: CachedAdmin | null = null; + private cachedAt = 0; + private inflight: Promise | null = null; + + /** + * Return the cached admin, fetching from Center if the cache is empty + * or older than the TTL. Returns null if no admin is set. + * + * `force=true` bypasses the cache and refreshes immediately — used by + * the cli refresh command. + */ + async get(force = false): Promise { + const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS; + if (!force && this.cachedAt > 0 && fresh) { + return this.cached; + } + if (this.inflight) return this.inflight; + + this.inflight = (async () => { + try { + const result = await fetchAdminEmail(); + this.cached = result; + this.cachedAt = Date.now(); + this.logger.log( + `admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`, + ); + return result; + } finally { + this.inflight = null; + } + })(); + return this.inflight; + } + + /** Snapshot of the cached admin (no fetch). Returns null if not yet + * populated. Used by the hot delivery path which doesn't want to + * block on a Center round-trip. */ + snapshot(): CachedAdmin | null { + return this.cached; + } +} diff --git a/src/common/center-auth.ts b/src/common/center-auth.ts index 295d7d8..3a611d9 100644 --- a/src/common/center-auth.ts +++ b/src/common/center-auth.ts @@ -26,6 +26,31 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo }; } +/** + * Fetch the single Center-scoped admin user (if any). + * Same x-api-key auth as introspect / resolve-names. + * Returns `null` when no admin is set OR the request fails (treated + * identically — the guild simply falls back to "no admin observer"). + */ +export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> { + const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL; + const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY; + if (!centerBaseUrl || !centerApiKey) return null; + + try { + const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, { + method: 'GET', + headers: { 'x-api-key': centerApiKey }, + }); + if (!res.ok) return null; + const data = (await res.json()) as { email?: string; userId?: string } | null; + if (!data || !data.email || !data.userId) return null; + return { email: data.email, userId: data.userId }; + } catch { + return null; + } +} + // Resolve <@user.name:NAME> names to userIds within this guild node via // Center. Unresolved names are simply absent from the returned map. export async function resolveUserNames(names: string[]): Promise> { diff --git a/src/messaging/messaging.controller.ts b/src/messaging/messaging.controller.ts index e40d57c..f74f919 100644 --- a/src/messaging/messaging.controller.ts +++ b/src/messaging/messaging.controller.ts @@ -21,6 +21,7 @@ import { ChannelMember } from '../entities/channel-member.entity.js'; import { Message } from '../entities/message.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js'; +import { AdminCacheService } from '../common/admin-cache.service.js'; import { parseSlashCommand } from '../channels/slash-commands.js'; import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js'; import { resolveUserNames } from '../common/center-auth.js'; @@ -50,6 +51,7 @@ export class MessagingController { private readonly turn: TurnService, private readonly events: EventsService, private readonly realtime: RealtimeGateway, + private readonly adminCache: AdminCacheService, ) {} private async getIdempotentResponse( @@ -225,16 +227,19 @@ export class MessagingController { const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds); await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); } else { - // general/report/triage/custom: wakeup from x_type + wake_mapping; - // general also honors the message's at-list + // general/report/triage/custom: 3-state delivery + // (wake / observer / skip) — see realtime.gateway.computeDelivery. + // Center-scoped admin (cached, 1d TTL) gets `observer` on triage. const wakeRows = await this.wakeRepo.find({ where: { channelId } }); const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId)); + const admin = await this.adminCache.get(); await this.realtime.emitMessageCreated(channelId, responseBody, { xType, authorUserId, wakeUserIds, mentionUserIds, + adminUserId: admin?.userId ?? null, }); } diff --git a/src/messaging/messaging.module.ts b/src/messaging/messaging.module.ts index 9a57969..3895bd9 100644 --- a/src/messaging/messaging.module.ts +++ b/src/messaging/messaging.module.ts @@ -6,9 +6,12 @@ import { ChannelMember } from '../entities/channel-member.entity.js'; import { Message } from '../entities/message.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js'; +import { AdminCacheService } from '../common/admin-cache.service.js'; @Module({ imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])], controllers: [MessagingController], + providers: [AdminCacheService], + exports: [AdminCacheService], }) export class MessagingModule {} diff --git a/src/realtime/realtime.gateway.ts b/src/realtime/realtime.gateway.ts index 9808ccf..85d0de4 100644 --- a/src/realtime/realtime.gateway.ts +++ b/src/realtime/realtime.gateway.ts @@ -13,15 +13,70 @@ import { introspectGuildToken } from '../common/center-auth.js'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; -// Wakeup for non-rotating channels only (general/report/triage/custom). -// discuss/work go through TurnService + emitMessageTargeted, never here. -// Precedence: -// 1. the author never gets woken by their own message -// 2. triage/custom: only wake users in the channel's wake_mapping -// (mentions change nothing here) -// 3. general: if the message has an at-list, wake only the at'd users; -// otherwise wake everyone -// 4. report (and anything else): wake nobody +/** + * Per-recipient delivery decision for a non-rotating channel message. + * + * • `wake` — push the event AND wake the recipient (model turn fires) + * • `observer` — push the event with wakeup=false (silent; UI displays + * but the openclaw plugin records-only without dispatch). Currently + * used for the Center admin observing triage traffic + * • `skip` — don't even emit the event to this recipient + * + * Wakeup-only channels (general/report/dm/custom) never return + * 'observer'; the legacy behaviour is preserved end-to-end. + * + * Precedence for triage (the only place 'skip' / 'observer' fire): + * 1. author never gets back their own message + * 2. wake_mapping (on-duty) → wake + * 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage + * mention exception') + * 4. admin (Center-scoped, at most one) → observer + * 5. everyone else → skip (was 'deliver, wakeup=false' before) + */ +export type DeliveryDecision = 'wake' | 'observer' | 'skip'; + +export interface ComputeDeliveryArgs { + xType: XType; + recipientUserId: string; + authorUserId: string; + wakeUserIds: Set; + mentionUserIds?: Set; + /** Single Center-scoped admin userId, or null. */ + adminUserId?: string | null; +} + +export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision { + const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args; + if (recipientUserId === authorUserId) return 'skip'; + + switch (xType) { + case 'triage': + if (wakeUserIds.has(recipientUserId)) return 'wake'; + if (mentionUserIds?.has(recipientUserId)) return 'wake'; + if (adminUserId && recipientUserId === adminUserId) return 'observer'; + return 'skip'; + case 'general': + if (mentionUserIds && mentionUserIds.size > 0) { + return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer'; + } + return 'wake'; + case 'custom': + // wake_mapping decides who wakes; everyone else still sees the + // message (observer) — preserves the legacy "deliver to all, wake + // some" contract for custom channels. + return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer'; + case 'dm': + return 'wake'; + default: + // report (and anything else): deliver as observer, no wake + return 'observer'; + } +} + +/** + * @deprecated Use computeDelivery (returns 3-state). Kept for any + * external callers; treats 'observer' and 'skip' both as `false`. + */ export function computeWakeup(args: { xType: XType; recipientUserId: string; @@ -29,23 +84,7 @@ export function computeWakeup(args: { wakeUserIds: Set; mentionUserIds?: Set; }): boolean { - const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args; - if (recipientUserId === authorUserId) return false; - switch (xType) { - case 'general': - if (mentionUserIds && mentionUserIds.size > 0) { - return mentionUserIds.has(recipientUserId); - } - return true; - case 'triage': - case 'custom': - return wakeUserIds.has(recipientUserId); - case 'dm': - // 1:1 conversation: every non-author participant is always woken. - return true; - default: - return false; - } + return computeDelivery(args) === 'wake'; } @WebSocketGateway({ @@ -183,7 +222,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect this.server.to(`user:${userId}`).emit(event, data); } - // Emits message.created per-recipient so each carries its own `wakeup` flag. + // Emits message.created per-recipient using the 3-state delivery + // decision (wake / observer / skip). Skipped recipients receive + // nothing — used by triage channels to keep non-on-duty / non-mention + // / non-admin users completely out of the loop. async emitMessageCreated( channelId: string, data: Record, @@ -192,19 +234,28 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; + /** Single Center-scoped admin userId (or null). */ + adminUserId?: string | null; }, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; - const wakeup = computeWakeup({ + const decision = computeDelivery({ xType: ctx.xType, recipientUserId, authorUserId: ctx.authorUserId, wakeUserIds: ctx.wakeUserIds, mentionUserIds: ctx.mentionUserIds, + adminUserId: ctx.adminUserId, + }); + if (decision === 'skip') continue; + s.emit('message.created', { + ...data, + channelId, + wakeup: decision === 'wake', + xType: ctx.xType, }); - s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType }); } }