From 80ee9082f3b6757f0cc015c367d7c92b07c4530c Mon Sep 17 00:00:00 2001 From: hzhang Date: Sat, 23 May 2026 11:31:47 +0100 Subject: [PATCH] feat(guild): announce channel type + agent-presence + busy-discard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of DIALECTIC-V2 — adds Fabric infrastructure for system-broadcast channels with HF-status-aware delivery filtering. New channel x_type 'announce': - channels.entity.ts + channels.service.ts + realtime.gateway.ts enum + union extended. - computeDelivery() adds an 'announce' case: recipient with presence='busy' → 'skip' (discarded silently); other presences → 'observer' (delivered, no wake). System-broadcast semantics — agents proactively check their announce inbox when they're ready, not interrupted out of band. - messaging.controller POST guard: announce-type channels reject posts that don't present x-fabric-system-key header matching FABRIC_BACKEND_GUILD_SYSTEM_API_KEY env. Empty env = no system caller is valid (closed-by-default). New entity + module agent_presences: - agent-presence.entity.ts: per-user (userId PK) status enum (idle/on_call/busy/exhausted/offline/unknown), source tag, updatedAt - agent-presence.service.ts: getStatus/getStatusMap (bulk for delivery-time fanout) + setStatus (upsert) - agent-presence.controller.ts: GET + PUT /agents/:userId/presence - agent-presence.module.ts: TypeORM forFeature + wired into AppModule - buildTypeOrmConfig() entities list extended RealtimeGateway wiring: - New optional field on the gateway (typed loosely to avoid circular import). RealtimeModule.onModuleInit() assigns from the injected AgentPresenceService — degrades gracefully (no busy-discard, treat all as 'unknown') if presence wiring is ever removed. - emitMessageCreated pre-loads presence per fanout only when xType is 'announce' (other xTypes bypass the lookup entirely). Note: actual presence data writes come from Fabric.OpenclawPlugin's presence-sync loop (separate commit on that submodule); without it, all rows are 'unknown' and announce delivery falls through to the default observer behavior (no busy filtering). System-only POST gate is independent and works immediately. See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md sections 7 + 10 Phase 1. --- src/agents/agent-presence.controller.ts | 42 ++++++++++++++++++++ src/agents/agent-presence.module.ts | 13 ++++++ src/agents/agent-presence.service.ts | 53 +++++++++++++++++++++++++ src/app.module.ts | 2 + src/channels/channels.service.ts | 2 +- src/database.config.ts | 2 + src/entities/agent-presence.entity.ts | 35 ++++++++++++++++ src/entities/channel.entity.ts | 4 +- src/messaging/messaging.controller.ts | 18 +++++++++ src/realtime/realtime.gateway.ts | 48 ++++++++++++++++++++-- src/realtime/realtime.module.ts | 20 +++++++++- 11 files changed, 231 insertions(+), 8 deletions(-) create mode 100644 src/agents/agent-presence.controller.ts create mode 100644 src/agents/agent-presence.module.ts create mode 100644 src/agents/agent-presence.service.ts create mode 100644 src/entities/agent-presence.entity.ts diff --git a/src/agents/agent-presence.controller.ts b/src/agents/agent-presence.controller.ts new file mode 100644 index 0000000..c06bcb6 --- /dev/null +++ b/src/agents/agent-presence.controller.ts @@ -0,0 +1,42 @@ +import { BadRequestException, Body, Controller, Get, Param, Put } from '@nestjs/common'; +import { AgentPresenceService, PresenceStatus } from './agent-presence.service.js'; + +const VALID: PresenceStatus[] = ['idle', 'on_call', 'busy', 'exhausted', 'offline', 'unknown']; + +interface PutBody { + status?: string; + source?: string; +} + +@Controller('agents/:userId/presence') +export class AgentPresenceController { + constructor(private readonly svc: AgentPresenceService) {} + + /** + * Read a user's current presence cache row. + * Auth: ApiKeyGuard (global). Any introspected center user can read. + */ + @Get() + async get(@Param('userId') userId: string): Promise<{ userId: string; status: PresenceStatus }> { + const status = await this.svc.getStatus(userId); + return { userId, status }; + } + + /** + * Push a presence update. Called by Fabric.OpenclawPlugin's + * `presence-sync` loop on each delta. Auth: ApiKeyGuard (global) + + * the plugin uses its center-introspected api key. + * + * `source` is a debug tag describing who pushed (e.g. 'hf-plugin', + * 'manual'). Stored verbatim for trail. + */ + @Put() + async put(@Param('userId') userId: string, @Body() body: PutBody): Promise<{ userId: string; status: PresenceStatus }> { + if (!body?.status || !VALID.includes(body.status as PresenceStatus)) { + throw new BadRequestException(`status must be one of ${VALID.join('|')}`); + } + const source = (body.source ?? 'unknown').slice(0, 64); + const row = await this.svc.setStatus(userId, body.status as PresenceStatus, source); + return { userId: row.userId, status: row.status }; + } +} diff --git a/src/agents/agent-presence.module.ts b/src/agents/agent-presence.module.ts new file mode 100644 index 0000000..417c4c7 --- /dev/null +++ b/src/agents/agent-presence.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { AgentPresence } from '../entities/agent-presence.entity.js'; +import { AgentPresenceController } from './agent-presence.controller.js'; +import { AgentPresenceService } from './agent-presence.service.js'; + +@Module({ + imports: [TypeOrmModule.forFeature([AgentPresence])], + controllers: [AgentPresenceController], + providers: [AgentPresenceService], + exports: [AgentPresenceService], +}) +export class AgentPresenceModule {} diff --git a/src/agents/agent-presence.service.ts b/src/agents/agent-presence.service.ts new file mode 100644 index 0000000..70f1ab3 --- /dev/null +++ b/src/agents/agent-presence.service.ts @@ -0,0 +1,53 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { AgentPresence } from '../entities/agent-presence.entity.js'; + +export type PresenceStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown'; + +@Injectable() +export class AgentPresenceService { + constructor( + @InjectRepository(AgentPresence) + private readonly repo: Repository, + ) {} + + /** + * Get a user's current presence. Returns 'unknown' if no row. + * Used by `RealtimeGateway` per-recipient when xType === 'announce'. + */ + async getStatus(userId: string): Promise { + if (!userId) return 'unknown'; + const row = await this.repo.findOne({ where: { userId } }); + return row?.status ?? 'unknown'; + } + + /** Bulk variant for delivery-time lookups across many recipients in one trip. */ + async getStatusMap(userIds: string[]): Promise> { + const out = new Map(); + for (const id of userIds) out.set(id, 'unknown'); + if (userIds.length === 0) return out; + const rows = await this.repo + .createQueryBuilder('p') + .where('p.userId IN (:...ids)', { ids: userIds }) + .getMany(); + for (const r of rows) out.set(r.userId, r.status); + return out; + } + + /** + * Upsert a user's presence. Source is a free-text tag for debugging + * (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence + * calls this; the plugin pushes only on diff so writes are sparse. + */ + async setStatus(userId: string, status: PresenceStatus, source: string): Promise { + const existing = await this.repo.findOne({ where: { userId } }); + if (existing) { + existing.status = status; + existing.source = source; + return this.repo.save(existing); + } + const row = this.repo.create({ userId, status, source }); + return this.repo.save(row); + } +} diff --git a/src/app.module.ts b/src/app.module.ts index a75b4dd..80aa88a 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -16,6 +16,7 @@ import { MembersModule } from './members/members.module.js'; import { FilesModule } from './files/files.module.js'; import { CanvasModule } from './canvas/canvas.module.js'; import { CommandsModule } from './commands/commands.module.js'; +import { AgentPresenceModule } from './agents/agent-presence.module.js'; @Module({ imports: [ @@ -30,6 +31,7 @@ import { CommandsModule } from './commands/commands.module.js'; FilesModule, CanvasModule, CommandsModule, + AgentPresenceModule, ], controllers: [HealthController, MetricsController], providers: [ diff --git a/src/channels/channels.service.ts b/src/channels/channels.service.ts index acb2810..3952e25 100644 --- a/src/channels/channels.service.ts +++ b/src/channels/channels.service.ts @@ -7,7 +7,7 @@ import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { TurnService } from './turn.service.js'; import { RealtimeGateway } from '../realtime/realtime.gateway.js'; -const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const; +const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm', 'announce'] as const; type XType = (typeof X_TYPES)[number]; type CreateChannelInput = { diff --git a/src/database.config.ts b/src/database.config.ts index a779bb4..07ca46e 100644 --- a/src/database.config.ts +++ b/src/database.config.ts @@ -14,6 +14,7 @@ import { IdempotencyRecord } from './entities/idempotency-record.entity.js'; import { StoredFile } from './entities/stored-file.entity.js'; import { ChannelCanvas } from './entities/channel-canvas.entity.js'; import { GuildCommand } from './entities/guild-command.entity.js'; +import { AgentPresence } from './entities/agent-presence.entity.js'; export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({ type: 'mysql', @@ -38,6 +39,7 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({ StoredFile, ChannelCanvas, GuildCommand, + AgentPresence, ], synchronize: (process.env.FABRIC_BACKEND_GUILD_DB_SYNC ?? 'true') === 'true', logging: (process.env.FABRIC_BACKEND_GUILD_DB_LOGGING ?? 'false') === 'true', diff --git a/src/entities/agent-presence.entity.ts b/src/entities/agent-presence.entity.ts new file mode 100644 index 0000000..504b0f0 --- /dev/null +++ b/src/entities/agent-presence.entity.ts @@ -0,0 +1,35 @@ +import { Column, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm'; + +/** + * Per-user (typically agent) presence cache. + * + * Populated by Fabric.OpenclawPlugin's presence-sync loop: every ~30s + * it reads each connected agent's HF status from the cross-plugin + * `globalThis.__hfAgentStatus.get(agentId)` (exposed by + * HarborForge.OpenclawPlugin) and pushes diffs via + * `PUT /agents/:userId/presence`. + * + * Used by `RealtimeGateway.computeDelivery` for `announce`-type + * channels to skip delivery to recipients whose status is `busy`. + * Defaults to `unknown` if no row exists (treated as not-busy). + */ +@Entity('agent_presences') +export class AgentPresence { + // Same id as the Fabric Center user id (UUID v4 string, char(36)). + @PrimaryColumn({ type: 'char', length: 36 }) + userId!: string; + + @Column({ + type: 'enum', + enum: ['idle', 'on_call', 'busy', 'exhausted', 'offline', 'unknown'], + default: 'unknown', + }) + status!: 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown'; + + /** Free-text source tag for debugging ("hf-plugin", "manual", etc.). */ + @Column({ type: 'varchar', length: 64, default: 'unknown' }) + source!: string; + + @UpdateDateColumn() + updatedAt!: Date; +} diff --git a/src/entities/channel.entity.ts b/src/entities/channel.entity.ts index aef082a..d37c1da 100644 --- a/src/entities/channel.entity.ts +++ b/src/entities/channel.entity.ts @@ -16,9 +16,9 @@ export class Channel { @Column({ name: 'x_type', type: 'enum', - enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'], + enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm', 'announce'], }) - xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; + xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm' | 'announce'; @Column({ type: 'varchar', length: 16, default: 'text' }) kind!: 'text' | 'announcement'; diff --git a/src/messaging/messaging.controller.ts b/src/messaging/messaging.controller.ts index f74f919..fd2590a 100644 --- a/src/messaging/messaging.controller.ts +++ b/src/messaging/messaging.controller.ts @@ -158,6 +158,7 @@ export class MessagingController { @Body() body: CreateMessageDto, @Req() req: { userId?: string }, @Headers('idempotency-key') idempotencyKey?: string, + @Headers('x-fabric-system-key') systemKey?: string, ) { const scope = `POST:/channels/${channelId}/messages`; const existed = await this.getIdempotentResponse(scope, idempotencyKey); @@ -174,6 +175,23 @@ export class MessagingController { } const xType = channel.xType ?? 'general'; const isRotating = xType === 'discuss' || xType === 'work'; + + // announce channels: posts only allowed when the caller presents a + // valid system key (matches FABRIC_BACKEND_GUILD_SYSTEM_API_KEY env). + // The ApiKeyGuard has already validated user identity; this is an + // additional system-only gate on top. Non-system posts are silently + // discarded (return 403 + log) so misbehaving clients don't pollute + // the broadcast. + if (xType === 'announce') { + const expected = process.env.FABRIC_BACKEND_GUILD_SYSTEM_API_KEY ?? ''; + if (!expected || systemKey !== expected) { + // log + reject; treat empty env as "no system caller is ever valid" + throw new ForbiddenException({ + error: 'announce_system_only', + message: 'announce-type channels accept system-signed posts only', + }); + } + } const authorUserId = userId; // ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via diff --git a/src/realtime/realtime.gateway.ts b/src/realtime/realtime.gateway.ts index 85d0de4..eaffdc8 100644 --- a/src/realtime/realtime.gateway.ts +++ b/src/realtime/realtime.gateway.ts @@ -11,7 +11,17 @@ import { Logger } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; import { introspectGuildToken } from '../common/center-auth.js'; -type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm'; +type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm' | 'announce'; + +/** + * Cross-presence info needed by `announce`-type delivery: a recipient + * with hf-side status === 'busy' has the message discarded silently + * (don't enter their session, no UI emit). Other statuses + non-announce + * channels are unaffected. Presence is sourced from the + * `agent_presences` table populated by Fabric.OpenclawPlugin's + * presence-sync loop (which reads from HF plugin's `__hfAgentStatus`). + */ +export type PresenceStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown'; /** * Per-recipient delivery decision for a non-rotating channel message. @@ -43,10 +53,12 @@ export interface ComputeDeliveryArgs { mentionUserIds?: Set; /** Single Center-scoped admin userId, or null. */ adminUserId?: string | null; + /** Recipient's current presence; only consulted for `announce` xType. Defaults to 'unknown' (treated as not-busy). */ + recipientPresence?: PresenceStatus; } export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision { - const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args; + const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId, recipientPresence } = args; if (recipientUserId === authorUserId) return 'skip'; switch (xType) { @@ -67,6 +79,16 @@ export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision { return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer'; case 'dm': return 'wake'; + case 'announce': + // System-broadcast channels (e.g. Dialectic topic announcements). + // Recipients with HF status === 'busy' have the message discarded + // silently — busy agents should not be distracted by signup pings + // they can't act on. All other presences (idle/on_call/exhausted/ + // offline/unknown) get the message as 'observer' (no wake): the + // channel itself is browsable; agents proactively decide what to + // do with announcements when they next look at their inbox. + if (recipientPresence === 'busy') return 'skip'; + return 'observer'; default: // report (and anything else): deliver as observer, no wake return 'observer'; @@ -100,6 +122,12 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect private readonly logger = new Logger(RealtimeGateway.name); private readonly onlineUsers = new Set(); + // Optional: injected at module wiring time. Used by emitMessageCreated + // to pre-load recipient presence for announce-type channels. + // Typed loosely to avoid a circular import between realtime and agents + // modules; the actual interface lives in agents/agent-presence.service. + presence?: { getStatusMap(ids: string[]): Promise> }; + private userIdFromClient(client: Socket): string { const authUser = client.handshake.auth?.userId; const headerUser = client.handshake.headers['x-user-id']; @@ -225,7 +253,8 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect // Emits message.created per-recipient using the 3-state delivery // decision (wake / observer / skip). Skipped recipients receive // nothing — used by triage channels to keep non-on-duty / non-mention - // / non-admin users completely out of the loop. + // / non-admin users completely out of the loop, and by announce + // channels to suppress delivery to recipients whose presence is busy. async emitMessageCreated( channelId: string, data: Record, @@ -239,6 +268,18 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect }, ): Promise { const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); + + // For announce-type channels, pre-load presence for all recipients + // in one query so the per-recipient loop doesn't fan out to N round + // trips. For other xTypes, presence is irrelevant — skip the lookup. + let presenceMap: Map | undefined; + if (ctx.xType === 'announce' && this.presence) { + const recipientIds = sockets + .map((s) => (typeof s.data.userId === 'string' ? (s.data.userId as string) : '')) + .filter((id) => id && !id.startsWith('anon:')); + presenceMap = await this.presence.getStatusMap(recipientIds); + } + for (const s of sockets) { const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const decision = computeDelivery({ @@ -248,6 +289,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect wakeUserIds: ctx.wakeUserIds, mentionUserIds: ctx.mentionUserIds, adminUserId: ctx.adminUserId, + recipientPresence: presenceMap?.get(recipientUserId) ?? 'unknown', }); if (decision === 'skip') continue; s.emit('message.created', { diff --git a/src/realtime/realtime.module.ts b/src/realtime/realtime.module.ts index 12460ff..b91a060 100644 --- a/src/realtime/realtime.module.ts +++ b/src/realtime/realtime.module.ts @@ -1,9 +1,25 @@ -import { Global, Module } from '@nestjs/common'; +import { Global, Module, OnModuleInit } from '@nestjs/common'; import { RealtimeGateway } from './realtime.gateway.js'; +import { AgentPresenceModule } from '../agents/agent-presence.module.js'; +import { AgentPresenceService } from '../agents/agent-presence.service.js'; @Global() @Module({ + imports: [AgentPresenceModule], providers: [RealtimeGateway], exports: [RealtimeGateway], }) -export class RealtimeModule {} +export class RealtimeModule implements OnModuleInit { + // Wire presence into the gateway at startup. Using assignment (vs + // constructor injection) keeps the gateway free of the agents-module + // import — no risk of circular dependency, and announce-channel + // delivery degrades gracefully (presence stays undefined → 'unknown' + // status → no busy-discard) if AgentPresenceModule is ever removed. + constructor( + private readonly gateway: RealtimeGateway, + private readonly presence: AgentPresenceService, + ) {} + onModuleInit(): void { + this.gateway.presence = this.presence; + } +}