6 Commits

Author SHA1 Message Date
7cb046d785 feat(triage): 3-state delivery + admin observer + admin cache
Triage channels now compute a 3-state delivery decision per recipient
(wake / observer / skip) instead of the binary wakeup flag, and route
according to:

  1. author never gets back their own message            → skip
  2. wake_mapping member (on-duty)                       → wake
  3. mention (NEW: was 'skip' for triage before)         → wake
  4. Center-scoped admin (at most 1)                     → observer
  5. anyone else                                         → skip
                                                         (was 'deliver wake=false')

Skipping means the websocket emit is omitted entirely — the recipient's
openclaw plugin never sees the message and the agent's session stays
free of background noise. Observer means delivered with wakeup=false
(silent UI / no model dispatch on the plugin side).

## What this PR ships

### realtime/realtime.gateway.ts
- new `computeDelivery()` returns DeliveryDecision = 'wake'|'observer'|'skip'
- old `computeWakeup()` kept as a deprecated wrapper for callers that
  still want the boolean answer (treats observer + skip as false)
- `emitMessageCreated` accepts `adminUserId?: string|null` and now
  short-circuits on 'skip' (no socket emit at all)
- general kept its current behavior; custom kept its current behavior
  (members not in wake_mapping become observer instead of `wake=false`)
  — the user-visible bit is just that the response field is the same
  `wakeup: boolean`; the explicit 'skip' is new for triage

### common/center-auth.ts
- `fetchAdminEmail()` calls GET `${center}/auth/admin-email` with the
  existing x-api-key (same auth as introspect/resolve-names). Returns
  `{email, userId}` or `null` on either "no admin" or any error

### common/admin-cache.service.ts (NEW)
- `AdminCacheService` — in-memory cache, 1-day TTL, lazy refresh.
  `get(force=true)` bypasses TTL for cli-triggered refresh
- exposed by MessagingModule

### messaging/messaging.controller.ts
- non-rotating branch threads `adminUserId` into emitMessageCreated

### cli/admin-refresh.ts (NEW)
- `node dist/cli/admin-refresh.js` — force-refresh cache and print
  before/after JSON. Use after a Center `user set-admin` so triage
  delivery picks up the new admin without waiting for 24h TTL

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:14:05 +01:00
e635faea9c Merge pull request 'feat(realtime): push channel.joined/left events to user-scoped rooms' (#1) from feat/push-channel-membership-events into main 2026-05-21 07:12:51 +00:00
30069377e7 feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to
nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw
fabric inbound had to poll `/api/channels?guildId=...` every 60s to
discover newly-joined channels (any DM another user just dragged the
agent into). Now the server tells the agent's socket directly so
sub/unsub is realtime.

Changes:
- realtime.gateway.ts:
  * handleConnection joins the socket into a `user:<userId>` room.
    All of a user's connected sockets now share that room.
  * New `emitToUser(userId, event, data)` helper that emits into
    that room. No-op for offline users (next connect resyncs via the
    plugin's initial channel-list fetch).
- channels.service.ts:
  * Inject RealtimeGateway (RealtimeModule is @Global, no module
    plumbing needed).
  * Private `notifyMembership(kind, channelId, userIds, extra)`
    helper that emits `channel.<kind>` (joined|left) with payload
    {channelId, userId, xType, occurredAt}.
  * create(): emit channel.joined to every seeded member (creator +
    explicit memberUserIds + triage on-duty).
  * joinChannel(): emit channel.joined to userId (only if the row was
    actually inserted, idempotent on existing membership).
  * leaveChannel(): emit channel.left to userId iff a row was
    actually deleted.

Event shape:
  {
    channelId: string,
    userId: string,
    xType?: string,
    occurredAt: ISO string,
  }

Client-side contract (fabric plugin):
  socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId}))
  socket.on('channel.left',   m => socket.emit('leave_channel', {channelId: m.channelId}))

The plugin keeps its 60s polling resync as a safety net for missed
events (transient socket drops between emit and reconnect, partial
failures, etc).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:07:46 +01:00
b1f7467161 feat(guild): add 'dm' x-type (private 1:1, always-wake)
channel enum + X_TYPES + realtime XType gain 'dm'. dm channels are
forced private (never public) and non-unique (no dedup; create()
always makes a fresh one). computeWakeup: dm wakes every non-author
participant unconditionally (no rotation / no wake_mapping). The
message.created realtime payload now carries xType so the plugin can
treat dm specially.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:19 +01:00
7e944a08f6 feat(ops): CLI to print the commands-sync key (Guild C-2)
node dist/cli/print-commands-sync-key.js (npm run print:commands-key)
outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY as the process sees it,
so an operator can docker-exec it on the deployed guild and copy the
value into the plugin's FABRIC_COMMANDS_SYNC_KEY. --export prints a
ready-to-paste assignment; exits 1 when unset (fallback mode).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:28:23 +01:00
e45ad91340 fix(security): close Critical IDOR/authz gaps (C-1/C-2)
C-1: messaging endpoints now enforce channel participation (public
     channels open; private require channel_members). authorUserId is
     forced to the authenticated user (no more author spoofing); edit/
     delete require message-author ownership; history read gated too.
C-2: PUT /commands body strictly validated + size-capped via
     SyncCommandsDto (kills catalog poisoning / DoS). Optional
     FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY restricts the write to the
     plugin when set; never weaker than before when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:08 +01:00
12 changed files with 507 additions and 49 deletions

View File

@@ -7,6 +7,7 @@
"scripts": { "scripts": {
"build": "tsc -p tsconfig.build.json", "build": "tsc -p tsconfig.build.json",
"start": "node dist/main.js", "start": "node dist/main.js",
"print:commands-key": "node dist/cli/print-commands-sync-key.js",
"start:dev": "ts-node src/main.ts", "start:dev": "ts-node src/main.ts",
"lint": "eslint 'src/**/*.ts'", "lint": "eslint 'src/**/*.ts'",
"lint:fix": "eslint 'src/**/*.ts' --fix", "lint:fix": "eslint 'src/**/*.ts' --fix",

View File

@@ -5,8 +5,9 @@ import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js'; import { ChannelMember } from '../entities/channel-member.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { TurnService } from './turn.service.js'; import { TurnService } from './turn.service.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom'] as const; const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
type XType = (typeof X_TYPES)[number]; type XType = (typeof X_TYPES)[number];
type CreateChannelInput = { type CreateChannelInput = {
@@ -35,8 +36,34 @@ export class ChannelsService {
@InjectRepository(WakeMapping) @InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>, private readonly wakeRepo: Repository<WakeMapping>,
private readonly turnService: TurnService, private readonly turnService: TurnService,
// RealtimeGateway is provided by the global RealtimeModule. Used to
// push channel.joined / channel.left so connected clients (e.g. the
// OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately
// instead of waiting for the polling fallback.
private readonly realtime: RealtimeGateway,
) {} ) {}
// Push a channel membership change to each affected user's socket-room.
// Best-effort: offline users see the new state on their next connect
// (the inbound runs an initial channel-list fetch on connect).
private notifyMembership(
kind: 'joined' | 'left',
channelId: string,
userIds: string[] | Set<string>,
extra: Record<string, unknown> = {},
): void {
const ids = userIds instanceof Set ? [...userIds] : userIds;
const payload = {
channelId,
...extra,
occurredAt: new Date().toISOString(),
};
for (const u of ids) {
if (!u) continue;
this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u });
}
}
// Channels visible to a user within a guild: // Channels visible to a user within a guild:
// - every public channel of the guild (incl. ones created before the user // - every public channel of the guild (incl. ones created before the user
// joined the guild), OR // joined the guild), OR
@@ -93,6 +120,7 @@ export class ChannelsService {
if (channel.xType === 'discuss' || channel.xType === 'work') { if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberAdded(channelId, userId); await this.turnService.onMemberAdded(channelId, userId);
} }
this.notifyMembership('joined', channelId, [userId], { xType: channel.xType });
} }
return { status: 'ok', channelId, userId, member: true }; return { status: 'ok', channelId, userId, member: true };
} }
@@ -102,11 +130,14 @@ export class ChannelsService {
if (!channel) throw new NotFoundException('channel not found'); if (!channel) throw new NotFoundException('channel not found');
// remove every channel-scoped row that references this user // remove every channel-scoped row that references this user
await this.memberRepo.delete({ channelId, userId }); const deleted = await this.memberRepo.delete({ channelId, userId });
await this.wakeRepo.delete({ channelId, userId }); await this.wakeRepo.delete({ channelId, userId });
if (channel.xType === 'discuss' || channel.xType === 'work') { if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberRemoved(channelId, userId); await this.turnService.onMemberRemoved(channelId, userId);
} }
if ((deleted.affected ?? 0) > 0) {
this.notifyMembership('left', channelId, [userId], { xType: channel.xType });
}
return { status: 'ok', channelId, userId, member: false }; return { status: 'ok', channelId, userId, member: false };
} }
@@ -130,14 +161,19 @@ export class ChannelsService {
.map((x) => String(x ?? '').trim()) .map((x) => String(x ?? '').trim())
.filter(Boolean); .filter(Boolean);
// dm channels are always private (a 1:1 conversation); never public.
// dm is not unique — multiple dm channels between the same users are
// allowed (create() always makes a fresh one, no dedup).
const isPublic = xType === 'dm' ? false : Boolean(input.isPublic);
const channel = await this.channelRepo.save( const channel = await this.channelRepo.save(
this.channelRepo.create({ this.channelRepo.create({
guildId, guildId,
name, name,
xType, xType,
kind: input.kind === 'announcement' ? 'announcement' : 'text', kind: input.kind === 'announcement' ? 'announcement' : 'text',
isPrivate: !input.isPublic, isPrivate: !isPublic,
isPublic: Boolean(input.isPublic), isPublic,
lastSeq: 0, lastSeq: 0,
}), }),
); );
@@ -155,6 +191,12 @@ export class ChannelsService {
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })), [...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
); );
// Push channel.joined to every seeded member (creator + invitees +
// triage on-duty) so their connected sockets sub the new room
// immediately. Skips offline users — next connect's channel-list
// fetch covers them.
this.notifyMembership('joined', channel.id, memberIds, { xType });
// wake_mapping: triage -> the on-duty user; custom -> each listener // wake_mapping: triage -> the on-duty user; custom -> each listener
const wakeUserIds = new Set<string>(); const wakeUserIds = new Set<string>();
if (xType === 'triage') wakeUserIds.add(onDuty); if (xType === 'triage') wakeUserIds.add(onDuty);

39
src/cli/admin-refresh.ts Normal file
View File

@@ -0,0 +1,39 @@
// Operator convenience: force-refresh the in-memory Center admin cache
// without waiting for the 1-day TTL. Used after `center user set-admin`
// to make new admin visible immediately to triage delivery.
//
// Usage (inside the deployed container):
// docker exec fabric-backend-guild node dist/cli/admin-refresh.js
//
// Prints the (possibly null) result as JSON. Exit 0 always — a "no
// admin" outcome is a valid state, not an error.
import 'reflect-metadata';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '../app.module.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] });
try {
const cache = app.get(AdminCacheService);
const before = cache.snapshot();
const after = await cache.get(true);
process.stdout.write(
JSON.stringify({
ok: true,
before,
after,
changed: JSON.stringify(before) !== JSON.stringify(after),
}) + '\n',
);
} finally {
await app.close();
}
}
void main().catch((error: unknown) => {
const message = error instanceof Error ? error.message : 'unknown error';
process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n');
process.exit(1);
});

View File

@@ -0,0 +1,37 @@
// Operator convenience (Guild C-2): print the commands-sync key that this
// guild process actually has in its environment, so it can be copied into
// the OpenClaw plugin's FABRIC_COMMANDS_SYNC_KEY.
//
// Usage (inside the deployed container — authoritative, reflects compose):
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js --export
//
// Default: prints the raw value only (so KEY=$(... ) works).
// --export: prints `FABRIC_COMMANDS_SYNC_KEY=<value>` for pasting.
// Exit 1 (no stdout) when unset — guild is then in the weaker
// "any authenticated user" fallback for PUT /commands.
const args = new Set(process.argv.slice(2));
if (args.has('--help') || args.has('-h')) {
process.stderr.write(
'print-commands-sync-key: outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY\n' +
' (no flag) print the raw key value\n' +
' --export print FABRIC_COMMANDS_SYNC_KEY=<value>\n',
);
process.exit(0);
}
const key = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
if (!key) {
process.stderr.write(
'FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is not set — PUT /commands is in ' +
'the fallback mode (any authenticated user). Set it to harden (Guild C-2).\n',
);
process.exit(1);
}
process.stdout.write(
(args.has('--export') ? `FABRIC_COMMANDS_SYNC_KEY=${key}` : key) + '\n',
);

View File

@@ -1,29 +1,51 @@
import { import {
Body, Body,
Controller, Controller,
ForbiddenException,
Get, Get,
Headers,
Put, Put,
Req, Req,
UnauthorizedException, UnauthorizedException,
} from '@nestjs/common'; } from '@nestjs/common';
import { timingSafeEqual } from 'node:crypto';
import { CommandsService } from './commands.service.js'; import { CommandsService } from './commands.service.js';
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
type AuthedRequest = { userId?: string }; type AuthedRequest = { userId?: string };
function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}
@Controller('commands') @Controller('commands')
export class CommandsController { export class CommandsController {
constructor(private readonly commands: CommandsService) {} constructor(private readonly commands: CommandsService) {}
// Plugin syncs the OpenClaw native-command catalog here (any authenticated // Guild C-2: catalog write is privileged. When
// agent/user; idempotent full replace). // FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is configured (recommended in
// production), the caller must present a matching x-commands-sync-key
// header — this restricts writes to the OpenClaw plugin. When unset, we
// fall back to "any authenticated agent/user" (never weaker than before).
// The body is always strictly validated + size-capped via SyncCommandsDto.
@Put() @Put()
sync( sync(
@Req() req: AuthedRequest, @Req() req: AuthedRequest,
@Body() body: { commands?: unknown[] }, @Body() body: SyncCommandsDto,
@Headers('x-commands-sync-key') syncKey?: string,
) { ) {
if (!req.userId) throw new UnauthorizedException('missing user'); const configured = process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '';
const commands = Array.isArray(body?.commands) ? body.commands : []; if (configured) {
return this.commands.sync(commands); if (!syncKey || !safeEqual(syncKey, configured)) {
throw new ForbiddenException('invalid commands sync key');
}
} else if (!req.userId) {
throw new UnauthorizedException('missing user');
}
return this.commands.sync(body.commands as unknown[]);
} }
// Frontend reads the catalog to drive `/` autocomplete. // Frontend reads the catalog to drive `/` autocomplete.

View File

@@ -0,0 +1,102 @@
import {
ArrayMaxSize,
IsArray,
IsBoolean,
IsOptional,
IsString,
MaxLength,
ValidateNested,
} from 'class-validator';
import { Type } from 'class-transformer';
// Guild C-2: the slash-command catalog is guild-global and rendered by the
// frontend `/` autocomplete. Without a strict schema + caps a single
// authenticated caller could poison it or blow up the DB / clients.
// The global ValidationPipe runs with { whitelist, forbidNonWhitelisted },
// so any unknown field is rejected.
class CommandChoiceDto {
@IsString()
@MaxLength(200)
value!: string;
@IsString()
@MaxLength(200)
label!: string;
}
class CommandArgDto {
@IsString()
@MaxLength(100)
name!: string;
@IsOptional()
@IsString()
@MaxLength(500)
description?: string;
@IsOptional()
@IsString()
@MaxLength(40)
type?: string;
@IsOptional()
@IsBoolean()
required?: boolean;
@IsOptional()
@IsBoolean()
captureRemaining?: boolean;
@IsOptional()
@IsBoolean()
preferAutocomplete?: boolean;
// null when there are no choices (plugin sends explicit null).
@IsOptional()
@IsArray()
@ArrayMaxSize(100)
@ValidateNested({ each: true })
@Type(() => CommandChoiceDto)
choices?: CommandChoiceDto[] | null;
}
class CommandSpecDto {
@IsString()
@MaxLength(100)
name!: string;
@IsOptional()
@IsString()
@MaxLength(100)
nativeName?: string;
@IsOptional()
@IsString()
@MaxLength(500)
description?: string;
@IsOptional()
@IsBoolean()
acceptsArgs?: boolean;
@IsOptional()
@IsArray()
@ArrayMaxSize(50)
@ValidateNested({ each: true })
@Type(() => CommandArgDto)
args?: CommandArgDto[];
@IsOptional()
@IsString()
@MaxLength(20)
argsParsing?: string;
}
export class SyncCommandsDto {
@IsArray()
@ArrayMaxSize(200)
@ValidateNested({ each: true })
@Type(() => CommandSpecDto)
commands!: CommandSpecDto[];
}

View File

@@ -0,0 +1,73 @@
/**
* Center-scoped admin cache.
*
* Holds the at-most-one admin user (email + userId) fetched from Center.
* Used to decide who to deliver triage messages to as a silent observer
* (wake=false), regardless of on-duty / mention status.
*
* Refresh policy (per spec, 2026-05-22):
* • TTL = 1 day. Center admin changes are rare; agents tolerate a
* day's stale cache without surprises
* • on first lookup the cache lazy-fetches
* • cli `admin refresh` forces an out-of-band refresh without waiting
* for TTL expiry
*
* Failure mode: a Center fetch error is treated identically to "no
* admin" — guild keeps operating without an observer. The cache holds
* the failed-fetch decision for the same TTL so we don't hammer Center.
*/
import { Injectable, Logger } from '@nestjs/common';
import { fetchAdminEmail } from './center-auth.js';
const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000;
export interface CachedAdmin {
email: string;
userId: string;
}
@Injectable()
export class AdminCacheService {
private readonly logger = new Logger(AdminCacheService.name);
private cached: CachedAdmin | null = null;
private cachedAt = 0;
private inflight: Promise<CachedAdmin | null> | null = null;
/**
* Return the cached admin, fetching from Center if the cache is empty
* or older than the TTL. Returns null if no admin is set.
*
* `force=true` bypasses the cache and refreshes immediately — used by
* the cli refresh command.
*/
async get(force = false): Promise<CachedAdmin | null> {
const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS;
if (!force && this.cachedAt > 0 && fresh) {
return this.cached;
}
if (this.inflight) return this.inflight;
this.inflight = (async () => {
try {
const result = await fetchAdminEmail();
this.cached = result;
this.cachedAt = Date.now();
this.logger.log(
`admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`,
);
return result;
} finally {
this.inflight = null;
}
})();
return this.inflight;
}
/** Snapshot of the cached admin (no fetch). Returns null if not yet
* populated. Used by the hot delivery path which doesn't want to
* block on a Center round-trip. */
snapshot(): CachedAdmin | null {
return this.cached;
}
}

View File

@@ -26,6 +26,31 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo
}; };
} }
/**
* Fetch the single Center-scoped admin user (if any).
* Same x-api-key auth as introspect / resolve-names.
* Returns `null` when no admin is set OR the request fails (treated
* identically — the guild simply falls back to "no admin observer").
*/
export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> {
const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL;
const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY;
if (!centerBaseUrl || !centerApiKey) return null;
try {
const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, {
method: 'GET',
headers: { 'x-api-key': centerApiKey },
});
if (!res.ok) return null;
const data = (await res.json()) as { email?: string; userId?: string } | null;
if (!data || !data.email || !data.userId) return null;
return { email: data.email, userId: data.userId };
} catch {
return null;
}
}
// Resolve <@user.name:NAME> names to userIds within this guild node via // Resolve <@user.name:NAME> names to userIds within this guild node via
// Center. Unresolved names are simply absent from the returned map. // Center. Unresolved names are simply absent from the returned map.
export async function resolveUserNames(names: string[]): Promise<Record<string, string>> { export async function resolveUserNames(names: string[]): Promise<Record<string, string>> {

View File

@@ -16,9 +16,9 @@ export class Channel {
@Column({ @Column({
name: 'x_type', name: 'x_type',
type: 'enum', type: 'enum',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom'], enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
}) })
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom'; xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
@Column({ type: 'varchar', length: 16, default: 'text' }) @Column({ type: 'varchar', length: 16, default: 'text' })
kind!: 'text' | 'announcement'; kind!: 'text' | 'announcement';

View File

@@ -3,6 +3,7 @@ import {
ConflictException, ConflictException,
Controller, Controller,
Delete, Delete,
ForbiddenException,
Get, Get,
Headers, Headers,
NotFoundException, NotFoundException,
@@ -10,14 +11,17 @@ import {
Patch, Patch,
Post, Post,
Query, Query,
Req,
} from '@nestjs/common'; } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { DataSource, Repository } from 'typeorm'; import { DataSource, Repository } from 'typeorm';
import { CreateMessageDto } from './dto.create-message.dto.js'; import { CreateMessageDto } from './dto.create-message.dto.js';
import { Channel } from '../entities/channel.entity.js'; import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js'; import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
import { parseSlashCommand } from '../channels/slash-commands.js'; import { parseSlashCommand } from '../channels/slash-commands.js';
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js'; import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
import { resolveUserNames } from '../common/center-auth.js'; import { resolveUserNames } from '../common/center-auth.js';
@@ -36,6 +40,8 @@ export class MessagingController {
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
@InjectRepository(Channel) @InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>, private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(Message) @InjectRepository(Message)
private readonly messageRepo: Repository<Message>, private readonly messageRepo: Repository<Message>,
@InjectRepository(IdempotencyRecord) @InjectRepository(IdempotencyRecord)
@@ -45,6 +51,7 @@ export class MessagingController {
private readonly turn: TurnService, private readonly turn: TurnService,
private readonly events: EventsService, private readonly events: EventsService,
private readonly realtime: RealtimeGateway, private readonly realtime: RealtimeGateway,
private readonly adminCache: AdminCacheService,
) {} ) {}
private async getIdempotentResponse( private async getIdempotentResponse(
@@ -86,6 +93,19 @@ export class MessagingController {
}; };
} }
// Channel-participant gate (Guild C-1): public channels are readable/
// writable by any authenticated user; private channels require explicit
// channel_members membership. Returns the channel so callers can reuse it.
private async assertParticipant(channelId: string, userId: string): Promise<Channel> {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
if (channel.isPublic) return channel;
if (!userId) throw new ForbiddenException('not a channel member');
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member) throw new ForbiddenException('not a channel member');
return channel;
}
// Persists one message (allocates a seq under a channel row lock) and // Persists one message (allocates a seq under a channel row lock) and
// returns its view. Used for normal messages and for guild /ack messages. // returns its view. Used for normal messages and for guild /ack messages.
private async persistMessage( private async persistMessage(
@@ -136,20 +156,25 @@ export class MessagingController {
async create( async create(
@Param('id') channelId: string, @Param('id') channelId: string,
@Body() body: CreateMessageDto, @Body() body: CreateMessageDto,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string, @Headers('idempotency-key') idempotencyKey?: string,
) { ) {
const scope = `POST:/channels/${channelId}/messages`; const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey); const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed; if (existed) return existed;
const channel = await this.channelRepo.findOne({ where: { id: channelId } }); // Guild C-1: caller must be a participant of the channel, and the
if (!channel) throw new NotFoundException('channel not found'); // author is always the authenticated user — body.authorUserId is
// ignored so a caller can never post as someone else.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const channel = await this.assertParticipant(channelId, userId);
if (channel.closed) { if (channel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' }); throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
} }
const xType = channel.xType ?? 'general'; const xType = channel.xType ?? 'general';
const isRotating = xType === 'discuss' || xType === 'work'; const isRotating = xType === 'discuss' || xType === 'work';
const authorUserId = String(body.authorUserId ?? 'anonymous'); const authorUserId = userId;
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via // ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
// Center before anything else persists/parses the content // Center before anything else persists/parses the content
@@ -202,16 +227,19 @@ export class MessagingController {
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds); const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId); await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else { } else {
// general/report/triage/custom: wakeup from x_type + wake_mapping; // general/report/triage/custom: 3-state delivery
// general also honors the message's at-list // (wake / observer / skip) — see realtime.gateway.computeDelivery.
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
const wakeRows = await this.wakeRepo.find({ where: { channelId } }); const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId)); const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId)); const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
const admin = await this.adminCache.get();
await this.realtime.emitMessageCreated(channelId, responseBody, { await this.realtime.emitMessageCreated(channelId, responseBody, {
xType, xType,
authorUserId, authorUserId,
wakeUserIds, wakeUserIds,
mentionUserIds, mentionUserIds,
adminUserId: admin?.userId ?? null,
}); });
} }
@@ -223,14 +251,23 @@ export class MessagingController {
@Param('id') channelId: string, @Param('id') channelId: string,
@Param('messageId') messageId: string, @Param('messageId') messageId: string,
@Body() body: { content?: string }, @Body() body: { content?: string },
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string, @Headers('idempotency-key') idempotencyKey?: string,
) { ) {
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`; const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey); const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed; if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } }); const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' }; if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
const now = Date.now(); const now = Date.now();
const createdAt = new Date(item.createdAt).getTime(); const createdAt = new Date(item.createdAt).getTime();
@@ -259,14 +296,23 @@ export class MessagingController {
async remove( async remove(
@Param('id') channelId: string, @Param('id') channelId: string,
@Param('messageId') messageId: string, @Param('messageId') messageId: string,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string, @Headers('idempotency-key') idempotencyKey?: string,
) { ) {
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`; const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey); const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed; if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } }); const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' }; if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
item.isDeleted = true; item.isDeleted = true;
item.deletedAt = new Date(); item.deletedAt = new Date();
@@ -304,10 +350,14 @@ export class MessagingController {
@Get() @Get()
async listBySeq( async listBySeq(
@Param('id') channelId: string, @Param('id') channelId: string,
@Req() req: { userId?: string },
@Query('seq_from') seqFrom?: string, @Query('seq_from') seqFrom?: string,
@Query('seq_to') seqTo?: string, @Query('seq_to') seqTo?: string,
@Query('limit') limit?: string, @Query('limit') limit?: string,
) { ) {
// Guild C-1: only participants may read channel history.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const from = seqFrom ? Number(seqFrom) : 1; const from = seqFrom ? Number(seqFrom) : 1;
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER; const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT); const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
@@ -327,10 +377,7 @@ export class MessagingController {
}; };
} }
const channel = await this.channelRepo.findOne({ where: { id: channelId } }); const channel = await this.assertParticipant(channelId, userId);
if (!channel) {
throw new NotFoundException('channel not found');
}
const qb = this.messageRepo const qb = this.messageRepo
.createQueryBuilder('m') .createQueryBuilder('m')

View File

@@ -2,12 +2,16 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm'; import { TypeOrmModule } from '@nestjs/typeorm';
import { MessagingController } from './messaging.controller.js'; import { MessagingController } from './messaging.controller.js';
import { Channel } from '../entities/channel.entity.js'; import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js'; import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js'; import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js'; import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
@Module({ @Module({
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord, WakeMapping])], imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])],
controllers: [MessagingController], controllers: [MessagingController],
providers: [AdminCacheService],
exports: [AdminCacheService],
}) })
export class MessagingModule {} export class MessagingModule {}

View File

@@ -11,17 +11,72 @@ import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io'; import { Server, Socket } from 'socket.io';
import { introspectGuildToken } from '../common/center-auth.js'; import { introspectGuildToken } from '../common/center-auth.js';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom'; type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
// Wakeup for non-rotating channels only (general/report/triage/custom). /**
// discuss/work go through TurnService + emitMessageTargeted, never here. * Per-recipient delivery decision for a non-rotating channel message.
// Precedence: *
// 1. the author never gets woken by their own message * • `wake` — push the event AND wake the recipient (model turn fires)
// 2. triage/custom: only wake users in the channel's wake_mapping * • `observer` — push the event with wakeup=false (silent; UI displays
// (mentions change nothing here) * but the openclaw plugin records-only without dispatch). Currently
// 3. general: if the message has an at-list, wake only the at'd users; * used for the Center admin observing triage traffic
// otherwise wake everyone * • `skip` — don't even emit the event to this recipient
// 4. report (and anything else): wake nobody *
* Wakeup-only channels (general/report/dm/custom) never return
* 'observer'; the legacy behaviour is preserved end-to-end.
*
* Precedence for triage (the only place 'skip' / 'observer' fire):
* 1. author never gets back their own message
* 2. wake_mapping (on-duty) → wake
* 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage
* mention exception')
* 4. admin (Center-scoped, at most one) → observer
* 5. everyone else → skip (was 'deliver, wakeup=false' before)
*/
export type DeliveryDecision = 'wake' | 'observer' | 'skip';
export interface ComputeDeliveryArgs {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId, or null. */
adminUserId?: string | null;
}
export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision {
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args;
if (recipientUserId === authorUserId) return 'skip';
switch (xType) {
case 'triage':
if (wakeUserIds.has(recipientUserId)) return 'wake';
if (mentionUserIds?.has(recipientUserId)) return 'wake';
if (adminUserId && recipientUserId === adminUserId) return 'observer';
return 'skip';
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer';
}
return 'wake';
case 'custom':
// wake_mapping decides who wakes; everyone else still sees the
// message (observer) — preserves the legacy "deliver to all, wake
// some" contract for custom channels.
return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer';
case 'dm':
return 'wake';
default:
// report (and anything else): deliver as observer, no wake
return 'observer';
}
}
/**
* @deprecated Use computeDelivery (returns 3-state). Kept for any
* external callers; treats 'observer' and 'skip' both as `false`.
*/
export function computeWakeup(args: { export function computeWakeup(args: {
xType: XType; xType: XType;
recipientUserId: string; recipientUserId: string;
@@ -29,20 +84,7 @@ export function computeWakeup(args: {
wakeUserIds: Set<string>; wakeUserIds: Set<string>;
mentionUserIds?: Set<string>; mentionUserIds?: Set<string>;
}): boolean { }): boolean {
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args; return computeDelivery(args) === 'wake';
if (recipientUserId === authorUserId) return false;
switch (xType) {
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId);
}
return true;
case 'triage':
case 'custom':
return wakeUserIds.has(recipientUserId);
default:
return false;
}
} }
@WebSocketGateway({ @WebSocketGateway({
@@ -93,6 +135,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
const userId = result.user.id || this.userIdFromClient(client); const userId = result.user.id || this.userIdFromClient(client);
client.data.userId = userId; client.data.userId = userId;
this.onlineUsers.add(userId); this.onlineUsers.add(userId);
// Per-user room: lets server code emit user-scoped events (e.g.
// channel.joined when membership changes) without bookkeeping a
// userId→sockets map. All of this user's sockets receive the event.
client.join(`user:${userId}`);
this.server.emit('presence.online', { this.server.emit('presence.online', {
userId, userId,
onlineCount: this.onlineUsers.size, onlineCount: this.onlineUsers.size,
@@ -168,7 +214,18 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
this.server.to(`channel:${channelId}`).emit(event, data); this.server.to(`channel:${channelId}`).emit(event, data);
} }
// Emits message.created per-recipient so each carries its own `wakeup` flag. // Emit a user-scoped event to all sockets currently connected for `userId`
// (via the `user:<userId>` room joined in handleConnection). No-op for
// offline users — the next connect's initial channel-list fetch covers it.
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
if (!userId) return;
this.server.to(`user:${userId}`).emit(event, data);
}
// Emits message.created per-recipient using the 3-state delivery
// decision (wake / observer / skip). Skipped recipients receive
// nothing — used by triage channels to keep non-on-duty / non-mention
// / non-admin users completely out of the loop.
async emitMessageCreated( async emitMessageCreated(
channelId: string, channelId: string,
data: Record<string, unknown>, data: Record<string, unknown>,
@@ -177,19 +234,28 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
authorUserId: string; authorUserId: string;
wakeUserIds: Set<string>; wakeUserIds: Set<string>;
mentionUserIds?: Set<string>; mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId (or null). */
adminUserId?: string | null;
}, },
): Promise<void> { ): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets(); const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) { for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`; const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const wakeup = computeWakeup({ const decision = computeDelivery({
xType: ctx.xType, xType: ctx.xType,
recipientUserId, recipientUserId,
authorUserId: ctx.authorUserId, authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds, wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds, mentionUserIds: ctx.mentionUserIds,
adminUserId: ctx.adminUserId,
});
if (decision === 'skip') continue;
s.emit('message.created', {
...data,
channelId,
wakeup: decision === 'wake',
xType: ctx.xType,
}); });
s.emit('message.created', { ...data, channelId, wakeup });
} }
} }