Compare commits
6 Commits
3e96de730a
...
feat/triag
| Author | SHA1 | Date | |
|---|---|---|---|
| 7cb046d785 | |||
| e635faea9c | |||
| 30069377e7 | |||
| b1f7467161 | |||
| 7e944a08f6 | |||
| e45ad91340 |
@@ -7,6 +7,7 @@
|
|||||||
"scripts": {
|
"scripts": {
|
||||||
"build": "tsc -p tsconfig.build.json",
|
"build": "tsc -p tsconfig.build.json",
|
||||||
"start": "node dist/main.js",
|
"start": "node dist/main.js",
|
||||||
|
"print:commands-key": "node dist/cli/print-commands-sync-key.js",
|
||||||
"start:dev": "ts-node src/main.ts",
|
"start:dev": "ts-node src/main.ts",
|
||||||
"lint": "eslint 'src/**/*.ts'",
|
"lint": "eslint 'src/**/*.ts'",
|
||||||
"lint:fix": "eslint 'src/**/*.ts' --fix",
|
"lint:fix": "eslint 'src/**/*.ts' --fix",
|
||||||
|
|||||||
@@ -5,8 +5,9 @@ import { Channel } from '../entities/channel.entity.js';
|
|||||||
import { ChannelMember } from '../entities/channel-member.entity.js';
|
import { ChannelMember } from '../entities/channel-member.entity.js';
|
||||||
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
||||||
import { TurnService } from './turn.service.js';
|
import { TurnService } from './turn.service.js';
|
||||||
|
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
|
||||||
|
|
||||||
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom'] as const;
|
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
|
||||||
type XType = (typeof X_TYPES)[number];
|
type XType = (typeof X_TYPES)[number];
|
||||||
|
|
||||||
type CreateChannelInput = {
|
type CreateChannelInput = {
|
||||||
@@ -35,8 +36,34 @@ export class ChannelsService {
|
|||||||
@InjectRepository(WakeMapping)
|
@InjectRepository(WakeMapping)
|
||||||
private readonly wakeRepo: Repository<WakeMapping>,
|
private readonly wakeRepo: Repository<WakeMapping>,
|
||||||
private readonly turnService: TurnService,
|
private readonly turnService: TurnService,
|
||||||
|
// RealtimeGateway is provided by the global RealtimeModule. Used to
|
||||||
|
// push channel.joined / channel.left so connected clients (e.g. the
|
||||||
|
// OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately
|
||||||
|
// instead of waiting for the polling fallback.
|
||||||
|
private readonly realtime: RealtimeGateway,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
// Push a channel membership change to each affected user's socket-room.
|
||||||
|
// Best-effort: offline users see the new state on their next connect
|
||||||
|
// (the inbound runs an initial channel-list fetch on connect).
|
||||||
|
private notifyMembership(
|
||||||
|
kind: 'joined' | 'left',
|
||||||
|
channelId: string,
|
||||||
|
userIds: string[] | Set<string>,
|
||||||
|
extra: Record<string, unknown> = {},
|
||||||
|
): void {
|
||||||
|
const ids = userIds instanceof Set ? [...userIds] : userIds;
|
||||||
|
const payload = {
|
||||||
|
channelId,
|
||||||
|
...extra,
|
||||||
|
occurredAt: new Date().toISOString(),
|
||||||
|
};
|
||||||
|
for (const u of ids) {
|
||||||
|
if (!u) continue;
|
||||||
|
this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Channels visible to a user within a guild:
|
// Channels visible to a user within a guild:
|
||||||
// - every public channel of the guild (incl. ones created before the user
|
// - every public channel of the guild (incl. ones created before the user
|
||||||
// joined the guild), OR
|
// joined the guild), OR
|
||||||
@@ -93,6 +120,7 @@ export class ChannelsService {
|
|||||||
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
||||||
await this.turnService.onMemberAdded(channelId, userId);
|
await this.turnService.onMemberAdded(channelId, userId);
|
||||||
}
|
}
|
||||||
|
this.notifyMembership('joined', channelId, [userId], { xType: channel.xType });
|
||||||
}
|
}
|
||||||
return { status: 'ok', channelId, userId, member: true };
|
return { status: 'ok', channelId, userId, member: true };
|
||||||
}
|
}
|
||||||
@@ -102,11 +130,14 @@ export class ChannelsService {
|
|||||||
if (!channel) throw new NotFoundException('channel not found');
|
if (!channel) throw new NotFoundException('channel not found');
|
||||||
|
|
||||||
// remove every channel-scoped row that references this user
|
// remove every channel-scoped row that references this user
|
||||||
await this.memberRepo.delete({ channelId, userId });
|
const deleted = await this.memberRepo.delete({ channelId, userId });
|
||||||
await this.wakeRepo.delete({ channelId, userId });
|
await this.wakeRepo.delete({ channelId, userId });
|
||||||
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
if (channel.xType === 'discuss' || channel.xType === 'work') {
|
||||||
await this.turnService.onMemberRemoved(channelId, userId);
|
await this.turnService.onMemberRemoved(channelId, userId);
|
||||||
}
|
}
|
||||||
|
if ((deleted.affected ?? 0) > 0) {
|
||||||
|
this.notifyMembership('left', channelId, [userId], { xType: channel.xType });
|
||||||
|
}
|
||||||
return { status: 'ok', channelId, userId, member: false };
|
return { status: 'ok', channelId, userId, member: false };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,14 +161,19 @@ export class ChannelsService {
|
|||||||
.map((x) => String(x ?? '').trim())
|
.map((x) => String(x ?? '').trim())
|
||||||
.filter(Boolean);
|
.filter(Boolean);
|
||||||
|
|
||||||
|
// dm channels are always private (a 1:1 conversation); never public.
|
||||||
|
// dm is not unique — multiple dm channels between the same users are
|
||||||
|
// allowed (create() always makes a fresh one, no dedup).
|
||||||
|
const isPublic = xType === 'dm' ? false : Boolean(input.isPublic);
|
||||||
|
|
||||||
const channel = await this.channelRepo.save(
|
const channel = await this.channelRepo.save(
|
||||||
this.channelRepo.create({
|
this.channelRepo.create({
|
||||||
guildId,
|
guildId,
|
||||||
name,
|
name,
|
||||||
xType,
|
xType,
|
||||||
kind: input.kind === 'announcement' ? 'announcement' : 'text',
|
kind: input.kind === 'announcement' ? 'announcement' : 'text',
|
||||||
isPrivate: !input.isPublic,
|
isPrivate: !isPublic,
|
||||||
isPublic: Boolean(input.isPublic),
|
isPublic,
|
||||||
lastSeq: 0,
|
lastSeq: 0,
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
@@ -155,6 +191,12 @@ export class ChannelsService {
|
|||||||
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
|
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Push channel.joined to every seeded member (creator + invitees +
|
||||||
|
// triage on-duty) so their connected sockets sub the new room
|
||||||
|
// immediately. Skips offline users — next connect's channel-list
|
||||||
|
// fetch covers them.
|
||||||
|
this.notifyMembership('joined', channel.id, memberIds, { xType });
|
||||||
|
|
||||||
// wake_mapping: triage -> the on-duty user; custom -> each listener
|
// wake_mapping: triage -> the on-duty user; custom -> each listener
|
||||||
const wakeUserIds = new Set<string>();
|
const wakeUserIds = new Set<string>();
|
||||||
if (xType === 'triage') wakeUserIds.add(onDuty);
|
if (xType === 'triage') wakeUserIds.add(onDuty);
|
||||||
|
|||||||
39
src/cli/admin-refresh.ts
Normal file
39
src/cli/admin-refresh.ts
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
// Operator convenience: force-refresh the in-memory Center admin cache
|
||||||
|
// without waiting for the 1-day TTL. Used after `center user set-admin`
|
||||||
|
// to make new admin visible immediately to triage delivery.
|
||||||
|
//
|
||||||
|
// Usage (inside the deployed container):
|
||||||
|
// docker exec fabric-backend-guild node dist/cli/admin-refresh.js
|
||||||
|
//
|
||||||
|
// Prints the (possibly null) result as JSON. Exit 0 always — a "no
|
||||||
|
// admin" outcome is a valid state, not an error.
|
||||||
|
|
||||||
|
import 'reflect-metadata';
|
||||||
|
import { NestFactory } from '@nestjs/core';
|
||||||
|
import { AppModule } from '../app.module.js';
|
||||||
|
import { AdminCacheService } from '../common/admin-cache.service.js';
|
||||||
|
|
||||||
|
async function main() {
|
||||||
|
const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] });
|
||||||
|
try {
|
||||||
|
const cache = app.get(AdminCacheService);
|
||||||
|
const before = cache.snapshot();
|
||||||
|
const after = await cache.get(true);
|
||||||
|
process.stdout.write(
|
||||||
|
JSON.stringify({
|
||||||
|
ok: true,
|
||||||
|
before,
|
||||||
|
after,
|
||||||
|
changed: JSON.stringify(before) !== JSON.stringify(after),
|
||||||
|
}) + '\n',
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
await app.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void main().catch((error: unknown) => {
|
||||||
|
const message = error instanceof Error ? error.message : 'unknown error';
|
||||||
|
process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n');
|
||||||
|
process.exit(1);
|
||||||
|
});
|
||||||
37
src/cli/print-commands-sync-key.ts
Normal file
37
src/cli/print-commands-sync-key.ts
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
// Operator convenience (Guild C-2): print the commands-sync key that this
|
||||||
|
// guild process actually has in its environment, so it can be copied into
|
||||||
|
// the OpenClaw plugin's FABRIC_COMMANDS_SYNC_KEY.
|
||||||
|
//
|
||||||
|
// Usage (inside the deployed container — authoritative, reflects compose):
|
||||||
|
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js
|
||||||
|
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js --export
|
||||||
|
//
|
||||||
|
// Default: prints the raw value only (so KEY=$(... ) works).
|
||||||
|
// --export: prints `FABRIC_COMMANDS_SYNC_KEY=<value>` for pasting.
|
||||||
|
// Exit 1 (no stdout) when unset — guild is then in the weaker
|
||||||
|
// "any authenticated user" fallback for PUT /commands.
|
||||||
|
|
||||||
|
const args = new Set(process.argv.slice(2));
|
||||||
|
|
||||||
|
if (args.has('--help') || args.has('-h')) {
|
||||||
|
process.stderr.write(
|
||||||
|
'print-commands-sync-key: outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY\n' +
|
||||||
|
' (no flag) print the raw key value\n' +
|
||||||
|
' --export print FABRIC_COMMANDS_SYNC_KEY=<value>\n',
|
||||||
|
);
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
const key = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
|
||||||
|
|
||||||
|
if (!key) {
|
||||||
|
process.stderr.write(
|
||||||
|
'FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is not set — PUT /commands is in ' +
|
||||||
|
'the fallback mode (any authenticated user). Set it to harden (Guild C-2).\n',
|
||||||
|
);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
process.stdout.write(
|
||||||
|
(args.has('--export') ? `FABRIC_COMMANDS_SYNC_KEY=${key}` : key) + '\n',
|
||||||
|
);
|
||||||
@@ -1,29 +1,51 @@
|
|||||||
import {
|
import {
|
||||||
Body,
|
Body,
|
||||||
Controller,
|
Controller,
|
||||||
|
ForbiddenException,
|
||||||
Get,
|
Get,
|
||||||
|
Headers,
|
||||||
Put,
|
Put,
|
||||||
Req,
|
Req,
|
||||||
UnauthorizedException,
|
UnauthorizedException,
|
||||||
} from '@nestjs/common';
|
} from '@nestjs/common';
|
||||||
|
import { timingSafeEqual } from 'node:crypto';
|
||||||
import { CommandsService } from './commands.service.js';
|
import { CommandsService } from './commands.service.js';
|
||||||
|
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
|
||||||
|
|
||||||
type AuthedRequest = { userId?: string };
|
type AuthedRequest = { userId?: string };
|
||||||
|
|
||||||
|
function safeEqual(a: string, b: string): boolean {
|
||||||
|
const ab = Buffer.from(a);
|
||||||
|
const bb = Buffer.from(b);
|
||||||
|
if (ab.length !== bb.length) return false;
|
||||||
|
return timingSafeEqual(ab, bb);
|
||||||
|
}
|
||||||
|
|
||||||
@Controller('commands')
|
@Controller('commands')
|
||||||
export class CommandsController {
|
export class CommandsController {
|
||||||
constructor(private readonly commands: CommandsService) {}
|
constructor(private readonly commands: CommandsService) {}
|
||||||
|
|
||||||
// Plugin syncs the OpenClaw native-command catalog here (any authenticated
|
// Guild C-2: catalog write is privileged. When
|
||||||
// agent/user; idempotent full replace).
|
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is configured (recommended in
|
||||||
|
// production), the caller must present a matching x-commands-sync-key
|
||||||
|
// header — this restricts writes to the OpenClaw plugin. When unset, we
|
||||||
|
// fall back to "any authenticated agent/user" (never weaker than before).
|
||||||
|
// The body is always strictly validated + size-capped via SyncCommandsDto.
|
||||||
@Put()
|
@Put()
|
||||||
sync(
|
sync(
|
||||||
@Req() req: AuthedRequest,
|
@Req() req: AuthedRequest,
|
||||||
@Body() body: { commands?: unknown[] },
|
@Body() body: SyncCommandsDto,
|
||||||
|
@Headers('x-commands-sync-key') syncKey?: string,
|
||||||
) {
|
) {
|
||||||
if (!req.userId) throw new UnauthorizedException('missing user');
|
const configured = process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '';
|
||||||
const commands = Array.isArray(body?.commands) ? body.commands : [];
|
if (configured) {
|
||||||
return this.commands.sync(commands);
|
if (!syncKey || !safeEqual(syncKey, configured)) {
|
||||||
|
throw new ForbiddenException('invalid commands sync key');
|
||||||
|
}
|
||||||
|
} else if (!req.userId) {
|
||||||
|
throw new UnauthorizedException('missing user');
|
||||||
|
}
|
||||||
|
return this.commands.sync(body.commands as unknown[]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Frontend reads the catalog to drive `/` autocomplete.
|
// Frontend reads the catalog to drive `/` autocomplete.
|
||||||
|
|||||||
102
src/commands/dto.sync-commands.dto.ts
Normal file
102
src/commands/dto.sync-commands.dto.ts
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
import {
|
||||||
|
ArrayMaxSize,
|
||||||
|
IsArray,
|
||||||
|
IsBoolean,
|
||||||
|
IsOptional,
|
||||||
|
IsString,
|
||||||
|
MaxLength,
|
||||||
|
ValidateNested,
|
||||||
|
} from 'class-validator';
|
||||||
|
import { Type } from 'class-transformer';
|
||||||
|
|
||||||
|
// Guild C-2: the slash-command catalog is guild-global and rendered by the
|
||||||
|
// frontend `/` autocomplete. Without a strict schema + caps a single
|
||||||
|
// authenticated caller could poison it or blow up the DB / clients.
|
||||||
|
// The global ValidationPipe runs with { whitelist, forbidNonWhitelisted },
|
||||||
|
// so any unknown field is rejected.
|
||||||
|
|
||||||
|
class CommandChoiceDto {
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(200)
|
||||||
|
value!: string;
|
||||||
|
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(200)
|
||||||
|
label!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class CommandArgDto {
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(100)
|
||||||
|
name!: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(500)
|
||||||
|
description?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(40)
|
||||||
|
type?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
required?: boolean;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
captureRemaining?: boolean;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
preferAutocomplete?: boolean;
|
||||||
|
|
||||||
|
// null when there are no choices (plugin sends explicit null).
|
||||||
|
@IsOptional()
|
||||||
|
@IsArray()
|
||||||
|
@ArrayMaxSize(100)
|
||||||
|
@ValidateNested({ each: true })
|
||||||
|
@Type(() => CommandChoiceDto)
|
||||||
|
choices?: CommandChoiceDto[] | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
class CommandSpecDto {
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(100)
|
||||||
|
name!: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(100)
|
||||||
|
nativeName?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(500)
|
||||||
|
description?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsBoolean()
|
||||||
|
acceptsArgs?: boolean;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsArray()
|
||||||
|
@ArrayMaxSize(50)
|
||||||
|
@ValidateNested({ each: true })
|
||||||
|
@Type(() => CommandArgDto)
|
||||||
|
args?: CommandArgDto[];
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(20)
|
||||||
|
argsParsing?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class SyncCommandsDto {
|
||||||
|
@IsArray()
|
||||||
|
@ArrayMaxSize(200)
|
||||||
|
@ValidateNested({ each: true })
|
||||||
|
@Type(() => CommandSpecDto)
|
||||||
|
commands!: CommandSpecDto[];
|
||||||
|
}
|
||||||
73
src/common/admin-cache.service.ts
Normal file
73
src/common/admin-cache.service.ts
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
/**
|
||||||
|
* Center-scoped admin cache.
|
||||||
|
*
|
||||||
|
* Holds the at-most-one admin user (email + userId) fetched from Center.
|
||||||
|
* Used to decide who to deliver triage messages to as a silent observer
|
||||||
|
* (wake=false), regardless of on-duty / mention status.
|
||||||
|
*
|
||||||
|
* Refresh policy (per spec, 2026-05-22):
|
||||||
|
* • TTL = 1 day. Center admin changes are rare; agents tolerate a
|
||||||
|
* day's stale cache without surprises
|
||||||
|
* • on first lookup the cache lazy-fetches
|
||||||
|
* • cli `admin refresh` forces an out-of-band refresh without waiting
|
||||||
|
* for TTL expiry
|
||||||
|
*
|
||||||
|
* Failure mode: a Center fetch error is treated identically to "no
|
||||||
|
* admin" — guild keeps operating without an observer. The cache holds
|
||||||
|
* the failed-fetch decision for the same TTL so we don't hammer Center.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
|
import { fetchAdminEmail } from './center-auth.js';
|
||||||
|
|
||||||
|
const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000;
|
||||||
|
|
||||||
|
export interface CachedAdmin {
|
||||||
|
email: string;
|
||||||
|
userId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class AdminCacheService {
|
||||||
|
private readonly logger = new Logger(AdminCacheService.name);
|
||||||
|
private cached: CachedAdmin | null = null;
|
||||||
|
private cachedAt = 0;
|
||||||
|
private inflight: Promise<CachedAdmin | null> | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the cached admin, fetching from Center if the cache is empty
|
||||||
|
* or older than the TTL. Returns null if no admin is set.
|
||||||
|
*
|
||||||
|
* `force=true` bypasses the cache and refreshes immediately — used by
|
||||||
|
* the cli refresh command.
|
||||||
|
*/
|
||||||
|
async get(force = false): Promise<CachedAdmin | null> {
|
||||||
|
const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS;
|
||||||
|
if (!force && this.cachedAt > 0 && fresh) {
|
||||||
|
return this.cached;
|
||||||
|
}
|
||||||
|
if (this.inflight) return this.inflight;
|
||||||
|
|
||||||
|
this.inflight = (async () => {
|
||||||
|
try {
|
||||||
|
const result = await fetchAdminEmail();
|
||||||
|
this.cached = result;
|
||||||
|
this.cachedAt = Date.now();
|
||||||
|
this.logger.log(
|
||||||
|
`admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`,
|
||||||
|
);
|
||||||
|
return result;
|
||||||
|
} finally {
|
||||||
|
this.inflight = null;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
return this.inflight;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Snapshot of the cached admin (no fetch). Returns null if not yet
|
||||||
|
* populated. Used by the hot delivery path which doesn't want to
|
||||||
|
* block on a Center round-trip. */
|
||||||
|
snapshot(): CachedAdmin | null {
|
||||||
|
return this.cached;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -26,6 +26,31 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch the single Center-scoped admin user (if any).
|
||||||
|
* Same x-api-key auth as introspect / resolve-names.
|
||||||
|
* Returns `null` when no admin is set OR the request fails (treated
|
||||||
|
* identically — the guild simply falls back to "no admin observer").
|
||||||
|
*/
|
||||||
|
export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> {
|
||||||
|
const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL;
|
||||||
|
const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY;
|
||||||
|
if (!centerBaseUrl || !centerApiKey) return null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, {
|
||||||
|
method: 'GET',
|
||||||
|
headers: { 'x-api-key': centerApiKey },
|
||||||
|
});
|
||||||
|
if (!res.ok) return null;
|
||||||
|
const data = (await res.json()) as { email?: string; userId?: string } | null;
|
||||||
|
if (!data || !data.email || !data.userId) return null;
|
||||||
|
return { email: data.email, userId: data.userId };
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Resolve <@user.name:NAME> names to userIds within this guild node via
|
// Resolve <@user.name:NAME> names to userIds within this guild node via
|
||||||
// Center. Unresolved names are simply absent from the returned map.
|
// Center. Unresolved names are simply absent from the returned map.
|
||||||
export async function resolveUserNames(names: string[]): Promise<Record<string, string>> {
|
export async function resolveUserNames(names: string[]): Promise<Record<string, string>> {
|
||||||
|
|||||||
@@ -16,9 +16,9 @@ export class Channel {
|
|||||||
@Column({
|
@Column({
|
||||||
name: 'x_type',
|
name: 'x_type',
|
||||||
type: 'enum',
|
type: 'enum',
|
||||||
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom'],
|
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
|
||||||
})
|
})
|
||||||
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
|
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
|
||||||
|
|
||||||
@Column({ type: 'varchar', length: 16, default: 'text' })
|
@Column({ type: 'varchar', length: 16, default: 'text' })
|
||||||
kind!: 'text' | 'announcement';
|
kind!: 'text' | 'announcement';
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import {
|
|||||||
ConflictException,
|
ConflictException,
|
||||||
Controller,
|
Controller,
|
||||||
Delete,
|
Delete,
|
||||||
|
ForbiddenException,
|
||||||
Get,
|
Get,
|
||||||
Headers,
|
Headers,
|
||||||
NotFoundException,
|
NotFoundException,
|
||||||
@@ -10,14 +11,17 @@ import {
|
|||||||
Patch,
|
Patch,
|
||||||
Post,
|
Post,
|
||||||
Query,
|
Query,
|
||||||
|
Req,
|
||||||
} from '@nestjs/common';
|
} from '@nestjs/common';
|
||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
import { DataSource, Repository } from 'typeorm';
|
import { DataSource, Repository } from 'typeorm';
|
||||||
import { CreateMessageDto } from './dto.create-message.dto.js';
|
import { CreateMessageDto } from './dto.create-message.dto.js';
|
||||||
import { Channel } from '../entities/channel.entity.js';
|
import { Channel } from '../entities/channel.entity.js';
|
||||||
|
import { ChannelMember } from '../entities/channel-member.entity.js';
|
||||||
import { Message } from '../entities/message.entity.js';
|
import { Message } from '../entities/message.entity.js';
|
||||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
|
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
|
||||||
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
||||||
|
import { AdminCacheService } from '../common/admin-cache.service.js';
|
||||||
import { parseSlashCommand } from '../channels/slash-commands.js';
|
import { parseSlashCommand } from '../channels/slash-commands.js';
|
||||||
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
|
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
|
||||||
import { resolveUserNames } from '../common/center-auth.js';
|
import { resolveUserNames } from '../common/center-auth.js';
|
||||||
@@ -36,6 +40,8 @@ export class MessagingController {
|
|||||||
private readonly dataSource: DataSource,
|
private readonly dataSource: DataSource,
|
||||||
@InjectRepository(Channel)
|
@InjectRepository(Channel)
|
||||||
private readonly channelRepo: Repository<Channel>,
|
private readonly channelRepo: Repository<Channel>,
|
||||||
|
@InjectRepository(ChannelMember)
|
||||||
|
private readonly memberRepo: Repository<ChannelMember>,
|
||||||
@InjectRepository(Message)
|
@InjectRepository(Message)
|
||||||
private readonly messageRepo: Repository<Message>,
|
private readonly messageRepo: Repository<Message>,
|
||||||
@InjectRepository(IdempotencyRecord)
|
@InjectRepository(IdempotencyRecord)
|
||||||
@@ -45,6 +51,7 @@ export class MessagingController {
|
|||||||
private readonly turn: TurnService,
|
private readonly turn: TurnService,
|
||||||
private readonly events: EventsService,
|
private readonly events: EventsService,
|
||||||
private readonly realtime: RealtimeGateway,
|
private readonly realtime: RealtimeGateway,
|
||||||
|
private readonly adminCache: AdminCacheService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
private async getIdempotentResponse(
|
private async getIdempotentResponse(
|
||||||
@@ -86,6 +93,19 @@ export class MessagingController {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Channel-participant gate (Guild C-1): public channels are readable/
|
||||||
|
// writable by any authenticated user; private channels require explicit
|
||||||
|
// channel_members membership. Returns the channel so callers can reuse it.
|
||||||
|
private async assertParticipant(channelId: string, userId: string): Promise<Channel> {
|
||||||
|
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||||
|
if (!channel) throw new NotFoundException('channel not found');
|
||||||
|
if (channel.isPublic) return channel;
|
||||||
|
if (!userId) throw new ForbiddenException('not a channel member');
|
||||||
|
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
|
||||||
|
if (!member) throw new ForbiddenException('not a channel member');
|
||||||
|
return channel;
|
||||||
|
}
|
||||||
|
|
||||||
// Persists one message (allocates a seq under a channel row lock) and
|
// Persists one message (allocates a seq under a channel row lock) and
|
||||||
// returns its view. Used for normal messages and for guild /ack messages.
|
// returns its view. Used for normal messages and for guild /ack messages.
|
||||||
private async persistMessage(
|
private async persistMessage(
|
||||||
@@ -136,20 +156,25 @@ export class MessagingController {
|
|||||||
async create(
|
async create(
|
||||||
@Param('id') channelId: string,
|
@Param('id') channelId: string,
|
||||||
@Body() body: CreateMessageDto,
|
@Body() body: CreateMessageDto,
|
||||||
|
@Req() req: { userId?: string },
|
||||||
@Headers('idempotency-key') idempotencyKey?: string,
|
@Headers('idempotency-key') idempotencyKey?: string,
|
||||||
) {
|
) {
|
||||||
const scope = `POST:/channels/${channelId}/messages`;
|
const scope = `POST:/channels/${channelId}/messages`;
|
||||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||||
if (existed) return existed;
|
if (existed) return existed;
|
||||||
|
|
||||||
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
// Guild C-1: caller must be a participant of the channel, and the
|
||||||
if (!channel) throw new NotFoundException('channel not found');
|
// author is always the authenticated user — body.authorUserId is
|
||||||
|
// ignored so a caller can never post as someone else.
|
||||||
|
const userId = String(req.userId ?? '');
|
||||||
|
if (!userId) throw new ForbiddenException('missing user');
|
||||||
|
const channel = await this.assertParticipant(channelId, userId);
|
||||||
if (channel.closed) {
|
if (channel.closed) {
|
||||||
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
|
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
|
||||||
}
|
}
|
||||||
const xType = channel.xType ?? 'general';
|
const xType = channel.xType ?? 'general';
|
||||||
const isRotating = xType === 'discuss' || xType === 'work';
|
const isRotating = xType === 'discuss' || xType === 'work';
|
||||||
const authorUserId = String(body.authorUserId ?? 'anonymous');
|
const authorUserId = userId;
|
||||||
|
|
||||||
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
|
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
|
||||||
// Center before anything else persists/parses the content
|
// Center before anything else persists/parses the content
|
||||||
@@ -202,16 +227,19 @@ export class MessagingController {
|
|||||||
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
|
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
|
||||||
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
|
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
|
||||||
} else {
|
} else {
|
||||||
// general/report/triage/custom: wakeup from x_type + wake_mapping;
|
// general/report/triage/custom: 3-state delivery
|
||||||
// general also honors the message's at-list
|
// (wake / observer / skip) — see realtime.gateway.computeDelivery.
|
||||||
|
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
|
||||||
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
|
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
|
||||||
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
|
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
|
||||||
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
|
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
|
||||||
|
const admin = await this.adminCache.get();
|
||||||
await this.realtime.emitMessageCreated(channelId, responseBody, {
|
await this.realtime.emitMessageCreated(channelId, responseBody, {
|
||||||
xType,
|
xType,
|
||||||
authorUserId,
|
authorUserId,
|
||||||
wakeUserIds,
|
wakeUserIds,
|
||||||
mentionUserIds,
|
mentionUserIds,
|
||||||
|
adminUserId: admin?.userId ?? null,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,14 +251,23 @@ export class MessagingController {
|
|||||||
@Param('id') channelId: string,
|
@Param('id') channelId: string,
|
||||||
@Param('messageId') messageId: string,
|
@Param('messageId') messageId: string,
|
||||||
@Body() body: { content?: string },
|
@Body() body: { content?: string },
|
||||||
|
@Req() req: { userId?: string },
|
||||||
@Headers('idempotency-key') idempotencyKey?: string,
|
@Headers('idempotency-key') idempotencyKey?: string,
|
||||||
) {
|
) {
|
||||||
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
|
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
|
||||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||||
if (existed) return existed;
|
if (existed) return existed;
|
||||||
|
|
||||||
|
// Guild C-1: participant + author-ownership.
|
||||||
|
const userId = String(req.userId ?? '');
|
||||||
|
if (!userId) throw new ForbiddenException('missing user');
|
||||||
|
await this.assertParticipant(channelId, userId);
|
||||||
|
|
||||||
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
||||||
if (!item) return { status: 'not_found' };
|
if (!item) return { status: 'not_found' };
|
||||||
|
if (item.authorUserId !== userId) {
|
||||||
|
throw new ForbiddenException('not the message author');
|
||||||
|
}
|
||||||
|
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const createdAt = new Date(item.createdAt).getTime();
|
const createdAt = new Date(item.createdAt).getTime();
|
||||||
@@ -259,14 +296,23 @@ export class MessagingController {
|
|||||||
async remove(
|
async remove(
|
||||||
@Param('id') channelId: string,
|
@Param('id') channelId: string,
|
||||||
@Param('messageId') messageId: string,
|
@Param('messageId') messageId: string,
|
||||||
|
@Req() req: { userId?: string },
|
||||||
@Headers('idempotency-key') idempotencyKey?: string,
|
@Headers('idempotency-key') idempotencyKey?: string,
|
||||||
) {
|
) {
|
||||||
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
|
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
|
||||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||||
if (existed) return existed;
|
if (existed) return existed;
|
||||||
|
|
||||||
|
// Guild C-1: participant + author-ownership.
|
||||||
|
const userId = String(req.userId ?? '');
|
||||||
|
if (!userId) throw new ForbiddenException('missing user');
|
||||||
|
await this.assertParticipant(channelId, userId);
|
||||||
|
|
||||||
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
||||||
if (!item) return { status: 'not_found' };
|
if (!item) return { status: 'not_found' };
|
||||||
|
if (item.authorUserId !== userId) {
|
||||||
|
throw new ForbiddenException('not the message author');
|
||||||
|
}
|
||||||
|
|
||||||
item.isDeleted = true;
|
item.isDeleted = true;
|
||||||
item.deletedAt = new Date();
|
item.deletedAt = new Date();
|
||||||
@@ -304,10 +350,14 @@ export class MessagingController {
|
|||||||
@Get()
|
@Get()
|
||||||
async listBySeq(
|
async listBySeq(
|
||||||
@Param('id') channelId: string,
|
@Param('id') channelId: string,
|
||||||
|
@Req() req: { userId?: string },
|
||||||
@Query('seq_from') seqFrom?: string,
|
@Query('seq_from') seqFrom?: string,
|
||||||
@Query('seq_to') seqTo?: string,
|
@Query('seq_to') seqTo?: string,
|
||||||
@Query('limit') limit?: string,
|
@Query('limit') limit?: string,
|
||||||
) {
|
) {
|
||||||
|
// Guild C-1: only participants may read channel history.
|
||||||
|
const userId = String(req.userId ?? '');
|
||||||
|
if (!userId) throw new ForbiddenException('missing user');
|
||||||
const from = seqFrom ? Number(seqFrom) : 1;
|
const from = seqFrom ? Number(seqFrom) : 1;
|
||||||
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
|
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
|
||||||
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
|
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
|
||||||
@@ -327,10 +377,7 @@ export class MessagingController {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
const channel = await this.assertParticipant(channelId, userId);
|
||||||
if (!channel) {
|
|
||||||
throw new NotFoundException('channel not found');
|
|
||||||
}
|
|
||||||
|
|
||||||
const qb = this.messageRepo
|
const qb = this.messageRepo
|
||||||
.createQueryBuilder('m')
|
.createQueryBuilder('m')
|
||||||
|
|||||||
@@ -2,12 +2,16 @@ import { Module } from '@nestjs/common';
|
|||||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||||
import { MessagingController } from './messaging.controller.js';
|
import { MessagingController } from './messaging.controller.js';
|
||||||
import { Channel } from '../entities/channel.entity.js';
|
import { Channel } from '../entities/channel.entity.js';
|
||||||
|
import { ChannelMember } from '../entities/channel-member.entity.js';
|
||||||
import { Message } from '../entities/message.entity.js';
|
import { Message } from '../entities/message.entity.js';
|
||||||
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
|
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
|
||||||
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
||||||
|
import { AdminCacheService } from '../common/admin-cache.service.js';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord, WakeMapping])],
|
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])],
|
||||||
controllers: [MessagingController],
|
controllers: [MessagingController],
|
||||||
|
providers: [AdminCacheService],
|
||||||
|
exports: [AdminCacheService],
|
||||||
})
|
})
|
||||||
export class MessagingModule {}
|
export class MessagingModule {}
|
||||||
|
|||||||
@@ -11,17 +11,72 @@ import { Logger } from '@nestjs/common';
|
|||||||
import { Server, Socket } from 'socket.io';
|
import { Server, Socket } from 'socket.io';
|
||||||
import { introspectGuildToken } from '../common/center-auth.js';
|
import { introspectGuildToken } from '../common/center-auth.js';
|
||||||
|
|
||||||
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
|
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
|
||||||
|
|
||||||
// Wakeup for non-rotating channels only (general/report/triage/custom).
|
/**
|
||||||
// discuss/work go through TurnService + emitMessageTargeted, never here.
|
* Per-recipient delivery decision for a non-rotating channel message.
|
||||||
// Precedence:
|
*
|
||||||
// 1. the author never gets woken by their own message
|
* • `wake` — push the event AND wake the recipient (model turn fires)
|
||||||
// 2. triage/custom: only wake users in the channel's wake_mapping
|
* • `observer` — push the event with wakeup=false (silent; UI displays
|
||||||
// (mentions change nothing here)
|
* but the openclaw plugin records-only without dispatch). Currently
|
||||||
// 3. general: if the message has an at-list, wake only the at'd users;
|
* used for the Center admin observing triage traffic
|
||||||
// otherwise wake everyone
|
* • `skip` — don't even emit the event to this recipient
|
||||||
// 4. report (and anything else): wake nobody
|
*
|
||||||
|
* Wakeup-only channels (general/report/dm/custom) never return
|
||||||
|
* 'observer'; the legacy behaviour is preserved end-to-end.
|
||||||
|
*
|
||||||
|
* Precedence for triage (the only place 'skip' / 'observer' fire):
|
||||||
|
* 1. author never gets back their own message
|
||||||
|
* 2. wake_mapping (on-duty) → wake
|
||||||
|
* 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage
|
||||||
|
* mention exception')
|
||||||
|
* 4. admin (Center-scoped, at most one) → observer
|
||||||
|
* 5. everyone else → skip (was 'deliver, wakeup=false' before)
|
||||||
|
*/
|
||||||
|
export type DeliveryDecision = 'wake' | 'observer' | 'skip';
|
||||||
|
|
||||||
|
export interface ComputeDeliveryArgs {
|
||||||
|
xType: XType;
|
||||||
|
recipientUserId: string;
|
||||||
|
authorUserId: string;
|
||||||
|
wakeUserIds: Set<string>;
|
||||||
|
mentionUserIds?: Set<string>;
|
||||||
|
/** Single Center-scoped admin userId, or null. */
|
||||||
|
adminUserId?: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision {
|
||||||
|
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId } = args;
|
||||||
|
if (recipientUserId === authorUserId) return 'skip';
|
||||||
|
|
||||||
|
switch (xType) {
|
||||||
|
case 'triage':
|
||||||
|
if (wakeUserIds.has(recipientUserId)) return 'wake';
|
||||||
|
if (mentionUserIds?.has(recipientUserId)) return 'wake';
|
||||||
|
if (adminUserId && recipientUserId === adminUserId) return 'observer';
|
||||||
|
return 'skip';
|
||||||
|
case 'general':
|
||||||
|
if (mentionUserIds && mentionUserIds.size > 0) {
|
||||||
|
return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer';
|
||||||
|
}
|
||||||
|
return 'wake';
|
||||||
|
case 'custom':
|
||||||
|
// wake_mapping decides who wakes; everyone else still sees the
|
||||||
|
// message (observer) — preserves the legacy "deliver to all, wake
|
||||||
|
// some" contract for custom channels.
|
||||||
|
return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer';
|
||||||
|
case 'dm':
|
||||||
|
return 'wake';
|
||||||
|
default:
|
||||||
|
// report (and anything else): deliver as observer, no wake
|
||||||
|
return 'observer';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use computeDelivery (returns 3-state). Kept for any
|
||||||
|
* external callers; treats 'observer' and 'skip' both as `false`.
|
||||||
|
*/
|
||||||
export function computeWakeup(args: {
|
export function computeWakeup(args: {
|
||||||
xType: XType;
|
xType: XType;
|
||||||
recipientUserId: string;
|
recipientUserId: string;
|
||||||
@@ -29,20 +84,7 @@ export function computeWakeup(args: {
|
|||||||
wakeUserIds: Set<string>;
|
wakeUserIds: Set<string>;
|
||||||
mentionUserIds?: Set<string>;
|
mentionUserIds?: Set<string>;
|
||||||
}): boolean {
|
}): boolean {
|
||||||
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds } = args;
|
return computeDelivery(args) === 'wake';
|
||||||
if (recipientUserId === authorUserId) return false;
|
|
||||||
switch (xType) {
|
|
||||||
case 'general':
|
|
||||||
if (mentionUserIds && mentionUserIds.size > 0) {
|
|
||||||
return mentionUserIds.has(recipientUserId);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
case 'triage':
|
|
||||||
case 'custom':
|
|
||||||
return wakeUserIds.has(recipientUserId);
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@WebSocketGateway({
|
@WebSocketGateway({
|
||||||
@@ -93,6 +135,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
|||||||
const userId = result.user.id || this.userIdFromClient(client);
|
const userId = result.user.id || this.userIdFromClient(client);
|
||||||
client.data.userId = userId;
|
client.data.userId = userId;
|
||||||
this.onlineUsers.add(userId);
|
this.onlineUsers.add(userId);
|
||||||
|
// Per-user room: lets server code emit user-scoped events (e.g.
|
||||||
|
// channel.joined when membership changes) without bookkeeping a
|
||||||
|
// userId→sockets map. All of this user's sockets receive the event.
|
||||||
|
client.join(`user:${userId}`);
|
||||||
this.server.emit('presence.online', {
|
this.server.emit('presence.online', {
|
||||||
userId,
|
userId,
|
||||||
onlineCount: this.onlineUsers.size,
|
onlineCount: this.onlineUsers.size,
|
||||||
@@ -168,7 +214,18 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
|||||||
this.server.to(`channel:${channelId}`).emit(event, data);
|
this.server.to(`channel:${channelId}`).emit(event, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emits message.created per-recipient so each carries its own `wakeup` flag.
|
// Emit a user-scoped event to all sockets currently connected for `userId`
|
||||||
|
// (via the `user:<userId>` room joined in handleConnection). No-op for
|
||||||
|
// offline users — the next connect's initial channel-list fetch covers it.
|
||||||
|
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
|
||||||
|
if (!userId) return;
|
||||||
|
this.server.to(`user:${userId}`).emit(event, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emits message.created per-recipient using the 3-state delivery
|
||||||
|
// decision (wake / observer / skip). Skipped recipients receive
|
||||||
|
// nothing — used by triage channels to keep non-on-duty / non-mention
|
||||||
|
// / non-admin users completely out of the loop.
|
||||||
async emitMessageCreated(
|
async emitMessageCreated(
|
||||||
channelId: string,
|
channelId: string,
|
||||||
data: Record<string, unknown>,
|
data: Record<string, unknown>,
|
||||||
@@ -177,19 +234,28 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
|
|||||||
authorUserId: string;
|
authorUserId: string;
|
||||||
wakeUserIds: Set<string>;
|
wakeUserIds: Set<string>;
|
||||||
mentionUserIds?: Set<string>;
|
mentionUserIds?: Set<string>;
|
||||||
|
/** Single Center-scoped admin userId (or null). */
|
||||||
|
adminUserId?: string | null;
|
||||||
},
|
},
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
|
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
|
||||||
for (const s of sockets) {
|
for (const s of sockets) {
|
||||||
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
|
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
|
||||||
const wakeup = computeWakeup({
|
const decision = computeDelivery({
|
||||||
xType: ctx.xType,
|
xType: ctx.xType,
|
||||||
recipientUserId,
|
recipientUserId,
|
||||||
authorUserId: ctx.authorUserId,
|
authorUserId: ctx.authorUserId,
|
||||||
wakeUserIds: ctx.wakeUserIds,
|
wakeUserIds: ctx.wakeUserIds,
|
||||||
mentionUserIds: ctx.mentionUserIds,
|
mentionUserIds: ctx.mentionUserIds,
|
||||||
|
adminUserId: ctx.adminUserId,
|
||||||
|
});
|
||||||
|
if (decision === 'skip') continue;
|
||||||
|
s.emit('message.created', {
|
||||||
|
...data,
|
||||||
|
channelId,
|
||||||
|
wakeup: decision === 'wake',
|
||||||
|
xType: ctx.xType,
|
||||||
});
|
});
|
||||||
s.emit('message.created', { ...data, channelId, wakeup });
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user