5 Commits

Author SHA1 Message Date
a3acebf694 Merge feat/triage-wake-mapping-reassign: privileged triage wake_mapping endpoint 2026-06-10 17:32:50 +01:00
5a182827c8 feat(channels): PUT /channels/:id/wake-mapping to reassign triage on-duty
On-call handoff needs to move a triage channel's wake_mapping (who triage
arrivals wake) from the outgoing to the incoming agent. Until now wake rows
were only written at channel creation; the only post-create mutation was
leave() deleting your own row, and PATCH allows just `purpose`. So there was
no way for an agent to hand off the baton without an admin DB UPDATE.

Add a privileged PUT /channels/:id/wake-mapping {wakeUserIds:[]}:
- Gated on the system key (req.isSystem from x-fabric-system-key) — a normal
  agent's guild token gets 403. The handoff is driven by ClawSkills
  fabric-ctrl set-on-duty, which reads the shared key from secret-mgr's public
  scope so the agent never handles it.
- Restricted to xType=triage (other types derive wake differently); non-triage
  → 400.
- Atomic swap of all wake rows; ensures each new wake user is a channel member
  first (the realtime gateway only routes triage to members) and notifies them.

Verified on the sim guild backend: 403 without the key, 400 on a report
channel, reassign succeeds with the key, DB wake_mapping row flips, and the
set-on-duty script round-trips without ever echoing the secret.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-10 17:32:50 +01:00
340eed8aa3 feat(guild): restore system-key bypass + isSystem msg path
Resurrects the x-fabric-system-key bypass + isSystem branch on POST
/channels/:id/messages, dropped in ca20df7 when dialectic stopped
broadcasting topic lifecycle events to Fabric. Re-enabling now because
Fabric.OpenclawPlugin's close-sub-discussion needs to write a callback
into a parent channel as a system-authored message (not as the closing
host), with an optional precision wakeup so the recruitment workflow can
resume immediately after an interview sub-discussion closes.

Three coupled bits:

1. ApiKeyGuard pre-Bearer bypass: when x-fabric-system-key matches
   FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY, set req.isSystem=true and
   skip the Bearer check. Intentionally reuses the existing commands
   sync env — same shared secret, same consumer (the OpenclawPlugin
   reads channels.fabric.commandsSyncKey for both paths). One less env
   to rotate, one less secret to manage.

2. messaging.controller POST /channels/:id/messages adds an isSystem
   branch (runs before the participant gate):

   - Looks up the channel directly (not via assertParticipant).
   - Persists with sentinel author 00000000-0000-0000-0000-000000000000,
     same UUID the old impl used.
   - Translates <@user.name:NAME> mentions like the regular path.
   - When wakeupUserId is set, delivers via emitMessageTargeted so that
     exactly that one recipient receives wakeup=true; everyone else gets
     wakeup=false. When omitted, delivers via emitMessageCreated with an
     empty wakeUserIds set so nobody is woken — silent system log.

   Two intentional differences from the 985b06a original:
   - No xType=announce restriction. The original was limited to announce
     because that was Dialectic's only use case; we now need this on dm /
     general / discuss / etc. for the sub-discussion callback. Closed
     channels are still rejected (409) on both paths.
   - The wakeupUserId field is new — old impl only ever sent silent
     announces.

3. DTO carries wakeupUserId? optional string. Ignored on the regular
   user-bearer path; load-bearing on the system path.

Shared helper: extracted commands.controller's private safeEqual into
src/common/safe-equal.ts so api-key.guard.ts can use the same constant-
time check. Vitest spec covers equal / inequal / length-mismatch / empty
cases. Existing unit tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 20:51:19 +01:00
h z
3f77c0e35d fix(agent-presence): upsert atomically — kill first-time-insert race (#3) 2026-05-26 02:06:20 +00:00
38b4665321 fix(agent-presence): upsert atomically — kill first-time-insert race
Previous setStatus() did read-modify-write:
  findOne → if-exists save / else create+save

Two concurrent first-time writes for the same userId both saw no row,
both INSERT'd, second hit unique-key (agent_presences.PRIMARY) and 500'd
with "Duplicate entry '<userId>' for key 'agent_presences.PRIMARY'" —
visible in prod (2026-05-25 23:23:35Z) when Fabric.OpenclawPlugin's
presence-sync emitted two PUTs ~10 ms apart for the same agent (its
tick-overlap is being fixed separately in nav/Fabric.OpenclawPlugin).

Replace with repo.upsert(values, ['userId']) — compiles to MySQL
`INSERT … ON DUPLICATE KEY UPDATE`, atomic at the storage engine,
no read needed, no race window. Synthesize the returned entity from
the values we just wrote rather than a SELECT round-trip; controller
only reads {userId, status} off it.

Sim verified with 5 parallel PUTs to a fresh userId: all 200, no
Duplicate errors in guild log (was: 1 × 200 + 4 × 500 with the
old code).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:07 +01:00
9 changed files with 209 additions and 20 deletions

View File

@@ -39,15 +39,23 @@ export class AgentPresenceService {
* Upsert a user's presence. Source is a free-text tag for debugging
* (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence
* calls this; the plugin pushes only on diff so writes are sparse.
*
* Implementation note: the older findOne+save split was a read-modify-
* write race — two concurrent first-time writes for the same userId
* would both read no row, both INSERT, second hits unique-key dup
* (`agent_presences.PRIMARY`) and 500s. Fabric.OpenclawPlugin's
* presence-sync occasionally fires two PUTs for the same agent within
* ~10 ms (tick overlap on its side — separate fix in the plugin),
* which surfaced this race in prod.
*
* `repo.upsert(values, conflictPaths)` compiles to MySQL
* `INSERT … ON DUPLICATE KEY UPDATE` and is atomic at the storage
* engine level — no read needed, no race window. We synthesize the
* returned entity from what we just wrote rather than round-tripping
* a SELECT — the controller only reads {userId, status} off it.
*/
async setStatus(userId: string, status: PresenceStatus, source: string): Promise<AgentPresence> {
const existing = await this.repo.findOne({ where: { userId } });
if (existing) {
existing.status = status;
existing.source = source;
return this.repo.save(existing);
}
const row = this.repo.create({ userId, status, source });
return this.repo.save(row);
await this.repo.upsert({ userId, status, source }, ['userId']);
return this.repo.create({ userId, status, source });
}
}

View File

@@ -1,8 +1,9 @@
import { BadRequestException, Body, Controller, Get, Param, Patch, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
import { BadRequestException, Body, Controller, ForbiddenException, Get, Param, Patch, Post, Put, Query, Req, UnauthorizedException } from '@nestjs/common';
import { ChannelsService } from './channels.service.js';
// ApiKeyGuard attaches the introspected Center user id onto the request.
type AuthedRequest = { userId?: string };
// ApiKeyGuard attaches the introspected Center user id onto the request,
// and sets isSystem=true for system-key callers (x-fabric-system-key).
type AuthedRequest = { userId?: string; isSystem?: boolean };
@Controller('channels')
export class ChannelsController {
@@ -59,6 +60,29 @@ export class ChannelsController {
return this.channelsService.updatePurpose(channelId, userId, body.purpose);
}
// Reassign a triage channel's wake_mapping (who triage arrivals wake) —
// the on-call-handoff baton. PRIVILEGED: requires the system key
// (x-fabric-system-key → req.isSystem); a normal agent's guild token is
// rejected. Restricted to triage channels in the service. Body:
// { wakeUserIds: string[] } (the new on-duty user id(s)).
@Put(':id/wake-mapping')
setWakeMapping(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: Record<string, unknown>,
) {
if (!req.isSystem) {
throw new ForbiddenException('wake_mapping reassignment requires the system key');
}
const wakeUserIds = Array.isArray(body.wakeUserIds)
? (body.wakeUserIds as unknown[]).map((u) => String(u ?? '').trim()).filter(Boolean)
: [];
if (!wakeUserIds.length) {
throw new BadRequestException('wakeUserIds (non-empty string[]) is required');
}
return this.channelsService.setTriageWakeMapping(channelId, wakeUserIds);
}
// Move an order member into the bypass list (discuss/work only).
@Post(':id/bypass')
bypass(

View File

@@ -248,6 +248,46 @@ export class ChannelsService {
};
}
// Reassign a TRIAGE channel's wake_mapping — the set of users woken by
// triage arrivals (the on-call-handoff baton). Privileged: the caller is
// gated at the controller on the system key, not a per-user membership
// check, because handing the baton to the *incoming* agent is exactly an
// operation the outgoing agent's own token shouldn't be trusted to scope.
//
// Atomically replaces every wake row for the channel. New wake users are
// ensured to be channel members first (the realtime gateway only routes a
// triage message to members, so a non-member on-duty agent would never be
// woken). Restricted to xType=triage; other types derive wake differently
// (custom=listeners, discuss/work=rotation) and must not be poked here.
async setTriageWakeMapping(channelId: string, wakeUserIds: string[]) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
if (channel.xType !== 'triage') {
throw new BadRequestException('wake_mapping reassignment is only allowed for triage channels');
}
const ids = [...new Set(wakeUserIds.map((u) => String(u ?? '').trim()).filter(Boolean))];
if (!ids.length) throw new BadRequestException('wakeUserIds must contain at least one user id');
const newMembers: string[] = [];
for (const userId of ids) {
const existing = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!existing) {
await this.memberRepo.save(this.memberRepo.create({ channelId, userId }));
newMembers.push(userId);
}
}
// Atomic swap: drop all current wake rows, then insert the new set.
await this.wakeRepo.delete({ channelId });
await this.wakeRepo.save(ids.map((userId) => this.wakeRepo.create({ channelId, userId })));
if (newMembers.length) {
this.notifyMembership('joined', channelId, newMembers, { xType: channel.xType });
}
return { id: channel.id, name: channel.name, xType: channel.xType, wakeUserIds: ids };
}
// Move an order member into the bypass list (discuss/work only).
// Any channel member may do this.
async moveToBypass(channelId: string, actorUserId: string, targetUserId: string) {

View File

@@ -8,19 +8,12 @@ import {
Req,
UnauthorizedException,
} from '@nestjs/common';
import { timingSafeEqual } from 'node:crypto';
import { CommandsService } from './commands.service.js';
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
import { safeEqual } from '../common/safe-equal.js';
type AuthedRequest = { userId?: string };
function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}
@Controller('commands')
export class CommandsController {
constructor(private readonly commands: CommandsService) {}

View File

@@ -5,6 +5,7 @@ import {
UnauthorizedException,
} from '@nestjs/common';
import { introspectGuildToken } from './center-auth.js';
import { safeEqual } from './safe-equal.js';
@Injectable()
export class ApiKeyGuard implements CanActivate {
@@ -21,6 +22,25 @@ export class ApiKeyGuard implements CanActivate {
return true;
}
// System-key bypass: when a caller presents x-fabric-system-key
// matching FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY (intentionally the
// same shared secret as x-commands-sync-key — both legitimate
// consumers are Fabric.OpenclawPlugin), skip the Bearer requirement
// and mark this as a system caller. Downstream handlers (e.g.
// messaging.controller POST /channels/:id/messages) gate on
// req.isSystem to take the system-author code path.
//
// Empty env → bypass disabled (no system caller ever valid; closed
// by default). Header carries the secret as-is; we constant-time
// compare against the env value.
const sysExpected = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
const sysHeader = req.headers['x-fabric-system-key'];
const sysProvided = Array.isArray(sysHeader) ? sysHeader[0] : sysHeader;
if (sysExpected && sysProvided && safeEqual(sysProvided, sysExpected)) {
(req as { isSystem?: boolean }).isSystem = true;
return true;
}
const auth = req.headers['authorization'];
const authValue = Array.isArray(auth) ? auth[0] : auth;
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';

View File

@@ -0,0 +1,25 @@
import { describe, it, expect } from 'vitest';
import { safeEqual } from './safe-equal.js';
describe('safeEqual', () => {
it('returns true for identical non-empty strings', () => {
expect(safeEqual('abc123', 'abc123')).toBe(true);
});
it('returns false for different strings of same length', () => {
expect(safeEqual('abc123', 'abc124')).toBe(false);
});
it('returns false for differing lengths', () => {
expect(safeEqual('abc', 'abcd')).toBe(false);
});
it('handles empty strings', () => {
// both empty is technically equal — but downstream callers should
// explicitly check for empty expected before invoking. We just
// document the constant-time-comparison primitive's behavior.
expect(safeEqual('', '')).toBe(true);
expect(safeEqual('a', '')).toBe(false);
expect(safeEqual('', 'a')).toBe(false);
});
});

12
src/common/safe-equal.ts Normal file
View File

@@ -0,0 +1,12 @@
import { timingSafeEqual } from 'node:crypto';
// Constant-time string comparison. Returns false for length mismatch (the
// length difference itself is observable, but the per-byte loop isn't).
// Used for shared-secret header checks (commands-sync-key, system-key,
// etc.) to keep timing-oracle attacks off the table.
export function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}

View File

@@ -56,4 +56,14 @@ export class CreateMessageDto {
@IsString()
@MaxLength(64)
authorUserId?: string;
// System-author path only (x-fabric-system-key gated). When set, the
// message is delivered via emitMessageTargeted so this single recipient
// gets wakeup=true; everyone else in the channel sees wakeup=false. For
// regular (user-bearer) posts this field is ignored. Used by
// close-sub-discussion to precisely wake the host on callback.
@IsOptional()
@IsString()
@MaxLength(64)
wakeupUserId?: string;
}

View File

@@ -156,13 +156,70 @@ export class MessagingController {
async create(
@Param('id') channelId: string,
@Body() body: CreateMessageDto,
@Req() req: { userId?: string },
@Req() req: { userId?: string; isSystem?: boolean },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// System caller (ApiKeyGuard set isSystem from x-fabric-system-key):
// skip the per-user participant check; resolve channel directly.
// System posts are allowed into any non-closed channel — used by
// Fabric.OpenclawPlugin to write `close-sub-discussion` callbacks
// back to a parent channel that the host agent may not be currently
// "in" from the backend's perspective, and to deliver guide-injected
// system intros into sub-discussion channels without needing to log
// in as a real user. Author is a sentinel UUID that no real user
// ever has; `wakeupUserId` (optional) lets the caller precisely wake
// one recipient (e.g. the host of a closing sub-discussion).
if (req.isSystem) {
const sysChannel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!sysChannel) throw new NotFoundException('channel not found');
if (sysChannel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
}
const SYSTEM_USER_ID = '00000000-0000-0000-0000-000000000000';
let sysContent = body.content ?? '';
const sysNames = extractNameMentions(sysContent);
if (sysNames.length) {
const nameMap = await resolveUserNames(sysNames);
sysContent = replaceNameMentions(sysContent, nameMap);
}
const sysMessage = await this.persistMessage(channelId, {
authorUserId: SYSTEM_USER_ID,
content: sysContent,
clientMessageId: body.clientMessageId,
replyToMessageId: body.replyToMessageId,
mentions: body.mentions,
attachments: body.attachments,
});
const sysView = this.toView(sysMessage) as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, sysView);
await this.events.emit({
eventType: 'message.created',
channelId,
actorId: SYSTEM_USER_ID,
data: sysView,
});
// wakeupUserId set -> emitMessageTargeted wakes exactly that user
// (everyone else gets the same message with wakeup=false).
// wakeupUserId omitted/null -> emitMessageCreated routes via the
// channel's xType-specific 3-state delivery with empty wakeSet, so
// nobody is woken (the message lands in history only).
const wakeupUserId = typeof body.wakeupUserId === 'string' ? body.wakeupUserId.trim() : '';
if (wakeupUserId) {
await this.realtime.emitMessageTargeted(channelId, sysView, wakeupUserId);
} else {
await this.realtime.emitMessageCreated(channelId, sysView, {
xType: sysChannel.xType ?? 'general',
authorUserId: SYSTEM_USER_ID,
wakeUserIds: new Set<string>(),
});
}
return sysView;
}
// Guild C-1: caller must be a participant of the channel, and the
// author is always the authenticated user — body.authorUserId is
// ignored so a caller can never post as someone else.