Compare commits
5 Commits
ca20df7618
...
a3acebf694
| Author | SHA1 | Date | |
|---|---|---|---|
| a3acebf694 | |||
| 5a182827c8 | |||
| 340eed8aa3 | |||
| 3f77c0e35d | |||
| 38b4665321 |
@@ -39,15 +39,23 @@ export class AgentPresenceService {
|
|||||||
* Upsert a user's presence. Source is a free-text tag for debugging
|
* Upsert a user's presence. Source is a free-text tag for debugging
|
||||||
* (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence
|
* (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence
|
||||||
* calls this; the plugin pushes only on diff so writes are sparse.
|
* calls this; the plugin pushes only on diff so writes are sparse.
|
||||||
|
*
|
||||||
|
* Implementation note: the older findOne+save split was a read-modify-
|
||||||
|
* write race — two concurrent first-time writes for the same userId
|
||||||
|
* would both read no row, both INSERT, second hits unique-key dup
|
||||||
|
* (`agent_presences.PRIMARY`) and 500s. Fabric.OpenclawPlugin's
|
||||||
|
* presence-sync occasionally fires two PUTs for the same agent within
|
||||||
|
* ~10 ms (tick overlap on its side — separate fix in the plugin),
|
||||||
|
* which surfaced this race in prod.
|
||||||
|
*
|
||||||
|
* `repo.upsert(values, conflictPaths)` compiles to MySQL
|
||||||
|
* `INSERT … ON DUPLICATE KEY UPDATE` and is atomic at the storage
|
||||||
|
* engine level — no read needed, no race window. We synthesize the
|
||||||
|
* returned entity from what we just wrote rather than round-tripping
|
||||||
|
* a SELECT — the controller only reads {userId, status} off it.
|
||||||
*/
|
*/
|
||||||
async setStatus(userId: string, status: PresenceStatus, source: string): Promise<AgentPresence> {
|
async setStatus(userId: string, status: PresenceStatus, source: string): Promise<AgentPresence> {
|
||||||
const existing = await this.repo.findOne({ where: { userId } });
|
await this.repo.upsert({ userId, status, source }, ['userId']);
|
||||||
if (existing) {
|
return this.repo.create({ userId, status, source });
|
||||||
existing.status = status;
|
|
||||||
existing.source = source;
|
|
||||||
return this.repo.save(existing);
|
|
||||||
}
|
|
||||||
const row = this.repo.create({ userId, status, source });
|
|
||||||
return this.repo.save(row);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
import { BadRequestException, Body, Controller, Get, Param, Patch, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
|
import { BadRequestException, Body, Controller, ForbiddenException, Get, Param, Patch, Post, Put, Query, Req, UnauthorizedException } from '@nestjs/common';
|
||||||
import { ChannelsService } from './channels.service.js';
|
import { ChannelsService } from './channels.service.js';
|
||||||
|
|
||||||
// ApiKeyGuard attaches the introspected Center user id onto the request.
|
// ApiKeyGuard attaches the introspected Center user id onto the request,
|
||||||
type AuthedRequest = { userId?: string };
|
// and sets isSystem=true for system-key callers (x-fabric-system-key).
|
||||||
|
type AuthedRequest = { userId?: string; isSystem?: boolean };
|
||||||
|
|
||||||
@Controller('channels')
|
@Controller('channels')
|
||||||
export class ChannelsController {
|
export class ChannelsController {
|
||||||
@@ -59,6 +60,29 @@ export class ChannelsController {
|
|||||||
return this.channelsService.updatePurpose(channelId, userId, body.purpose);
|
return this.channelsService.updatePurpose(channelId, userId, body.purpose);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reassign a triage channel's wake_mapping (who triage arrivals wake) —
|
||||||
|
// the on-call-handoff baton. PRIVILEGED: requires the system key
|
||||||
|
// (x-fabric-system-key → req.isSystem); a normal agent's guild token is
|
||||||
|
// rejected. Restricted to triage channels in the service. Body:
|
||||||
|
// { wakeUserIds: string[] } (the new on-duty user id(s)).
|
||||||
|
@Put(':id/wake-mapping')
|
||||||
|
setWakeMapping(
|
||||||
|
@Req() req: AuthedRequest,
|
||||||
|
@Param('id') channelId: string,
|
||||||
|
@Body() body: Record<string, unknown>,
|
||||||
|
) {
|
||||||
|
if (!req.isSystem) {
|
||||||
|
throw new ForbiddenException('wake_mapping reassignment requires the system key');
|
||||||
|
}
|
||||||
|
const wakeUserIds = Array.isArray(body.wakeUserIds)
|
||||||
|
? (body.wakeUserIds as unknown[]).map((u) => String(u ?? '').trim()).filter(Boolean)
|
||||||
|
: [];
|
||||||
|
if (!wakeUserIds.length) {
|
||||||
|
throw new BadRequestException('wakeUserIds (non-empty string[]) is required');
|
||||||
|
}
|
||||||
|
return this.channelsService.setTriageWakeMapping(channelId, wakeUserIds);
|
||||||
|
}
|
||||||
|
|
||||||
// Move an order member into the bypass list (discuss/work only).
|
// Move an order member into the bypass list (discuss/work only).
|
||||||
@Post(':id/bypass')
|
@Post(':id/bypass')
|
||||||
bypass(
|
bypass(
|
||||||
|
|||||||
@@ -248,6 +248,46 @@ export class ChannelsService {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reassign a TRIAGE channel's wake_mapping — the set of users woken by
|
||||||
|
// triage arrivals (the on-call-handoff baton). Privileged: the caller is
|
||||||
|
// gated at the controller on the system key, not a per-user membership
|
||||||
|
// check, because handing the baton to the *incoming* agent is exactly an
|
||||||
|
// operation the outgoing agent's own token shouldn't be trusted to scope.
|
||||||
|
//
|
||||||
|
// Atomically replaces every wake row for the channel. New wake users are
|
||||||
|
// ensured to be channel members first (the realtime gateway only routes a
|
||||||
|
// triage message to members, so a non-member on-duty agent would never be
|
||||||
|
// woken). Restricted to xType=triage; other types derive wake differently
|
||||||
|
// (custom=listeners, discuss/work=rotation) and must not be poked here.
|
||||||
|
async setTriageWakeMapping(channelId: string, wakeUserIds: string[]) {
|
||||||
|
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||||
|
if (!channel) throw new NotFoundException('channel not found');
|
||||||
|
if (channel.xType !== 'triage') {
|
||||||
|
throw new BadRequestException('wake_mapping reassignment is only allowed for triage channels');
|
||||||
|
}
|
||||||
|
const ids = [...new Set(wakeUserIds.map((u) => String(u ?? '').trim()).filter(Boolean))];
|
||||||
|
if (!ids.length) throw new BadRequestException('wakeUserIds must contain at least one user id');
|
||||||
|
|
||||||
|
const newMembers: string[] = [];
|
||||||
|
for (const userId of ids) {
|
||||||
|
const existing = await this.memberRepo.findOne({ where: { channelId, userId } });
|
||||||
|
if (!existing) {
|
||||||
|
await this.memberRepo.save(this.memberRepo.create({ channelId, userId }));
|
||||||
|
newMembers.push(userId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomic swap: drop all current wake rows, then insert the new set.
|
||||||
|
await this.wakeRepo.delete({ channelId });
|
||||||
|
await this.wakeRepo.save(ids.map((userId) => this.wakeRepo.create({ channelId, userId })));
|
||||||
|
|
||||||
|
if (newMembers.length) {
|
||||||
|
this.notifyMembership('joined', channelId, newMembers, { xType: channel.xType });
|
||||||
|
}
|
||||||
|
|
||||||
|
return { id: channel.id, name: channel.name, xType: channel.xType, wakeUserIds: ids };
|
||||||
|
}
|
||||||
|
|
||||||
// Move an order member into the bypass list (discuss/work only).
|
// Move an order member into the bypass list (discuss/work only).
|
||||||
// Any channel member may do this.
|
// Any channel member may do this.
|
||||||
async moveToBypass(channelId: string, actorUserId: string, targetUserId: string) {
|
async moveToBypass(channelId: string, actorUserId: string, targetUserId: string) {
|
||||||
|
|||||||
@@ -8,19 +8,12 @@ import {
|
|||||||
Req,
|
Req,
|
||||||
UnauthorizedException,
|
UnauthorizedException,
|
||||||
} from '@nestjs/common';
|
} from '@nestjs/common';
|
||||||
import { timingSafeEqual } from 'node:crypto';
|
|
||||||
import { CommandsService } from './commands.service.js';
|
import { CommandsService } from './commands.service.js';
|
||||||
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
|
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
|
||||||
|
import { safeEqual } from '../common/safe-equal.js';
|
||||||
|
|
||||||
type AuthedRequest = { userId?: string };
|
type AuthedRequest = { userId?: string };
|
||||||
|
|
||||||
function safeEqual(a: string, b: string): boolean {
|
|
||||||
const ab = Buffer.from(a);
|
|
||||||
const bb = Buffer.from(b);
|
|
||||||
if (ab.length !== bb.length) return false;
|
|
||||||
return timingSafeEqual(ab, bb);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Controller('commands')
|
@Controller('commands')
|
||||||
export class CommandsController {
|
export class CommandsController {
|
||||||
constructor(private readonly commands: CommandsService) {}
|
constructor(private readonly commands: CommandsService) {}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import {
|
|||||||
UnauthorizedException,
|
UnauthorizedException,
|
||||||
} from '@nestjs/common';
|
} from '@nestjs/common';
|
||||||
import { introspectGuildToken } from './center-auth.js';
|
import { introspectGuildToken } from './center-auth.js';
|
||||||
|
import { safeEqual } from './safe-equal.js';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class ApiKeyGuard implements CanActivate {
|
export class ApiKeyGuard implements CanActivate {
|
||||||
@@ -21,6 +22,25 @@ export class ApiKeyGuard implements CanActivate {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// System-key bypass: when a caller presents x-fabric-system-key
|
||||||
|
// matching FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY (intentionally the
|
||||||
|
// same shared secret as x-commands-sync-key — both legitimate
|
||||||
|
// consumers are Fabric.OpenclawPlugin), skip the Bearer requirement
|
||||||
|
// and mark this as a system caller. Downstream handlers (e.g.
|
||||||
|
// messaging.controller POST /channels/:id/messages) gate on
|
||||||
|
// req.isSystem to take the system-author code path.
|
||||||
|
//
|
||||||
|
// Empty env → bypass disabled (no system caller ever valid; closed
|
||||||
|
// by default). Header carries the secret as-is; we constant-time
|
||||||
|
// compare against the env value.
|
||||||
|
const sysExpected = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
|
||||||
|
const sysHeader = req.headers['x-fabric-system-key'];
|
||||||
|
const sysProvided = Array.isArray(sysHeader) ? sysHeader[0] : sysHeader;
|
||||||
|
if (sysExpected && sysProvided && safeEqual(sysProvided, sysExpected)) {
|
||||||
|
(req as { isSystem?: boolean }).isSystem = true;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
const auth = req.headers['authorization'];
|
const auth = req.headers['authorization'];
|
||||||
const authValue = Array.isArray(auth) ? auth[0] : auth;
|
const authValue = Array.isArray(auth) ? auth[0] : auth;
|
||||||
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
|
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
|
||||||
|
|||||||
25
src/common/safe-equal.spec.ts
Normal file
25
src/common/safe-equal.spec.ts
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import { describe, it, expect } from 'vitest';
|
||||||
|
import { safeEqual } from './safe-equal.js';
|
||||||
|
|
||||||
|
describe('safeEqual', () => {
|
||||||
|
it('returns true for identical non-empty strings', () => {
|
||||||
|
expect(safeEqual('abc123', 'abc123')).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for different strings of same length', () => {
|
||||||
|
expect(safeEqual('abc123', 'abc124')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('returns false for differing lengths', () => {
|
||||||
|
expect(safeEqual('abc', 'abcd')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('handles empty strings', () => {
|
||||||
|
// both empty is technically equal — but downstream callers should
|
||||||
|
// explicitly check for empty expected before invoking. We just
|
||||||
|
// document the constant-time-comparison primitive's behavior.
|
||||||
|
expect(safeEqual('', '')).toBe(true);
|
||||||
|
expect(safeEqual('a', '')).toBe(false);
|
||||||
|
expect(safeEqual('', 'a')).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
12
src/common/safe-equal.ts
Normal file
12
src/common/safe-equal.ts
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
import { timingSafeEqual } from 'node:crypto';
|
||||||
|
|
||||||
|
// Constant-time string comparison. Returns false for length mismatch (the
|
||||||
|
// length difference itself is observable, but the per-byte loop isn't).
|
||||||
|
// Used for shared-secret header checks (commands-sync-key, system-key,
|
||||||
|
// etc.) to keep timing-oracle attacks off the table.
|
||||||
|
export function safeEqual(a: string, b: string): boolean {
|
||||||
|
const ab = Buffer.from(a);
|
||||||
|
const bb = Buffer.from(b);
|
||||||
|
if (ab.length !== bb.length) return false;
|
||||||
|
return timingSafeEqual(ab, bb);
|
||||||
|
}
|
||||||
@@ -56,4 +56,14 @@ export class CreateMessageDto {
|
|||||||
@IsString()
|
@IsString()
|
||||||
@MaxLength(64)
|
@MaxLength(64)
|
||||||
authorUserId?: string;
|
authorUserId?: string;
|
||||||
|
|
||||||
|
// System-author path only (x-fabric-system-key gated). When set, the
|
||||||
|
// message is delivered via emitMessageTargeted so this single recipient
|
||||||
|
// gets wakeup=true; everyone else in the channel sees wakeup=false. For
|
||||||
|
// regular (user-bearer) posts this field is ignored. Used by
|
||||||
|
// close-sub-discussion to precisely wake the host on callback.
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
@MaxLength(64)
|
||||||
|
wakeupUserId?: string;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -156,13 +156,70 @@ export class MessagingController {
|
|||||||
async create(
|
async create(
|
||||||
@Param('id') channelId: string,
|
@Param('id') channelId: string,
|
||||||
@Body() body: CreateMessageDto,
|
@Body() body: CreateMessageDto,
|
||||||
@Req() req: { userId?: string },
|
@Req() req: { userId?: string; isSystem?: boolean },
|
||||||
@Headers('idempotency-key') idempotencyKey?: string,
|
@Headers('idempotency-key') idempotencyKey?: string,
|
||||||
) {
|
) {
|
||||||
const scope = `POST:/channels/${channelId}/messages`;
|
const scope = `POST:/channels/${channelId}/messages`;
|
||||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||||
if (existed) return existed;
|
if (existed) return existed;
|
||||||
|
|
||||||
|
// System caller (ApiKeyGuard set isSystem from x-fabric-system-key):
|
||||||
|
// skip the per-user participant check; resolve channel directly.
|
||||||
|
// System posts are allowed into any non-closed channel — used by
|
||||||
|
// Fabric.OpenclawPlugin to write `close-sub-discussion` callbacks
|
||||||
|
// back to a parent channel that the host agent may not be currently
|
||||||
|
// "in" from the backend's perspective, and to deliver guide-injected
|
||||||
|
// system intros into sub-discussion channels without needing to log
|
||||||
|
// in as a real user. Author is a sentinel UUID that no real user
|
||||||
|
// ever has; `wakeupUserId` (optional) lets the caller precisely wake
|
||||||
|
// one recipient (e.g. the host of a closing sub-discussion).
|
||||||
|
if (req.isSystem) {
|
||||||
|
const sysChannel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||||
|
if (!sysChannel) throw new NotFoundException('channel not found');
|
||||||
|
if (sysChannel.closed) {
|
||||||
|
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
|
||||||
|
}
|
||||||
|
const SYSTEM_USER_ID = '00000000-0000-0000-0000-000000000000';
|
||||||
|
let sysContent = body.content ?? '';
|
||||||
|
const sysNames = extractNameMentions(sysContent);
|
||||||
|
if (sysNames.length) {
|
||||||
|
const nameMap = await resolveUserNames(sysNames);
|
||||||
|
sysContent = replaceNameMentions(sysContent, nameMap);
|
||||||
|
}
|
||||||
|
const sysMessage = await this.persistMessage(channelId, {
|
||||||
|
authorUserId: SYSTEM_USER_ID,
|
||||||
|
content: sysContent,
|
||||||
|
clientMessageId: body.clientMessageId,
|
||||||
|
replyToMessageId: body.replyToMessageId,
|
||||||
|
mentions: body.mentions,
|
||||||
|
attachments: body.attachments,
|
||||||
|
});
|
||||||
|
const sysView = this.toView(sysMessage) as Record<string, unknown>;
|
||||||
|
await this.saveIdempotentResponse(scope, idempotencyKey, sysView);
|
||||||
|
await this.events.emit({
|
||||||
|
eventType: 'message.created',
|
||||||
|
channelId,
|
||||||
|
actorId: SYSTEM_USER_ID,
|
||||||
|
data: sysView,
|
||||||
|
});
|
||||||
|
// wakeupUserId set -> emitMessageTargeted wakes exactly that user
|
||||||
|
// (everyone else gets the same message with wakeup=false).
|
||||||
|
// wakeupUserId omitted/null -> emitMessageCreated routes via the
|
||||||
|
// channel's xType-specific 3-state delivery with empty wakeSet, so
|
||||||
|
// nobody is woken (the message lands in history only).
|
||||||
|
const wakeupUserId = typeof body.wakeupUserId === 'string' ? body.wakeupUserId.trim() : '';
|
||||||
|
if (wakeupUserId) {
|
||||||
|
await this.realtime.emitMessageTargeted(channelId, sysView, wakeupUserId);
|
||||||
|
} else {
|
||||||
|
await this.realtime.emitMessageCreated(channelId, sysView, {
|
||||||
|
xType: sysChannel.xType ?? 'general',
|
||||||
|
authorUserId: SYSTEM_USER_ID,
|
||||||
|
wakeUserIds: new Set<string>(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return sysView;
|
||||||
|
}
|
||||||
|
|
||||||
// Guild C-1: caller must be a participant of the channel, and the
|
// Guild C-1: caller must be a participant of the channel, and the
|
||||||
// author is always the authenticated user — body.authorUserId is
|
// author is always the authenticated user — body.authorUserId is
|
||||||
// ignored so a caller can never post as someone else.
|
// ignored so a caller can never post as someone else.
|
||||||
|
|||||||
Reference in New Issue
Block a user