diff --git a/src/cli/admin-refresh.ts b/src/cli/admin-refresh.ts new file mode 100644 index 0000000..56af30d --- /dev/null +++ b/src/cli/admin-refresh.ts @@ -0,0 +1,39 @@ +// Operator convenience: force-refresh the in-memory Center admin cache +// without waiting for the 1-day TTL. Used after `center user set-admin` +// to make new admin visible immediately to triage delivery. +// +// Usage (inside the deployed container): +// docker exec fabric-backend-guild node dist/cli/admin-refresh.js +// +// Prints the (possibly null) result as JSON. Exit 0 always — a "no +// admin" outcome is a valid state, not an error. + +import 'reflect-metadata'; +import { NestFactory } from '@nestjs/core'; +import { AppModule } from '../app.module.js'; +import { AdminCacheService } from '../common/admin-cache.service.js'; + +async function main() { + const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] }); + try { + const cache = app.get(AdminCacheService); + const before = cache.snapshot(); + const after = await cache.get(true); + process.stdout.write( + JSON.stringify({ + ok: true, + before, + after, + changed: JSON.stringify(before) !== JSON.stringify(after), + }) + '\n', + ); + } finally { + await app.close(); + } +} + +void main().catch((error: unknown) => { + const message = error instanceof Error ? error.message : 'unknown error'; + process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n'); + process.exit(1); +}); diff --git a/src/common/admin-cache.service.ts b/src/common/admin-cache.service.ts new file mode 100644 index 0000000..bfbf310 --- /dev/null +++ b/src/common/admin-cache.service.ts @@ -0,0 +1,73 @@ +/** + * Center-scoped admin cache. + * + * Holds the at-most-one admin user (email + userId) fetched from Center. + * Used to decide who to deliver triage messages to as a silent observer + * (wake=false), regardless of on-duty / mention status. + * + * Refresh policy (per spec, 2026-05-22): + * • TTL = 1 day. Center admin changes are rare; agents tolerate a + * day's stale cache without surprises + * • on first lookup the cache lazy-fetches + * • cli `admin refresh` forces an out-of-band refresh without waiting + * for TTL expiry + * + * Failure mode: a Center fetch error is treated identically to "no + * admin" — guild keeps operating without an observer. The cache holds + * the failed-fetch decision for the same TTL so we don't hammer Center. + */ + +import { Injectable, Logger } from '@nestjs/common'; +import { fetchAdminEmail } from './center-auth.js'; + +const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000; + +export interface CachedAdmin { + email: string; + userId: string; +} + +@Injectable() +export class AdminCacheService { + private readonly logger = new Logger(AdminCacheService.name); + private cached: CachedAdmin | null = null; + private cachedAt = 0; + private inflight: Promise | null = null; + + /** + * Return the cached admin, fetching from Center if the cache is empty + * or older than the TTL. Returns null if no admin is set. + * + * `force=true` bypasses the cache and refreshes immediately — used by + * the cli refresh command. + */ + async get(force = false): Promise { + const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS; + if (!force && this.cachedAt > 0 && fresh) { + return this.cached; + } + if (this.inflight) return this.inflight; + + this.inflight = (async () => { + try { + const result = await fetchAdminEmail(); + this.cached = result; + this.cachedAt = Date.now(); + this.logger.log( + `admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`, + ); + return result; + } finally { + this.inflight = null; + } + })(); + return this.inflight; + } + + /** Snapshot of the cached admin (no fetch). Returns null if not yet + * populated. Used by the hot delivery path which doesn't want to + * block on a Center round-trip. */ + snapshot(): CachedAdmin | null { + return this.cached; + } +} diff --git a/src/common/center-auth.ts b/src/common/center-auth.ts index 295d7d8..3a611d9 100644 --- a/src/common/center-auth.ts +++ b/src/common/center-auth.ts @@ -26,6 +26,31 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo }; } +/** + * Fetch the single Center-scoped admin user (if any). + * Same x-api-key auth as introspect / resolve-names. + * Returns `null` when no admin is set OR the request fails (treated + * identically — the guild simply falls back to "no admin observer"). + */ +export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> { + const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL; + const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY; + if (!centerBaseUrl || !centerApiKey) return null; + + try { + const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, { + method: 'GET', + headers: { 'x-api-key': centerApiKey }, + }); + if (!res.ok) return null; + const data = (await res.json()) as { email?: string; userId?: string } | null; + if (!data || !data.email || !data.userId) return null; + return { email: data.email, userId: data.userId }; + } catch { + return null; + } +} + // Resolve <@user.name:NAME> names to userIds within this guild node via // Center. Unresolved names are simply absent from the returned map. export async function resolveUserNames(names: string[]): Promise> { diff --git a/src/messaging/messaging.controller.ts b/src/messaging/messaging.controller.ts index e40d57c..f74f919 100644 --- a/src/messaging/messaging.controller.ts +++ b/src/messaging/messaging.controller.ts @@ -21,6 +21,7 @@ import { ChannelMember } from '../entities/channel-member.entity.js'; import { Message } from '../entities/message.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js'; +import { AdminCacheService } from '../common/admin-cache.service.js'; import { parseSlashCommand } from '../channels/slash-commands.js'; import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js'; import { resolveUserNames } from '../common/center-auth.js'; @@ -50,6 +51,7 @@ export class MessagingController { private readonly turn: TurnService, private readonly events: EventsService, private readonly realtime: RealtimeGateway, + private readonly adminCache: AdminCacheService, ) {} private async getIdempotentResponse( @@ -225,16 +227,19 @@ export class MessagingController { const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds); await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); } else { - // general/report/triage/custom: wakeup from x_type + wake_mapping; - // general also honors the message's at-list + // general/report/triage/custom: 3-state delivery + // (wake / observer / skip) — see realtime.gateway.computeDelivery. + // Center-scoped admin (cached, 1d TTL) gets `observer` on triage. const wakeRows = await this.wakeRepo.find({ where: { channelId } }); const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId)); + const admin = await this.adminCache.get(); await this.realtime.emitMessageCreated(channelId, responseBody, { xType, authorUserId, wakeUserIds, mentionUserIds, + adminUserId: admin?.userId ?? null, }); } diff --git a/src/messaging/messaging.module.ts b/src/messaging/messaging.module.ts index 9a57969..3895bd9 100644 --- a/src/messaging/messaging.module.ts +++ b/src/messaging/messaging.module.ts @@ -6,9 +6,12 @@ import { ChannelMember } from '../entities/channel-member.entity.js'; import { Message } from '../entities/message.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js'; +import { AdminCacheService } from '../common/admin-cache.service.js'; @Module({ imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])], controllers: [MessagingController], + providers: [AdminCacheService], + exports: [AdminCacheService], }) export class MessagingModule {} diff --git a/src/realtime/realtime.gateway.ts b/src/realtime/realtime.gateway.ts index 9808ccf..85d0de4 100644 --- a/src/realtime/realtime.gateway.ts +++ b/src/realtime/realtime.gateway.ts @@ -13,15 +13,70 @@ import { introspectGuildToken } from '../common/center-auth.js'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; -// Wakeup for non-rotating channels only (general/report/triage/custom). -// discuss/work go through TurnService + emitMessageTargeted, never here. -// Precedence: -// 1. the author never gets woken by their own message -// 2. triage/custom: only wake users in the channel's wake_mapping -// (mentions change nothing here) -// 3. general: if the message has an at-list, wake only the at'd users; -// otherwise wake everyone -// 4. report (and anything else): wake nobody +/** + * Per-recipient delivery decision for a non-rotating channel message. + * + * • `wake` — push the event AND wake the recipient (model turn fires) + * • `observer` — push the event with wakeup=false (silent; UI displays + * but the openclaw plugin records-only without dispatch). Currently + * used for the Center admin observing triage traffic + * • `skip` — don't even emit the event to this recipient + * + * Wakeup-only channels (general/report/dm/custom) never return + * 'observer'; the legacy behaviour is preserved end-to-end. + * + * Precedence for triage (the only place 'skip' / 'observer' fire): + * 1. author never gets back their own message + * 2. wake_mapping (on-duty) → wake + * 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage + * mention exception') + * 4. admin (Center-scoped, at most one) → observer + * 5. everyone else → skip (was 'deliver, wakeup=false' before) + */ +export type DeliveryDecision = 'wake' | 'observer' | 'skip'; + +export interface ComputeDeliveryArgs { + xType: XType; + recipientUserId: string; + authorUserId: string; + wakeUserIds: Set; + mentionUserIds?: Set; + /** Single Center-scoped admin userId, or null. */ + adminUserId?: string | null; +} + +export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision { + const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args; + if (recipientUserId === authorUserId) return 'skip'; + + switch (xType) { + case 'triage': + if (wakeUserIds.has(recipientUserId)) return 'wake'; + if (mentionUserIds?.has(recipientUserId)) return 'wake'; + if (adminUserId && recipientUserId === adminUserId) return 'observer'; + return 'skip'; + case 'general': + if (mentionUserIds && mentionUserIds.size > 0) { + return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer'; + } + return 'wake'; + case 'custom': + // wake_mapping decides who wakes; everyone else still sees the + // message (observer) — preserves the legacy "deliver to all, wake + // some" contract for custom channels. + return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer'; + case 'dm': + return 'wake'; + default: + // report (and anything else): deliver as observer, no wake + return 'observer'; + } +} + +/** + * @deprecated Use computeDelivery (returns 3-state). Kept for any + * external callers; treats 'observer' and 'skip' both as `false`. + */ export function computeWakeup(args: { xType: XType; recipientUserId: string; @@ -29,23 +84,7 @@ export function computeWakeup(args: { wakeUserIds: Set; mentionUserIds?: Set; }): boolean { - const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args; - if (recipientUserId === authorUserId) return false; - switch (xType) { - case 'general': - if (mentionUserIds && mentionUserIds.size > 0) { - return mentionUserIds.has(recipientUserId); - } - return true; - case 'triage': - case 'custom': - return wakeUserIds.has(recipientUserId); - case 'dm': - // 1:1 conversation: every non-author participant is always woken. - return true; - default: - return false; - } + return computeDelivery(args) === 'wake'; } @WebSocketGateway({ @@ -183,7 +222,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect this.server.to(`user:${userId}`).emit(event, data); } - // Emits message.created per-recipient so each carries its own `wakeup` flag. + // Emits message.created per-recipient using the 3-state delivery + // decision (wake / observer / skip). Skipped recipients receive + // nothing — used by triage channels to keep non-on-duty / non-mention + // / non-admin users completely out of the loop. async emitMessageCreated( channelId: string, data: Record, @@ -192,19 +234,28 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect authorUserId: string; wakeUserIds: Set; mentionUserIds?: Set; + /** Single Center-scoped admin userId (or null). */ + adminUserId?: string | null; }, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; - const wakeup = computeWakeup({ + const decision = computeDelivery({ xType: ctx.xType, recipientUserId, authorUserId: ctx.authorUserId, wakeUserIds: ctx.wakeUserIds, mentionUserIds: ctx.mentionUserIds, + adminUserId: ctx.adminUserId, + }); + if (decision === 'skip') continue; + s.emit('message.created', { + ...data, + channelId, + wakeup: decision === 'wake', + xType: ctx.xType, }); - s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType }); } }