2 Commits

Author SHA1 Message Date
h z
5b835e0871 Merge pull request 'feat(realtime): push channel.joined/left events to user-scoped rooms' (#1) from feat/push-channel-membership-events into main 2026-05-21 07:12:51 +00:00
hanghang zhang
e33f1ecc53 feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to
nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw
fabric inbound had to poll `/api/channels?guildId=...` every 60s to
discover newly-joined channels (any DM another user just dragged the
agent into). Now the server tells the agent's socket directly so
sub/unsub is realtime.

Changes:
- realtime.gateway.ts:
  * handleConnection joins the socket into a `user:<userId>` room.
    All of a user's connected sockets now share that room.
  * New `emitToUser(userId, event, data)` helper that emits into
    that room. No-op for offline users (next connect resyncs via the
    plugin's initial channel-list fetch).
- channels.service.ts:
  * Inject RealtimeGateway (RealtimeModule is @Global, no module
    plumbing needed).
  * Private `notifyMembership(kind, channelId, userIds, extra)`
    helper that emits `channel.<kind>` (joined|left) with payload
    {channelId, userId, xType, occurredAt}.
  * create(): emit channel.joined to every seeded member (creator +
    explicit memberUserIds + triage on-duty).
  * joinChannel(): emit channel.joined to userId (only if the row was
    actually inserted, idempotent on existing membership).
  * leaveChannel(): emit channel.left to userId iff a row was
    actually deleted.

Event shape:
  {
    channelId: string,
    userId: string,
    xType?: string,
    occurredAt: ISO string,
  }

Client-side contract (fabric plugin):
  socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId}))
  socket.on('channel.left',   m => socket.emit('leave_channel', {channelId: m.channelId}))

The plugin keeps its 60s polling resync as a safety net for missed
events (transient socket drops between emit and reconnect, partial
failures, etc).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:07:46 +01:00
21 changed files with 47 additions and 639 deletions

View File

@@ -1,42 +0,0 @@
import { BadRequestException, Body, Controller, Get, Param, Put } from '@nestjs/common';
import { AgentPresenceService, PresenceStatus } from './agent-presence.service.js';
const VALID: PresenceStatus[] = ['idle', 'on_call', 'busy', 'exhausted', 'offline', 'unknown'];
interface PutBody {
status?: string;
source?: string;
}
@Controller('agents/:userId/presence')
export class AgentPresenceController {
constructor(private readonly svc: AgentPresenceService) {}
/**
* Read a user's current presence cache row.
* Auth: ApiKeyGuard (global). Any introspected center user can read.
*/
@Get()
async get(@Param('userId') userId: string): Promise<{ userId: string; status: PresenceStatus }> {
const status = await this.svc.getStatus(userId);
return { userId, status };
}
/**
* Push a presence update. Called by Fabric.OpenclawPlugin's
* `presence-sync` loop on each delta. Auth: ApiKeyGuard (global) +
* the plugin uses its center-introspected api key.
*
* `source` is a debug tag describing who pushed (e.g. 'hf-plugin',
* 'manual'). Stored verbatim for trail.
*/
@Put()
async put(@Param('userId') userId: string, @Body() body: PutBody): Promise<{ userId: string; status: PresenceStatus }> {
if (!body?.status || !VALID.includes(body.status as PresenceStatus)) {
throw new BadRequestException(`status must be one of ${VALID.join('|')}`);
}
const source = (body.source ?? 'unknown').slice(0, 64);
const row = await this.svc.setStatus(userId, body.status as PresenceStatus, source);
return { userId: row.userId, status: row.status };
}
}

View File

@@ -1,13 +0,0 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AgentPresence } from '../entities/agent-presence.entity.js';
import { AgentPresenceController } from './agent-presence.controller.js';
import { AgentPresenceService } from './agent-presence.service.js';
@Module({
imports: [TypeOrmModule.forFeature([AgentPresence])],
controllers: [AgentPresenceController],
providers: [AgentPresenceService],
exports: [AgentPresenceService],
})
export class AgentPresenceModule {}

View File

@@ -1,61 +0,0 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AgentPresence } from '../entities/agent-presence.entity.js';
export type PresenceStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown';
@Injectable()
export class AgentPresenceService {
constructor(
@InjectRepository(AgentPresence)
private readonly repo: Repository<AgentPresence>,
) {}
/**
* Get a user's current presence. Returns 'unknown' if no row.
* Used by `RealtimeGateway` per-recipient when xType === 'announce'.
*/
async getStatus(userId: string): Promise<PresenceStatus> {
if (!userId) return 'unknown';
const row = await this.repo.findOne({ where: { userId } });
return row?.status ?? 'unknown';
}
/** Bulk variant for delivery-time lookups across many recipients in one trip. */
async getStatusMap(userIds: string[]): Promise<Map<string, PresenceStatus>> {
const out = new Map<string, PresenceStatus>();
for (const id of userIds) out.set(id, 'unknown');
if (userIds.length === 0) return out;
const rows = await this.repo
.createQueryBuilder('p')
.where('p.userId IN (:...ids)', { ids: userIds })
.getMany();
for (const r of rows) out.set(r.userId, r.status);
return out;
}
/**
* Upsert a user's presence. Source is a free-text tag for debugging
* (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence
* calls this; the plugin pushes only on diff so writes are sparse.
*
* Implementation note: the older findOne+save split was a read-modify-
* write race — two concurrent first-time writes for the same userId
* would both read no row, both INSERT, second hits unique-key dup
* (`agent_presences.PRIMARY`) and 500s. Fabric.OpenclawPlugin's
* presence-sync occasionally fires two PUTs for the same agent within
* ~10 ms (tick overlap on its side — separate fix in the plugin),
* which surfaced this race in prod.
*
* `repo.upsert(values, conflictPaths)` compiles to MySQL
* `INSERT … ON DUPLICATE KEY UPDATE` and is atomic at the storage
* engine level — no read needed, no race window. We synthesize the
* returned entity from what we just wrote rather than round-tripping
* a SELECT — the controller only reads {userId, status} off it.
*/
async setStatus(userId: string, status: PresenceStatus, source: string): Promise<AgentPresence> {
await this.repo.upsert({ userId, status, source }, ['userId']);
return this.repo.create({ userId, status, source });
}
}

View File

@@ -16,7 +16,6 @@ import { MembersModule } from './members/members.module.js';
import { FilesModule } from './files/files.module.js'; import { FilesModule } from './files/files.module.js';
import { CanvasModule } from './canvas/canvas.module.js'; import { CanvasModule } from './canvas/canvas.module.js';
import { CommandsModule } from './commands/commands.module.js'; import { CommandsModule } from './commands/commands.module.js';
import { AgentPresenceModule } from './agents/agent-presence.module.js';
@Module({ @Module({
imports: [ imports: [
@@ -31,7 +30,6 @@ import { AgentPresenceModule } from './agents/agent-presence.module.js';
FilesModule, FilesModule,
CanvasModule, CanvasModule,
CommandsModule, CommandsModule,
AgentPresenceModule,
], ],
controllers: [HealthController, MetricsController], controllers: [HealthController, MetricsController],
providers: [ providers: [

View File

@@ -1,4 +1,4 @@
import { BadRequestException, Body, Controller, Get, Param, Patch, Post, Query, Req, UnauthorizedException } from '@nestjs/common'; import { Body, Controller, Get, Param, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
import { ChannelsService } from './channels.service.js'; import { ChannelsService } from './channels.service.js';
// ApiKeyGuard attaches the introspected Center user id onto the request. // ApiKeyGuard attaches the introspected Center user id onto the request.
@@ -32,33 +32,11 @@ export class ChannelsController {
bypassUserIds: Array.isArray(body.bypassUserIds) bypassUserIds: Array.isArray(body.bypassUserIds)
? (body.bypassUserIds as string[]) ? (body.bypassUserIds as string[])
: [], : [],
purpose: body.purpose as string | undefined,
}, },
userId, userId,
); );
} }
// Patch a channel's free-form purpose. Body: { purpose: string }. Pass
// empty string to clear. Auth: channel member (or anyone for public
// channels, mirroring close()). Frontend doesn't call this today —
// intended for agent-side use (fabric-channel-set-purpose tool).
@Patch(':id')
patch(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: Record<string, unknown>,
) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
// Only `purpose` is patchable today. Future patchable fields would
// get their own typed branch; we explicitly NOT allow {} no-op patches
// because that signals a caller bug.
if (typeof body.purpose !== 'string') {
throw new BadRequestException('purpose (string) is required');
}
return this.channelsService.updatePurpose(channelId, userId, body.purpose);
}
// Move an order member into the bypass list (discuss/work only). // Move an order member into the bypass list (discuss/work only).
@Post(':id/bypass') @Post(':id/bypass')
bypass( bypass(

View File

@@ -7,7 +7,7 @@ import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { TurnService } from './turn.service.js'; import { TurnService } from './turn.service.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js'; import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm', 'announce'] as const; const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
type XType = (typeof X_TYPES)[number]; type XType = (typeof X_TYPES)[number];
type CreateChannelInput = { type CreateChannelInput = {
@@ -24,10 +24,6 @@ type CreateChannelInput = {
// discuss/work only: members excluded from rotation (no wakeup unless // discuss/work only: members excluded from rotation (no wakeup unless
// @-mentioned). order and bypass partition the members disjointly. // @-mentioned). order and bypass partition the members disjointly.
bypassUserIds?: string[]; bypassUserIds?: string[];
// Free-form description of what this channel is for. Optional; agents
// typically fill it when creating, members can later edit via
// PATCH /api/channels/:id.
purpose?: string;
}; };
@Injectable() @Injectable()
@@ -170,9 +166,6 @@ export class ChannelsService {
// allowed (create() always makes a fresh one, no dedup). // allowed (create() always makes a fresh one, no dedup).
const isPublic = xType === 'dm' ? false : Boolean(input.isPublic); const isPublic = xType === 'dm' ? false : Boolean(input.isPublic);
const purposeRaw = String(input.purpose ?? '').trim();
const purpose = purposeRaw === '' ? null : purposeRaw;
const channel = await this.channelRepo.save( const channel = await this.channelRepo.save(
this.channelRepo.create({ this.channelRepo.create({
guildId, guildId,
@@ -181,7 +174,6 @@ export class ChannelsService {
kind: input.kind === 'announcement' ? 'announcement' : 'text', kind: input.kind === 'announcement' ? 'announcement' : 'text',
isPrivate: !isPublic, isPrivate: !isPublic,
isPublic, isPublic,
purpose,
lastSeq: 0, lastSeq: 0,
}), }),
); );
@@ -227,27 +219,6 @@ export class ChannelsService {
return channel; return channel;
} }
// Update a channel's free-form purpose. Any channel member may do this
// (or any guild user if the channel is public, mirroring closeChannel's
// member-or-public rule). Pass an empty string to clear.
async updatePurpose(channelId: string, actorUserId: string, purpose: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
const member = await this.memberRepo.findOne({ where: { channelId, userId: actorUserId } });
if (!member && !channel.isPublic) {
throw new ForbiddenException('not a channel member');
}
const trimmed = String(purpose ?? '').trim();
channel.purpose = trimmed === '' ? null : trimmed;
const saved = await this.channelRepo.save(channel);
return {
id: saved.id,
name: saved.name,
xType: saved.xType,
purpose: saved.purpose,
};
}
// Move an order member into the bypass list (discuss/work only). // Move an order member into the bypass list (discuss/work only).
// Any channel member may do this. // Any channel member may do this.
async moveToBypass(channelId: string, actorUserId: string, targetUserId: string) { async moveToBypass(channelId: string, actorUserId: string, targetUserId: string) {

View File

@@ -1,39 +0,0 @@
// Operator convenience: force-refresh the in-memory Center admin cache
// without waiting for the 1-day TTL. Used after `center user set-admin`
// to make new admin visible immediately to triage delivery.
//
// Usage (inside the deployed container):
// docker exec fabric-backend-guild node dist/cli/admin-refresh.js
//
// Prints the (possibly null) result as JSON. Exit 0 always — a "no
// admin" outcome is a valid state, not an error.
import 'reflect-metadata';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '../app.module.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] });
try {
const cache = app.get(AdminCacheService);
const before = cache.snapshot();
const after = await cache.get(true);
process.stdout.write(
JSON.stringify({
ok: true,
before,
after,
changed: JSON.stringify(before) !== JSON.stringify(after),
}) + '\n',
);
} finally {
await app.close();
}
}
void main().catch((error: unknown) => {
const message = error instanceof Error ? error.message : 'unknown error';
process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n');
process.exit(1);
});

View File

@@ -8,12 +8,19 @@ import {
Req, Req,
UnauthorizedException, UnauthorizedException,
} from '@nestjs/common'; } from '@nestjs/common';
import { timingSafeEqual } from 'node:crypto';
import { CommandsService } from './commands.service.js'; import { CommandsService } from './commands.service.js';
import { SyncCommandsDto } from './dto.sync-commands.dto.js'; import { SyncCommandsDto } from './dto.sync-commands.dto.js';
import { safeEqual } from '../common/safe-equal.js';
type AuthedRequest = { userId?: string }; type AuthedRequest = { userId?: string };
function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}
@Controller('commands') @Controller('commands')
export class CommandsController { export class CommandsController {
constructor(private readonly commands: CommandsService) {} constructor(private readonly commands: CommandsService) {}

View File

@@ -1,73 +0,0 @@
/**
* Center-scoped admin cache.
*
* Holds the at-most-one admin user (email + userId) fetched from Center.
* Used to decide who to deliver triage messages to as a silent observer
* (wake=false), regardless of on-duty / mention status.
*
* Refresh policy (per spec, 2026-05-22):
* • TTL = 1 day. Center admin changes are rare; agents tolerate a
* day's stale cache without surprises
* • on first lookup the cache lazy-fetches
* • cli `admin refresh` forces an out-of-band refresh without waiting
* for TTL expiry
*
* Failure mode: a Center fetch error is treated identically to "no
* admin" — guild keeps operating without an observer. The cache holds
* the failed-fetch decision for the same TTL so we don't hammer Center.
*/
import { Injectable, Logger } from '@nestjs/common';
import { fetchAdminEmail } from './center-auth.js';
const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000;
export interface CachedAdmin {
email: string;
userId: string;
}
@Injectable()
export class AdminCacheService {
private readonly logger = new Logger(AdminCacheService.name);
private cached: CachedAdmin | null = null;
private cachedAt = 0;
private inflight: Promise<CachedAdmin | null> | null = null;
/**
* Return the cached admin, fetching from Center if the cache is empty
* or older than the TTL. Returns null if no admin is set.
*
* `force=true` bypasses the cache and refreshes immediately — used by
* the cli refresh command.
*/
async get(force = false): Promise<CachedAdmin | null> {
const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS;
if (!force && this.cachedAt > 0 && fresh) {
return this.cached;
}
if (this.inflight) return this.inflight;
this.inflight = (async () => {
try {
const result = await fetchAdminEmail();
this.cached = result;
this.cachedAt = Date.now();
this.logger.log(
`admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`,
);
return result;
} finally {
this.inflight = null;
}
})();
return this.inflight;
}
/** Snapshot of the cached admin (no fetch). Returns null if not yet
* populated. Used by the hot delivery path which doesn't want to
* block on a Center round-trip. */
snapshot(): CachedAdmin | null {
return this.cached;
}
}

View File

@@ -5,7 +5,6 @@ import {
UnauthorizedException, UnauthorizedException,
} from '@nestjs/common'; } from '@nestjs/common';
import { introspectGuildToken } from './center-auth.js'; import { introspectGuildToken } from './center-auth.js';
import { safeEqual } from './safe-equal.js';
@Injectable() @Injectable()
export class ApiKeyGuard implements CanActivate { export class ApiKeyGuard implements CanActivate {
@@ -22,25 +21,6 @@ export class ApiKeyGuard implements CanActivate {
return true; return true;
} }
// System-key bypass: when a caller presents x-fabric-system-key
// matching FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY (intentionally the
// same shared secret as x-commands-sync-key — both legitimate
// consumers are Fabric.OpenclawPlugin), skip the Bearer requirement
// and mark this as a system caller. Downstream handlers (e.g.
// messaging.controller POST /channels/:id/messages) gate on
// req.isSystem to take the system-author code path.
//
// Empty env → bypass disabled (no system caller ever valid; closed
// by default). Header carries the secret as-is; we constant-time
// compare against the env value.
const sysExpected = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
const sysHeader = req.headers['x-fabric-system-key'];
const sysProvided = Array.isArray(sysHeader) ? sysHeader[0] : sysHeader;
if (sysExpected && sysProvided && safeEqual(sysProvided, sysExpected)) {
(req as { isSystem?: boolean }).isSystem = true;
return true;
}
const auth = req.headers['authorization']; const auth = req.headers['authorization'];
const authValue = Array.isArray(auth) ? auth[0] : auth; const authValue = Array.isArray(auth) ? auth[0] : auth;
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : ''; let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';

View File

@@ -26,31 +26,6 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo
}; };
} }
/**
* Fetch the single Center-scoped admin user (if any).
* Same x-api-key auth as introspect / resolve-names.
* Returns `null` when no admin is set OR the request fails (treated
* identically — the guild simply falls back to "no admin observer").
*/
export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> {
const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL;
const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY;
if (!centerBaseUrl || !centerApiKey) return null;
try {
const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, {
method: 'GET',
headers: { 'x-api-key': centerApiKey },
});
if (!res.ok) return null;
const data = (await res.json()) as { email?: string; userId?: string } | null;
if (!data || !data.email || !data.userId) return null;
return { email: data.email, userId: data.userId };
} catch {
return null;
}
}
// Resolve <@user.name:NAME> names to userIds within this guild node via // Resolve <@user.name:NAME> names to userIds within this guild node via
// Center. Unresolved names are simply absent from the returned map. // Center. Unresolved names are simply absent from the returned map.
export async function resolveUserNames(names: string[]): Promise<Record<string, string>> { export async function resolveUserNames(names: string[]): Promise<Record<string, string>> {

View File

@@ -1,25 +0,0 @@
import { describe, it, expect } from 'vitest';
import { safeEqual } from './safe-equal.js';
describe('safeEqual', () => {
it('returns true for identical non-empty strings', () => {
expect(safeEqual('abc123', 'abc123')).toBe(true);
});
it('returns false for different strings of same length', () => {
expect(safeEqual('abc123', 'abc124')).toBe(false);
});
it('returns false for differing lengths', () => {
expect(safeEqual('abc', 'abcd')).toBe(false);
});
it('handles empty strings', () => {
// both empty is technically equal — but downstream callers should
// explicitly check for empty expected before invoking. We just
// document the constant-time-comparison primitive's behavior.
expect(safeEqual('', '')).toBe(true);
expect(safeEqual('a', '')).toBe(false);
expect(safeEqual('', 'a')).toBe(false);
});
});

View File

@@ -1,12 +0,0 @@
import { timingSafeEqual } from 'node:crypto';
// Constant-time string comparison. Returns false for length mismatch (the
// length difference itself is observable, but the per-byte loop isn't).
// Used for shared-secret header checks (commands-sync-key, system-key,
// etc.) to keep timing-oracle attacks off the table.
export function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}

View File

@@ -14,7 +14,6 @@ import { IdempotencyRecord } from './entities/idempotency-record.entity.js';
import { StoredFile } from './entities/stored-file.entity.js'; import { StoredFile } from './entities/stored-file.entity.js';
import { ChannelCanvas } from './entities/channel-canvas.entity.js'; import { ChannelCanvas } from './entities/channel-canvas.entity.js';
import { GuildCommand } from './entities/guild-command.entity.js'; import { GuildCommand } from './entities/guild-command.entity.js';
import { AgentPresence } from './entities/agent-presence.entity.js';
export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
type: 'mysql', type: 'mysql',
@@ -39,7 +38,6 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
StoredFile, StoredFile,
ChannelCanvas, ChannelCanvas,
GuildCommand, GuildCommand,
AgentPresence,
], ],
synchronize: (process.env.FABRIC_BACKEND_GUILD_DB_SYNC ?? 'true') === 'true', synchronize: (process.env.FABRIC_BACKEND_GUILD_DB_SYNC ?? 'true') === 'true',
logging: (process.env.FABRIC_BACKEND_GUILD_DB_LOGGING ?? 'false') === 'true', logging: (process.env.FABRIC_BACKEND_GUILD_DB_LOGGING ?? 'false') === 'true',

View File

@@ -1,35 +0,0 @@
import { Column, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm';
/**
* Per-user (typically agent) presence cache.
*
* Populated by Fabric.OpenclawPlugin's presence-sync loop: every ~30s
* it reads each connected agent's HF status from the cross-plugin
* `globalThis.__hfAgentStatus.get(agentId)` (exposed by
* HarborForge.OpenclawPlugin) and pushes diffs via
* `PUT /agents/:userId/presence`.
*
* Used by `RealtimeGateway.computeDelivery` for `announce`-type
* channels to skip delivery to recipients whose status is `busy`.
* Defaults to `unknown` if no row exists (treated as not-busy).
*/
@Entity('agent_presences')
export class AgentPresence {
// Same id as the Fabric Center user id (UUID v4 string, char(36)).
@PrimaryColumn({ type: 'char', length: 36 })
userId!: string;
@Column({
type: 'enum',
enum: ['idle', 'on_call', 'busy', 'exhausted', 'offline', 'unknown'],
default: 'unknown',
})
status!: 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown';
/** Free-text source tag for debugging ("hf-plugin", "manual", etc.). */
@Column({ type: 'varchar', length: 64, default: 'unknown' })
source!: string;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -16,23 +16,13 @@ export class Channel {
@Column({ @Column({
name: 'x_type', name: 'x_type',
type: 'enum', type: 'enum',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm', 'announce'], enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
}) })
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm' | 'announce'; xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
@Column({ type: 'varchar', length: 16, default: 'text' }) @Column({ type: 'varchar', length: 16, default: 'text' })
kind!: 'text' | 'announcement'; kind!: 'text' | 'announcement';
// Free-form description of what this channel is for — what topics get
// posted, who participates, why it exists. Surfaced via GET /api/channels
// so agents can pick a channel by intent ("which announce channel is for
// debate broadcasts?") without channel id hard-coded into workflows.
// Any channel member can set it via PATCH /api/channels/:id (writes
// require membership the same way moveToBypass / close do). The frontend
// create form does NOT post this today — purpose stays optional.
@Column({ type: 'text', nullable: true })
purpose!: string | null;
@Column({ type: 'boolean', default: false }) @Column({ type: 'boolean', default: false })
isPrivate!: boolean; isPrivate!: boolean;

View File

@@ -56,14 +56,4 @@ export class CreateMessageDto {
@IsString() @IsString()
@MaxLength(64) @MaxLength(64)
authorUserId?: string; authorUserId?: string;
// System-author path only (x-fabric-system-key gated). When set, the
// message is delivered via emitMessageTargeted so this single recipient
// gets wakeup=true; everyone else in the channel sees wakeup=false. For
// regular (user-bearer) posts this field is ignored. Used by
// close-sub-discussion to precisely wake the host on callback.
@IsOptional()
@IsString()
@MaxLength(64)
wakeupUserId?: string;
} }

View File

@@ -21,7 +21,6 @@ import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js'; import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
import { parseSlashCommand } from '../channels/slash-commands.js'; import { parseSlashCommand } from '../channels/slash-commands.js';
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js'; import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
import { resolveUserNames } from '../common/center-auth.js'; import { resolveUserNames } from '../common/center-auth.js';
@@ -51,7 +50,6 @@ export class MessagingController {
private readonly turn: TurnService, private readonly turn: TurnService,
private readonly events: EventsService, private readonly events: EventsService,
private readonly realtime: RealtimeGateway, private readonly realtime: RealtimeGateway,
private readonly adminCache: AdminCacheService,
) {} ) {}
private async getIdempotentResponse( private async getIdempotentResponse(
@@ -156,78 +154,16 @@ export class MessagingController {
async create( async create(
@Param('id') channelId: string, @Param('id') channelId: string,
@Body() body: CreateMessageDto, @Body() body: CreateMessageDto,
@Req() req: { userId?: string; isSystem?: boolean }, @Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string, @Headers('idempotency-key') idempotencyKey?: string,
) { ) {
const scope = `POST:/channels/${channelId}/messages`; const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey); const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed; if (existed) return existed;
// System caller (ApiKeyGuard set isSystem from x-fabric-system-key):
// skip the per-user participant check; resolve channel directly.
// System posts are allowed into any non-closed channel — used by
// Fabric.OpenclawPlugin to write `close-sub-discussion` callbacks
// back to a parent channel that the host agent may not be currently
// "in" from the backend's perspective, and to deliver guide-injected
// system intros into sub-discussion channels without needing to log
// in as a real user. Author is a sentinel UUID that no real user
// ever has; `wakeupUserId` (optional) lets the caller precisely wake
// one recipient (e.g. the host of a closing sub-discussion).
if (req.isSystem) {
const sysChannel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!sysChannel) throw new NotFoundException('channel not found');
if (sysChannel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
}
const SYSTEM_USER_ID = '00000000-0000-0000-0000-000000000000';
let sysContent = body.content ?? '';
const sysNames = extractNameMentions(sysContent);
if (sysNames.length) {
const nameMap = await resolveUserNames(sysNames);
sysContent = replaceNameMentions(sysContent, nameMap);
}
const sysMessage = await this.persistMessage(channelId, {
authorUserId: SYSTEM_USER_ID,
content: sysContent,
clientMessageId: body.clientMessageId,
replyToMessageId: body.replyToMessageId,
mentions: body.mentions,
attachments: body.attachments,
});
const sysView = this.toView(sysMessage) as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, sysView);
await this.events.emit({
eventType: 'message.created',
channelId,
actorId: SYSTEM_USER_ID,
data: sysView,
});
// wakeupUserId set -> emitMessageTargeted wakes exactly that user
// (everyone else gets the same message with wakeup=false).
// wakeupUserId omitted/null -> emitMessageCreated routes via the
// channel's xType-specific 3-state delivery with empty wakeSet, so
// nobody is woken (the message lands in history only).
const wakeupUserId = typeof body.wakeupUserId === 'string' ? body.wakeupUserId.trim() : '';
if (wakeupUserId) {
await this.realtime.emitMessageTargeted(channelId, sysView, wakeupUserId);
} else {
await this.realtime.emitMessageCreated(channelId, sysView, {
xType: sysChannel.xType ?? 'general',
authorUserId: SYSTEM_USER_ID,
wakeUserIds: new Set<string>(),
});
}
return sysView;
}
// Guild C-1: caller must be a participant of the channel, and the // Guild C-1: caller must be a participant of the channel, and the
// author is always the authenticated user — body.authorUserId is // author is always the authenticated user — body.authorUserId is
// ignored so a caller can never post as someone else. // ignored so a caller can never post as someone else.
//
// announce channels: any participant can POST. Use case is one-off
// recruitment / broadcast messages posted by the agent that just
// created the originating topic (e.g. dialectic invites). No
// server-side privileged path — author is always a real user.
const userId = String(req.userId ?? ''); const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user'); if (!userId) throw new ForbiddenException('missing user');
const channel = await this.assertParticipant(channelId, userId); const channel = await this.assertParticipant(channelId, userId);
@@ -289,19 +225,16 @@ export class MessagingController {
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds); const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else { } else {
// general/report/triage/custom: 3-state delivery // general/report/triage/custom: wakeup from x_type + wake_mapping;
// (wake / observer / skip) — see realtime.gateway.computeDelivery. // general also honors the message's at-list
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
const wakeRows = await this.wakeRepo.find({ where: { channelId } }); const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId)); const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
const admin = await this.adminCache.get();
await this.realtime.emitMessageCreated(channelId, responseBody, { await this.realtime.emitMessageCreated(channelId, responseBody, {
xType, xType,
authorUserId, authorUserId,
wakeUserIds, wakeUserIds,
mentionUserIds, mentionUserIds,
adminUserId: admin?.userId ?? null,
}); });
} }

View File

@@ -6,12 +6,9 @@ import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js'; import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
@Module({ @Module({
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])], imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])],
controllers: [MessagingController], controllers: [MessagingController],
providers: [AdminCacheService],
exports: [AdminCacheService],
}) })
export class MessagingModule {} export class MessagingModule {}

View File

@@ -11,94 +11,17 @@ import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io'; import { Server, Socket } from 'socket.io';
import { introspectGuildToken } from '../common/center-auth.js'; import { introspectGuildToken } from '../common/center-auth.js';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm' | 'announce'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
/** // Wakeup for non-rotating channels only (general/report/triage/custom).
* Cross-presence info needed by `announce`-type delivery: a recipient // discuss/work go through TurnService + emitMessageTargeted, never here.
* with hf-side status === 'busy' has the message discarded silently // Precedence:
* (don't enter their session, no UI emit). Other statuses + non-announce // 1. the author never gets woken by their own message
* channels are unaffected. Presence is sourced from the // 2. triage/custom: only wake users in the channel's wake_mapping
* `agent_presences` table populated by Fabric.OpenclawPlugin's // (mentions change nothing here)
* presence-sync loop (which reads from HF plugin's `__hfAgentStatus`). // 3. general: if the message has an at-list, wake only the at'd users;
*/ // otherwise wake everyone
export type PresenceStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown'; // 4. report (and anything else): wake nobody
/**
* Per-recipient delivery decision for a non-rotating channel message.
*
* • `wake` — push the event AND wake the recipient (model turn fires)
* • `observer` — push the event with wakeup=false (silent; UI displays
* but the openclaw plugin records-only without dispatch). Currently
* used for the Center admin observing triage traffic
* • `skip` — don't even emit the event to this recipient
*
* Wakeup-only channels (general/report/dm/custom) never return
* 'observer'; the legacy behaviour is preserved end-to-end.
*
* Precedence for triage (the only place 'skip' / 'observer' fire):
* 1. author never gets back their own message
* 2. wake_mapping (on-duty) → wake
* 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage
* mention exception')
* 4. admin (Center-scoped, at most one) → observer
* 5. everyone else → skip (was 'deliver, wakeup=false' before)
*/
export type DeliveryDecision = 'wake' | 'observer' | 'skip';
export interface ComputeDeliveryArgs {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId, or null. */
adminUserId?: string | null;
/** Recipient's current presence; only consulted for `announce` xType. Defaults to 'unknown' (treated as not-busy). */
recipientPresence?: PresenceStatus;
}
export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision {
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId, recipientPresence } = args;
if (recipientUserId === authorUserId) return 'skip';
switch (xType) {
case 'triage':
if (wakeUserIds.has(recipientUserId)) return 'wake';
if (mentionUserIds?.has(recipientUserId)) return 'wake';
if (adminUserId && recipientUserId === adminUserId) return 'observer';
return 'skip';
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer';
}
return 'wake';
case 'custom':
// wake_mapping decides who wakes; everyone else still sees the
// message (observer) — preserves the legacy "deliver to all, wake
// some" contract for custom channels.
return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer';
case 'dm':
return 'wake';
case 'announce':
// System-broadcast channels (e.g. Dialectic topic announcements).
// Recipients with HF status === 'busy' have the message discarded
// silently — busy agents should not be distracted by signup pings
// they can't act on. All other presences (idle/on_call/exhausted/
// offline/unknown) get the message as 'observer' (no wake): the
// channel itself is browsable; agents proactively decide what to
// do with announcements when they next look at their inbox.
if (recipientPresence === 'busy') return 'skip';
return 'observer';
default:
// report (and anything else): deliver as observer, no wake
return 'observer';
}
}
/**
* @deprecated Use computeDelivery (returns 3-state). Kept for any
* external callers; treats 'observer' and 'skip' both as `false`.
*/
export function computeWakeup(args: { export function computeWakeup(args: {
xType: XType; xType: XType;
recipientUserId: string; recipientUserId: string;
@@ -106,7 +29,23 @@ export function computeWakeup(args: {
wakeUserIds: Set<string>; wakeUserIds: Set<string>;
mentionUserIds?: Set<string>; mentionUserIds?: Set<string>;
}): boolean { }): boolean {
return computeDelivery(args) === 'wake'; const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args;
if (recipientUserId === authorUserId) return false;
switch (xType) {
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId);
}
return true;
case 'triage':
case 'custom':
return wakeUserIds.has(recipientUserId);
case 'dm':
// 1:1 conversation: every non-author participant is always woken.
return true;
default:
return false;
}
} }
@WebSocketGateway({ @WebSocketGateway({
@@ -122,12 +61,6 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
private readonly logger = new Logger(RealtimeGateway.name); private readonly logger = new Logger(RealtimeGateway.name);
private readonly onlineUsers = new Set<string>(); private readonly onlineUsers = new Set<string>();
// Optional: injected at module wiring time. Used by emitMessageCreated
// to pre-load recipient presence for announce-type channels.
// Typed loosely to avoid a circular import between realtime and agents
// modules; the actual interface lives in agents/agent-presence.service.
presence?: { getStatusMap(ids: string[]): Promise<Map<string, PresenceStatus>> };
private userIdFromClient(client: Socket): string { private userIdFromClient(client: Socket): string {
const authUser = client.handshake.auth?.userId; const authUser = client.handshake.auth?.userId;
const headerUser = client.handshake.headers['x-user-id']; const headerUser = client.handshake.headers['x-user-id'];
@@ -250,11 +183,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
this.server.to(`user:${userId}`).emit(event, data); this.server.to(`user:${userId}`).emit(event, data);
} }
// Emits message.created per-recipient using the 3-state delivery // Emits message.created per-recipient so each carries its own `wakeup` flag.
// decision (wake / observer / skip). Skipped recipients receive
// nothing — used by triage channels to keep non-on-duty / non-mention
// / non-admin users completely out of the loop, and by announce
// channels to suppress delivery to recipients whose presence is busy.
async emitMessageCreated( async emitMessageCreated(
channelId: string, channelId: string,
data: Record<string, unknown>, data: Record<string, unknown>,
@@ -263,41 +192,19 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
authorUserId: string; authorUserId: string;
wakeUserIds: Set<string>; wakeUserIds: Set<string>;
mentionUserIds?: Set<string>; mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId (or null). */
adminUserId?: string | null;
}, },
): Promise<void> { ): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
// For announce-type channels, pre-load presence for all recipients
// in one query so the per-recipient loop doesn't fan out to N round
// trips. For other xTypes, presence is irrelevant — skip the lookup.
let presenceMap: Map<string, PresenceStatus> | undefined;
if (ctx.xType === 'announce' && this.presence) {
const recipientIds = sockets
.map((s) => (typeof s.data.userId === 'string' ? (s.data.userId as string) : ''))
.filter((id) => id && !id.startsWith('anon:'));
presenceMap = await this.presence.getStatusMap(recipientIds);
}
for (const s of sockets) { for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const decision = computeDelivery({ const wakeup = computeWakeup({
xType: ctx.xType, xType: ctx.xType,
recipientUserId, recipientUserId,
authorUserId: ctx.authorUserId, authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds, wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds, mentionUserIds: ctx.mentionUserIds,
adminUserId: ctx.adminUserId,
recipientPresence: presenceMap?.get(recipientUserId) ?? 'unknown',
});
if (decision === 'skip') continue;
s.emit('message.created', {
...data,
channelId,
wakeup: decision === 'wake',
xType: ctx.xType,
}); });
s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType });
} }
} }

View File

@@ -1,25 +1,9 @@
import { Global, Module, OnModuleInit } from '@nestjs/common'; import { Global, Module } from '@nestjs/common';
import { RealtimeGateway } from './realtime.gateway.js'; import { RealtimeGateway } from './realtime.gateway.js';
import { AgentPresenceModule } from '../agents/agent-presence.module.js';
import { AgentPresenceService } from '../agents/agent-presence.service.js';
@Global() @Global()
@Module({ @Module({
imports: [AgentPresenceModule],
providers: [RealtimeGateway], providers: [RealtimeGateway],
exports: [RealtimeGateway], exports: [RealtimeGateway],
}) })
export class RealtimeModule implements OnModuleInit { export class RealtimeModule {}
// Wire presence into the gateway at startup. Using assignment (vs
// constructor injection) keeps the gateway free of the agents-module
// import — no risk of circular dependency, and announce-channel
// delivery degrades gracefully (presence stays undefined → 'unknown'
// status → no busy-discard) if AgentPresenceModule is ever removed.
constructor(
private readonly gateway: RealtimeGateway,
private readonly presence: AgentPresenceService,
) {}
onModuleInit(): void {
this.gateway.presence = this.presence;
}
}