8 Commits

Author SHA1 Message Date
30069377e7 feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to
nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw
fabric inbound had to poll `/api/channels?guildId=...` every 60s to
discover newly-joined channels (any DM another user just dragged the
agent into). Now the server tells the agent's socket directly so
sub/unsub is realtime.

Changes:
- realtime.gateway.ts:
  * handleConnection joins the socket into a `user:<userId>` room.
    All of a user's connected sockets now share that room.
  * New `emitToUser(userId, event, data)` helper that emits into
    that room. No-op for offline users (next connect resyncs via the
    plugin's initial channel-list fetch).
- channels.service.ts:
  * Inject RealtimeGateway (RealtimeModule is @Global, no module
    plumbing needed).
  * Private `notifyMembership(kind, channelId, userIds, extra)`
    helper that emits `channel.<kind>` (joined|left) with payload
    {channelId, userId, xType, occurredAt}.
  * create(): emit channel.joined to every seeded member (creator +
    explicit memberUserIds + triage on-duty).
  * joinChannel(): emit channel.joined to userId (only if the row was
    actually inserted, idempotent on existing membership).
  * leaveChannel(): emit channel.left to userId iff a row was
    actually deleted.

Event shape:
  {
    channelId: string,
    userId: string,
    xType?: string,
    occurredAt: ISO string,
  }

Client-side contract (fabric plugin):
  socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId}))
  socket.on('channel.left',   m => socket.emit('leave_channel', {channelId: m.channelId}))

The plugin keeps its 60s polling resync as a safety net for missed
events (transient socket drops between emit and reconnect, partial
failures, etc).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:07:46 +01:00
b1f7467161 feat(guild): add 'dm' x-type (private 1:1, always-wake)
channel enum + X_TYPES + realtime XType gain 'dm'. dm channels are
forced private (never public) and non-unique (no dedup; create()
always makes a fresh one). computeWakeup: dm wakes every non-author
participant unconditionally (no rotation / no wake_mapping). The
message.created realtime payload now carries xType so the plugin can
treat dm specially.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:19 +01:00
7e944a08f6 feat(ops): CLI to print the commands-sync key (Guild C-2)
node dist/cli/print-commands-sync-key.js (npm run print:commands-key)
outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY as the process sees it,
so an operator can docker-exec it on the deployed guild and copy the
value into the plugin's FABRIC_COMMANDS_SYNC_KEY. --export prints a
ready-to-paste assignment; exits 1 when unset (fallback mode).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:28:23 +01:00
e45ad91340 fix(security): close Critical IDOR/authz gaps (C-1/C-2)
C-1: messaging endpoints now enforce channel participation (public
     channels open; private require channel_members). authorUserId is
     forced to the authenticated user (no more author spoofing); edit/
     delete require message-author ownership; history read gated too.
C-2: PUT /commands body strictly validated + size-capped via
     SyncCommandsDto (kills catalog poisoning / DoS). Optional
     FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY restricts the write to the
     plugin when set; never weaker than before when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:08 +01:00
3e96de730a docs: slash-command registry section
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:15:04 +01:00
f54ed6abb5 feat(guild): slash-command registry (sync + list API)
Guild-global slash-command catalog (one row per node guild). The
OpenClaw plugin PUTs the native-command specs (same data Discord
registers as slash commands); the frontend GETs it for / autocomplete.

- GuildCommand entity (guild_id unique, commands json, updatedAt)
- PUT /api/commands  -> idempotent full replace (any authed agent/user)
- GET /api/commands  -> { commands, updatedAt } (authed)
- stored verbatim (NativeCommandSpec-shaped); execution path unchanged:
  a /<cmd> message is delivered as a normal message -> plugin ->
  OpenClaw command system (only /no-reply, /force-proceed stay
  server-intercepted).

Verified: PUT->{ok,count}, GET round-trips args/choices, no-auth->401.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:02:49 +01:00
8de5736a59 docs: rewrite README to match current architecture
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 12:53:23 +01:00
58badf328c feat(guild): file upload/retention + channel canvas
Files:
- StoredFile entity + FilesModule: multipart upload (configurable
  FABRIC_BACKEND_GUILD_FILE_MAX_BYTES, default 100MB; no type limit),
  authenticated download (Bearer or ?access_token=), hourly + on-boot
  retention sweep (FABRIC_BACKEND_GUILD_FILE_TTL_DAYS, default 7).
- ApiKeyGuard also accepts ?access_token= (browser <img>/<a>).

Canvas:
- ChannelCanvas entity (one active per channel) + CanvasModule:
  GET / PUT|POST (share-replace, caller becomes sharer) /
  PATCH (sharer-only in-place update, version++) / DELETE (sharer-only).
  Emits canvas.updated / canvas.removed to the channel room.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 20:17:02 +01:00
24 changed files with 978 additions and 31 deletions

View File

@@ -1,22 +1,71 @@
# Fabric.Backend.Guild
Guild Node service for Fabric.
A **guild node** for Fabric (NestJS, ES modules, MySQL/TypeORM,
socket.io). Default port `7002`, global prefix `/api`. Many independent
guild nodes can run; each registers with `Fabric.Backend.Center` and
introspects the user/guild tokens Center issues.
## Scope (MVP)
- Workspace/Guild/Channel/DM
- Message create/edit/delete/reply/@mention
- Per-channel/DM seq ordering + gap backfill API
- Webhook/Bot integration surface
- Guild-level audit logs
## Responsibilities
## Next
- API skeleton (NestJS)
- Chat domain models
- Seq allocator and range query endpoints
- **Guilds / channels / messaging** — per-channel `seq` ordering, edit
window, soft delete, reply, `<@id>` mentions (backtick-aware) plus
`<@user.name:NAME>``<@userId>` translation via Center.
- **Channel `x_type`** (required on create): `general`, `work`, `report`,
`discuss`, `triage`, `custom`. Plus `isPublic` and `closed` (closed →
history readable, posting returns `409`).
- **`wake_mapping`** — explicit wake list for `triage` (on-duty) and
`custom` (listeners) channels.
- **Per-recipient `wakeup`** — `message.created` is emitted per socket with
its own `wakeup` flag (author=false; general→all; report→none;
triage/custom→wake_mapping; discuss/work→the current speaker only). This
is **push-only metadata for the OpenClaw plugin**; UIs ignore it.
- **discuss/work turn engine** (`channel_turn_state`): speaking order and a
disjoint **bypass list** (bypass members aren't woken unless @-mentioned);
activation from idle, queue-jump, cross-round `/no-reply` pause,
`/force-proceed`, end-of-round shuffle, guild `/ack`, and a mention
sub-frame stack with a 5-level nesting cap (root + 4). `moveToBypass`
mid-rotation.
- **Files** — `POST /files` (multipart, configurable max size, default
100 MB), `GET /files/:id` (Bearer **or** `?access_token=` for browser
`<img>/<a>`), automatic retention sweep (default 7 days). Messages carry
`attachments[]`.
- **Channel canvas** — one pinned document per channel (`md`/`html`/`text`),
re-share replaces, only the original sharer may update/remove; emits
`canvas.updated` / `canvas.removed`.
- **Slash-command registry** — guild-global catalog: `PUT /api/commands`
(the OpenClaw plugin syncs OpenClaw's native-command specs here),
`GET /api/commands` (frontend `/` autocomplete). Stored verbatim;
execution is unchanged (a `/<cmd>` message flows normally to the plugin →
OpenClaw command system; only `/no-reply`,`/force-proceed` are
server-intercepted).
- **Realtime** — socket.io `/realtime`; `join_channel`/`leave_channel`,
`message.created/updated/deleted`, `canvas.*`, presence, typing.
## Required env (hard-checked at startup)
## Required env (startup hard checks)
- `FABRIC_BACKEND_GUILD_CENTER_BASE_URL`
- `FABRIC_BACKEND_GUILD_CENTER_API_KEY`
- `FABRIC_BACKEND_GUILD_NODE_ID`
If any of the above is missing, service startup fails immediately.
Missing any of these aborts startup.
## Other env
- `FABRIC_BACKEND_GUILD_PORT` (default 7002)
- `FABRIC_BACKEND_GUILD_DB_*`, `FABRIC_BACKEND_GUILD_DB_SYNC`
- `FABRIC_BACKEND_GUILD_FILE_DIR` (storage root),
`FABRIC_BACKEND_GUILD_FILE_MAX_BYTES` (default 100 MB),
`FABRIC_BACKEND_GUILD_FILE_TTL_DAYS` (default 7)
- `FABRIC_BACKEND_GUILD_CORS_ORIGINS` (empty = allow all; `null` origin —
`file://` desktop — is always allowed)
## Run
```bash
npm install
npm run build && npm start # or: npm run start:dev
```
Usually run via the root `docker-compose.local.yml` (`backend-guild1`
`test-guild1` :7002, `backend-guild2` `test-guild2` :7003). Schema is
auto-managed (`DB_SYNC`). ES modules (`NodeNext`).

View File

@@ -7,6 +7,7 @@
"scripts": {
"build": "tsc -p tsconfig.build.json",
"start": "node dist/main.js",
"print:commands-key": "node dist/cli/print-commands-sync-key.js",
"start:dev": "ts-node src/main.ts",
"lint": "eslint 'src/**/*.ts'",
"lint:fix": "eslint 'src/**/*.ts' --fix",

View File

@@ -13,6 +13,9 @@ import { MessagingModule } from './messaging/messaging.module.js';
import { EventsModule } from './events/events.module.js';
import { RealtimeModule } from './realtime/realtime.module.js';
import { MembersModule } from './members/members.module.js';
import { FilesModule } from './files/files.module.js';
import { CanvasModule } from './canvas/canvas.module.js';
import { CommandsModule } from './commands/commands.module.js';
@Module({
imports: [
@@ -24,6 +27,9 @@ import { MembersModule } from './members/members.module.js';
ChannelsModule,
MembersModule,
MessagingModule,
FilesModule,
CanvasModule,
CommandsModule,
],
controllers: [HealthController, MetricsController],
providers: [

View File

@@ -0,0 +1,58 @@
import {
Body,
Controller,
Delete,
Get,
Param,
Post,
Put,
Patch,
Req,
UnauthorizedException,
} from '@nestjs/common';
import { CanvasService } from './canvas.service.js';
type AuthedRequest = { userId?: string };
type CanvasBody = { title?: string; format?: string; source?: string };
@Controller('channels/:id/canvas')
export class CanvasController {
constructor(private readonly canvas: CanvasService) {}
private uid(req: AuthedRequest): string {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return userId;
}
@Get()
get(@Req() req: AuthedRequest, @Param('id') channelId: string) {
return this.canvas.get(channelId, this.uid(req));
}
// share / replace (caller becomes the sharer)
@Put()
@Post()
share(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: CanvasBody,
) {
return this.canvas.share(channelId, this.uid(req), body ?? {});
}
// update in place (original sharer only)
@Patch()
update(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: CanvasBody,
) {
return this.canvas.update(channelId, this.uid(req), body ?? {});
}
@Delete()
remove(@Req() req: AuthedRequest, @Param('id') channelId: string) {
return this.canvas.remove(channelId, this.uid(req));
}
}

View File

@@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { ChannelCanvas } from '../entities/channel-canvas.entity.js';
import { CanvasController } from './canvas.controller.js';
import { CanvasService } from './canvas.service.js';
@Module({
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, ChannelCanvas])],
controllers: [CanvasController],
providers: [CanvasService],
})
export class CanvasModule {}

View File

@@ -0,0 +1,147 @@
import {
BadRequestException,
ForbiddenException,
Injectable,
NotFoundException,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import {
ChannelCanvas,
type CanvasFormat,
} from '../entities/channel-canvas.entity.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const FORMATS: CanvasFormat[] = ['md', 'html', 'text'];
@Injectable()
export class CanvasService {
constructor(
@InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(ChannelCanvas)
private readonly canvasRepo: Repository<ChannelCanvas>,
private readonly realtime: RealtimeGateway,
) {}
private view(c: ChannelCanvas) {
return {
channelId: c.channelId,
sharerUserId: c.sharerUserId,
title: c.title,
format: c.format,
source: c.source,
version: c.version,
createdAt: c.createdAt.toISOString(),
updatedAt: c.updatedAt.toISOString(),
};
}
private async assertChannel(channelId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
return channel;
}
private async assertParticipant(channelId: string, userId: string) {
const channel = await this.assertChannel(channelId);
if (channel.isPublic) return channel;
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member) throw new ForbiddenException('not a channel member');
return channel;
}
async get(channelId: string, userId: string) {
await this.assertParticipant(channelId, userId);
const c = await this.canvasRepo.findOne({ where: { channelId } });
return c ? this.view(c) : null;
}
private normalize(input: {
title?: string;
format?: string;
source?: string;
}) {
const title = String(input.title ?? '').trim().slice(0, 200) || 'Untitled';
const format = String(input.format ?? 'md') as CanvasFormat;
if (!FORMATS.includes(format)) {
throw new BadRequestException(`format must be one of: ${FORMATS.join(', ')}`);
}
const source = String(input.source ?? '');
return { title, format, source };
}
// Share / replace the channel's single active canvas (caller becomes sharer).
async share(
channelId: string,
userId: string,
input: { title?: string; format?: string; source?: string },
) {
await this.assertParticipant(channelId, userId);
const { title, format, source } = this.normalize(input);
let c = await this.canvasRepo.findOne({ where: { channelId } });
if (c) {
c.sharerUserId = userId;
c.title = title;
c.format = format;
c.source = source;
c.version = 1;
} else {
c = this.canvasRepo.create({
channelId,
sharerUserId: userId,
title,
format,
source,
version: 1,
});
}
c = await this.canvasRepo.save(c);
const v = this.view(c);
this.realtime.emitChannelEvent(channelId, 'canvas.updated', v);
return v;
}
// Update the existing canvas in place — only the original sharer.
async update(
channelId: string,
userId: string,
input: { title?: string; format?: string; source?: string },
) {
await this.assertParticipant(channelId, userId);
const c = await this.canvasRepo.findOne({ where: { channelId } });
if (!c) throw new NotFoundException('no canvas shared in this channel');
if (c.sharerUserId !== userId) {
throw new ForbiddenException('only the original sharer may update the canvas');
}
const { title, format, source } = this.normalize({
title: input.title ?? c.title,
format: input.format ?? c.format,
source: input.source ?? c.source,
});
c.title = title;
c.format = format;
c.source = source;
c.version += 1;
const saved = await this.canvasRepo.save(c);
const v = this.view(saved);
this.realtime.emitChannelEvent(channelId, 'canvas.updated', v);
return v;
}
async remove(channelId: string, userId: string) {
await this.assertParticipant(channelId, userId);
const c = await this.canvasRepo.findOne({ where: { channelId } });
if (!c) return { status: 'ok' };
if (c.sharerUserId !== userId) {
throw new ForbiddenException('only the original sharer may remove the canvas');
}
await this.canvasRepo.delete({ id: c.id });
this.realtime.emitChannelEvent(channelId, 'canvas.removed', { channelId });
return { status: 'ok' };
}
}

View File

@@ -5,8 +5,9 @@ import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { TurnService } from './turn.service.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom'] as const;
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'] as const;
type XType = (typeof X_TYPES)[number];
type CreateChannelInput = {
@@ -35,8 +36,34 @@ export class ChannelsService {
@InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>,
private readonly turnService: TurnService,
// RealtimeGateway is provided by the global RealtimeModule. Used to
// push channel.joined / channel.left so connected clients (e.g. the
// OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately
// instead of waiting for the polling fallback.
private readonly realtime: RealtimeGateway,
) {}
// Push a channel membership change to each affected user's socket-room.
// Best-effort: offline users see the new state on their next connect
// (the inbound runs an initial channel-list fetch on connect).
private notifyMembership(
kind: 'joined' | 'left',
channelId: string,
userIds: string[] | Set<string>,
extra: Record<string, unknown> = {},
): void {
const ids = userIds instanceof Set ? [...userIds] : userIds;
const payload = {
channelId,
...extra,
occurredAt: new Date().toISOString(),
};
for (const u of ids) {
if (!u) continue;
this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u });
}
}
// Channels visible to a user within a guild:
// - every public channel of the guild (incl. ones created before the user
// joined the guild), OR
@@ -93,6 +120,7 @@ export class ChannelsService {
if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberAdded(channelId, userId);
}
this.notifyMembership('joined', channelId, [userId], { xType: channel.xType });
}
return { status: 'ok', channelId, userId, member: true };
}
@@ -102,11 +130,14 @@ export class ChannelsService {
if (!channel) throw new NotFoundException('channel not found');
// remove every channel-scoped row that references this user
await this.memberRepo.delete({ channelId, userId });
const deleted = await this.memberRepo.delete({ channelId, userId });
await this.wakeRepo.delete({ channelId, userId });
if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberRemoved(channelId, userId);
}
if ((deleted.affected ?? 0) > 0) {
this.notifyMembership('left', channelId, [userId], { xType: channel.xType });
}
return { status: 'ok', channelId, userId, member: false };
}
@@ -130,14 +161,19 @@ export class ChannelsService {
.map((x) => String(x ?? '').trim())
.filter(Boolean);
// dm channels are always private (a 1:1 conversation); never public.
// dm is not unique — multiple dm channels between the same users are
// allowed (create() always makes a fresh one, no dedup).
const isPublic = xType === 'dm' ? false : Boolean(input.isPublic);
const channel = await this.channelRepo.save(
this.channelRepo.create({
guildId,
name,
xType,
kind: input.kind === 'announcement' ? 'announcement' : 'text',
isPrivate: !input.isPublic,
isPublic: Boolean(input.isPublic),
isPrivate: !isPublic,
isPublic,
lastSeq: 0,
}),
);
@@ -155,6 +191,12 @@ export class ChannelsService {
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
);
// Push channel.joined to every seeded member (creator + invitees +
// triage on-duty) so their connected sockets sub the new room
// immediately. Skips offline users — next connect's channel-list
// fetch covers them.
this.notifyMembership('joined', channel.id, memberIds, { xType });
// wake_mapping: triage -> the on-duty user; custom -> each listener
const wakeUserIds = new Set<string>();
if (xType === 'triage') wakeUserIds.add(onDuty);

View File

@@ -0,0 +1,37 @@
// Operator convenience (Guild C-2): print the commands-sync key that this
// guild process actually has in its environment, so it can be copied into
// the OpenClaw plugin's FABRIC_COMMANDS_SYNC_KEY.
//
// Usage (inside the deployed container — authoritative, reflects compose):
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js --export
//
// Default: prints the raw value only (so KEY=$(... ) works).
// --export: prints `FABRIC_COMMANDS_SYNC_KEY=<value>` for pasting.
// Exit 1 (no stdout) when unset — guild is then in the weaker
// "any authenticated user" fallback for PUT /commands.
const args = new Set(process.argv.slice(2));
if (args.has('--help') || args.has('-h')) {
process.stderr.write(
'print-commands-sync-key: outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY\n' +
' (no flag) print the raw key value\n' +
' --export print FABRIC_COMMANDS_SYNC_KEY=<value>\n',
);
process.exit(0);
}
const key = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
if (!key) {
process.stderr.write(
'FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is not set — PUT /commands is in ' +
'the fallback mode (any authenticated user). Set it to harden (Guild C-2).\n',
);
process.exit(1);
}
process.stdout.write(
(args.has('--export') ? `FABRIC_COMMANDS_SYNC_KEY=${key}` : key) + '\n',
);

View File

@@ -0,0 +1,57 @@
import {
Body,
Controller,
ForbiddenException,
Get,
Headers,
Put,
Req,
UnauthorizedException,
} from '@nestjs/common';
import { timingSafeEqual } from 'node:crypto';
import { CommandsService } from './commands.service.js';
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
type AuthedRequest = { userId?: string };
function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}
@Controller('commands')
export class CommandsController {
constructor(private readonly commands: CommandsService) {}
// Guild C-2: catalog write is privileged. When
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is configured (recommended in
// production), the caller must present a matching x-commands-sync-key
// header — this restricts writes to the OpenClaw plugin. When unset, we
// fall back to "any authenticated agent/user" (never weaker than before).
// The body is always strictly validated + size-capped via SyncCommandsDto.
@Put()
sync(
@Req() req: AuthedRequest,
@Body() body: SyncCommandsDto,
@Headers('x-commands-sync-key') syncKey?: string,
) {
const configured = process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '';
if (configured) {
if (!syncKey || !safeEqual(syncKey, configured)) {
throw new ForbiddenException('invalid commands sync key');
}
} else if (!req.userId) {
throw new UnauthorizedException('missing user');
}
return this.commands.sync(body.commands as unknown[]);
}
// Frontend reads the catalog to drive `/` autocomplete.
@Get()
list(@Req() req: AuthedRequest) {
if (!req.userId) throw new UnauthorizedException('missing user');
return this.commands.list();
}
}

View File

@@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { GuildCommand } from '../entities/guild-command.entity.js';
import { CommandsController } from './commands.controller.js';
import { CommandsService } from './commands.service.js';
@Module({
imports: [TypeOrmModule.forFeature([GuildCommand])],
controllers: [CommandsController],
providers: [CommandsService],
})
export class CommandsModule {}

View File

@@ -0,0 +1,39 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { GuildCommand } from '../entities/guild-command.entity.js';
// This node's guild id (one guild per node).
function guildId(): string {
return process.env.FABRIC_BACKEND_GUILD_NODE_ID ?? 'guild';
}
@Injectable()
export class CommandsService {
constructor(
@InjectRepository(GuildCommand)
private readonly repo: Repository<GuildCommand>,
) {}
// Replace the whole guild-global slash-command catalog (idempotent;
// the plugin re-PUTs the full set on every gateway start).
async sync(commands: unknown[]): Promise<{ status: string; count: number }> {
const gid = guildId();
let row = await this.repo.findOne({ where: { guildId: gid } });
if (row) {
row.commands = commands;
} else {
row = this.repo.create({ guildId: gid, commands });
}
await this.repo.save(row);
return { status: 'ok', count: commands.length };
}
async list(): Promise<{ commands: unknown[]; updatedAt: string | null }> {
const row = await this.repo.findOne({ where: { guildId: guildId() } });
return {
commands: row?.commands ?? [],
updatedAt: row?.updatedAt ? row.updatedAt.toISOString() : null,
};
}
}

View File

@@ -0,0 +1,102 @@
import {
ArrayMaxSize,
IsArray,
IsBoolean,
IsOptional,
IsString,
MaxLength,
ValidateNested,
} from 'class-validator';
import { Type } from 'class-transformer';
// Guild C-2: the slash-command catalog is guild-global and rendered by the
// frontend `/` autocomplete. Without a strict schema + caps a single
// authenticated caller could poison it or blow up the DB / clients.
// The global ValidationPipe runs with { whitelist, forbidNonWhitelisted },
// so any unknown field is rejected.
class CommandChoiceDto {
@IsString()
@MaxLength(200)
value!: string;
@IsString()
@MaxLength(200)
label!: string;
}
class CommandArgDto {
@IsString()
@MaxLength(100)
name!: string;
@IsOptional()
@IsString()
@MaxLength(500)
description?: string;
@IsOptional()
@IsString()
@MaxLength(40)
type?: string;
@IsOptional()
@IsBoolean()
required?: boolean;
@IsOptional()
@IsBoolean()
captureRemaining?: boolean;
@IsOptional()
@IsBoolean()
preferAutocomplete?: boolean;
// null when there are no choices (plugin sends explicit null).
@IsOptional()
@IsArray()
@ArrayMaxSize(100)
@ValidateNested({ each: true })
@Type(() => CommandChoiceDto)
choices?: CommandChoiceDto[] | null;
}
class CommandSpecDto {
@IsString()
@MaxLength(100)
name!: string;
@IsOptional()
@IsString()
@MaxLength(100)
nativeName?: string;
@IsOptional()
@IsString()
@MaxLength(500)
description?: string;
@IsOptional()
@IsBoolean()
acceptsArgs?: boolean;
@IsOptional()
@IsArray()
@ArrayMaxSize(50)
@ValidateNested({ each: true })
@Type(() => CommandArgDto)
args?: CommandArgDto[];
@IsOptional()
@IsString()
@MaxLength(20)
argsParsing?: string;
}
export class SyncCommandsDto {
@IsArray()
@ArrayMaxSize(200)
@ValidateNested({ each: true })
@Type(() => CommandSpecDto)
commands!: CommandSpecDto[];
}

View File

@@ -9,7 +9,11 @@ import { introspectGuildToken } from './center-auth.js';
@Injectable()
export class ApiKeyGuard implements CanActivate {
async canActivate(context: ExecutionContext): Promise<boolean> {
const req = context.switchToHttp().getRequest<{ path?: string; headers: Record<string, string | string[] | undefined> }>();
const req = context.switchToHttp().getRequest<{
path?: string;
headers: Record<string, string | string[] | undefined>;
query?: Record<string, string | string[] | undefined>;
}>();
const path = req.path ?? '';
// allow health check without auth
@@ -19,7 +23,13 @@ export class ApiKeyGuard implements CanActivate {
const auth = req.headers['authorization'];
const authValue = Array.isArray(auth) ? auth[0] : auth;
const token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
// Browsers can't set Authorization on <img>/<a> (file downloads); accept
// the guild token via ?access_token= as a fallback. Still introspected.
if (!token) {
const qt = req.query?.['access_token'];
token = (Array.isArray(qt) ? qt[0] : qt) ?? '';
}
if (!token) throw new UnauthorizedException('missing bearer token');
const result = await introspectGuildToken(token);

View File

@@ -11,6 +11,9 @@ import { GuildRole } from './entities/guild-role.entity.js';
import { GuildMember } from './entities/guild-member.entity.js';
import { GuildMemberRole } from './entities/guild-member-role.entity.js';
import { IdempotencyRecord } from './entities/idempotency-record.entity.js';
import { StoredFile } from './entities/stored-file.entity.js';
import { ChannelCanvas } from './entities/channel-canvas.entity.js';
import { GuildCommand } from './entities/guild-command.entity.js';
export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
type: 'mysql',
@@ -32,6 +35,9 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
GuildMember,
GuildMemberRole,
IdempotencyRecord,
StoredFile,
ChannelCanvas,
GuildCommand,
],
synchronize: (process.env.FABRIC_BACKEND_GUILD_DB_SYNC ?? 'true') === 'true',
logging: (process.env.FABRIC_BACKEND_GUILD_DB_LOGGING ?? 'false') === 'true',

View File

@@ -0,0 +1,46 @@
import {
Column,
CreateDateColumn,
Entity,
Index,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm';
export type CanvasFormat = 'md' | 'html' | 'text';
// One active shared document per channel (ChatGPT-canvas-like). Re-sharing
// replaces it; only the original sharer may update it in place. Pinned in
// the channel UI, independent of the message scroll.
@Entity('channel_canvas')
export class ChannelCanvas {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index({ unique: true })
@Column({ name: 'channel_id', type: 'char', length: 36 })
channelId!: string;
// who shared it; only this user may PATCH/DELETE
@Column({ name: 'sharer_user_id', type: 'varchar', length: 64 })
sharerUserId!: string;
@Column({ type: 'varchar', length: 200 })
title!: string;
@Column({ type: 'varchar', length: 8 })
format!: CanvasFormat;
// raw document source (rendered client-side per format)
@Column({ type: 'mediumtext' })
source!: string;
@Column({ type: 'int', default: 1 })
version!: number;
@CreateDateColumn()
createdAt!: Date;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -16,9 +16,9 @@ export class Channel {
@Column({
name: 'x_type',
type: 'enum',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom'],
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
})
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
@Column({ type: 'varchar', length: 16, default: 'text' })
kind!: 'text' | 'announcement';

View File

@@ -0,0 +1,25 @@
import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
// Guild-global slash-command catalog. One row per guild (this node's
// FABRIC_BACKEND_GUILD_NODE_ID). The OpenClaw plugin PUTs the OpenClaw
// native-command specs here (the same data Discord registers as slash
// commands); the frontend GETs it to drive `/` autocomplete. The guild
// node stores the catalog opaquely — it does not interpret command bodies.
@Entity('guild_commands')
export class GuildCommand {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index({ unique: true })
@Column({ name: 'guild_id', type: 'varchar', length: 80 })
guildId!: string;
// NativeCommandSpec[]-shaped (name, nativeName, description, acceptsArgs,
// args[{name,description,type,required,choices:[{value,label}],
// captureRemaining,preferAutocomplete}], argsParsing). Stored verbatim.
@Column({ type: 'json' })
commands!: unknown[];
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -0,0 +1,43 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
// An uploaded or canvas-shared file held on the guild node. Retained for a
// configurable window (default 7 days) then purged by FilesService.
@Entity('stored_files')
export class StoredFile {
@PrimaryGeneratedColumn('uuid')
id!: string;
// public, URL-safe id used in /api/files/:fileId
@Index({ unique: true })
@Column({ name: 'file_id', type: 'varchar', length: 64 })
fileId!: string;
// owning channel (best-effort context; null = not channel-scoped)
@Index()
@Column({ name: 'channel_id', type: 'char', length: 36, nullable: true })
channelId!: string | null;
@Column({ name: 'uploader_user_id', type: 'varchar', length: 64 })
uploaderUserId!: string;
@Column({ name: 'original_name', type: 'varchar', length: 255 })
originalName!: string;
@Column({ name: 'mime_type', type: 'varchar', length: 150 })
mimeType!: string;
@Column({ name: 'size_bytes', type: 'bigint' })
sizeBytes!: number;
// path on disk relative to the storage root
@Column({ name: 'storage_path', type: 'varchar', length: 300 })
storagePath!: string;
@CreateDateColumn()
createdAt!: Date;
// hard-delete deadline; rows past this are purged with their blob
@Index()
@Column({ name: 'expires_at', type: 'datetime' })
expiresAt!: Date;
}

View File

@@ -0,0 +1,84 @@
import {
BadRequestException,
Controller,
Get,
Param,
Post,
Query,
Req,
Res,
UnauthorizedException,
UploadedFile,
UseInterceptors,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import type { Response } from 'express';
import { FilesService } from './files.service.js';
type AuthedRequest = { userId?: string };
type UploadedMulterFile = {
originalname: string;
mimetype: string;
size: number;
buffer: Buffer;
};
@Controller('files')
export class FilesController {
constructor(private readonly files: FilesService) {}
@Post()
@UseInterceptors(FileInterceptor('file'))
async upload(
@Req() req: AuthedRequest,
@UploadedFile() file: UploadedMulterFile | undefined,
@Query('channelId') channelId?: string,
) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
if (!file || !file.buffer?.length) throw new BadRequestException('no file');
if (this.files.maxBytes > 0 && file.size > this.files.maxBytes) {
throw new BadRequestException(
`file exceeds limit of ${this.files.maxBytes} bytes`,
);
}
const row = await this.files.store({
channelId: channelId ? String(channelId) : null,
uploaderUserId: userId,
originalName: file.originalname || 'file',
mimeType: file.mimetype,
buffer: file.buffer,
});
return {
fileId: row.fileId,
url: `/api/files/${row.fileId}`,
name: row.originalName,
mimeType: row.mimeType,
size: Number(row.sizeBytes),
expiresAt: row.expiresAt.toISOString(),
};
}
@Get(':fileId')
async download(
@Param('fileId') fileId: string,
@Res() res: Response,
): Promise<void> {
const row = await this.files.find(fileId);
if (!row) {
res.status(404).json({ error: 'file_not_found' });
return;
}
const blob = await this.files.readBlob(row);
const inline = /^(image|audio|video)\//.test(row.mimeType) || row.mimeType === 'application/pdf';
const safeName = row.originalName.replace(/["\r\n]/g, '_');
res.setHeader('Content-Type', row.mimeType);
res.setHeader('Content-Length', String(blob.length));
res.setHeader(
'Content-Disposition',
`${inline ? 'inline' : 'attachment'}; filename="${safeName}"`,
);
res.setHeader('Cache-Control', 'private, max-age=3600');
res.end(blob);
}
}

13
src/files/files.module.ts Normal file
View File

@@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { StoredFile } from '../entities/stored-file.entity.js';
import { FilesController } from './files.controller.js';
import { FilesService } from './files.service.js';
@Module({
imports: [TypeOrmModule.forFeature([StoredFile])],
controllers: [FilesController],
providers: [FilesService],
exports: [FilesService],
})
export class FilesModule {}

View File

@@ -0,0 +1,98 @@
import { randomBytes } from 'node:crypto';
import { promises as fs } from 'node:fs';
import { join, resolve } from 'node:path';
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { LessThan, Repository } from 'typeorm';
import { StoredFile } from '../entities/stored-file.entity.js';
const DAY_MS = 24 * 60 * 60 * 1000;
const CLEANUP_INTERVAL_MS = 60 * 60 * 1000; // hourly
@Injectable()
export class FilesService implements OnModuleInit, OnModuleDestroy {
private readonly log = new Logger('FilesService');
private timer: NodeJS.Timeout | null = null;
// Storage root; the guild operator may relocate / resize this freely.
readonly dir = resolve(
process.env.FABRIC_BACKEND_GUILD_FILE_DIR ?? join(process.cwd(), '.data', 'files'),
);
// 0 / unset => no cap (default per product: 100MB, operator-configurable).
readonly maxBytes = Number(
process.env.FABRIC_BACKEND_GUILD_FILE_MAX_BYTES ?? 100 * 1024 * 1024,
);
readonly ttlDays = Number(process.env.FABRIC_BACKEND_GUILD_FILE_TTL_DAYS ?? 7);
constructor(
@InjectRepository(StoredFile)
private readonly repo: Repository<StoredFile>,
) {}
async onModuleInit(): Promise<void> {
await fs.mkdir(this.dir, { recursive: true });
this.log.log(
`files dir=${this.dir} maxBytes=${this.maxBytes} ttlDays=${this.ttlDays}`,
);
// sweep on boot, then hourly
void this.cleanup();
this.timer = setInterval(() => void this.cleanup(), CLEANUP_INTERVAL_MS);
this.timer.unref?.();
}
onModuleDestroy(): void {
if (this.timer) clearInterval(this.timer);
this.timer = null;
}
async store(input: {
channelId: string | null;
uploaderUserId: string;
originalName: string;
mimeType: string;
buffer: Buffer;
}): Promise<StoredFile> {
const fileId = randomBytes(18).toString('base64url');
const storagePath = fileId; // flat layout, opaque name
await fs.writeFile(join(this.dir, storagePath), input.buffer);
const row = this.repo.create({
fileId,
channelId: input.channelId,
uploaderUserId: input.uploaderUserId,
originalName: input.originalName.slice(0, 255),
mimeType: (input.mimeType || 'application/octet-stream').slice(0, 150),
sizeBytes: input.buffer.length,
storagePath,
expiresAt: new Date(Date.now() + this.ttlDays * DAY_MS),
});
return this.repo.save(row);
}
async find(fileId: string): Promise<StoredFile | null> {
const row = await this.repo.findOne({ where: { fileId } });
if (!row) return null;
if (row.expiresAt.getTime() <= Date.now()) return null; // treat as gone
return row;
}
async readBlob(row: StoredFile): Promise<Buffer> {
return fs.readFile(join(this.dir, row.storagePath));
}
// Purge every row past its retention deadline together with its blob.
async cleanup(): Promise<number> {
const expired = await this.repo.find({ where: { expiresAt: LessThan(new Date()) } });
let removed = 0;
for (const row of expired) {
try {
await fs.rm(join(this.dir, row.storagePath), { force: true });
} catch {
/* best effort: drop the row regardless */
}
await this.repo.delete({ id: row.id });
removed++;
}
if (removed) this.log.log(`retention sweep removed ${removed} expired file(s)`);
return removed;
}
}

View File

@@ -3,6 +3,7 @@ import {
ConflictException,
Controller,
Delete,
ForbiddenException,
Get,
Headers,
NotFoundException,
@@ -10,11 +11,13 @@ import {
Patch,
Post,
Query,
Req,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { DataSource, Repository } from 'typeorm';
import { CreateMessageDto } from './dto.create-message.dto.js';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
@@ -36,6 +39,8 @@ export class MessagingController {
private readonly dataSource: DataSource,
@InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(Message)
private readonly messageRepo: Repository<Message>,
@InjectRepository(IdempotencyRecord)
@@ -86,6 +91,19 @@ export class MessagingController {
};
}
// Channel-participant gate (Guild C-1): public channels are readable/
// writable by any authenticated user; private channels require explicit
// channel_members membership. Returns the channel so callers can reuse it.
private async assertParticipant(channelId: string, userId: string): Promise<Channel> {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
if (channel.isPublic) return channel;
if (!userId) throw new ForbiddenException('not a channel member');
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member) throw new ForbiddenException('not a channel member');
return channel;
}
// Persists one message (allocates a seq under a channel row lock) and
// returns its view. Used for normal messages and for guild /ack messages.
private async persistMessage(
@@ -136,20 +154,25 @@ export class MessagingController {
async create(
@Param('id') channelId: string,
@Body() body: CreateMessageDto,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
// Guild C-1: caller must be a participant of the channel, and the
// author is always the authenticated user — body.authorUserId is
// ignored so a caller can never post as someone else.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const channel = await this.assertParticipant(channelId, userId);
if (channel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
}
const xType = channel.xType ?? 'general';
const isRotating = xType === 'discuss' || xType === 'work';
const authorUserId = String(body.authorUserId ?? 'anonymous');
const authorUserId = userId;
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
// Center before anything else persists/parses the content
@@ -223,14 +246,23 @@ export class MessagingController {
@Param('id') channelId: string,
@Param('messageId') messageId: string,
@Body() body: { content?: string },
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
const now = Date.now();
const createdAt = new Date(item.createdAt).getTime();
@@ -259,14 +291,23 @@ export class MessagingController {
async remove(
@Param('id') channelId: string,
@Param('messageId') messageId: string,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
item.isDeleted = true;
item.deletedAt = new Date();
@@ -304,10 +345,14 @@ export class MessagingController {
@Get()
async listBySeq(
@Param('id') channelId: string,
@Req() req: { userId?: string },
@Query('seq_from') seqFrom?: string,
@Query('seq_to') seqTo?: string,
@Query('limit') limit?: string,
) {
// Guild C-1: only participants may read channel history.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const from = seqFrom ? Number(seqFrom) : 1;
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
@@ -327,10 +372,7 @@ export class MessagingController {
};
}
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) {
throw new NotFoundException('channel not found');
}
const channel = await this.assertParticipant(channelId, userId);
const qb = this.messageRepo
.createQueryBuilder('m')

View File

@@ -2,12 +2,13 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { MessagingController } from './messaging.controller.js';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
@Module({
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord, WakeMapping])],
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])],
controllers: [MessagingController],
})
export class MessagingModule {}

View File

@@ -11,7 +11,7 @@ import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { introspectGuildToken } from '../common/center-auth.js';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm';
// Wakeup for non-rotating channels only (general/report/triage/custom).
// discuss/work go through TurnService + emitMessageTargeted, never here.
@@ -40,6 +40,9 @@ export function computeWakeup(args: {
case 'triage':
case 'custom':
return wakeUserIds.has(recipientUserId);
case 'dm':
// 1:1 conversation: every non-author participant is always woken.
return true;
default:
return false;
}
@@ -93,6 +96,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
const userId = result.user.id || this.userIdFromClient(client);
client.data.userId = userId;
this.onlineUsers.add(userId);
// Per-user room: lets server code emit user-scoped events (e.g.
// channel.joined when membership changes) without bookkeeping a
// userId→sockets map. All of this user's sockets receive the event.
client.join(`user:${userId}`);
this.server.emit('presence.online', {
userId,
onlineCount: this.onlineUsers.size,
@@ -168,6 +175,14 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
this.server.to(`channel:${channelId}`).emit(event, data);
}
// Emit a user-scoped event to all sockets currently connected for `userId`
// (via the `user:<userId>` room joined in handleConnection). No-op for
// offline users — the next connect's initial channel-list fetch covers it.
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
if (!userId) return;
this.server.to(`user:${userId}`).emit(event, data);
}
// Emits message.created per-recipient so each carries its own `wakeup` flag.
async emitMessageCreated(
channelId: string,
@@ -189,7 +204,7 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds,
});
s.emit('message.created', { ...data, channelId, wakeup });
s.emit('message.created', { ...data, channelId, wakeup, xType: ctx.xType });
}
}