Files
Fabric.Backend.Guild/src/messaging/messaging.controller.ts
hzhang ca20df7618 refactor(guild): drop system-key bypass + announce-only-system limit
Pairs with Dialectic.Backend@5cf4302 which removes the backend-driven
broadcaster that was the only consumer of the x-fabric-system-key
header path. Backend cleanup is complete on the consumer side; this
removes the producer-side surface.

Removed:
  - ApiKeyGuard: x-fabric-system-key bypass branch (sysExpected /
    sysProvided / req.isSystem flag) — only Bearer flow remains.
  - messaging.controller.create(): the entire 'if (req.isSystem)'
    branch including the SYSTEM_USER_ID='00000000-...-0000' sentinel
    persistence path.
  - messaging.controller.create(): the 'if (xType === announce) throw
    announce_system_only' gate. Announce channels are now ordinary
    channels — any participant can POST. Use case: agents post one-off
    recruitment broadcasts via fabric-send-message (e.g. dialectic
    'come participate in topic X' messages).
  - cli/gen-system-api-key.ts: deleted (was the generator for the env
    that's no longer read).

Kept:
  - channel.purpose field + PATCH /api/channels/:id (member auth for
    setting purpose — agents use this to label channels for
    fabric-channel-list discoverability).
  - cli/print-commands-sync-key.ts (separate key, separate lifecycle).
  - GuildRole.isSystem flag (unrelated — system-role permission gate).
2026-05-23 23:49:47 +01:00

417 lines
15 KiB
TypeScript

import {
Body,
ConflictException,
Controller,
Delete,
ForbiddenException,
Get,
Headers,
NotFoundException,
Param,
Patch,
Post,
Query,
Req,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { DataSource, Repository } from 'typeorm';
import { CreateMessageDto } from './dto.create-message.dto.js';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
import { parseSlashCommand } from '../channels/slash-commands.js';
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
import { resolveUserNames } from '../common/center-auth.js';
import { TurnService } from '../channels/turn.service.js';
import { EventsService } from '../events/events.service.js';
import { clampLimit, computeNextExpectedSeq } from './pagination.util.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const EDIT_WINDOW_MS = 15 * 60 * 1000;
const DEFAULT_PAGE_LIMIT = 50;
const MAX_PAGE_LIMIT = 200;
@Controller('channels/:id/messages')
export class MessagingController {
constructor(
private readonly dataSource: DataSource,
@InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(Message)
private readonly messageRepo: Repository<Message>,
@InjectRepository(IdempotencyRecord)
private readonly idemRepo: Repository<IdempotencyRecord>,
@InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>,
private readonly turn: TurnService,
private readonly events: EventsService,
private readonly realtime: RealtimeGateway,
private readonly adminCache: AdminCacheService,
) {}
private async getIdempotentResponse(
scope: string,
idempotencyKey?: string,
): Promise<Record<string, unknown> | null> {
if (!idempotencyKey) return null;
const row = await this.idemRepo.findOne({ where: { scope, idempotencyKey } });
return row?.responseBody ?? null;
}
private async saveIdempotentResponse(
scope: string,
idempotencyKey: string | undefined,
responseBody: Record<string, unknown>,
): Promise<void> {
if (!idempotencyKey) return;
const row = this.idemRepo.create({
scope,
idempotencyKey,
responseBody,
});
await this.idemRepo.save(row);
}
private toView(m: Message) {
return {
messageId: m.messageId,
seq: m.seq,
content: m.content,
authorUserId: m.authorUserId,
replyToMessageId: m.replyToMessageId,
mentions: m.mentions ?? [],
attachments: m.attachments ?? [],
createdAt: m.createdAt.toISOString(),
editedAt: m.editedAt ? m.editedAt.toISOString() : null,
deletedAt: m.deletedAt ? m.deletedAt.toISOString() : null,
isDeleted: m.isDeleted,
};
}
// Channel-participant gate (Guild C-1): public channels are readable/
// writable by any authenticated user; private channels require explicit
// channel_members membership. Returns the channel so callers can reuse it.
private async assertParticipant(channelId: string, userId: string): Promise<Channel> {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
if (channel.isPublic) return channel;
if (!userId) throw new ForbiddenException('not a channel member');
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member) throw new ForbiddenException('not a channel member');
return channel;
}
// Persists one message (allocates a seq under a channel row lock) and
// returns its view. Used for normal messages and for guild /ack messages.
private async persistMessage(
channelId: string,
input: { authorUserId: string; content: string; clientMessageId?: string | null; replyToMessageId?: string | null; mentions?: string[]; attachments?: Array<{ url: string; name?: string; mimeType?: string }> },
): Promise<Message> {
return this.dataSource.transaction(async (manager) => {
const channel = await manager.findOne(Channel, {
where: { id: channelId },
lock: { mode: 'pessimistic_write' },
});
if (!channel) {
throw new NotFoundException('channel not found');
}
const nextSeq = channel.lastSeq + 1;
channel.lastSeq = nextSeq;
await manager.save(Channel, channel);
const messageId = input.clientMessageId ?? `m-${channelId}-${nextSeq}`;
const row = manager.create(Message, {
messageId,
channelId,
conversationId: null,
authorUserId: input.authorUserId,
seq: nextSeq,
content: input.content,
replyToMessageId: input.replyToMessageId ?? null,
mentions: input.mentions ?? [],
attachments: input.attachments ?? [],
editedAt: null,
deletedAt: null,
isDeleted: false,
});
return manager.save(Message, row);
});
}
// Emits a guild-authored /ack message to the channel; wakeup=true only for
// the new current speaker (null = nobody). One message-id; persisted.
private async emitAck(channelId: string, wakeupUserId: string | null): Promise<void> {
const ack = await this.persistMessage(channelId, { authorUserId: 'guild', content: '/ack' });
const body = this.toView(ack) as Record<string, unknown>;
await this.events.emit({ eventType: 'message.created', channelId, actorId: 'guild', data: body });
await this.realtime.emitMessageTargeted(channelId, body, wakeupUserId);
}
@Post()
async create(
@Param('id') channelId: string,
@Body() body: CreateMessageDto,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: caller must be a participant of the channel, and the
// author is always the authenticated user — body.authorUserId is
// ignored so a caller can never post as someone else.
//
// announce channels: any participant can POST. Use case is one-off
// recruitment / broadcast messages posted by the agent that just
// created the originating topic (e.g. dialectic invites). No
// server-side privileged path — author is always a real user.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const channel = await this.assertParticipant(channelId, userId);
if (channel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
}
const xType = channel.xType ?? 'general';
const isRotating = xType === 'discuss' || xType === 'work';
const authorUserId = userId;
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
// Center before anything else persists/parses the content
let content = body.content ?? '';
const names = extractNameMentions(content);
if (names.length) {
const map = await resolveUserNames(names);
content = replaceNameMentions(content, map);
}
// ---- command interception: registered slash commands are never delivered
const cmd = parseSlashCommand(content);
if (cmd) {
if (isRotating && cmd.name === 'no-reply') {
const { ack } = await this.turn.onNoReply(channelId, authorUserId);
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
} else if (isRotating && cmd.name === 'force-proceed') {
const { ack } = await this.turn.onForceProceed(channelId);
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
}
// non-rotating channels (or no effect): swallowed, nothing delivered
return { status: 'command', command: cmd.name };
}
// ---- normal message
const message = await this.persistMessage(channelId, {
authorUserId,
content,
clientMessageId: body.clientMessageId,
replyToMessageId: body.replyToMessageId,
mentions: body.mentions,
attachments: body.attachments,
});
const responseBody = this.toView(message) as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
await this.events.emit({
eventType: 'message.created',
channelId,
actorId: authorUserId,
data: responseBody,
});
// mentions: <@id> outside backtick spans (post name-translation)
const mentionIds = parseMentions(content);
if (isRotating) {
// discuss/work: rotation (incl. mention sub-frames) picks the target
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else {
// general/report/triage/custom: 3-state delivery
// (wake / observer / skip) — see realtime.gateway.computeDelivery.
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
const admin = await this.adminCache.get();
await this.realtime.emitMessageCreated(channelId, responseBody, {
xType,
authorUserId,
wakeUserIds,
mentionUserIds,
adminUserId: admin?.userId ?? null,
});
}
return responseBody;
}
@Patch(':messageId')
async edit(
@Param('id') channelId: string,
@Param('messageId') messageId: string,
@Body() body: { content?: string },
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
const now = Date.now();
const createdAt = new Date(item.createdAt).getTime();
if (now - createdAt > EDIT_WINDOW_MS) {
return { status: 'edit_window_expired', messageId };
}
item.content = body.content ?? item.content;
item.editedAt = new Date();
const saved = await this.messageRepo.save(item);
const responseBody = this.toView(saved) as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
await this.events.emit({
eventType: 'message.updated',
channelId,
actorId: saved.authorUserId,
data: responseBody,
});
this.realtime.emitChannelEvent(channelId, 'message.updated', responseBody);
return responseBody;
}
@Delete(':messageId')
async remove(
@Param('id') channelId: string,
@Param('messageId') messageId: string,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
item.isDeleted = true;
item.deletedAt = new Date();
item.content = '[deleted]';
item.mentions = [];
item.attachments = [];
await this.messageRepo.save(item);
const responseBody = {
status: 'deleted',
mode: 'soft',
messageId,
} as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
await this.events.emit({
eventType: 'message.deleted',
channelId,
actorId: item.authorUserId,
data: {
messageId,
seq: item.seq,
deletedAt: item.deletedAt?.toISOString() ?? null,
},
});
this.realtime.emitChannelEvent(channelId, 'message.deleted', {
messageId,
seq: item.seq,
deletedAt: item.deletedAt?.toISOString() ?? null,
});
return responseBody;
}
@Get()
async listBySeq(
@Param('id') channelId: string,
@Req() req: { userId?: string },
@Query('seq_from') seqFrom?: string,
@Query('seq_to') seqTo?: string,
@Query('limit') limit?: string,
) {
// Guild C-1: only participants may read channel history.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const from = seqFrom ? Number(seqFrom) : 1;
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
if (from > to) {
return {
items: [],
page: {
seqFrom: from,
seqTo: to,
limit: safeLimit,
returned: 0,
hasMore: false,
nextExpectedSeq: from,
highestCommittedSeq: 0,
},
};
}
const channel = await this.assertParticipant(channelId, userId);
const qb = this.messageRepo
.createQueryBuilder('m')
.where('m.channelId = :channelId', { channelId })
.andWhere('m.seq >= :from', { from })
.andWhere('m.seq <= :to', { to })
.orderBy('m.seq', 'ASC');
const total = await qb.getCount();
const rows = await qb.limit(safeLimit).getMany();
const items = rows.map((m) => this.toView(m));
const nextExpectedSeq = computeNextExpectedSeq(
from,
rows.map((row) => row.seq),
);
return {
items,
page: {
seqFrom: from,
seqTo: to,
limit: safeLimit,
returned: items.length,
hasMore: total > items.length,
nextExpectedSeq,
highestCommittedSeq: channel.lastSeq,
},
};
}
}