Compare commits
3 Commits
ca20df7618
...
340eed8aa3
| Author | SHA1 | Date | |
|---|---|---|---|
| 340eed8aa3 | |||
| 3f77c0e35d | |||
| 38b4665321 |
@@ -39,15 +39,23 @@ export class AgentPresenceService {
|
||||
* Upsert a user's presence. Source is a free-text tag for debugging
|
||||
* (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence
|
||||
* calls this; the plugin pushes only on diff so writes are sparse.
|
||||
*
|
||||
* Implementation note: the older findOne+save split was a read-modify-
|
||||
* write race — two concurrent first-time writes for the same userId
|
||||
* would both read no row, both INSERT, second hits unique-key dup
|
||||
* (`agent_presences.PRIMARY`) and 500s. Fabric.OpenclawPlugin's
|
||||
* presence-sync occasionally fires two PUTs for the same agent within
|
||||
* ~10 ms (tick overlap on its side — separate fix in the plugin),
|
||||
* which surfaced this race in prod.
|
||||
*
|
||||
* `repo.upsert(values, conflictPaths)` compiles to MySQL
|
||||
* `INSERT … ON DUPLICATE KEY UPDATE` and is atomic at the storage
|
||||
* engine level — no read needed, no race window. We synthesize the
|
||||
* returned entity from what we just wrote rather than round-tripping
|
||||
* a SELECT — the controller only reads {userId, status} off it.
|
||||
*/
|
||||
async setStatus(userId: string, status: PresenceStatus, source: string): Promise<AgentPresence> {
|
||||
const existing = await this.repo.findOne({ where: { userId } });
|
||||
if (existing) {
|
||||
existing.status = status;
|
||||
existing.source = source;
|
||||
return this.repo.save(existing);
|
||||
}
|
||||
const row = this.repo.create({ userId, status, source });
|
||||
return this.repo.save(row);
|
||||
await this.repo.upsert({ userId, status, source }, ['userId']);
|
||||
return this.repo.create({ userId, status, source });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,19 +8,12 @@ import {
|
||||
Req,
|
||||
UnauthorizedException,
|
||||
} from '@nestjs/common';
|
||||
import { timingSafeEqual } from 'node:crypto';
|
||||
import { CommandsService } from './commands.service.js';
|
||||
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
|
||||
import { safeEqual } from '../common/safe-equal.js';
|
||||
|
||||
type AuthedRequest = { userId?: string };
|
||||
|
||||
function safeEqual(a: string, b: string): boolean {
|
||||
const ab = Buffer.from(a);
|
||||
const bb = Buffer.from(b);
|
||||
if (ab.length !== bb.length) return false;
|
||||
return timingSafeEqual(ab, bb);
|
||||
}
|
||||
|
||||
@Controller('commands')
|
||||
export class CommandsController {
|
||||
constructor(private readonly commands: CommandsService) {}
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
UnauthorizedException,
|
||||
} from '@nestjs/common';
|
||||
import { introspectGuildToken } from './center-auth.js';
|
||||
import { safeEqual } from './safe-equal.js';
|
||||
|
||||
@Injectable()
|
||||
export class ApiKeyGuard implements CanActivate {
|
||||
@@ -21,6 +22,25 @@ export class ApiKeyGuard implements CanActivate {
|
||||
return true;
|
||||
}
|
||||
|
||||
// System-key bypass: when a caller presents x-fabric-system-key
|
||||
// matching FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY (intentionally the
|
||||
// same shared secret as x-commands-sync-key — both legitimate
|
||||
// consumers are Fabric.OpenclawPlugin), skip the Bearer requirement
|
||||
// and mark this as a system caller. Downstream handlers (e.g.
|
||||
// messaging.controller POST /channels/:id/messages) gate on
|
||||
// req.isSystem to take the system-author code path.
|
||||
//
|
||||
// Empty env → bypass disabled (no system caller ever valid; closed
|
||||
// by default). Header carries the secret as-is; we constant-time
|
||||
// compare against the env value.
|
||||
const sysExpected = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
|
||||
const sysHeader = req.headers['x-fabric-system-key'];
|
||||
const sysProvided = Array.isArray(sysHeader) ? sysHeader[0] : sysHeader;
|
||||
if (sysExpected && sysProvided && safeEqual(sysProvided, sysExpected)) {
|
||||
(req as { isSystem?: boolean }).isSystem = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
const auth = req.headers['authorization'];
|
||||
const authValue = Array.isArray(auth) ? auth[0] : auth;
|
||||
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
|
||||
|
||||
25
src/common/safe-equal.spec.ts
Normal file
25
src/common/safe-equal.spec.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { safeEqual } from './safe-equal.js';
|
||||
|
||||
describe('safeEqual', () => {
|
||||
it('returns true for identical non-empty strings', () => {
|
||||
expect(safeEqual('abc123', 'abc123')).toBe(true);
|
||||
});
|
||||
|
||||
it('returns false for different strings of same length', () => {
|
||||
expect(safeEqual('abc123', 'abc124')).toBe(false);
|
||||
});
|
||||
|
||||
it('returns false for differing lengths', () => {
|
||||
expect(safeEqual('abc', 'abcd')).toBe(false);
|
||||
});
|
||||
|
||||
it('handles empty strings', () => {
|
||||
// both empty is technically equal — but downstream callers should
|
||||
// explicitly check for empty expected before invoking. We just
|
||||
// document the constant-time-comparison primitive's behavior.
|
||||
expect(safeEqual('', '')).toBe(true);
|
||||
expect(safeEqual('a', '')).toBe(false);
|
||||
expect(safeEqual('', 'a')).toBe(false);
|
||||
});
|
||||
});
|
||||
12
src/common/safe-equal.ts
Normal file
12
src/common/safe-equal.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { timingSafeEqual } from 'node:crypto';
|
||||
|
||||
// Constant-time string comparison. Returns false for length mismatch (the
|
||||
// length difference itself is observable, but the per-byte loop isn't).
|
||||
// Used for shared-secret header checks (commands-sync-key, system-key,
|
||||
// etc.) to keep timing-oracle attacks off the table.
|
||||
export function safeEqual(a: string, b: string): boolean {
|
||||
const ab = Buffer.from(a);
|
||||
const bb = Buffer.from(b);
|
||||
if (ab.length !== bb.length) return false;
|
||||
return timingSafeEqual(ab, bb);
|
||||
}
|
||||
@@ -56,4 +56,14 @@ export class CreateMessageDto {
|
||||
@IsString()
|
||||
@MaxLength(64)
|
||||
authorUserId?: string;
|
||||
|
||||
// System-author path only (x-fabric-system-key gated). When set, the
|
||||
// message is delivered via emitMessageTargeted so this single recipient
|
||||
// gets wakeup=true; everyone else in the channel sees wakeup=false. For
|
||||
// regular (user-bearer) posts this field is ignored. Used by
|
||||
// close-sub-discussion to precisely wake the host on callback.
|
||||
@IsOptional()
|
||||
@IsString()
|
||||
@MaxLength(64)
|
||||
wakeupUserId?: string;
|
||||
}
|
||||
|
||||
@@ -156,13 +156,70 @@ export class MessagingController {
|
||||
async create(
|
||||
@Param('id') channelId: string,
|
||||
@Body() body: CreateMessageDto,
|
||||
@Req() req: { userId?: string },
|
||||
@Req() req: { userId?: string; isSystem?: boolean },
|
||||
@Headers('idempotency-key') idempotencyKey?: string,
|
||||
) {
|
||||
const scope = `POST:/channels/${channelId}/messages`;
|
||||
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
|
||||
if (existed) return existed;
|
||||
|
||||
// System caller (ApiKeyGuard set isSystem from x-fabric-system-key):
|
||||
// skip the per-user participant check; resolve channel directly.
|
||||
// System posts are allowed into any non-closed channel — used by
|
||||
// Fabric.OpenclawPlugin to write `close-sub-discussion` callbacks
|
||||
// back to a parent channel that the host agent may not be currently
|
||||
// "in" from the backend's perspective, and to deliver guide-injected
|
||||
// system intros into sub-discussion channels without needing to log
|
||||
// in as a real user. Author is a sentinel UUID that no real user
|
||||
// ever has; `wakeupUserId` (optional) lets the caller precisely wake
|
||||
// one recipient (e.g. the host of a closing sub-discussion).
|
||||
if (req.isSystem) {
|
||||
const sysChannel = await this.channelRepo.findOne({ where: { id: channelId } });
|
||||
if (!sysChannel) throw new NotFoundException('channel not found');
|
||||
if (sysChannel.closed) {
|
||||
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
|
||||
}
|
||||
const SYSTEM_USER_ID = '00000000-0000-0000-0000-000000000000';
|
||||
let sysContent = body.content ?? '';
|
||||
const sysNames = extractNameMentions(sysContent);
|
||||
if (sysNames.length) {
|
||||
const nameMap = await resolveUserNames(sysNames);
|
||||
sysContent = replaceNameMentions(sysContent, nameMap);
|
||||
}
|
||||
const sysMessage = await this.persistMessage(channelId, {
|
||||
authorUserId: SYSTEM_USER_ID,
|
||||
content: sysContent,
|
||||
clientMessageId: body.clientMessageId,
|
||||
replyToMessageId: body.replyToMessageId,
|
||||
mentions: body.mentions,
|
||||
attachments: body.attachments,
|
||||
});
|
||||
const sysView = this.toView(sysMessage) as Record<string, unknown>;
|
||||
await this.saveIdempotentResponse(scope, idempotencyKey, sysView);
|
||||
await this.events.emit({
|
||||
eventType: 'message.created',
|
||||
channelId,
|
||||
actorId: SYSTEM_USER_ID,
|
||||
data: sysView,
|
||||
});
|
||||
// wakeupUserId set -> emitMessageTargeted wakes exactly that user
|
||||
// (everyone else gets the same message with wakeup=false).
|
||||
// wakeupUserId omitted/null -> emitMessageCreated routes via the
|
||||
// channel's xType-specific 3-state delivery with empty wakeSet, so
|
||||
// nobody is woken (the message lands in history only).
|
||||
const wakeupUserId = typeof body.wakeupUserId === 'string' ? body.wakeupUserId.trim() : '';
|
||||
if (wakeupUserId) {
|
||||
await this.realtime.emitMessageTargeted(channelId, sysView, wakeupUserId);
|
||||
} else {
|
||||
await this.realtime.emitMessageCreated(channelId, sysView, {
|
||||
xType: sysChannel.xType ?? 'general',
|
||||
authorUserId: SYSTEM_USER_ID,
|
||||
wakeUserIds: new Set<string>(),
|
||||
});
|
||||
}
|
||||
return sysView;
|
||||
}
|
||||
|
||||
// Guild C-1: caller must be a participant of the channel, and the
|
||||
// author is always the authenticated user — body.authorUserId is
|
||||
// ignored so a caller can never post as someone else.
|
||||
|
||||
Reference in New Issue
Block a user