Phase 1 of DIALECTIC-V2 — adds Fabric infrastructure for system-broadcast channels with HF-status-aware delivery filtering. New channel x_type 'announce': - channels.entity.ts + channels.service.ts + realtime.gateway.ts enum + union extended. - computeDelivery() adds an 'announce' case: recipient with presence='busy' → 'skip' (discarded silently); other presences → 'observer' (delivered, no wake). System-broadcast semantics — agents proactively check their announce inbox when they're ready, not interrupted out of band. - messaging.controller POST guard: announce-type channels reject posts that don't present x-fabric-system-key header matching FABRIC_BACKEND_GUILD_SYSTEM_API_KEY env. Empty env = no system caller is valid (closed-by-default). New entity + module agent_presences: - agent-presence.entity.ts: per-user (userId PK) status enum (idle/on_call/busy/exhausted/offline/unknown), source tag, updatedAt - agent-presence.service.ts: getStatus/getStatusMap (bulk for delivery-time fanout) + setStatus (upsert) - agent-presence.controller.ts: GET + PUT /agents/:userId/presence - agent-presence.module.ts: TypeORM forFeature + wired into AppModule - buildTypeOrmConfig() entities list extended RealtimeGateway wiring: - New optional field on the gateway (typed loosely to avoid circular import). RealtimeModule.onModuleInit() assigns from the injected AgentPresenceService — degrades gracefully (no busy-discard, treat all as 'unknown') if presence wiring is ever removed. - emitMessageCreated pre-loads presence per fanout only when xType is 'announce' (other xTypes bypass the lookup entirely). Note: actual presence data writes come from Fabric.OpenclawPlugin's presence-sync loop (separate commit on that submodule); without it, all rows are 'unknown' and announce delivery falls through to the default observer behavior (no busy filtering). System-only POST gate is independent and works immediately. See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md sections 7 + 10 Phase 1.
430 lines
15 KiB
TypeScript
430 lines
15 KiB
TypeScript
import {
|
|
Body,
|
|
ConflictException,
|
|
Controller,
|
|
Delete,
|
|
ForbiddenException,
|
|
Get,
|
|
Headers,
|
|
NotFoundException,
|
|
Param,
|
|
Patch,
|
|
Post,
|
|
Query,
|
|
Req,
|
|
} from '@nestjs/common';
|
|
import { InjectRepository } from '@nestjs/typeorm';
|
|
import { DataSource, Repository } from 'typeorm';
|
|
import { CreateMessageDto } from './dto.create-message.dto.js';
|
|
import { Channel } from '../entities/channel.entity.js';
|
|
import { ChannelMember } from '../entities/channel-member.entity.js';
|
|
import { Message } from '../entities/message.entity.js';
|
|
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
|
|
import { WakeMapping } from '../entities/wake-mapping.entity.js';
|
|
import { AdminCacheService } from '../common/admin-cache.service.js';
|
|
import { parseSlashCommand } from '../channels/slash-commands.js';
|
|
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
|
|
import { resolveUserNames } from '../common/center-auth.js';
|
|
import { TurnService } from '../channels/turn.service.js';
|
|
import { EventsService } from '../events/events.service.js';
|
|
import { clampLimit, computeNextExpectedSeq } from './pagination.util.js';
|
|
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
|
|
|
|
const EDIT_WINDOW_MS = 15 * 60 * 1000;
|
|
const DEFAULT_PAGE_LIMIT = 50;
|
|
const MAX_PAGE_LIMIT = 200;
|
|
|
|
@Controller('channels/:id/messages')
|
|
export class MessagingController {
|
|
constructor(
|
|
private readonly dataSource: DataSource,
|
|
@InjectRepository(Channel)
|
|
private readonly channelRepo: Repository<Channel>,
|
|
@InjectRepository(ChannelMember)
|
|
private readonly memberRepo: Repository<ChannelMember>,
|
|
@InjectRepository(Message)
|
|
private readonly messageRepo: Repository<Message>,
|
|
@InjectRepository(IdempotencyRecord)
|
|
private readonly idemRepo: Repository<IdempotencyRecord>,
|
|
@InjectRepository(WakeMapping)
|
|
private readonly wakeRepo: Repository<WakeMapping>,
|
|
private readonly turn: TurnService,
|
|
private readonly events: EventsService,
|
|
private readonly realtime: RealtimeGateway,
|
|
private readonly adminCache: AdminCacheService,
|
|
) {}
|
|
|
|
private async getIdempotentResponse(
|
|
scope: string,
|
|
idempotencyKey?: string,
|
|
): Promise<Record<string, unknown> | null> {
|
|
if (!idempotencyKey) return null;
|
|
const row = await this.idemRepo.findOne({ where: { scope, idempotencyKey } });
|
|
return row?.responseBody ?? null;
|
|
}
|
|
|
|
private async saveIdempotentResponse(
|
|
scope: string,
|
|
idempotencyKey: string | undefined,
|
|
responseBody: Record<string, unknown>,
|
|
): Promise<void> {
|
|
if (!idempotencyKey) return;
|
|
const row = this.idemRepo.create({
|
|
scope,
|
|
idempotencyKey,
|
|
responseBody,
|
|
});
|
|
await this.idemRepo.save(row);
|
|
}
|
|
|
|
private toView(m: Message) {
|
|
return {
|
|
messageId: m.messageId,
|
|
seq: m.seq,
|
|
content: m.content,
|
|
authorUserId: m.authorUserId,
|
|
replyToMessageId: m.replyToMessageId,
|
|
mentions: m.mentions ?? [],
|
|
attachments: m.attachments ?? [],
|
|
createdAt: m.createdAt.toISOString(),
|
|
editedAt: m.editedAt ? m.editedAt.toISOString() : null,
|
|
deletedAt: m.deletedAt ? m.deletedAt.toISOString() : null,
|
|
isDeleted: m.isDeleted,
|
|
};
|
|
}
|
|
|
|
// Channel-participant gate (Guild C-1): public channels are readable/
|
|
// writable by any authenticated user; private channels require explicit
|
|
// channel_members membership. Returns the channel so callers can reuse it.
|
|
private async assertParticipant(channelId: string, userId: string): Promise<Channel> {
|
|
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
|
if (!channel) throw new NotFoundException('channel not found');
|
|
if (channel.isPublic) return channel;
|
|
if (!userId) throw new ForbiddenException('not a channel member');
|
|
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
|
|
if (!member) throw new ForbiddenException('not a channel member');
|
|
return channel;
|
|
}
|
|
|
|
// Persists one message (allocates a seq under a channel row lock) and
|
|
// returns its view. Used for normal messages and for guild /ack messages.
|
|
private async persistMessage(
|
|
channelId: string,
|
|
input: { authorUserId: string; content: string; clientMessageId?: string | null; replyToMessageId?: string | null; mentions?: string[]; attachments?: Array<{ url: string; name?: string; mimeType?: string }> },
|
|
): Promise<Message> {
|
|
return this.dataSource.transaction(async (manager) => {
|
|
const channel = await manager.findOne(Channel, {
|
|
where: { id: channelId },
|
|
lock: { mode: 'pessimistic_write' },
|
|
});
|
|
if (!channel) {
|
|
throw new NotFoundException('channel not found');
|
|
}
|
|
const nextSeq = channel.lastSeq + 1;
|
|
channel.lastSeq = nextSeq;
|
|
await manager.save(Channel, channel);
|
|
|
|
const messageId = input.clientMessageId ?? `m-${channelId}-${nextSeq}`;
|
|
const row = manager.create(Message, {
|
|
messageId,
|
|
channelId,
|
|
conversationId: null,
|
|
authorUserId: input.authorUserId,
|
|
seq: nextSeq,
|
|
content: input.content,
|
|
replyToMessageId: input.replyToMessageId ?? null,
|
|
mentions: input.mentions ?? [],
|
|
attachments: input.attachments ?? [],
|
|
editedAt: null,
|
|
deletedAt: null,
|
|
isDeleted: false,
|
|
});
|
|
return manager.save(Message, row);
|
|
});
|
|
}
|
|
|
|
// Emits a guild-authored /ack message to the channel; wakeup=true only for
|
|
// the new current speaker (null = nobody). One message-id; persisted.
|
|
private async emitAck(channelId: string, wakeupUserId: string | null): Promise<void> {
|
|
const ack = await this.persistMessage(channelId, { authorUserId: 'guild', content: '/ack' });
|
|
const body = this.toView(ack) as Record<string, unknown>;
|
|
await this.events.emit({ eventType: 'message.created', channelId, actorId: 'guild', data: body });
|
|
await this.realtime.emitMessageTargeted(channelId, body, wakeupUserId);
|
|
}
|
|
|
|
@Post()
|
|
async create(
|
|
@Param('id') channelId: string,
|
|
@Body() body: CreateMessageDto,
|
|
@Req() req: { userId?: string },
|
|
@Headers('idempotency-key') idempotencyKey?: string,
|
|
@Headers('x-fabric-system-key') systemKey?: string,
|
|
) {
|
|
const scope = `POST:/channels/${channelId}/messages`;
|
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
|
if (existed) return existed;
|
|
|
|
// Guild C-1: caller must be a participant of the channel, and the
|
|
// author is always the authenticated user — body.authorUserId is
|
|
// ignored so a caller can never post as someone else.
|
|
const userId = String(req.userId ?? '');
|
|
if (!userId) throw new ForbiddenException('missing user');
|
|
const channel = await this.assertParticipant(channelId, userId);
|
|
if (channel.closed) {
|
|
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
|
|
}
|
|
const xType = channel.xType ?? 'general';
|
|
const isRotating = xType === 'discuss' || xType === 'work';
|
|
|
|
// announce channels: posts only allowed when the caller presents a
|
|
// valid system key (matches FABRIC_BACKEND_GUILD_SYSTEM_API_KEY env).
|
|
// The ApiKeyGuard has already validated user identity; this is an
|
|
// additional system-only gate on top. Non-system posts are silently
|
|
// discarded (return 403 + log) so misbehaving clients don't pollute
|
|
// the broadcast.
|
|
if (xType === 'announce') {
|
|
const expected = process.env.FABRIC_BACKEND_GUILD_SYSTEM_API_KEY ?? '';
|
|
if (!expected || systemKey !== expected) {
|
|
// log + reject; treat empty env as "no system caller is ever valid"
|
|
throw new ForbiddenException({
|
|
error: 'announce_system_only',
|
|
message: 'announce-type channels accept system-signed posts only',
|
|
});
|
|
}
|
|
}
|
|
const authorUserId = userId;
|
|
|
|
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
|
|
// Center before anything else persists/parses the content
|
|
let content = body.content ?? '';
|
|
const names = extractNameMentions(content);
|
|
if (names.length) {
|
|
const map = await resolveUserNames(names);
|
|
content = replaceNameMentions(content, map);
|
|
}
|
|
|
|
// ---- command interception: registered slash commands are never delivered
|
|
const cmd = parseSlashCommand(content);
|
|
if (cmd) {
|
|
if (isRotating && cmd.name === 'no-reply') {
|
|
const { ack } = await this.turn.onNoReply(channelId, authorUserId);
|
|
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
|
|
} else if (isRotating && cmd.name === 'force-proceed') {
|
|
const { ack } = await this.turn.onForceProceed(channelId);
|
|
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
|
|
}
|
|
// non-rotating channels (or no effect): swallowed, nothing delivered
|
|
return { status: 'command', command: cmd.name };
|
|
}
|
|
|
|
// ---- normal message
|
|
const message = await this.persistMessage(channelId, {
|
|
authorUserId,
|
|
content,
|
|
clientMessageId: body.clientMessageId,
|
|
replyToMessageId: body.replyToMessageId,
|
|
mentions: body.mentions,
|
|
attachments: body.attachments,
|
|
});
|
|
|
|
const responseBody = this.toView(message) as Record<string, unknown>;
|
|
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
|
|
|
|
await this.events.emit({
|
|
eventType: 'message.created',
|
|
channelId,
|
|
actorId: authorUserId,
|
|
data: responseBody,
|
|
});
|
|
|
|
// mentions: <@id> outside backtick spans (post name-translation)
|
|
const mentionIds = parseMentions(content);
|
|
|
|
if (isRotating) {
|
|
// discuss/work: rotation (incl. mention sub-frames) picks the target
|
|
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
|
|
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
|
|
} else {
|
|
// general/report/triage/custom: 3-state delivery
|
|
// (wake / observer / skip) — see realtime.gateway.computeDelivery.
|
|
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
|
|
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
|
|
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
|
|
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
|
|
const admin = await this.adminCache.get();
|
|
await this.realtime.emitMessageCreated(channelId, responseBody, {
|
|
xType,
|
|
authorUserId,
|
|
wakeUserIds,
|
|
mentionUserIds,
|
|
adminUserId: admin?.userId ?? null,
|
|
});
|
|
}
|
|
|
|
return responseBody;
|
|
}
|
|
|
|
@Patch(':messageId')
|
|
async edit(
|
|
@Param('id') channelId: string,
|
|
@Param('messageId') messageId: string,
|
|
@Body() body: { content?: string },
|
|
@Req() req: { userId?: string },
|
|
@Headers('idempotency-key') idempotencyKey?: string,
|
|
) {
|
|
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
|
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
|
if (existed) return existed;
|
|
|
|
// Guild C-1: participant + author-ownership.
|
|
const userId = String(req.userId ?? '');
|
|
if (!userId) throw new ForbiddenException('missing user');
|
|
await this.assertParticipant(channelId, userId);
|
|
|
|
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
|
if (!item) return { status: 'not_found' };
|
|
if (item.authorUserId !== userId) {
|
|
throw new ForbiddenException('not the message author');
|
|
}
|
|
|
|
const now = Date.now();
|
|
const createdAt = new Date(item.createdAt).getTime();
|
|
if (now - createdAt > EDIT_WINDOW_MS) {
|
|
return { status: 'edit_window_expired', messageId };
|
|
}
|
|
|
|
item.content = body.content ?? item.content;
|
|
item.editedAt = new Date();
|
|
const saved = await this.messageRepo.save(item);
|
|
const responseBody = this.toView(saved) as Record<string, unknown>;
|
|
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
|
|
|
|
await this.events.emit({
|
|
eventType: 'message.updated',
|
|
channelId,
|
|
actorId: saved.authorUserId,
|
|
data: responseBody,
|
|
});
|
|
this.realtime.emitChannelEvent(channelId, 'message.updated', responseBody);
|
|
|
|
return responseBody;
|
|
}
|
|
|
|
@Delete(':messageId')
|
|
async remove(
|
|
@Param('id') channelId: string,
|
|
@Param('messageId') messageId: string,
|
|
@Req() req: { userId?: string },
|
|
@Headers('idempotency-key') idempotencyKey?: string,
|
|
) {
|
|
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
|
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
|
if (existed) return existed;
|
|
|
|
// Guild C-1: participant + author-ownership.
|
|
const userId = String(req.userId ?? '');
|
|
if (!userId) throw new ForbiddenException('missing user');
|
|
await this.assertParticipant(channelId, userId);
|
|
|
|
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
|
|
if (!item) return { status: 'not_found' };
|
|
if (item.authorUserId !== userId) {
|
|
throw new ForbiddenException('not the message author');
|
|
}
|
|
|
|
item.isDeleted = true;
|
|
item.deletedAt = new Date();
|
|
item.content = '[deleted]';
|
|
item.mentions = [];
|
|
item.attachments = [];
|
|
await this.messageRepo.save(item);
|
|
|
|
const responseBody = {
|
|
status: 'deleted',
|
|
mode: 'soft',
|
|
messageId,
|
|
} as Record<string, unknown>;
|
|
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
|
|
|
|
await this.events.emit({
|
|
eventType: 'message.deleted',
|
|
channelId,
|
|
actorId: item.authorUserId,
|
|
data: {
|
|
messageId,
|
|
seq: item.seq,
|
|
deletedAt: item.deletedAt?.toISOString() ?? null,
|
|
},
|
|
});
|
|
this.realtime.emitChannelEvent(channelId, 'message.deleted', {
|
|
messageId,
|
|
seq: item.seq,
|
|
deletedAt: item.deletedAt?.toISOString() ?? null,
|
|
});
|
|
|
|
return responseBody;
|
|
}
|
|
|
|
@Get()
|
|
async listBySeq(
|
|
@Param('id') channelId: string,
|
|
@Req() req: { userId?: string },
|
|
@Query('seq_from') seqFrom?: string,
|
|
@Query('seq_to') seqTo?: string,
|
|
@Query('limit') limit?: string,
|
|
) {
|
|
// Guild C-1: only participants may read channel history.
|
|
const userId = String(req.userId ?? '');
|
|
if (!userId) throw new ForbiddenException('missing user');
|
|
const from = seqFrom ? Number(seqFrom) : 1;
|
|
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
|
|
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
|
|
|
|
if (from > to) {
|
|
return {
|
|
items: [],
|
|
page: {
|
|
seqFrom: from,
|
|
seqTo: to,
|
|
limit: safeLimit,
|
|
returned: 0,
|
|
hasMore: false,
|
|
nextExpectedSeq: from,
|
|
highestCommittedSeq: 0,
|
|
},
|
|
};
|
|
}
|
|
|
|
const channel = await this.assertParticipant(channelId, userId);
|
|
|
|
const qb = this.messageRepo
|
|
.createQueryBuilder('m')
|
|
.where('m.channelId = :channelId', { channelId })
|
|
.andWhere('m.seq >= :from', { from })
|
|
.andWhere('m.seq <= :to', { to })
|
|
.orderBy('m.seq', 'ASC');
|
|
|
|
const total = await qb.getCount();
|
|
const rows = await qb.limit(safeLimit).getMany();
|
|
const items = rows.map((m) => this.toView(m));
|
|
|
|
const nextExpectedSeq = computeNextExpectedSeq(
|
|
from,
|
|
rows.map((row) => row.seq),
|
|
);
|
|
|
|
return {
|
|
items,
|
|
page: {
|
|
seqFrom: from,
|
|
seqTo: to,
|
|
limit: safeLimit,
|
|
returned: items.length,
|
|
hasMore: total > items.length,
|
|
nextExpectedSeq,
|
|
highestCommittedSeq: channel.lastSeq,
|
|
},
|
|
};
|
|
}
|
|
}
|