29 Commits

Author SHA1 Message Date
340eed8aa3 feat(guild): restore system-key bypass + isSystem msg path
Resurrects the x-fabric-system-key bypass + isSystem branch on POST
/channels/:id/messages, dropped in ca20df7 when dialectic stopped
broadcasting topic lifecycle events to Fabric. Re-enabling now because
Fabric.OpenclawPlugin's close-sub-discussion needs to write a callback
into a parent channel as a system-authored message (not as the closing
host), with an optional precision wakeup so the recruitment workflow can
resume immediately after an interview sub-discussion closes.

Three coupled bits:

1. ApiKeyGuard pre-Bearer bypass: when x-fabric-system-key matches
   FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY, set req.isSystem=true and
   skip the Bearer check. Intentionally reuses the existing commands
   sync env — same shared secret, same consumer (the OpenclawPlugin
   reads channels.fabric.commandsSyncKey for both paths). One less env
   to rotate, one less secret to manage.

2. messaging.controller POST /channels/:id/messages adds an isSystem
   branch (runs before the participant gate):

   - Looks up the channel directly (not via assertParticipant).
   - Persists with sentinel author 00000000-0000-0000-0000-000000000000,
     same UUID the old impl used.
   - Translates <@user.name:NAME> mentions like the regular path.
   - When wakeupUserId is set, delivers via emitMessageTargeted so that
     exactly that one recipient receives wakeup=true; everyone else gets
     wakeup=false. When omitted, delivers via emitMessageCreated with an
     empty wakeUserIds set so nobody is woken — silent system log.

   Two intentional differences from the 985b06a original:
   - No xType=announce restriction. The original was limited to announce
     because that was Dialectic's only use case; we now need this on dm /
     general / discuss / etc. for the sub-discussion callback. Closed
     channels are still rejected (409) on both paths.
   - The wakeupUserId field is new — old impl only ever sent silent
     announces.

3. DTO carries wakeupUserId? optional string. Ignored on the regular
   user-bearer path; load-bearing on the system path.

Shared helper: extracted commands.controller's private safeEqual into
src/common/safe-equal.ts so api-key.guard.ts can use the same constant-
time check. Vitest spec covers equal / inequal / length-mismatch / empty
cases. Existing unit tests still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 20:51:19 +01:00
h z
3f77c0e35d fix(agent-presence): upsert atomically — kill first-time-insert race (#3) 2026-05-26 02:06:20 +00:00
38b4665321 fix(agent-presence): upsert atomically — kill first-time-insert race
Previous setStatus() did read-modify-write:
  findOne → if-exists save / else create+save

Two concurrent first-time writes for the same userId both saw no row,
both INSERT'd, second hit unique-key (agent_presences.PRIMARY) and 500'd
with "Duplicate entry '<userId>' for key 'agent_presences.PRIMARY'" —
visible in prod (2026-05-25 23:23:35Z) when Fabric.OpenclawPlugin's
presence-sync emitted two PUTs ~10 ms apart for the same agent (its
tick-overlap is being fixed separately in nav/Fabric.OpenclawPlugin).

Replace with repo.upsert(values, ['userId']) — compiles to MySQL
`INSERT … ON DUPLICATE KEY UPDATE`, atomic at the storage engine,
no read needed, no race window. Synthesize the returned entity from
the values we just wrote rather than a SELECT round-trip; controller
only reads {userId, status} off it.

Sim verified with 5 parallel PUTs to a fresh userId: all 200, no
Duplicate errors in guild log (was: 1 × 200 + 4 × 500 with the
old code).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-26 02:25:07 +01:00
ca20df7618 refactor(guild): drop system-key bypass + announce-only-system limit
Pairs with Dialectic.Backend@5cf4302 which removes the backend-driven
broadcaster that was the only consumer of the x-fabric-system-key
header path. Backend cleanup is complete on the consumer side; this
removes the producer-side surface.

Removed:
  - ApiKeyGuard: x-fabric-system-key bypass branch (sysExpected /
    sysProvided / req.isSystem flag) — only Bearer flow remains.
  - messaging.controller.create(): the entire 'if (req.isSystem)'
    branch including the SYSTEM_USER_ID='00000000-...-0000' sentinel
    persistence path.
  - messaging.controller.create(): the 'if (xType === announce) throw
    announce_system_only' gate. Announce channels are now ordinary
    channels — any participant can POST. Use case: agents post one-off
    recruitment broadcasts via fabric-send-message (e.g. dialectic
    'come participate in topic X' messages).
  - cli/gen-system-api-key.ts: deleted (was the generator for the env
    that's no longer read).

Kept:
  - channel.purpose field + PATCH /api/channels/:id (member auth for
    setting purpose — agents use this to label channels for
    fabric-channel-list discoverability).
  - cli/print-commands-sync-key.ts (separate key, separate lifecycle).
  - GuildRole.isSystem flag (unrelated — system-role permission gate).
2026-05-23 23:49:47 +01:00
cb7b3bb5fe feat(channel-discovery): add purpose column + PATCH /api/channels/:id
Adds a free-form 'purpose' text field on Channel so agents (or anyone
creating a channel via API) can describe what the channel is for —
'debate broadcasts', 'security alerts', etc. — and other agents can
later find the right channel by intent rather than channel id.

Wire:
  - Channel.purpose (text, nullable; TypeORM synchronize auto-adds)
  - POST /api/channels accepts optional 'purpose' in body
  - GET /api/channels returns purpose on every row (already returns the
    full entity via {...c})
  - PATCH /api/channels/:id { purpose } — member-or-public auth (mirrors
    the close() rule). Today only 'purpose' is patchable; other fields
    would get their own typed branch.

Frontend create form continues to omit the field — purpose stays optional.
This pairs with Fabric.OpenclawPlugin's fabric-channel-set-purpose tool +
fabric-channel-list returning purpose, so agent workflows can say 'find
an announce channel about X' instead of pinning a UUID.
2026-05-23 19:22:00 +01:00
985b06a886 feat(guild): system-key bypass + announce-only system path + gen CLI
Three coupled changes that let Dialectic.Backend (and future system
broadcasters) post to announce channels without needing a Fabric user
bearer.

1. ApiKeyGuard: when x-fabric-system-key matches
   FABRIC_BACKEND_GUILD_SYSTEM_API_KEY env, skip the Bearer requirement
   and set req.isSystem=true. Pre-Bearer system bypass; no per-user
   session token needed. Empty env -> bypass disabled (closed by default).

2. messaging.controller POST /channels/:id/messages: when req.isSystem,
   skip assertParticipant + fetch channel directly. Enforce xType=announce
   (system key only writes to announce channels - never to regular chats).
   Persist with sentinel author 00000000-0000-0000-0000-000000000000.
   Emit message.created + realtime.emitMessageCreated with xType=announce
   so the Phase 1 busy-discard logic kicks in for recipients.

3. New cli: src/cli/gen-system-api-key.ts. Generates a random 32-byte
   hex key (same shape as agent + admin keys) and prints it. Does NOT
   store - operator pastes into compose env and restarts guild. Pattern
   mirrors the existing print-commands-sync-key.ts.

Removes the need for a FABRIC_BOT_BEARER_TOKEN concept entirely - the
system key alone is sufficient. announce-channel posts by regular
authenticated users (who happen to know channel id but no system key)
are now 403 announce_system_only.
2026-05-23 17:49:53 +01:00
80ee9082f3 feat(guild): announce channel type + agent-presence + busy-discard
Phase 1 of DIALECTIC-V2 — adds Fabric infrastructure for
system-broadcast channels with HF-status-aware delivery filtering.

New channel x_type 'announce':
- channels.entity.ts + channels.service.ts + realtime.gateway.ts
  enum + union extended.
- computeDelivery() adds an 'announce' case: recipient with
  presence='busy' → 'skip' (discarded silently); other presences →
  'observer' (delivered, no wake). System-broadcast semantics —
  agents proactively check their announce inbox when they're ready,
  not interrupted out of band.
- messaging.controller POST guard: announce-type channels reject
  posts that don't present x-fabric-system-key header matching
  FABRIC_BACKEND_GUILD_SYSTEM_API_KEY env. Empty env = no system
  caller is valid (closed-by-default).

New entity + module agent_presences:
- agent-presence.entity.ts: per-user (userId PK) status enum
  (idle/on_call/busy/exhausted/offline/unknown), source tag, updatedAt
- agent-presence.service.ts: getStatus/getStatusMap (bulk for
  delivery-time fanout) + setStatus (upsert)
- agent-presence.controller.ts: GET + PUT /agents/:userId/presence
- agent-presence.module.ts: TypeORM forFeature + wired into AppModule
- buildTypeOrmConfig() entities list extended

RealtimeGateway wiring:
- New optional  field on the gateway (typed loosely to avoid
  circular import). RealtimeModule.onModuleInit() assigns from the
  injected AgentPresenceService — degrades gracefully (no busy-discard,
  treat all as 'unknown') if presence wiring is ever removed.
- emitMessageCreated pre-loads presence per fanout only when xType is
  'announce' (other xTypes bypass the lookup entirely).

Note: actual presence data writes come from Fabric.OpenclawPlugin's
presence-sync loop (separate commit on that submodule); without it,
all rows are 'unknown' and announce delivery falls through to the
default observer behavior (no busy filtering). System-only POST gate
is independent and works immediately.

See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md sections 7 + 10 Phase 1.
2026-05-23 11:31:47 +01:00
801b562999 Merge pull request 'feat(triage): 3-state delivery + admin observer + admin cache' (#2) from feat/triage-3state-delivery into main 2026-05-22 21:59:19 +00:00
7cb046d785 feat(triage): 3-state delivery + admin observer + admin cache
Triage channels now compute a 3-state delivery decision per recipient
(wake / observer / skip) instead of the binary wakeup flag, and route
according to:

  1. author never gets back their own message            → skip
  2. wake_mapping member (on-duty)                       → wake
  3. mention (NEW: was 'skip' for triage before)         → wake
  4. Center-scoped admin (at most 1)                     → observer
  5. anyone else                                         → skip
                                                         (was 'deliver wake=false')

Skipping means the websocket emit is omitted entirely — the recipient's
openclaw plugin never sees the message and the agent's session stays
free of background noise. Observer means delivered with wakeup=false
(silent UI / no model dispatch on the plugin side).

## What this PR ships

### realtime/realtime.gateway.ts
- new `computeDelivery()` returns DeliveryDecision = 'wake'|'observer'|'skip'
- old `computeWakeup()` kept as a deprecated wrapper for callers that
  still want the boolean answer (treats observer + skip as false)
- `emitMessageCreated` accepts `adminUserId?: string|null` and now
  short-circuits on 'skip' (no socket emit at all)
- general kept its current behavior; custom kept its current behavior
  (members not in wake_mapping become observer instead of `wake=false`)
  — the user-visible bit is just that the response field is the same
  `wakeup: boolean`; the explicit 'skip' is new for triage

### common/center-auth.ts
- `fetchAdminEmail()` calls GET `${center}/auth/admin-email` with the
  existing x-api-key (same auth as introspect/resolve-names). Returns
  `{email, userId}` or `null` on either "no admin" or any error

### common/admin-cache.service.ts (NEW)
- `AdminCacheService` — in-memory cache, 1-day TTL, lazy refresh.
  `get(force=true)` bypasses TTL for cli-triggered refresh
- exposed by MessagingModule

### messaging/messaging.controller.ts
- non-rotating branch threads `adminUserId` into emitMessageCreated

### cli/admin-refresh.ts (NEW)
- `node dist/cli/admin-refresh.js` — force-refresh cache and print
  before/after JSON. Use after a Center `user set-admin` so triage
  delivery picks up the new admin without waiting for 24h TTL

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:14:05 +01:00
e635faea9c Merge pull request 'feat(realtime): push channel.joined/left events to user-scoped rooms' (#1) from feat/push-channel-membership-events into main 2026-05-21 07:12:51 +00:00
30069377e7 feat(realtime): push channel.joined/left events to user-scoped rooms
Backend half of the plugin push-based channel sync (companion to
nav/Fabric.OpenclawPlugin#1 follow-up). Before this, the OpenClaw
fabric inbound had to poll `/api/channels?guildId=...` every 60s to
discover newly-joined channels (any DM another user just dragged the
agent into). Now the server tells the agent's socket directly so
sub/unsub is realtime.

Changes:
- realtime.gateway.ts:
  * handleConnection joins the socket into a `user:<userId>` room.
    All of a user's connected sockets now share that room.
  * New `emitToUser(userId, event, data)` helper that emits into
    that room. No-op for offline users (next connect resyncs via the
    plugin's initial channel-list fetch).
- channels.service.ts:
  * Inject RealtimeGateway (RealtimeModule is @Global, no module
    plumbing needed).
  * Private `notifyMembership(kind, channelId, userIds, extra)`
    helper that emits `channel.<kind>` (joined|left) with payload
    {channelId, userId, xType, occurredAt}.
  * create(): emit channel.joined to every seeded member (creator +
    explicit memberUserIds + triage on-duty).
  * joinChannel(): emit channel.joined to userId (only if the row was
    actually inserted, idempotent on existing membership).
  * leaveChannel(): emit channel.left to userId iff a row was
    actually deleted.

Event shape:
  {
    channelId: string,
    userId: string,
    xType?: string,
    occurredAt: ISO string,
  }

Client-side contract (fabric plugin):
  socket.on('channel.joined', m => socket.emit('join_channel', {channelId: m.channelId}))
  socket.on('channel.left',   m => socket.emit('leave_channel', {channelId: m.channelId}))

The plugin keeps its 60s polling resync as a safety net for missed
events (transient socket drops between emit and reconnect, partial
failures, etc).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:07:46 +01:00
b1f7467161 feat(guild): add 'dm' x-type (private 1:1, always-wake)
channel enum + X_TYPES + realtime XType gain 'dm'. dm channels are
forced private (never public) and non-unique (no dedup; create()
always makes a fresh one). computeWakeup: dm wakes every non-author
participant unconditionally (no rotation / no wake_mapping). The
message.created realtime payload now carries xType so the plugin can
treat dm specially.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 09:18:19 +01:00
7e944a08f6 feat(ops): CLI to print the commands-sync key (Guild C-2)
node dist/cli/print-commands-sync-key.js (npm run print:commands-key)
outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY as the process sees it,
so an operator can docker-exec it on the deployed guild and copy the
value into the plugin's FABRIC_COMMANDS_SYNC_KEY. --export prints a
ready-to-paste assignment; exits 1 when unset (fallback mode).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 18:28:23 +01:00
e45ad91340 fix(security): close Critical IDOR/authz gaps (C-1/C-2)
C-1: messaging endpoints now enforce channel participation (public
     channels open; private require channel_members). authorUserId is
     forced to the authenticated user (no more author spoofing); edit/
     delete require message-author ownership; history read gated too.
C-2: PUT /commands body strictly validated + size-capped via
     SyncCommandsDto (kills catalog poisoning / DoS). Optional
     FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY restricts the write to the
     plugin when set; never weaker than before when unset.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:47:08 +01:00
3e96de730a docs: slash-command registry section
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:15:04 +01:00
f54ed6abb5 feat(guild): slash-command registry (sync + list API)
Guild-global slash-command catalog (one row per node guild). The
OpenClaw plugin PUTs the native-command specs (same data Discord
registers as slash commands); the frontend GETs it for / autocomplete.

- GuildCommand entity (guild_id unique, commands json, updatedAt)
- PUT /api/commands  -> idempotent full replace (any authed agent/user)
- GET /api/commands  -> { commands, updatedAt } (authed)
- stored verbatim (NativeCommandSpec-shaped); execution path unchanged:
  a /<cmd> message is delivered as a normal message -> plugin ->
  OpenClaw command system (only /no-reply, /force-proceed stay
  server-intercepted).

Verified: PUT->{ok,count}, GET round-trips args/choices, no-auth->401.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:02:49 +01:00
8de5736a59 docs: rewrite README to match current architecture
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 12:53:23 +01:00
58badf328c feat(guild): file upload/retention + channel canvas
Files:
- StoredFile entity + FilesModule: multipart upload (configurable
  FABRIC_BACKEND_GUILD_FILE_MAX_BYTES, default 100MB; no type limit),
  authenticated download (Bearer or ?access_token=), hourly + on-boot
  retention sweep (FABRIC_BACKEND_GUILD_FILE_TTL_DAYS, default 7).
- ApiKeyGuard also accepts ?access_token= (browser <img>/<a>).

Canvas:
- ChannelCanvas entity (one active per channel) + CanvasModule:
  GET / PUT|POST (share-replace, caller becomes sharer) /
  PATCH (sharer-only in-place update, version++) / DELETE (sharer-only).
  Emits canvas.updated / canvas.removed to the channel room.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 20:17:02 +01:00
b3fcefb5ec feat(channels): bypass-list for discuss/work rotation
- channel_turn_state.bypass_user_ids: order and bypass form a disjoint
  partition of the channel members; bypass excluded from rotation.
- initForChannel(channelId, members, bypass=[]) computes order = members
  − bypass; create() passes bypassUserIds (∩ members) for discuss/work.
- pushFrame() enforces mention nesting cap: max 4 sub-frames (5 levels
  incl. root); overflow evicts the bottom-most (root->A..D + E => root->B..E).
- mention sites use pushFrame so bypass members are only transiently
  pulled in via @-mention, then return to bypass on pop.
- moveToBypass(): move an order member to bypass mid-rotation; if current
  speaker, successor takes over. onMemberRemoved also strips bypass.
- POST /channels/:id/bypass; GET :id/members now returns {userId,bypass}.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 19:26:18 +01:00
8c41d23a9c refactor: migrate to ES modules
package.json type=module, tsconfig module/moduleResolution=NodeNext,
target es2022, explicit .js on all relative imports. Center: jsonwebtoken
& bcryptjs switched to default imports (ESM/CJS interop). Verified:
builds, boots, full auth + plugin round-trip work under ESM.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 18:47:36 +01:00
9670da400e feat(guild): closed channel (discussion-complete support)
Channel.closed; POST /channels/:id/close (member-only); message/command
posts on closed channel -> 409 {error:channel_closed}; GET history still
allowed; listForUser carries closed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 16:52:43 +01:00
22fd834ed0 feat(guild): translate <@user.name:NAME> -> <@userId>
Before persist/parse, resolve <@user.name:NAME> (outside backticks) via
Center and rewrite to <@userId>; unresolved tokens left as-is. Translated
ids then flow into the existing mention/wakeup/sub-frame logic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 15:47:01 +01:00
02b7c72e70 feat(guild): <@id> mention mechanism
- parse <@user-id> outside backtick spans
- general: message with an at-list wakes only the at'd users (else all)
- report/triage/custom: mentions change nothing
- discuss/work: mention by current speaker pushes a sub-rotation frame
  (atList = mentions - sender, intersected with channel members); single
  linear pass (real/no-reply/force-proceed), then pop back to the saved
  parent pointer (resumes at the pusher); nested frames supported

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 15:27:35 +01:00
182cfb3c41 feat(guild): GET /channels/:id/members
List explicit channel members (userIds) for the split members sidebar.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 15:00:24 +01:00
6b993522cf feat(guild): wake_mapping, per-recipient wakeup, discuss/work turn engine, channel join/leave
- wake_mapping table; triage onDuty (auto-added member) / custom listeners
- per-recipient wakeup metadata on message.created (one message-id; added
  only at push). Rules: author=false; triage/custom=wake_mapping only;
  general=all; report=none
- discuss/work rotation: channel_turn_state (order/currentSpeaker/round
  events/cross-round no-reply streak); null activation, queue-jump,
  /no-reply pass, all-/no-reply pause, end-of-round shuffle (trailing
  no-reply run to tail, head shuffled, first != last normal speaker)
- slash-command registry (/no-reply, /force-proceed); registered commands
  intercepted and never delivered; guild-authored /ack persisted
- POST /channels/:id/join|leave; leave cleans channel_members, wake_mapping
  and turn-state order

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 14:51:09 +01:00
605d3ac092 feat(guild): required channel x_type enum
Channel.x_type enum(general|work|report|discuss|triage|custom); required
and validated on channel creation (400 if missing/invalid).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 09:35:36 +01:00
774dff11ba feat(guild): channel membership + public visibility
- new channel_members table; creator always added, plus selected members
- Channel.isPublic (default false): public channels visible to all guild
  members; non-public only to explicit members
- GET /channels filters to channels visible to the requesting user

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 09:09:41 +01:00
nav
78d2179e8c fix(guild): validate channel create payload and return 400 2026-05-14 16:49:56 +00:00
nav
9ad6ccaa3d feat(guild): enable CORS and add members listing API 2026-05-14 16:46:30 +00:00
60 changed files with 2836 additions and 123 deletions

View File

@@ -1,22 +1,71 @@
# Fabric.Backend.Guild
Guild Node service for Fabric.
A **guild node** for Fabric (NestJS, ES modules, MySQL/TypeORM,
socket.io). Default port `7002`, global prefix `/api`. Many independent
guild nodes can run; each registers with `Fabric.Backend.Center` and
introspects the user/guild tokens Center issues.
## Scope (MVP)
- Workspace/Guild/Channel/DM
- Message create/edit/delete/reply/@mention
- Per-channel/DM seq ordering + gap backfill API
- Webhook/Bot integration surface
- Guild-level audit logs
## Responsibilities
## Next
- API skeleton (NestJS)
- Chat domain models
- Seq allocator and range query endpoints
- **Guilds / channels / messaging** — per-channel `seq` ordering, edit
window, soft delete, reply, `<@id>` mentions (backtick-aware) plus
`<@user.name:NAME>``<@userId>` translation via Center.
- **Channel `x_type`** (required on create): `general`, `work`, `report`,
`discuss`, `triage`, `custom`. Plus `isPublic` and `closed` (closed →
history readable, posting returns `409`).
- **`wake_mapping`** — explicit wake list for `triage` (on-duty) and
`custom` (listeners) channels.
- **Per-recipient `wakeup`** — `message.created` is emitted per socket with
its own `wakeup` flag (author=false; general→all; report→none;
triage/custom→wake_mapping; discuss/work→the current speaker only). This
is **push-only metadata for the OpenClaw plugin**; UIs ignore it.
- **discuss/work turn engine** (`channel_turn_state`): speaking order and a
disjoint **bypass list** (bypass members aren't woken unless @-mentioned);
activation from idle, queue-jump, cross-round `/no-reply` pause,
`/force-proceed`, end-of-round shuffle, guild `/ack`, and a mention
sub-frame stack with a 5-level nesting cap (root + 4). `moveToBypass`
mid-rotation.
- **Files** — `POST /files` (multipart, configurable max size, default
100 MB), `GET /files/:id` (Bearer **or** `?access_token=` for browser
`<img>/<a>`), automatic retention sweep (default 7 days). Messages carry
`attachments[]`.
- **Channel canvas** — one pinned document per channel (`md`/`html`/`text`),
re-share replaces, only the original sharer may update/remove; emits
`canvas.updated` / `canvas.removed`.
- **Slash-command registry** — guild-global catalog: `PUT /api/commands`
(the OpenClaw plugin syncs OpenClaw's native-command specs here),
`GET /api/commands` (frontend `/` autocomplete). Stored verbatim;
execution is unchanged (a `/<cmd>` message flows normally to the plugin →
OpenClaw command system; only `/no-reply`,`/force-proceed` are
server-intercepted).
- **Realtime** — socket.io `/realtime`; `join_channel`/`leave_channel`,
`message.created/updated/deleted`, `canvas.*`, presence, typing.
## Required env (hard-checked at startup)
## Required env (startup hard checks)
- `FABRIC_BACKEND_GUILD_CENTER_BASE_URL`
- `FABRIC_BACKEND_GUILD_CENTER_API_KEY`
- `FABRIC_BACKEND_GUILD_NODE_ID`
If any of the above is missing, service startup fails immediately.
Missing any of these aborts startup.
## Other env
- `FABRIC_BACKEND_GUILD_PORT` (default 7002)
- `FABRIC_BACKEND_GUILD_DB_*`, `FABRIC_BACKEND_GUILD_DB_SYNC`
- `FABRIC_BACKEND_GUILD_FILE_DIR` (storage root),
`FABRIC_BACKEND_GUILD_FILE_MAX_BYTES` (default 100 MB),
`FABRIC_BACKEND_GUILD_FILE_TTL_DAYS` (default 7)
- `FABRIC_BACKEND_GUILD_CORS_ORIGINS` (empty = allow all; `null` origin —
`file://` desktop — is always allowed)
## Run
```bash
npm install
npm run build && npm start # or: npm run start:dev
```
Usually run via the root `docker-compose.local.yml` (`backend-guild1`
`test-guild1` :7002, `backend-guild2` `test-guild2` :7003). Schema is
auto-managed (`DB_SYNC`). ES modules (`NodeNext`).

View File

@@ -2,10 +2,12 @@
"name": "fabric-backend-guild",
"version": "0.1.0",
"private": true,
"type": "module",
"description": "Fabric Guild Node service",
"scripts": {
"build": "tsc -p tsconfig.build.json",
"start": "node dist/main.js",
"print:commands-key": "node dist/cli/print-commands-sync-key.js",
"start:dev": "ts-node src/main.ts",
"lint": "eslint 'src/**/*.ts'",
"lint:fix": "eslint 'src/**/*.ts' --fix",

View File

@@ -0,0 +1,42 @@
import { BadRequestException, Body, Controller, Get, Param, Put } from '@nestjs/common';
import { AgentPresenceService, PresenceStatus } from './agent-presence.service.js';
const VALID: PresenceStatus[] = ['idle', 'on_call', 'busy', 'exhausted', 'offline', 'unknown'];
interface PutBody {
status?: string;
source?: string;
}
@Controller('agents/:userId/presence')
export class AgentPresenceController {
constructor(private readonly svc: AgentPresenceService) {}
/**
* Read a user's current presence cache row.
* Auth: ApiKeyGuard (global). Any introspected center user can read.
*/
@Get()
async get(@Param('userId') userId: string): Promise<{ userId: string; status: PresenceStatus }> {
const status = await this.svc.getStatus(userId);
return { userId, status };
}
/**
* Push a presence update. Called by Fabric.OpenclawPlugin's
* `presence-sync` loop on each delta. Auth: ApiKeyGuard (global) +
* the plugin uses its center-introspected api key.
*
* `source` is a debug tag describing who pushed (e.g. 'hf-plugin',
* 'manual'). Stored verbatim for trail.
*/
@Put()
async put(@Param('userId') userId: string, @Body() body: PutBody): Promise<{ userId: string; status: PresenceStatus }> {
if (!body?.status || !VALID.includes(body.status as PresenceStatus)) {
throw new BadRequestException(`status must be one of ${VALID.join('|')}`);
}
const source = (body.source ?? 'unknown').slice(0, 64);
const row = await this.svc.setStatus(userId, body.status as PresenceStatus, source);
return { userId: row.userId, status: row.status };
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AgentPresence } from '../entities/agent-presence.entity.js';
import { AgentPresenceController } from './agent-presence.controller.js';
import { AgentPresenceService } from './agent-presence.service.js';
@Module({
imports: [TypeOrmModule.forFeature([AgentPresence])],
controllers: [AgentPresenceController],
providers: [AgentPresenceService],
exports: [AgentPresenceService],
})
export class AgentPresenceModule {}

View File

@@ -0,0 +1,61 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AgentPresence } from '../entities/agent-presence.entity.js';
export type PresenceStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown';
@Injectable()
export class AgentPresenceService {
constructor(
@InjectRepository(AgentPresence)
private readonly repo: Repository<AgentPresence>,
) {}
/**
* Get a user's current presence. Returns 'unknown' if no row.
* Used by `RealtimeGateway` per-recipient when xType === 'announce'.
*/
async getStatus(userId: string): Promise<PresenceStatus> {
if (!userId) return 'unknown';
const row = await this.repo.findOne({ where: { userId } });
return row?.status ?? 'unknown';
}
/** Bulk variant for delivery-time lookups across many recipients in one trip. */
async getStatusMap(userIds: string[]): Promise<Map<string, PresenceStatus>> {
const out = new Map<string, PresenceStatus>();
for (const id of userIds) out.set(id, 'unknown');
if (userIds.length === 0) return out;
const rows = await this.repo
.createQueryBuilder('p')
.where('p.userId IN (:...ids)', { ids: userIds })
.getMany();
for (const r of rows) out.set(r.userId, r.status);
return out;
}
/**
* Upsert a user's presence. Source is a free-text tag for debugging
* (e.g. "hf-plugin", "manual", "test"). PUT /agents/:id/presence
* calls this; the plugin pushes only on diff so writes are sparse.
*
* Implementation note: the older findOne+save split was a read-modify-
* write race — two concurrent first-time writes for the same userId
* would both read no row, both INSERT, second hits unique-key dup
* (`agent_presences.PRIMARY`) and 500s. Fabric.OpenclawPlugin's
* presence-sync occasionally fires two PUTs for the same agent within
* ~10 ms (tick overlap on its side — separate fix in the plugin),
* which surfaced this race in prod.
*
* `repo.upsert(values, conflictPaths)` compiles to MySQL
* `INSERT … ON DUPLICATE KEY UPDATE` and is atomic at the storage
* engine level — no read needed, no race window. We synthesize the
* returned entity from what we just wrote rather than round-tripping
* a SELECT — the controller only reads {userId, status} off it.
*/
async setStatus(userId: string, status: PresenceStatus, source: string): Promise<AgentPresence> {
await this.repo.upsert({ userId, status, source }, ['userId']);
return this.repo.create({ userId, status, source });
}
}

View File

@@ -1,16 +1,22 @@
import { Module } from '@nestjs/common';
import { APP_GUARD } from '@nestjs/core';
import { TypeOrmModule } from '@nestjs/typeorm';
import { buildTypeOrmConfig } from './database.config';
import { HealthController } from './common/health.controller';
import { MetricsController } from './common/metrics.controller';
import { MetricsService } from './common/metrics.service';
import { ApiKeyGuard } from './common/api-key.guard';
import { GuildsModule } from './guilds/guilds.module';
import { ChannelsModule } from './channels/channels.module';
import { MessagingModule } from './messaging/messaging.module';
import { EventsModule } from './events/events.module';
import { RealtimeModule } from './realtime/realtime.module';
import { buildTypeOrmConfig } from './database.config.js';
import { HealthController } from './common/health.controller.js';
import { MetricsController } from './common/metrics.controller.js';
import { MetricsService } from './common/metrics.service.js';
import { ApiKeyGuard } from './common/api-key.guard.js';
import { GuildsModule } from './guilds/guilds.module.js';
import { ChannelsModule } from './channels/channels.module.js';
import { TurnModule } from './channels/turn.module.js';
import { MessagingModule } from './messaging/messaging.module.js';
import { EventsModule } from './events/events.module.js';
import { RealtimeModule } from './realtime/realtime.module.js';
import { MembersModule } from './members/members.module.js';
import { FilesModule } from './files/files.module.js';
import { CanvasModule } from './canvas/canvas.module.js';
import { CommandsModule } from './commands/commands.module.js';
import { AgentPresenceModule } from './agents/agent-presence.module.js';
@Module({
imports: [
@@ -18,8 +24,14 @@ import { RealtimeModule } from './realtime/realtime.module';
EventsModule,
RealtimeModule,
GuildsModule,
TurnModule,
ChannelsModule,
MembersModule,
MessagingModule,
FilesModule,
CanvasModule,
CommandsModule,
AgentPresenceModule,
],
controllers: [HealthController, MetricsController],
providers: [

View File

@@ -0,0 +1,58 @@
import {
Body,
Controller,
Delete,
Get,
Param,
Post,
Put,
Patch,
Req,
UnauthorizedException,
} from '@nestjs/common';
import { CanvasService } from './canvas.service.js';
type AuthedRequest = { userId?: string };
type CanvasBody = { title?: string; format?: string; source?: string };
@Controller('channels/:id/canvas')
export class CanvasController {
constructor(private readonly canvas: CanvasService) {}
private uid(req: AuthedRequest): string {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return userId;
}
@Get()
get(@Req() req: AuthedRequest, @Param('id') channelId: string) {
return this.canvas.get(channelId, this.uid(req));
}
// share / replace (caller becomes the sharer)
@Put()
@Post()
share(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: CanvasBody,
) {
return this.canvas.share(channelId, this.uid(req), body ?? {});
}
// update in place (original sharer only)
@Patch()
update(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: CanvasBody,
) {
return this.canvas.update(channelId, this.uid(req), body ?? {});
}
@Delete()
remove(@Req() req: AuthedRequest, @Param('id') channelId: string) {
return this.canvas.remove(channelId, this.uid(req));
}
}

View File

@@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { ChannelCanvas } from '../entities/channel-canvas.entity.js';
import { CanvasController } from './canvas.controller.js';
import { CanvasService } from './canvas.service.js';
@Module({
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, ChannelCanvas])],
controllers: [CanvasController],
providers: [CanvasService],
})
export class CanvasModule {}

View File

@@ -0,0 +1,147 @@
import {
BadRequestException,
ForbiddenException,
Injectable,
NotFoundException,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import {
ChannelCanvas,
type CanvasFormat,
} from '../entities/channel-canvas.entity.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const FORMATS: CanvasFormat[] = ['md', 'html', 'text'];
@Injectable()
export class CanvasService {
constructor(
@InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(ChannelCanvas)
private readonly canvasRepo: Repository<ChannelCanvas>,
private readonly realtime: RealtimeGateway,
) {}
private view(c: ChannelCanvas) {
return {
channelId: c.channelId,
sharerUserId: c.sharerUserId,
title: c.title,
format: c.format,
source: c.source,
version: c.version,
createdAt: c.createdAt.toISOString(),
updatedAt: c.updatedAt.toISOString(),
};
}
private async assertChannel(channelId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
return channel;
}
private async assertParticipant(channelId: string, userId: string) {
const channel = await this.assertChannel(channelId);
if (channel.isPublic) return channel;
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member) throw new ForbiddenException('not a channel member');
return channel;
}
async get(channelId: string, userId: string) {
await this.assertParticipant(channelId, userId);
const c = await this.canvasRepo.findOne({ where: { channelId } });
return c ? this.view(c) : null;
}
private normalize(input: {
title?: string;
format?: string;
source?: string;
}) {
const title = String(input.title ?? '').trim().slice(0, 200) || 'Untitled';
const format = String(input.format ?? 'md') as CanvasFormat;
if (!FORMATS.includes(format)) {
throw new BadRequestException(`format must be one of: ${FORMATS.join(', ')}`);
}
const source = String(input.source ?? '');
return { title, format, source };
}
// Share / replace the channel's single active canvas (caller becomes sharer).
async share(
channelId: string,
userId: string,
input: { title?: string; format?: string; source?: string },
) {
await this.assertParticipant(channelId, userId);
const { title, format, source } = this.normalize(input);
let c = await this.canvasRepo.findOne({ where: { channelId } });
if (c) {
c.sharerUserId = userId;
c.title = title;
c.format = format;
c.source = source;
c.version = 1;
} else {
c = this.canvasRepo.create({
channelId,
sharerUserId: userId,
title,
format,
source,
version: 1,
});
}
c = await this.canvasRepo.save(c);
const v = this.view(c);
this.realtime.emitChannelEvent(channelId, 'canvas.updated', v);
return v;
}
// Update the existing canvas in place — only the original sharer.
async update(
channelId: string,
userId: string,
input: { title?: string; format?: string; source?: string },
) {
await this.assertParticipant(channelId, userId);
const c = await this.canvasRepo.findOne({ where: { channelId } });
if (!c) throw new NotFoundException('no canvas shared in this channel');
if (c.sharerUserId !== userId) {
throw new ForbiddenException('only the original sharer may update the canvas');
}
const { title, format, source } = this.normalize({
title: input.title ?? c.title,
format: input.format ?? c.format,
source: input.source ?? c.source,
});
c.title = title;
c.format = format;
c.source = source;
c.version += 1;
const saved = await this.canvasRepo.save(c);
const v = this.view(saved);
this.realtime.emitChannelEvent(channelId, 'canvas.updated', v);
return v;
}
async remove(channelId: string, userId: string) {
await this.assertParticipant(channelId, userId);
const c = await this.canvasRepo.findOne({ where: { channelId } });
if (!c) return { status: 'ok' };
if (c.sharerUserId !== userId) {
throw new ForbiddenException('only the original sharer may remove the canvas');
}
await this.canvasRepo.delete({ id: c.id });
this.realtime.emitChannelEvent(channelId, 'canvas.removed', { channelId });
return { status: 'ok' };
}
}

View File

@@ -1,18 +1,105 @@
import { Body, Controller, Get, Post, Query } from '@nestjs/common';
import { ChannelsService } from './channels.service';
import { BadRequestException, Body, Controller, Get, Param, Patch, Post, Query, Req, UnauthorizedException } from '@nestjs/common';
import { ChannelsService } from './channels.service.js';
// ApiKeyGuard attaches the introspected Center user id onto the request.
type AuthedRequest = { userId?: string };
@Controller('channels')
export class ChannelsController {
constructor(private readonly channelsService: ChannelsService) {}
@Get()
list(@Query('guildId') guildId?: string) {
if (!guildId) return this.channelsService.listAll();
return this.channelsService.listByGuild(guildId);
list(@Req() req: AuthedRequest, @Query('guildId') guildId?: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.listForUser(String(guildId ?? ''), userId);
}
@Post()
create(@Body() body: Record<string, unknown>) {
return this.channelsService.create(body);
create(@Req() req: AuthedRequest, @Body() body: Record<string, unknown>) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.create(
{
guildId: body.guildId as string | undefined,
name: body.name as string | undefined,
kind: body.kind as string | undefined,
xType: body.xType as string | undefined,
isPublic: Boolean(body.isPublic),
memberUserIds: Array.isArray(body.memberUserIds) ? (body.memberUserIds as string[]) : [],
onDuty: body.onDuty as string | undefined,
listeners: Array.isArray(body.listeners) ? (body.listeners as string[]) : [],
bypassUserIds: Array.isArray(body.bypassUserIds)
? (body.bypassUserIds as string[])
: [],
purpose: body.purpose as string | undefined,
},
userId,
);
}
// Patch a channel's free-form purpose. Body: { purpose: string }. Pass
// empty string to clear. Auth: channel member (or anyone for public
// channels, mirroring close()). Frontend doesn't call this today —
// intended for agent-side use (fabric-channel-set-purpose tool).
@Patch(':id')
patch(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: Record<string, unknown>,
) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
// Only `purpose` is patchable today. Future patchable fields would
// get their own typed branch; we explicitly NOT allow {} no-op patches
// because that signals a caller bug.
if (typeof body.purpose !== 'string') {
throw new BadRequestException('purpose (string) is required');
}
return this.channelsService.updatePurpose(channelId, userId, body.purpose);
}
// Move an order member into the bypass list (discuss/work only).
@Post(':id/bypass')
bypass(
@Req() req: AuthedRequest,
@Param('id') channelId: string,
@Body() body: Record<string, unknown>,
) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.moveToBypass(
channelId,
userId,
String(body.userId ?? ''),
);
}
@Get(':id/members')
members(@Req() req: AuthedRequest, @Param('id') channelId: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.channelMembers(channelId);
}
@Post(':id/join')
join(@Req() req: AuthedRequest, @Param('id') channelId: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.joinChannel(channelId, userId);
}
@Post(':id/leave')
leave(@Req() req: AuthedRequest, @Param('id') channelId: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.leaveChannel(channelId, userId);
}
@Post(':id/close')
close(@Req() req: AuthedRequest, @Param('id') channelId: string) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
return this.channelsService.closeChannel(channelId, userId);
}
}

View File

@@ -1,11 +1,13 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ChannelsController } from './channels.controller';
import { Channel } from '../entities/channel.entity';
import { ChannelsService } from './channels.service';
import { ChannelsController } from './channels.controller.js';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { ChannelsService } from './channels.service.js';
@Module({
imports: [TypeOrmModule.forFeature([Channel])],
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, WakeMapping])],
controllers: [ChannelsController],
providers: [ChannelsService],
exports: [ChannelsService],

View File

@@ -1,38 +1,266 @@
import { Injectable } from '@nestjs/common';
import { BadRequestException, ForbiddenException, Injectable, NotFoundException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Channel } from '../entities/channel.entity';
import { In, Repository } from 'typeorm';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { TurnService } from './turn.service.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const X_TYPES = ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm', 'announce'] as const;
type XType = (typeof X_TYPES)[number];
type CreateChannelInput = {
guildId?: string;
name?: string;
kind?: string;
xType?: string;
isPublic?: boolean;
memberUserIds?: string[];
// required when xType === 'triage': the on-duty user for this channel
onDuty?: string;
// optional when xType === 'custom': users to wake on this channel
listeners?: string[];
// discuss/work only: members excluded from rotation (no wakeup unless
// @-mentioned). order and bypass partition the members disjointly.
bypassUserIds?: string[];
// Free-form description of what this channel is for. Optional; agents
// typically fill it when creating, members can later edit via
// PATCH /api/channels/:id.
purpose?: string;
};
@Injectable()
export class ChannelsService {
constructor(
@InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>,
private readonly turnService: TurnService,
// RealtimeGateway is provided by the global RealtimeModule. Used to
// push channel.joined / channel.left so connected clients (e.g. the
// OpenClaw fabric plugin) can sub/unsub socket.io rooms immediately
// instead of waiting for the polling fallback.
private readonly realtime: RealtimeGateway,
) {}
listByGuild(guildId: string) {
return this.channelRepo.find({
where: { guildId },
order: { createdAt: 'ASC' },
take: 200,
});
// Push a channel membership change to each affected user's socket-room.
// Best-effort: offline users see the new state on their next connect
// (the inbound runs an initial channel-list fetch on connect).
private notifyMembership(
kind: 'joined' | 'left',
channelId: string,
userIds: string[] | Set<string>,
extra: Record<string, unknown> = {},
): void {
const ids = userIds instanceof Set ? [...userIds] : userIds;
const payload = {
channelId,
...extra,
occurredAt: new Date().toISOString(),
};
for (const u of ids) {
if (!u) continue;
this.realtime.emitToUser(u, `channel.${kind}`, { ...payload, userId: u });
}
}
listAll() {
return this.channelRepo.find({
// Channels visible to a user within a guild:
// - every public channel of the guild (incl. ones created before the user
// joined the guild), OR
// - a non-public channel the user is an explicit member of.
async listForUser(guildId: string, userId: string) {
const all = await this.channelRepo.find({
where: guildId ? { guildId } : {},
order: { createdAt: 'ASC' },
take: 200,
take: 500,
});
if (!all.length) return [];
const memberRows = await this.memberRepo.find({
where: { userId, channelId: In(all.map((c) => c.id)) },
});
const memberChannelIds = new Set(memberRows.map((m) => m.channelId));
return all
.filter((c) => c.isPublic || memberChannelIds.has(c.id))
.map((c) => ({ ...c, isMember: memberChannelIds.has(c.id) }));
}
create(input: Partial<Channel>) {
const channel = this.channelRepo.create({
guildId: String(input.guildId ?? ''),
name: String(input.name ?? ''),
kind: input.kind === 'announcement' ? 'announcement' : 'text',
isPrivate: Boolean(input.isPrivate),
lastSeq: 0,
async channelMembers(channelId: string): Promise<{ userId: string; bypass: boolean }[]> {
const rows = await this.memberRepo.find({
where: { channelId },
order: { createdAt: 'ASC' },
});
return this.channelRepo.save(channel);
const bypass = new Set(await this.turnService.getBypassUserIds(channelId));
return rows.map((r) => ({ userId: r.userId, bypass: bypass.has(r.userId) }));
}
async closeChannel(channelId: string, userId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member && !channel.isPublic) {
throw new ForbiddenException('not a channel member');
}
channel.closed = true;
await this.channelRepo.save(channel);
return { status: 'ok', channelId, closed: true };
}
async joinChannel(channelId: string, userId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
const existing = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!existing) {
if (!channel.isPublic) {
throw new ForbiddenException('cannot join a non-public channel');
}
await this.memberRepo.save(this.memberRepo.create({ channelId, userId }));
if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberAdded(channelId, userId);
}
this.notifyMembership('joined', channelId, [userId], { xType: channel.xType });
}
return { status: 'ok', channelId, userId, member: true };
}
async leaveChannel(channelId: string, userId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
// remove every channel-scoped row that references this user
const deleted = await this.memberRepo.delete({ channelId, userId });
await this.wakeRepo.delete({ channelId, userId });
if (channel.xType === 'discuss' || channel.xType === 'work') {
await this.turnService.onMemberRemoved(channelId, userId);
}
if ((deleted.affected ?? 0) > 0) {
this.notifyMembership('left', channelId, [userId], { xType: channel.xType });
}
return { status: 'ok', channelId, userId, member: false };
}
async create(input: CreateChannelInput, creatorUserId: string) {
const guildId = String(input.guildId ?? '').trim();
const name = String(input.name ?? '').trim();
const xType = String(input.xType ?? '').trim() as XType;
if (!guildId) throw new BadRequestException('guildId is required');
if (!name) throw new BadRequestException('name is required');
if (!creatorUserId) throw new BadRequestException('creator is required');
if (!input.xType) throw new BadRequestException('xType is required');
if (!X_TYPES.includes(xType)) {
throw new BadRequestException(`xType must be one of: ${X_TYPES.join(', ')}`);
}
const onDuty = String(input.onDuty ?? '').trim();
if (xType === 'triage' && !onDuty) {
throw new BadRequestException('onDuty is required for triage channels');
}
const listeners = (input.listeners ?? [])
.map((x) => String(x ?? '').trim())
.filter(Boolean);
// dm channels are always private (a 1:1 conversation); never public.
// dm is not unique — multiple dm channels between the same users are
// allowed (create() always makes a fresh one, no dedup).
const isPublic = xType === 'dm' ? false : Boolean(input.isPublic);
const purposeRaw = String(input.purpose ?? '').trim();
const purpose = purposeRaw === '' ? null : purposeRaw;
const channel = await this.channelRepo.save(
this.channelRepo.create({
guildId,
name,
xType,
kind: input.kind === 'announcement' ? 'announcement' : 'text',
isPrivate: !isPublic,
isPublic,
purpose,
lastSeq: 0,
}),
);
// creator is always a member; merge in any explicitly selected members
const memberIds = new Set<string>([creatorUserId]);
for (const id of input.memberUserIds ?? []) {
const trimmed = String(id ?? '').trim();
if (trimmed) memberIds.add(trimmed);
}
// triage: the on-duty user is auto-added to the channel if not already in it
if (xType === 'triage') memberIds.add(onDuty);
await this.memberRepo.save(
[...memberIds].map((userId) => this.memberRepo.create({ channelId: channel.id, userId })),
);
// Push channel.joined to every seeded member (creator + invitees +
// triage on-duty) so their connected sockets sub the new room
// immediately. Skips offline users — next connect's channel-list
// fetch covers them.
this.notifyMembership('joined', channel.id, memberIds, { xType });
// wake_mapping: triage -> the on-duty user; custom -> each listener
const wakeUserIds = new Set<string>();
if (xType === 'triage') wakeUserIds.add(onDuty);
if (xType === 'custom') listeners.forEach((l) => wakeUserIds.add(l));
if (wakeUserIds.size) {
await this.wakeRepo.save(
[...wakeUserIds].map((userId) => this.wakeRepo.create({ channelId: channel.id, userId })),
);
}
// discuss/work: initialize rotation state (order = members sorted by id,
// currentSpeaker = null until someone proactively speaks)
if (xType === 'discuss' || xType === 'work') {
const bypass = (input.bypassUserIds ?? [])
.map((x) => String(x ?? '').trim())
.filter((x) => x && memberIds.has(x));
await this.turnService.initForChannel(channel.id, [...memberIds], bypass);
}
return channel;
}
// Update a channel's free-form purpose. Any channel member may do this
// (or any guild user if the channel is public, mirroring closeChannel's
// member-or-public rule). Pass an empty string to clear.
async updatePurpose(channelId: string, actorUserId: string, purpose: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
const member = await this.memberRepo.findOne({ where: { channelId, userId: actorUserId } });
if (!member && !channel.isPublic) {
throw new ForbiddenException('not a channel member');
}
const trimmed = String(purpose ?? '').trim();
channel.purpose = trimmed === '' ? null : trimmed;
const saved = await this.channelRepo.save(channel);
return {
id: saved.id,
name: saved.name,
xType: saved.xType,
purpose: saved.purpose,
};
}
// Move an order member into the bypass list (discuss/work only).
// Any channel member may do this.
async moveToBypass(channelId: string, actorUserId: string, targetUserId: string) {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
if (channel.xType !== 'discuss' && channel.xType !== 'work') {
throw new BadRequestException('bypass only applies to discuss/work channels');
}
const actor = await this.memberRepo.findOne({ where: { channelId, userId: actorUserId } });
if (!actor && !channel.isPublic) throw new ForbiddenException('not a channel member');
const target = await this.memberRepo.findOne({ where: { channelId, userId: targetUserId } });
if (!target) throw new BadRequestException('target is not a channel member');
await this.turnService.moveToBypass(channelId, targetUserId);
return { status: 'ok', channelId, userId: targetUserId, bypass: true };
}
}

80
src/channels/mentions.ts Normal file
View File

@@ -0,0 +1,80 @@
// Split content into parts, tagging which are inside backtick spans so we
// only touch mentions in non-code regions. Splitting keeps backtick runs.
function codeAwareParts(content: string): { text: string; code: boolean }[] {
const raw = content.split(/(`+)/);
const parts: { text: string; code: boolean }[] = [];
let inCode = false;
for (const seg of raw) {
if (/^`+$/.test(seg)) {
parts.push({ text: seg, code: true });
inCode = !inCode;
} else {
parts.push({ text: seg, code: inCode });
}
}
return parts;
}
const NAME_MENTION_RE = /<@user\.name:([^>]+)>/g;
// Names referenced via <@user.name:NAME> outside backtick spans.
export function extractNameMentions(content: string): string[] {
if (typeof content !== 'string' || !content) return [];
const out: string[] = [];
const seen = new Set<string>();
for (const p of codeAwareParts(content)) {
if (p.code) continue;
let m: RegExpExecArray | null;
NAME_MENTION_RE.lastIndex = 0;
while ((m = NAME_MENTION_RE.exec(p.text)) !== null) {
const name = m[1].trim();
if (name && !seen.has(name)) {
seen.add(name);
out.push(name);
}
}
}
return out;
}
// Replace <@user.name:NAME> with <@userId> for resolved names (outside
// backticks only); unresolved tokens are left untouched.
export function replaceNameMentions(content: string, resolved: Record<string, string>): string {
if (typeof content !== 'string' || !content) return content;
return codeAwareParts(content)
.map((p) =>
p.code
? p.text
: p.text.replace(NAME_MENTION_RE, (full, name: string) => {
const id = resolved[String(name).trim()];
return id ? `<@${id}>` : full;
}),
)
.join('');
}
// Parse <@user-id> mentions from message content. A mention does NOT count
// when it sits inside a backtick span (single ` or triple ``` — any backtick
// run toggles a code region). Returns unique ids in first-seen order.
export function parseMentions(content: string): string[] {
if (typeof content !== 'string' || !content) return [];
// strip backtick-delimited regions: split on runs of backticks; odd
// segments (between an opening and closing run) are code -> dropped.
const segments = content.split(/`+/);
let outside = '';
for (let i = 0; i < segments.length; i += 2) outside += segments[i] + ' ';
const ids: string[] = [];
const seen = new Set<string>();
const re = /<@([^>`\s]+)>/g;
let m: RegExpExecArray | null;
while ((m = re.exec(outside)) !== null) {
const id = m[1];
if (!seen.has(id)) {
seen.add(id);
ids.push(id);
}
}
return ids;
}

View File

@@ -0,0 +1,32 @@
// Registry of known slash commands. Only a message whose content matches a
// REGISTERED command name is treated as a command (intercepted, never
// delivered). Anything else that merely starts with '/' (e.g. /etc/passwd)
// is delivered as a normal message.
export const SLASH_COMMANDS = ['no-reply', 'force-proceed'] as const;
export type SlashCommandName = (typeof SLASH_COMMANDS)[number];
export type ParsedCommand = {
name: SlashCommandName;
opts: string[];
};
// Matches: /<name> optionally followed by whitespace + opts. The name must be
// a registered command (case-sensitive, exact) for it to count as a command.
export function parseSlashCommand(content: string): ParsedCommand | null {
if (typeof content !== 'string') return null;
const trimmed = content.trim();
if (!trimmed.startsWith('/')) return null;
const m = /^\/(\S+)(?:\s+([\s\S]*))?$/.exec(trimmed);
if (!m) return null;
const name = m[1];
if (!(SLASH_COMMANDS as readonly string[]).includes(name)) return null;
const opts = (m[2] ?? '').trim();
return {
name: name as SlashCommandName,
opts: opts ? opts.split(/\s+/) : [],
};
}

View File

@@ -0,0 +1,52 @@
import { RoundEvent } from '../entities/channel-turn-state.entity.js';
export type ShuffleResult = { paused: true } | { paused: false; newOrder: string[] };
function shuffleInPlace<T>(arr: T[]): T[] {
for (let i = arr.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[arr[i], arr[j]] = [arr[j], arr[i]];
}
return arr;
}
// End-of-round shuffle.
// - tail = the *last contiguous run* of /no-reply in the round's turn events
// (anchor = first of that run; kept in event order)
// - head = every current member not in tail, shuffled randomly
// - constraint: head[0] !== D, where D = the last member who delivered a
// normal message in the round
// - if the constraint is unsatisfiable (head empty, or head === [D]) the
// rotation pauses instead of shuffling (per spec B.3)
export function computeShuffle(roundEvents: RoundEvent[], currentMembers: string[]): ShuffleResult {
const memberSet = new Set(currentMembers);
// trailing contiguous /no-reply run, in event order, limited to current members
const tail: string[] = [];
for (let i = roundEvents.length - 1; i >= 0; i--) {
if (roundEvents[i].a !== 'noreply') break;
if (memberSet.has(roundEvents[i].u)) tail.unshift(roundEvents[i].u);
}
const tailSet = new Set(tail);
// D = last normal speaker in the round (the one right before the trailing run)
let d: string | null = null;
for (let i = roundEvents.length - 1; i >= 0; i--) {
if (roundEvents[i].a === 'normal' && memberSet.has(roundEvents[i].u)) {
d = roundEvents[i].u;
break;
}
}
const head = currentMembers.filter((u) => !tailSet.has(u));
if (head.length === 0) return { paused: true };
if (head.length === 1 && head[0] === d) return { paused: true };
shuffleInPlace(head);
if (head[0] === d) {
// length >= 2 here (the [d] singleton case returned paused above)
[head[0], head[1]] = [head[1], head[0]];
}
return { paused: false, newOrder: [...head, ...tail] };
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { TurnService } from './turn.service.js';
@Global()
@Module({
providers: [TurnService],
exports: [TurnService],
})
export class TurnModule {}

View File

@@ -0,0 +1,406 @@
import { Injectable } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { ChannelTurnState, TurnFrame } from '../entities/channel-turn-state.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { computeShuffle } from './turn-shuffle.js';
// wakeupUserId: the single user who should receive wakeup=true on the
// resulting push (null = nobody / paused). For commands, `ack` present means
// the controller must emit a guild-authored /ack message.
export type TurnDecision = { wakeupUserId: string | null };
export type CommandDecision = { ack: TurnDecision | null };
@Injectable()
export class TurnService {
constructor(private readonly dataSource: DataSource) {}
private async loadLocked(
manager: EntityManager,
channelId: string,
): Promise<ChannelTurnState | null> {
const state = await manager
.createQueryBuilder(ChannelTurnState, 's')
.setLock('pessimistic_write')
.where('s.channelId = :channelId', { channelId })
.getOne();
if (state && !Array.isArray(state.frames)) state.frames = [];
return state;
}
private async ensureState(
manager: EntityManager,
channelId: string,
): Promise<ChannelTurnState> {
let state = await this.loadLocked(manager, channelId);
if (state) return state;
const members = await manager.find(ChannelMember, { where: { channelId } });
const order = members.map((m) => m.userId).sort();
state = manager.create(ChannelTurnState, {
channelId,
orderUserIds: order,
currentSpeaker: null,
roundEvents: [],
norepStreak: [],
lastNormalSpeaker: null,
frames: [],
bypassUserIds: [],
});
return manager.save(ChannelTurnState, state);
}
private frames(state: ChannelTurnState): TurnFrame[] {
if (!Array.isArray(state.frames)) state.frames = [];
return state.frames;
}
private bypass(state: ChannelTurnState): string[] {
if (!Array.isArray(state.bypassUserIds)) state.bypassUserIds = [];
return state.bypassUserIds;
}
// Push a mention sub-frame, enforcing the nesting cap. Max 4 sub-frames
// (5 levels incl. root); a 5th push evicts the bottom-most sub-frame
// (the one directly above root) and shifts the rest down:
// root->A->B->C->D + E => root->B->C->D->E
private pushFrame(state: ChannelTurnState, order: string[]): void {
const fr = this.frames(state);
while (fr.length >= 4) fr.shift();
fr.push({ order, idx: 0 });
}
// effective current speaker = top sub-frame's pointer, else root speaker
private effectiveCurrent(state: ChannelTurnState): string | null {
const fr = this.frames(state);
while (fr.length) {
const top = fr[fr.length - 1];
if (!top.order.length) {
fr.pop();
continue;
}
const idx = Math.min(top.idx, top.order.length - 1);
return top.order[idx];
}
return state.currentSpeaker;
}
// advance / pop the active sub-frame; returns the new effective speaker.
// A single linear pass: acting at the last index pops the frame.
private advanceSubFrame(state: ChannelTurnState): string | null {
const fr = this.frames(state);
const top = fr[fr.length - 1];
if (top.idx >= top.order.length - 1) {
fr.pop();
} else {
top.idx += 1;
}
return this.effectiveCurrent(state);
}
async initForChannel(
channelId: string,
memberUserIds: string[],
bypassUserIds: string[] = [],
): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const existing = await manager.findOne(ChannelTurnState, { where: { channelId } });
const members = [...new Set(memberUserIds)];
const bypassSet = new Set(bypassUserIds.filter((u) => members.includes(u)));
// order and bypass are a disjoint partition of members
const order = members.filter((u) => !bypassSet.has(u)).sort();
const base = {
orderUserIds: order,
currentSpeaker: null,
roundEvents: [] as ChannelTurnState['roundEvents'],
norepStreak: [] as string[],
lastNormalSpeaker: null,
frames: [] as TurnFrame[],
bypassUserIds: [...bypassSet],
};
if (existing) {
Object.assign(existing, base);
await manager.save(ChannelTurnState, existing);
return;
}
await manager.save(ChannelTurnState, manager.create(ChannelTurnState, { channelId, ...base }));
});
}
// Read-only: userIds currently in the bypass list (no rotation wakeup
// unless @-mentioned). Empty if no turn state / not discuss-work.
async getBypassUserIds(channelId: string): Promise<string[]> {
const state = await this.dataSource
.getRepository(ChannelTurnState)
.findOne({ where: { channelId } });
return state && Array.isArray(state.bypassUserIds) ? state.bypassUserIds : [];
}
async onMemberAdded(channelId: string, userId: string): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const inBypass = this.bypass(state).includes(userId);
if (!state.orderUserIds.includes(userId) && !inBypass) {
state.orderUserIds = [...state.orderUserIds, userId];
await manager.save(ChannelTurnState, state);
}
});
}
// Move an order member into the bypass list (any channel member may do
// this). If they are the current speaker, the next one takes over.
async moveToBypass(channelId: string, userId: string): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const order = state.orderUserIds;
const idx = order.indexOf(userId);
if (idx === -1) return; // not in rotation (already bypass / unknown)
if (state.currentSpeaker === userId) {
const next = order.length > 1 ? order[(idx + 1) % order.length] : null;
state.currentSpeaker = next === userId ? null : next;
}
state.orderUserIds = order.filter((u) => u !== userId);
if (!state.orderUserIds.length) state.currentSpeaker = null;
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
// remove from active sub-frames (re-enters only via a future mention)
state.frames = this.frames(state)
.map((f) => ({ order: f.order.filter((u) => u !== userId), idx: f.idx }))
.filter((f) => f.order.length > 0)
.map((f) => ({ order: f.order, idx: Math.min(f.idx, f.order.length - 1) }));
const bp = this.bypass(state);
if (!bp.includes(userId)) bp.push(userId);
await manager.save(ChannelTurnState, state);
});
}
async onMemberRemoved(channelId: string, userId: string): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const state = await this.loadLocked(manager, channelId);
if (!state) return;
const order = state.orderUserIds;
const idx = order.indexOf(userId);
if (idx !== -1) {
let nextCurrent = state.currentSpeaker;
if (state.currentSpeaker === userId) {
nextCurrent = order.length > 1 ? order[(idx + 1) % order.length] : null;
if (nextCurrent === userId) nextCurrent = null;
}
state.orderUserIds = order.filter((u) => u !== userId);
state.currentSpeaker = state.orderUserIds.length ? nextCurrent : null;
}
state.norepStreak = state.norepStreak.filter((u) => u !== userId);
state.bypassUserIds = this.bypass(state).filter((u) => u !== userId);
// strip the leaver from every sub-frame; drop emptied frames; clamp idx
const fr = this.frames(state)
.map((f) => ({ order: f.order.filter((u) => u !== userId), idx: f.idx }))
.filter((f) => f.order.length > 0)
.map((f) => ({ order: f.order, idx: Math.min(f.idx, f.order.length - 1) }));
state.frames = fr;
await manager.save(ChannelTurnState, state);
});
}
// A normal (non-command) message in a discuss/work channel.
// mentionIds = raw parsed mentions; the at-list is (mentions - author)
// intersected with channel members.
async onNormalMessage(
channelId: string,
authorUserId: string,
mentionIds: string[] = [],
): Promise<TurnDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
state.norepStreak = [];
const memberRows = await manager.find(ChannelMember, { where: { channelId } });
const memberSet = new Set(memberRows.map((m) => m.userId));
const atList = [...new Set(mentionIds)].filter(
(id) => id !== authorUserId && memberSet.has(id),
);
const fr = this.frames(state);
// ---- a sub-frame is active
if (fr.length) {
const top = fr[fr.length - 1];
const cur = top.order[Math.min(top.idx, top.order.length - 1)];
if (authorUserId === cur) {
if (atList.length) {
this.pushFrame(state, atList);
await manager.save(ChannelTurnState, state);
return { wakeupUserId: atList[0] };
}
const next = this.advanceSubFrame(state);
await manager.save(ChannelTurnState, state);
return { wakeupUserId: next };
}
// queue-jump within the sub-frame: delivered, no advance, no push
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
}
// ---- root rotation active
const order = state.orderUserIds;
const n = order.length;
if (n <= 1) {
state.currentSpeaker = null;
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
}
if (state.currentSpeaker === null) {
// activation: mover to front, rotation starts at order[1]
const newOrder = [authorUserId, ...order.filter((u) => u !== authorUserId)];
state.orderUserIds = newOrder;
state.currentSpeaker = newOrder[1];
state.roundEvents = [{ u: authorUserId, a: 'normal' }];
state.lastNormalSpeaker = authorUserId;
await manager.save(ChannelTurnState, state);
return { wakeupUserId: newOrder[1] };
}
if (authorUserId === state.currentSpeaker) {
// current speaker mentioning -> push a sub-frame; root pointer (this
// speaker) is left as-is and resumes after the sub-frame pops
if (atList.length) {
this.pushFrame(state, atList);
await manager.save(ChannelTurnState, state);
return { wakeupUserId: atList[0] };
}
const idx = order.indexOf(authorUserId);
const isLast = idx === n - 1;
state.roundEvents = [...state.roundEvents, { u: authorUserId, a: 'normal' }];
state.lastNormalSpeaker = authorUserId;
if (isLast) {
const res = computeShuffle(state.roundEvents, order);
if (res.paused) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
}
state.orderUserIds = res.newOrder;
state.roundEvents = [];
state.lastNormalSpeaker = null;
state.currentSpeaker = res.newOrder[0];
await manager.save(ChannelTurnState, state);
return { wakeupUserId: res.newOrder[0] };
}
const succ = order[idx + 1];
state.currentSpeaker = succ;
await manager.save(ChannelTurnState, state);
return { wakeupUserId: succ };
}
// queue-jump normal message: no advance, nobody woken
await manager.save(ChannelTurnState, state);
return { wakeupUserId: null };
});
}
async onNoReply(channelId: string, senderUserId: string): Promise<CommandDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const fr = this.frames(state);
// sub-frame: /no-reply counts as "acted"; advance/pop, no shuffle/pause
if (fr.length) {
const top = fr[fr.length - 1];
const cur = top.order[Math.min(top.idx, top.order.length - 1)];
if (senderUserId !== cur) return { ack: null };
const next = this.advanceSubFrame(state);
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: next } };
}
const order = state.orderUserIds;
const n = order.length;
if (n <= 1 || state.currentSpeaker === null || senderUserId !== state.currentSpeaker) {
return { ack: null };
}
const idx = order.indexOf(senderUserId);
const isLast = idx === n - 1;
state.roundEvents = [...state.roundEvents, { u: senderUserId, a: 'noreply' }];
if (!state.norepStreak.includes(senderUserId)) {
state.norepStreak = [...state.norepStreak, senderUserId];
}
const allCovered = order.every((u) => state.norepStreak.includes(u));
if (allCovered) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: null } };
}
if (isLast) {
const res = computeShuffle(state.roundEvents, order);
if (res.paused) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: null } };
}
state.orderUserIds = res.newOrder;
state.roundEvents = [];
state.lastNormalSpeaker = null;
state.currentSpeaker = res.newOrder[0];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: res.newOrder[0] } };
}
const succ = order[idx + 1];
state.currentSpeaker = succ;
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: succ } };
});
}
async onForceProceed(channelId: string): Promise<CommandDecision> {
return this.dataSource.transaction(async (manager) => {
const state = await this.ensureState(manager, channelId);
const fr = this.frames(state);
if (fr.length) {
const top = fr[fr.length - 1];
if (!top.order.length) return { ack: null };
const next = this.advanceSubFrame(state);
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: next } };
}
const order = state.orderUserIds;
const n = order.length;
if (n <= 1 || state.currentSpeaker === null) return { ack: null };
const idx = order.indexOf(state.currentSpeaker);
const isLast = idx === n - 1;
if (isLast) {
const res = computeShuffle(state.roundEvents, order);
if (res.paused) {
state.currentSpeaker = null;
state.roundEvents = [];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: null } };
}
state.orderUserIds = res.newOrder;
state.roundEvents = [];
state.lastNormalSpeaker = null;
state.currentSpeaker = res.newOrder[0];
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: res.newOrder[0] } };
}
const succ = order[idx + 1];
state.currentSpeaker = succ;
await manager.save(ChannelTurnState, state);
return { ack: { wakeupUserId: succ } };
});
}
}

39
src/cli/admin-refresh.ts Normal file
View File

@@ -0,0 +1,39 @@
// Operator convenience: force-refresh the in-memory Center admin cache
// without waiting for the 1-day TTL. Used after `center user set-admin`
// to make new admin visible immediately to triage delivery.
//
// Usage (inside the deployed container):
// docker exec fabric-backend-guild node dist/cli/admin-refresh.js
//
// Prints the (possibly null) result as JSON. Exit 0 always — a "no
// admin" outcome is a valid state, not an error.
import 'reflect-metadata';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '../app.module.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
async function main() {
const app = await NestFactory.createApplicationContext(AppModule, { logger: ['error', 'warn'] });
try {
const cache = app.get(AdminCacheService);
const before = cache.snapshot();
const after = await cache.get(true);
process.stdout.write(
JSON.stringify({
ok: true,
before,
after,
changed: JSON.stringify(before) !== JSON.stringify(after),
}) + '\n',
);
} finally {
await app.close();
}
}
void main().catch((error: unknown) => {
const message = error instanceof Error ? error.message : 'unknown error';
process.stderr.write(JSON.stringify({ ok: false, error: message }) + '\n');
process.exit(1);
});

View File

@@ -0,0 +1,37 @@
// Operator convenience (Guild C-2): print the commands-sync key that this
// guild process actually has in its environment, so it can be copied into
// the OpenClaw plugin's FABRIC_COMMANDS_SYNC_KEY.
//
// Usage (inside the deployed container — authoritative, reflects compose):
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js
// docker exec fabric-backend-guild node dist/cli/print-commands-sync-key.js --export
//
// Default: prints the raw value only (so KEY=$(... ) works).
// --export: prints `FABRIC_COMMANDS_SYNC_KEY=<value>` for pasting.
// Exit 1 (no stdout) when unset — guild is then in the weaker
// "any authenticated user" fallback for PUT /commands.
const args = new Set(process.argv.slice(2));
if (args.has('--help') || args.has('-h')) {
process.stderr.write(
'print-commands-sync-key: outputs FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY\n' +
' (no flag) print the raw key value\n' +
' --export print FABRIC_COMMANDS_SYNC_KEY=<value>\n',
);
process.exit(0);
}
const key = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
if (!key) {
process.stderr.write(
'FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is not set — PUT /commands is in ' +
'the fallback mode (any authenticated user). Set it to harden (Guild C-2).\n',
);
process.exit(1);
}
process.stdout.write(
(args.has('--export') ? `FABRIC_COMMANDS_SYNC_KEY=${key}` : key) + '\n',
);

View File

@@ -0,0 +1,50 @@
import {
Body,
Controller,
ForbiddenException,
Get,
Headers,
Put,
Req,
UnauthorizedException,
} from '@nestjs/common';
import { CommandsService } from './commands.service.js';
import { SyncCommandsDto } from './dto.sync-commands.dto.js';
import { safeEqual } from '../common/safe-equal.js';
type AuthedRequest = { userId?: string };
@Controller('commands')
export class CommandsController {
constructor(private readonly commands: CommandsService) {}
// Guild C-2: catalog write is privileged. When
// FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY is configured (recommended in
// production), the caller must present a matching x-commands-sync-key
// header — this restricts writes to the OpenClaw plugin. When unset, we
// fall back to "any authenticated agent/user" (never weaker than before).
// The body is always strictly validated + size-capped via SyncCommandsDto.
@Put()
sync(
@Req() req: AuthedRequest,
@Body() body: SyncCommandsDto,
@Headers('x-commands-sync-key') syncKey?: string,
) {
const configured = process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '';
if (configured) {
if (!syncKey || !safeEqual(syncKey, configured)) {
throw new ForbiddenException('invalid commands sync key');
}
} else if (!req.userId) {
throw new UnauthorizedException('missing user');
}
return this.commands.sync(body.commands as unknown[]);
}
// Frontend reads the catalog to drive `/` autocomplete.
@Get()
list(@Req() req: AuthedRequest) {
if (!req.userId) throw new UnauthorizedException('missing user');
return this.commands.list();
}
}

View File

@@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { GuildCommand } from '../entities/guild-command.entity.js';
import { CommandsController } from './commands.controller.js';
import { CommandsService } from './commands.service.js';
@Module({
imports: [TypeOrmModule.forFeature([GuildCommand])],
controllers: [CommandsController],
providers: [CommandsService],
})
export class CommandsModule {}

View File

@@ -0,0 +1,39 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { GuildCommand } from '../entities/guild-command.entity.js';
// This node's guild id (one guild per node).
function guildId(): string {
return process.env.FABRIC_BACKEND_GUILD_NODE_ID ?? 'guild';
}
@Injectable()
export class CommandsService {
constructor(
@InjectRepository(GuildCommand)
private readonly repo: Repository<GuildCommand>,
) {}
// Replace the whole guild-global slash-command catalog (idempotent;
// the plugin re-PUTs the full set on every gateway start).
async sync(commands: unknown[]): Promise<{ status: string; count: number }> {
const gid = guildId();
let row = await this.repo.findOne({ where: { guildId: gid } });
if (row) {
row.commands = commands;
} else {
row = this.repo.create({ guildId: gid, commands });
}
await this.repo.save(row);
return { status: 'ok', count: commands.length };
}
async list(): Promise<{ commands: unknown[]; updatedAt: string | null }> {
const row = await this.repo.findOne({ where: { guildId: guildId() } });
return {
commands: row?.commands ?? [],
updatedAt: row?.updatedAt ? row.updatedAt.toISOString() : null,
};
}
}

View File

@@ -0,0 +1,102 @@
import {
ArrayMaxSize,
IsArray,
IsBoolean,
IsOptional,
IsString,
MaxLength,
ValidateNested,
} from 'class-validator';
import { Type } from 'class-transformer';
// Guild C-2: the slash-command catalog is guild-global and rendered by the
// frontend `/` autocomplete. Without a strict schema + caps a single
// authenticated caller could poison it or blow up the DB / clients.
// The global ValidationPipe runs with { whitelist, forbidNonWhitelisted },
// so any unknown field is rejected.
class CommandChoiceDto {
@IsString()
@MaxLength(200)
value!: string;
@IsString()
@MaxLength(200)
label!: string;
}
class CommandArgDto {
@IsString()
@MaxLength(100)
name!: string;
@IsOptional()
@IsString()
@MaxLength(500)
description?: string;
@IsOptional()
@IsString()
@MaxLength(40)
type?: string;
@IsOptional()
@IsBoolean()
required?: boolean;
@IsOptional()
@IsBoolean()
captureRemaining?: boolean;
@IsOptional()
@IsBoolean()
preferAutocomplete?: boolean;
// null when there are no choices (plugin sends explicit null).
@IsOptional()
@IsArray()
@ArrayMaxSize(100)
@ValidateNested({ each: true })
@Type(() => CommandChoiceDto)
choices?: CommandChoiceDto[] | null;
}
class CommandSpecDto {
@IsString()
@MaxLength(100)
name!: string;
@IsOptional()
@IsString()
@MaxLength(100)
nativeName?: string;
@IsOptional()
@IsString()
@MaxLength(500)
description?: string;
@IsOptional()
@IsBoolean()
acceptsArgs?: boolean;
@IsOptional()
@IsArray()
@ArrayMaxSize(50)
@ValidateNested({ each: true })
@Type(() => CommandArgDto)
args?: CommandArgDto[];
@IsOptional()
@IsString()
@MaxLength(20)
argsParsing?: string;
}
export class SyncCommandsDto {
@IsArray()
@ArrayMaxSize(200)
@ValidateNested({ each: true })
@Type(() => CommandSpecDto)
commands!: CommandSpecDto[];
}

View File

@@ -0,0 +1,73 @@
/**
* Center-scoped admin cache.
*
* Holds the at-most-one admin user (email + userId) fetched from Center.
* Used to decide who to deliver triage messages to as a silent observer
* (wake=false), regardless of on-duty / mention status.
*
* Refresh policy (per spec, 2026-05-22):
* • TTL = 1 day. Center admin changes are rare; agents tolerate a
* day's stale cache without surprises
* • on first lookup the cache lazy-fetches
* • cli `admin refresh` forces an out-of-band refresh without waiting
* for TTL expiry
*
* Failure mode: a Center fetch error is treated identically to "no
* admin" — guild keeps operating without an observer. The cache holds
* the failed-fetch decision for the same TTL so we don't hammer Center.
*/
import { Injectable, Logger } from '@nestjs/common';
import { fetchAdminEmail } from './center-auth.js';
const ADMIN_CACHE_TTL_MS = 24 * 60 * 60 * 1000;
export interface CachedAdmin {
email: string;
userId: string;
}
@Injectable()
export class AdminCacheService {
private readonly logger = new Logger(AdminCacheService.name);
private cached: CachedAdmin | null = null;
private cachedAt = 0;
private inflight: Promise<CachedAdmin | null> | null = null;
/**
* Return the cached admin, fetching from Center if the cache is empty
* or older than the TTL. Returns null if no admin is set.
*
* `force=true` bypasses the cache and refreshes immediately — used by
* the cli refresh command.
*/
async get(force = false): Promise<CachedAdmin | null> {
const fresh = Date.now() - this.cachedAt < ADMIN_CACHE_TTL_MS;
if (!force && this.cachedAt > 0 && fresh) {
return this.cached;
}
if (this.inflight) return this.inflight;
this.inflight = (async () => {
try {
const result = await fetchAdminEmail();
this.cached = result;
this.cachedAt = Date.now();
this.logger.log(
`admin cache refreshed: ${result ? `${result.email} (${result.userId})` : 'no admin set'}`,
);
return result;
} finally {
this.inflight = null;
}
})();
return this.inflight;
}
/** Snapshot of the cached admin (no fetch). Returns null if not yet
* populated. Used by the hot delivery path which doesn't want to
* block on a Center round-trip. */
snapshot(): CachedAdmin | null {
return this.cached;
}
}

View File

@@ -4,12 +4,17 @@ import {
Injectable,
UnauthorizedException,
} from '@nestjs/common';
import { introspectGuildToken } from './center-auth';
import { introspectGuildToken } from './center-auth.js';
import { safeEqual } from './safe-equal.js';
@Injectable()
export class ApiKeyGuard implements CanActivate {
async canActivate(context: ExecutionContext): Promise<boolean> {
const req = context.switchToHttp().getRequest<{ path?: string; headers: Record<string, string | string[] | undefined> }>();
const req = context.switchToHttp().getRequest<{
path?: string;
headers: Record<string, string | string[] | undefined>;
query?: Record<string, string | string[] | undefined>;
}>();
const path = req.path ?? '';
// allow health check without auth
@@ -17,9 +22,34 @@ export class ApiKeyGuard implements CanActivate {
return true;
}
// System-key bypass: when a caller presents x-fabric-system-key
// matching FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY (intentionally the
// same shared secret as x-commands-sync-key — both legitimate
// consumers are Fabric.OpenclawPlugin), skip the Bearer requirement
// and mark this as a system caller. Downstream handlers (e.g.
// messaging.controller POST /channels/:id/messages) gate on
// req.isSystem to take the system-author code path.
//
// Empty env → bypass disabled (no system caller ever valid; closed
// by default). Header carries the secret as-is; we constant-time
// compare against the env value.
const sysExpected = (process.env.FABRIC_BACKEND_GUILD_COMMANDS_SYNC_KEY ?? '').trim();
const sysHeader = req.headers['x-fabric-system-key'];
const sysProvided = Array.isArray(sysHeader) ? sysHeader[0] : sysHeader;
if (sysExpected && sysProvided && safeEqual(sysProvided, sysExpected)) {
(req as { isSystem?: boolean }).isSystem = true;
return true;
}
const auth = req.headers['authorization'];
const authValue = Array.isArray(auth) ? auth[0] : auth;
const token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
let token = authValue?.startsWith('Bearer ') ? authValue.slice(7) : '';
// Browsers can't set Authorization on <img>/<a> (file downloads); accept
// the guild token via ?access_token= as a fallback. Still introspected.
if (!token) {
const qt = req.query?.['access_token'];
token = (Array.isArray(qt) ? qt[0] : qt) ?? '';
}
if (!token) throw new UnauthorizedException('missing bearer token');
const result = await introspectGuildToken(token);

View File

@@ -25,3 +25,50 @@ export async function introspectGuildToken(token: string): Promise<{ active: boo
user: data.user,
};
}
/**
* Fetch the single Center-scoped admin user (if any).
* Same x-api-key auth as introspect / resolve-names.
* Returns `null` when no admin is set OR the request fails (treated
* identically — the guild simply falls back to "no admin observer").
*/
export async function fetchAdminEmail(): Promise<{ email: string; userId: string } | null> {
const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL;
const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY;
if (!centerBaseUrl || !centerApiKey) return null;
try {
const res = await fetch(`${centerBaseUrl}/api/auth/admin-email`, {
method: 'GET',
headers: { 'x-api-key': centerApiKey },
});
if (!res.ok) return null;
const data = (await res.json()) as { email?: string; userId?: string } | null;
if (!data || !data.email || !data.userId) return null;
return { email: data.email, userId: data.userId };
} catch {
return null;
}
}
// Resolve <@user.name:NAME> names to userIds within this guild node via
// Center. Unresolved names are simply absent from the returned map.
export async function resolveUserNames(names: string[]): Promise<Record<string, string>> {
const centerBaseUrl = process.env.FABRIC_BACKEND_GUILD_CENTER_BASE_URL;
const guildNodeId = process.env.FABRIC_BACKEND_GUILD_NODE_ID;
const centerApiKey = process.env.FABRIC_BACKEND_GUILD_CENTER_API_KEY;
if (!centerBaseUrl || !guildNodeId || !centerApiKey || !names.length) return {};
try {
const res = await fetch(`${centerBaseUrl}/api/auth/resolve-names`, {
method: 'POST',
headers: { 'content-type': 'application/json', 'x-api-key': centerApiKey },
body: JSON.stringify({ guildNodeId, names }),
});
if (!res.ok) return {};
const data = (await res.json()) as { resolved?: Record<string, string> };
return data.resolved ?? {};
} catch {
return {};
}
}

View File

@@ -1,5 +1,5 @@
import { Controller, Get } from '@nestjs/common';
import { MetricsService } from './metrics.service';
import { MetricsService } from './metrics.service.js';
@Controller('metrics')
export class MetricsController {

View File

@@ -1,6 +1,6 @@
import { randomUUID } from 'crypto';
import { NextFunction, Request, Response } from 'express';
import { MetricsService } from './metrics.service';
import { MetricsService } from './metrics.service.js';
type ReqWithId = Request & { requestId?: string };

View File

@@ -0,0 +1,25 @@
import { describe, it, expect } from 'vitest';
import { safeEqual } from './safe-equal.js';
describe('safeEqual', () => {
it('returns true for identical non-empty strings', () => {
expect(safeEqual('abc123', 'abc123')).toBe(true);
});
it('returns false for different strings of same length', () => {
expect(safeEqual('abc123', 'abc124')).toBe(false);
});
it('returns false for differing lengths', () => {
expect(safeEqual('abc', 'abcd')).toBe(false);
});
it('handles empty strings', () => {
// both empty is technically equal — but downstream callers should
// explicitly check for empty expected before invoking. We just
// document the constant-time-comparison primitive's behavior.
expect(safeEqual('', '')).toBe(true);
expect(safeEqual('a', '')).toBe(false);
expect(safeEqual('', 'a')).toBe(false);
});
});

12
src/common/safe-equal.ts Normal file
View File

@@ -0,0 +1,12 @@
import { timingSafeEqual } from 'node:crypto';
// Constant-time string comparison. Returns false for length mismatch (the
// length difference itself is observable, but the per-byte loop isn't).
// Used for shared-secret header checks (commands-sync-key, system-key,
// etc.) to keep timing-oracle attacks off the table.
export function safeEqual(a: string, b: string): boolean {
const ab = Buffer.from(a);
const bb = Buffer.from(b);
if (ab.length !== bb.length) return false;
return timingSafeEqual(ab, bb);
}

View File

@@ -1,6 +1,6 @@
import 'reflect-metadata';
import { DataSource, DataSourceOptions } from 'typeorm';
import { buildTypeOrmConfig } from './database.config';
import { buildTypeOrmConfig } from './database.config.js';
const cfg = buildTypeOrmConfig();

View File

@@ -1,13 +1,20 @@
import { TypeOrmModuleOptions } from '@nestjs/typeorm';
import { Guild } from './entities/guild.entity';
import { Channel } from './entities/channel.entity';
import { Message } from './entities/message.entity';
import { DmConversation } from './entities/dm-conversation.entity';
import { DmParticipant } from './entities/dm-participant.entity';
import { GuildRole } from './entities/guild-role.entity';
import { GuildMember } from './entities/guild-member.entity';
import { GuildMemberRole } from './entities/guild-member-role.entity';
import { IdempotencyRecord } from './entities/idempotency-record.entity';
import { Guild } from './entities/guild.entity.js';
import { Channel } from './entities/channel.entity.js';
import { ChannelMember } from './entities/channel-member.entity.js';
import { WakeMapping } from './entities/wake-mapping.entity.js';
import { ChannelTurnState } from './entities/channel-turn-state.entity.js';
import { Message } from './entities/message.entity.js';
import { DmConversation } from './entities/dm-conversation.entity.js';
import { DmParticipant } from './entities/dm-participant.entity.js';
import { GuildRole } from './entities/guild-role.entity.js';
import { GuildMember } from './entities/guild-member.entity.js';
import { GuildMemberRole } from './entities/guild-member-role.entity.js';
import { IdempotencyRecord } from './entities/idempotency-record.entity.js';
import { StoredFile } from './entities/stored-file.entity.js';
import { ChannelCanvas } from './entities/channel-canvas.entity.js';
import { GuildCommand } from './entities/guild-command.entity.js';
import { AgentPresence } from './entities/agent-presence.entity.js';
export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
type: 'mysql',
@@ -19,6 +26,9 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
entities: [
Guild,
Channel,
ChannelMember,
WakeMapping,
ChannelTurnState,
Message,
DmConversation,
DmParticipant,
@@ -26,6 +36,10 @@ export const buildTypeOrmConfig = (): TypeOrmModuleOptions => ({
GuildMember,
GuildMemberRole,
IdempotencyRecord,
StoredFile,
ChannelCanvas,
GuildCommand,
AgentPresence,
],
synchronize: (process.env.FABRIC_BACKEND_GUILD_DB_SYNC ?? 'true') === 'true',
logging: (process.env.FABRIC_BACKEND_GUILD_DB_LOGGING ?? 'false') === 'true',

View File

@@ -0,0 +1,35 @@
import { Column, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm';
/**
* Per-user (typically agent) presence cache.
*
* Populated by Fabric.OpenclawPlugin's presence-sync loop: every ~30s
* it reads each connected agent's HF status from the cross-plugin
* `globalThis.__hfAgentStatus.get(agentId)` (exposed by
* HarborForge.OpenclawPlugin) and pushes diffs via
* `PUT /agents/:userId/presence`.
*
* Used by `RealtimeGateway.computeDelivery` for `announce`-type
* channels to skip delivery to recipients whose status is `busy`.
* Defaults to `unknown` if no row exists (treated as not-busy).
*/
@Entity('agent_presences')
export class AgentPresence {
// Same id as the Fabric Center user id (UUID v4 string, char(36)).
@PrimaryColumn({ type: 'char', length: 36 })
userId!: string;
@Column({
type: 'enum',
enum: ['idle', 'on_call', 'busy', 'exhausted', 'offline', 'unknown'],
default: 'unknown',
})
status!: 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown';
/** Free-text source tag for debugging ("hf-plugin", "manual", etc.). */
@Column({ type: 'varchar', length: 64, default: 'unknown' })
source!: string;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -0,0 +1,46 @@
import {
Column,
CreateDateColumn,
Entity,
Index,
PrimaryGeneratedColumn,
UpdateDateColumn,
} from 'typeorm';
export type CanvasFormat = 'md' | 'html' | 'text';
// One active shared document per channel (ChatGPT-canvas-like). Re-sharing
// replaces it; only the original sharer may update it in place. Pinned in
// the channel UI, independent of the message scroll.
@Entity('channel_canvas')
export class ChannelCanvas {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index({ unique: true })
@Column({ name: 'channel_id', type: 'char', length: 36 })
channelId!: string;
// who shared it; only this user may PATCH/DELETE
@Column({ name: 'sharer_user_id', type: 'varchar', length: 64 })
sharerUserId!: string;
@Column({ type: 'varchar', length: 200 })
title!: string;
@Column({ type: 'varchar', length: 8 })
format!: CanvasFormat;
// raw document source (rendered client-side per format)
@Column({ type: 'mediumtext' })
source!: string;
@Column({ type: 'int', default: 1 })
version!: number;
@CreateDateColumn()
createdAt!: Date;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -0,0 +1,19 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
@Entity('channel_members')
@Index(['channelId', 'userId'], { unique: true })
@Index(['userId'])
export class ChannelMember {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index()
@Column({ type: 'char', length: 36 })
channelId!: string;
@Column({ type: 'varchar', length: 64 })
userId!: string;
@CreateDateColumn()
createdAt!: Date;
}

View File

@@ -0,0 +1,59 @@
import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
export type RoundEvent = { u: string; a: 'normal' | 'noreply' };
// A mention sub-rotation frame pushed on top of the root rotation.
// currentSpeaker of an active sub-frame = order[idx]. Single linear pass:
// after the member at the last index acts, the frame pops.
export type TurnFrame = { order: string[]; idx: number };
// Per-channel rotation state for discuss/work x_type channels.
// All mutations must be serialized per channel (pessimistic row lock).
@Entity('channel_turn_state')
export class ChannelTurnState {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index({ unique: true })
@Column({ name: 'channel_id', type: 'char', length: 36 })
channelId!: string;
// speaking order; userIds. order and bypass are a DISJOINT partition of
// the channel's members.
@Column({ name: 'order_user_ids', type: 'json' })
orderUserIds!: string[];
// members excluded from rotation: never woken by normal rotation, only when
// @-mentioned (then transiently pulled into a sub-frame; back to bypass on
// pop). discuss/work only.
@Column({ name: 'bypass_user_ids', type: 'json', nullable: true })
bypassUserIds!: string[] | null;
// null = paused (created, or all-members-consecutively-/no-reply)
@Column({ name: 'current_speaker', type: 'varchar', length: 64, nullable: true })
currentSpeaker!: string | null;
// ordered turn actions of the *current* round (skipped/queue-jumps excluded);
// used to compute the shuffle tail/anchor
@Column({ name: 'round_events', type: 'json' })
roundEvents!: RoundEvent[];
// distinct userIds that consecutively replied /no-reply via rotation since
// the last normal message; persists ACROSS rounds; reset by any normal msg.
// When it covers every current member -> pause.
@Column({ name: 'norep_streak', type: 'json' })
norepStreak!: string[];
// last member who delivered a normal message in the current round (D)
@Column({ name: 'last_normal_speaker', type: 'varchar', length: 64, nullable: true })
lastNormalSpeaker!: string | null;
// mention sub-rotation stack on top of the root rotation. Empty = root
// active. Top of stack is the active frame; the root rotation
// (order/currentSpeaker/round/streak) is paused while it is non-empty.
@Column({ name: 'frames', type: 'json', nullable: true })
frames!: TurnFrame[] | null;
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -13,12 +13,38 @@ export class Channel {
@Column({ type: 'varchar', length: 120 })
name!: string;
@Column({
name: 'x_type',
type: 'enum',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm', 'announce'],
})
xType!: 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm' | 'announce';
@Column({ type: 'varchar', length: 16, default: 'text' })
kind!: 'text' | 'announcement';
// Free-form description of what this channel is for — what topics get
// posted, who participates, why it exists. Surfaced via GET /api/channels
// so agents can pick a channel by intent ("which announce channel is for
// debate broadcasts?") without channel id hard-coded into workflows.
// Any channel member can set it via PATCH /api/channels/:id (writes
// require membership the same way moveToBypass / close do). The frontend
// create form does NOT post this today — purpose stays optional.
@Column({ type: 'text', nullable: true })
purpose!: string | null;
@Column({ type: 'boolean', default: false })
isPrivate!: boolean;
// public channels are visible to every guild member (including those who
// join after the channel was created); default off (unchecked)
@Column({ type: 'boolean', default: false })
isPublic!: boolean;
// closed (e.g. discussion-complete): history readable, new posts rejected
@Column({ type: 'boolean', default: false })
closed!: boolean;
@Index()
@Column({ default: 0 })
lastSeq!: number;

View File

@@ -0,0 +1,25 @@
import { Column, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm';
// Guild-global slash-command catalog. One row per guild (this node's
// FABRIC_BACKEND_GUILD_NODE_ID). The OpenClaw plugin PUTs the OpenClaw
// native-command specs here (the same data Discord registers as slash
// commands); the frontend GETs it to drive `/` autocomplete. The guild
// node stores the catalog opaquely — it does not interpret command bodies.
@Entity('guild_commands')
export class GuildCommand {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index({ unique: true })
@Column({ name: 'guild_id', type: 'varchar', length: 80 })
guildId!: string;
// NativeCommandSpec[]-shaped (name, nativeName, description, acceptsArgs,
// args[{name,description,type,required,choices:[{value,label}],
// captureRemaining,preferAutocomplete}], argsParsing). Stored verbatim.
@Column({ type: 'json' })
commands!: unknown[];
@UpdateDateColumn()
updatedAt!: Date;
}

View File

@@ -0,0 +1,43 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
// An uploaded or canvas-shared file held on the guild node. Retained for a
// configurable window (default 7 days) then purged by FilesService.
@Entity('stored_files')
export class StoredFile {
@PrimaryGeneratedColumn('uuid')
id!: string;
// public, URL-safe id used in /api/files/:fileId
@Index({ unique: true })
@Column({ name: 'file_id', type: 'varchar', length: 64 })
fileId!: string;
// owning channel (best-effort context; null = not channel-scoped)
@Index()
@Column({ name: 'channel_id', type: 'char', length: 36, nullable: true })
channelId!: string | null;
@Column({ name: 'uploader_user_id', type: 'varchar', length: 64 })
uploaderUserId!: string;
@Column({ name: 'original_name', type: 'varchar', length: 255 })
originalName!: string;
@Column({ name: 'mime_type', type: 'varchar', length: 150 })
mimeType!: string;
@Column({ name: 'size_bytes', type: 'bigint' })
sizeBytes!: number;
// path on disk relative to the storage root
@Column({ name: 'storage_path', type: 'varchar', length: 300 })
storagePath!: string;
@CreateDateColumn()
createdAt!: Date;
// hard-delete deadline; rows past this are purged with their blob
@Index()
@Column({ name: 'expires_at', type: 'datetime' })
expiresAt!: Date;
}

View File

@@ -0,0 +1,19 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
@Entity('wake_mapping')
@Index(['channelId', 'userId'], { unique: true })
@Index(['userId'])
export class WakeMapping {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Index()
@Column({ name: 'channel_id', type: 'char', length: 36 })
channelId!: string;
@Column({ name: 'user_id', type: 'varchar', length: 64 })
userId!: string;
@CreateDateColumn()
createdAt!: Date;
}

View File

@@ -1,5 +1,5 @@
import { Global, Module } from '@nestjs/common';
import { EventsService } from './events.service';
import { EventsService } from './events.service.js';
@Global()
@Module({

View File

@@ -1,6 +1,6 @@
import { Injectable, Logger } from '@nestjs/common';
import { createHmac, randomUUID } from 'crypto';
import { FabricEventEnvelope } from './event-envelope';
import { FabricEventEnvelope } from './event-envelope.js';
type RetryTask = {
envelope: FabricEventEnvelope;

View File

@@ -0,0 +1,84 @@
import {
BadRequestException,
Controller,
Get,
Param,
Post,
Query,
Req,
Res,
UnauthorizedException,
UploadedFile,
UseInterceptors,
} from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import type { Response } from 'express';
import { FilesService } from './files.service.js';
type AuthedRequest = { userId?: string };
type UploadedMulterFile = {
originalname: string;
mimetype: string;
size: number;
buffer: Buffer;
};
@Controller('files')
export class FilesController {
constructor(private readonly files: FilesService) {}
@Post()
@UseInterceptors(FileInterceptor('file'))
async upload(
@Req() req: AuthedRequest,
@UploadedFile() file: UploadedMulterFile | undefined,
@Query('channelId') channelId?: string,
) {
const userId = req.userId ?? '';
if (!userId) throw new UnauthorizedException('missing user');
if (!file || !file.buffer?.length) throw new BadRequestException('no file');
if (this.files.maxBytes > 0 && file.size > this.files.maxBytes) {
throw new BadRequestException(
`file exceeds limit of ${this.files.maxBytes} bytes`,
);
}
const row = await this.files.store({
channelId: channelId ? String(channelId) : null,
uploaderUserId: userId,
originalName: file.originalname || 'file',
mimeType: file.mimetype,
buffer: file.buffer,
});
return {
fileId: row.fileId,
url: `/api/files/${row.fileId}`,
name: row.originalName,
mimeType: row.mimeType,
size: Number(row.sizeBytes),
expiresAt: row.expiresAt.toISOString(),
};
}
@Get(':fileId')
async download(
@Param('fileId') fileId: string,
@Res() res: Response,
): Promise<void> {
const row = await this.files.find(fileId);
if (!row) {
res.status(404).json({ error: 'file_not_found' });
return;
}
const blob = await this.files.readBlob(row);
const inline = /^(image|audio|video)\//.test(row.mimeType) || row.mimeType === 'application/pdf';
const safeName = row.originalName.replace(/["\r\n]/g, '_');
res.setHeader('Content-Type', row.mimeType);
res.setHeader('Content-Length', String(blob.length));
res.setHeader(
'Content-Disposition',
`${inline ? 'inline' : 'attachment'}; filename="${safeName}"`,
);
res.setHeader('Cache-Control', 'private, max-age=3600');
res.end(blob);
}
}

13
src/files/files.module.ts Normal file
View File

@@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { StoredFile } from '../entities/stored-file.entity.js';
import { FilesController } from './files.controller.js';
import { FilesService } from './files.service.js';
@Module({
imports: [TypeOrmModule.forFeature([StoredFile])],
controllers: [FilesController],
providers: [FilesService],
exports: [FilesService],
})
export class FilesModule {}

View File

@@ -0,0 +1,98 @@
import { randomBytes } from 'node:crypto';
import { promises as fs } from 'node:fs';
import { join, resolve } from 'node:path';
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { LessThan, Repository } from 'typeorm';
import { StoredFile } from '../entities/stored-file.entity.js';
const DAY_MS = 24 * 60 * 60 * 1000;
const CLEANUP_INTERVAL_MS = 60 * 60 * 1000; // hourly
@Injectable()
export class FilesService implements OnModuleInit, OnModuleDestroy {
private readonly log = new Logger('FilesService');
private timer: NodeJS.Timeout | null = null;
// Storage root; the guild operator may relocate / resize this freely.
readonly dir = resolve(
process.env.FABRIC_BACKEND_GUILD_FILE_DIR ?? join(process.cwd(), '.data', 'files'),
);
// 0 / unset => no cap (default per product: 100MB, operator-configurable).
readonly maxBytes = Number(
process.env.FABRIC_BACKEND_GUILD_FILE_MAX_BYTES ?? 100 * 1024 * 1024,
);
readonly ttlDays = Number(process.env.FABRIC_BACKEND_GUILD_FILE_TTL_DAYS ?? 7);
constructor(
@InjectRepository(StoredFile)
private readonly repo: Repository<StoredFile>,
) {}
async onModuleInit(): Promise<void> {
await fs.mkdir(this.dir, { recursive: true });
this.log.log(
`files dir=${this.dir} maxBytes=${this.maxBytes} ttlDays=${this.ttlDays}`,
);
// sweep on boot, then hourly
void this.cleanup();
this.timer = setInterval(() => void this.cleanup(), CLEANUP_INTERVAL_MS);
this.timer.unref?.();
}
onModuleDestroy(): void {
if (this.timer) clearInterval(this.timer);
this.timer = null;
}
async store(input: {
channelId: string | null;
uploaderUserId: string;
originalName: string;
mimeType: string;
buffer: Buffer;
}): Promise<StoredFile> {
const fileId = randomBytes(18).toString('base64url');
const storagePath = fileId; // flat layout, opaque name
await fs.writeFile(join(this.dir, storagePath), input.buffer);
const row = this.repo.create({
fileId,
channelId: input.channelId,
uploaderUserId: input.uploaderUserId,
originalName: input.originalName.slice(0, 255),
mimeType: (input.mimeType || 'application/octet-stream').slice(0, 150),
sizeBytes: input.buffer.length,
storagePath,
expiresAt: new Date(Date.now() + this.ttlDays * DAY_MS),
});
return this.repo.save(row);
}
async find(fileId: string): Promise<StoredFile | null> {
const row = await this.repo.findOne({ where: { fileId } });
if (!row) return null;
if (row.expiresAt.getTime() <= Date.now()) return null; // treat as gone
return row;
}
async readBlob(row: StoredFile): Promise<Buffer> {
return fs.readFile(join(this.dir, row.storagePath));
}
// Purge every row past its retention deadline together with its blob.
async cleanup(): Promise<number> {
const expired = await this.repo.find({ where: { expiresAt: LessThan(new Date()) } });
let removed = 0;
for (const row of expired) {
try {
await fs.rm(join(this.dir, row.storagePath), { force: true });
} catch {
/* best effort: drop the row regardless */
}
await this.repo.delete({ id: row.id });
removed++;
}
if (removed) this.log.log(`retention sweep removed ${removed} expired file(s)`);
return removed;
}
}

View File

@@ -1,5 +1,5 @@
import { Body, Controller, Get, Post } from '@nestjs/common';
import { GuildsService } from './guilds.service';
import { GuildsService } from './guilds.service.js';
@Controller('guilds')
export class GuildsController {

View File

@@ -1,8 +1,8 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { GuildsController } from './guilds.controller';
import { Guild } from '../entities/guild.entity';
import { GuildsService } from './guilds.service';
import { GuildsController } from './guilds.controller.js';
import { Guild } from '../entities/guild.entity.js';
import { GuildsService } from './guilds.service.js';
@Module({
imports: [TypeOrmModule.forFeature([Guild])],

View File

@@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Guild } from '../entities/guild.entity';
import { Guild } from '../entities/guild.entity.js';
@Injectable()
export class GuildsService {

View File

@@ -3,7 +3,7 @@ import { Test } from '@nestjs/testing';
import request from 'supertest';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import { DataSource } from 'typeorm';
import { Channel } from './entities/channel.entity';
import { Channel } from './entities/channel.entity.js';
process.env.DB_HOST = '127.0.0.1';
process.env.DB_PORT = '3308';
@@ -18,7 +18,7 @@ describe('guild integration (mysql + api)', () => {
let dataSource: DataSource;
beforeAll(async () => {
const { AppModule } = await import('./app.module');
const { AppModule } = await import('./app.module.js');
const moduleRef = await Test.createTestingModule({
imports: [AppModule],
}).compile();

View File

@@ -2,9 +2,9 @@ import 'reflect-metadata';
import { ValidationPipe } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { AppModule } from './app.module';
import { createRequestContextMiddleware } from './common/request-context.middleware';
import { MetricsService } from './common/metrics.service';
import { AppModule } from './app.module.js';
import { createRequestContextMiddleware } from './common/request-context.middleware.js';
import { MetricsService } from './common/metrics.service.js';
function requireEnv(name: string): string {
const value = process.env[name];
@@ -24,6 +24,23 @@ async function bootstrap() {
validateEnv();
const app = await NestFactory.create(AppModule);
const corsOrigins = (process.env.FABRIC_BACKEND_GUILD_CORS_ORIGINS ?? '')
.split(',')
.map((x) => x.trim())
.filter(Boolean);
app.enableCors({
origin: (origin, callback) => {
if (!origin) return callback(null, true);
if (origin === 'null') return callback(null, true);
if (!corsOrigins.length) return callback(null, true);
if (corsOrigins.includes(origin)) return callback(null, true);
return callback(new Error('CORS origin not allowed'), false);
},
methods: ['GET', 'POST', 'PATCH', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', 'x-client-name', 'x-request-id', 'x-api-key'],
credentials: false,
});
app.setGlobalPrefix('api');
const metrics = app.get(MetricsService);
app.use(createRequestContextMiddleware('guild', metrics));

View File

@@ -0,0 +1,13 @@
import { Controller, Get, Query } from '@nestjs/common';
import { MembersService } from './members.service.js';
@Controller('members')
export class MembersController {
constructor(private readonly membersService: MembersService) {}
@Get()
list(@Query('guildId') guildId?: string) {
return this.membersService.list(guildId);
}
}

View File

@@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { GuildMember } from '../entities/guild-member.entity.js';
import { MembersController } from './members.controller.js';
import { MembersService } from './members.service.js';
@Module({
imports: [TypeOrmModule.forFeature([GuildMember])],
controllers: [MembersController],
providers: [MembersService],
})
export class MembersModule {}

View File

@@ -0,0 +1,20 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { GuildMember } from '../entities/guild-member.entity.js';
@Injectable()
export class MembersService {
constructor(
@InjectRepository(GuildMember)
private readonly memberRepo: Repository<GuildMember>,
) {}
list(guildId?: string) {
if (guildId) {
return this.memberRepo.find({ where: { guildId, status: 'active' }, order: { createdAt: 'ASC' }, take: 500 });
}
return this.memberRepo.find({ where: { status: 'active' }, order: { createdAt: 'ASC' }, take: 500 });
}
}

View File

@@ -56,4 +56,14 @@ export class CreateMessageDto {
@IsString()
@MaxLength(64)
authorUserId?: string;
// System-author path only (x-fabric-system-key gated). When set, the
// message is delivered via emitMessageTargeted so this single recipient
// gets wakeup=true; everyone else in the channel sees wakeup=false. For
// regular (user-bearer) posts this field is ignored. Used by
// close-sub-discussion to precisely wake the host on callback.
@IsOptional()
@IsString()
@MaxLength(64)
wakeupUserId?: string;
}

View File

@@ -1,7 +1,9 @@
import {
Body,
ConflictException,
Controller,
Delete,
ForbiddenException,
Get,
Headers,
NotFoundException,
@@ -9,16 +11,24 @@ import {
Patch,
Post,
Query,
Req,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { DataSource, Repository } from 'typeorm';
import { CreateMessageDto } from './dto.create-message.dto';
import { Channel } from '../entities/channel.entity';
import { Message } from '../entities/message.entity';
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
import { EventsService } from '../events/events.service';
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
import { RealtimeGateway } from '../realtime/realtime.gateway';
import { CreateMessageDto } from './dto.create-message.dto.js';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
import { parseSlashCommand } from '../channels/slash-commands.js';
import { parseMentions, extractNameMentions, replaceNameMentions } from '../channels/mentions.js';
import { resolveUserNames } from '../common/center-auth.js';
import { TurnService } from '../channels/turn.service.js';
import { EventsService } from '../events/events.service.js';
import { clampLimit, computeNextExpectedSeq } from './pagination.util.js';
import { RealtimeGateway } from '../realtime/realtime.gateway.js';
const EDIT_WINDOW_MS = 15 * 60 * 1000;
const DEFAULT_PAGE_LIMIT = 50;
@@ -30,12 +40,18 @@ export class MessagingController {
private readonly dataSource: DataSource,
@InjectRepository(Channel)
private readonly channelRepo: Repository<Channel>,
@InjectRepository(ChannelMember)
private readonly memberRepo: Repository<ChannelMember>,
@InjectRepository(Message)
private readonly messageRepo: Repository<Message>,
@InjectRepository(IdempotencyRecord)
private readonly idemRepo: Repository<IdempotencyRecord>,
@InjectRepository(WakeMapping)
private readonly wakeRepo: Repository<WakeMapping>,
private readonly turn: TurnService,
private readonly events: EventsService,
private readonly realtime: RealtimeGateway,
private readonly adminCache: AdminCacheService,
) {}
private async getIdempotentResponse(
@@ -77,17 +93,26 @@ export class MessagingController {
};
}
@Post()
async create(
@Param('id') channelId: string,
@Body() body: CreateMessageDto,
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Channel-participant gate (Guild C-1): public channels are readable/
// writable by any authenticated user; private channels require explicit
// channel_members membership. Returns the channel so callers can reuse it.
private async assertParticipant(channelId: string, userId: string): Promise<Channel> {
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) throw new NotFoundException('channel not found');
if (channel.isPublic) return channel;
if (!userId) throw new ForbiddenException('not a channel member');
const member = await this.memberRepo.findOne({ where: { channelId, userId } });
if (!member) throw new ForbiddenException('not a channel member');
return channel;
}
const message = await this.dataSource.transaction(async (manager) => {
// Persists one message (allocates a seq under a channel row lock) and
// returns its view. Used for normal messages and for guild /ack messages.
private async persistMessage(
channelId: string,
input: { authorUserId: string; content: string; clientMessageId?: string | null; replyToMessageId?: string | null; mentions?: string[]; attachments?: Array<{ url: string; name?: string; mimeType?: string }> },
): Promise<Message> {
return this.dataSource.transaction(async (manager) => {
const channel = await manager.findOne(Channel, {
where: { id: channelId },
lock: { mode: 'pessimistic_write' },
@@ -95,28 +120,156 @@ export class MessagingController {
if (!channel) {
throw new NotFoundException('channel not found');
}
const nextSeq = channel.lastSeq + 1;
channel.lastSeq = nextSeq;
await manager.save(Channel, channel);
const messageId = body.clientMessageId ?? `m-${channelId}-${nextSeq}`;
const messageId = input.clientMessageId ?? `m-${channelId}-${nextSeq}`;
const row = manager.create(Message, {
messageId,
channelId,
conversationId: null,
authorUserId: body.authorUserId ?? 'anonymous',
authorUserId: input.authorUserId,
seq: nextSeq,
content: body.content,
replyToMessageId: body.replyToMessageId ?? null,
mentions: body.mentions ?? [],
attachments: body.attachments ?? [],
content: input.content,
replyToMessageId: input.replyToMessageId ?? null,
mentions: input.mentions ?? [],
attachments: input.attachments ?? [],
editedAt: null,
deletedAt: null,
isDeleted: false,
});
return manager.save(Message, row);
});
}
// Emits a guild-authored /ack message to the channel; wakeup=true only for
// the new current speaker (null = nobody). One message-id; persisted.
private async emitAck(channelId: string, wakeupUserId: string | null): Promise<void> {
const ack = await this.persistMessage(channelId, { authorUserId: 'guild', content: '/ack' });
const body = this.toView(ack) as Record<string, unknown>;
await this.events.emit({ eventType: 'message.created', channelId, actorId: 'guild', data: body });
await this.realtime.emitMessageTargeted(channelId, body, wakeupUserId);
}
@Post()
async create(
@Param('id') channelId: string,
@Body() body: CreateMessageDto,
@Req() req: { userId?: string; isSystem?: boolean },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `POST:/channels/${channelId}/messages`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// System caller (ApiKeyGuard set isSystem from x-fabric-system-key):
// skip the per-user participant check; resolve channel directly.
// System posts are allowed into any non-closed channel — used by
// Fabric.OpenclawPlugin to write `close-sub-discussion` callbacks
// back to a parent channel that the host agent may not be currently
// "in" from the backend's perspective, and to deliver guide-injected
// system intros into sub-discussion channels without needing to log
// in as a real user. Author is a sentinel UUID that no real user
// ever has; `wakeupUserId` (optional) lets the caller precisely wake
// one recipient (e.g. the host of a closing sub-discussion).
if (req.isSystem) {
const sysChannel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!sysChannel) throw new NotFoundException('channel not found');
if (sysChannel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
}
const SYSTEM_USER_ID = '00000000-0000-0000-0000-000000000000';
let sysContent = body.content ?? '';
const sysNames = extractNameMentions(sysContent);
if (sysNames.length) {
const nameMap = await resolveUserNames(sysNames);
sysContent = replaceNameMentions(sysContent, nameMap);
}
const sysMessage = await this.persistMessage(channelId, {
authorUserId: SYSTEM_USER_ID,
content: sysContent,
clientMessageId: body.clientMessageId,
replyToMessageId: body.replyToMessageId,
mentions: body.mentions,
attachments: body.attachments,
});
const sysView = this.toView(sysMessage) as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, sysView);
await this.events.emit({
eventType: 'message.created',
channelId,
actorId: SYSTEM_USER_ID,
data: sysView,
});
// wakeupUserId set -> emitMessageTargeted wakes exactly that user
// (everyone else gets the same message with wakeup=false).
// wakeupUserId omitted/null -> emitMessageCreated routes via the
// channel's xType-specific 3-state delivery with empty wakeSet, so
// nobody is woken (the message lands in history only).
const wakeupUserId = typeof body.wakeupUserId === 'string' ? body.wakeupUserId.trim() : '';
if (wakeupUserId) {
await this.realtime.emitMessageTargeted(channelId, sysView, wakeupUserId);
} else {
await this.realtime.emitMessageCreated(channelId, sysView, {
xType: sysChannel.xType ?? 'general',
authorUserId: SYSTEM_USER_ID,
wakeUserIds: new Set<string>(),
});
}
return sysView;
}
// Guild C-1: caller must be a participant of the channel, and the
// author is always the authenticated user — body.authorUserId is
// ignored so a caller can never post as someone else.
//
// announce channels: any participant can POST. Use case is one-off
// recruitment / broadcast messages posted by the agent that just
// created the originating topic (e.g. dialectic invites). No
// server-side privileged path — author is always a real user.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const channel = await this.assertParticipant(channelId, userId);
if (channel.closed) {
throw new ConflictException({ error: 'channel_closed', message: 'channel is closed' });
}
const xType = channel.xType ?? 'general';
const isRotating = xType === 'discuss' || xType === 'work';
const authorUserId = userId;
// ---- translate <@user.name:NAME> -> <@userId> (outside backticks) via
// Center before anything else persists/parses the content
let content = body.content ?? '';
const names = extractNameMentions(content);
if (names.length) {
const map = await resolveUserNames(names);
content = replaceNameMentions(content, map);
}
// ---- command interception: registered slash commands are never delivered
const cmd = parseSlashCommand(content);
if (cmd) {
if (isRotating && cmd.name === 'no-reply') {
const { ack } = await this.turn.onNoReply(channelId, authorUserId);
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
} else if (isRotating && cmd.name === 'force-proceed') {
const { ack } = await this.turn.onForceProceed(channelId);
if (ack) await this.emitAck(channelId, ack.wakeupUserId);
}
// non-rotating channels (or no effect): swallowed, nothing delivered
return { status: 'command', command: cmd.name };
}
// ---- normal message
const message = await this.persistMessage(channelId, {
authorUserId,
content,
clientMessageId: body.clientMessageId,
replyToMessageId: body.replyToMessageId,
mentions: body.mentions,
attachments: body.attachments,
});
const responseBody = this.toView(message) as Record<string, unknown>;
await this.saveIdempotentResponse(scope, idempotencyKey, responseBody);
@@ -124,10 +277,33 @@ export class MessagingController {
await this.events.emit({
eventType: 'message.created',
channelId,
actorId: body.authorUserId ?? 'anonymous',
actorId: authorUserId,
data: responseBody,
});
this.realtime.emitChannelEvent(channelId, 'message.created', responseBody);
// mentions: <@id> outside backtick spans (post name-translation)
const mentionIds = parseMentions(content);
if (isRotating) {
// discuss/work: rotation (incl. mention sub-frames) picks the target
const decision = await this.turn.onNormalMessage(channelId, authorUserId, mentionIds);
await this.realtime.emitMessageTargeted(channelId, responseBody, decision.wakeupUserId);
} else {
// general/report/triage/custom: 3-state delivery
// (wake / observer / skip) — see realtime.gateway.computeDelivery.
// Center-scoped admin (cached, 1d TTL) gets `observer` on triage.
const wakeRows = await this.wakeRepo.find({ where: { channelId } });
const wakeUserIds = new Set(wakeRows.map((w) => w.userId));
const mentionUserIds = new Set(mentionIds.filter((id) => id !== authorUserId));
const admin = await this.adminCache.get();
await this.realtime.emitMessageCreated(channelId, responseBody, {
xType,
authorUserId,
wakeUserIds,
mentionUserIds,
adminUserId: admin?.userId ?? null,
});
}
return responseBody;
}
@@ -137,14 +313,23 @@ export class MessagingController {
@Param('id') channelId: string,
@Param('messageId') messageId: string,
@Body() body: { content?: string },
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `PATCH:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
const now = Date.now();
const createdAt = new Date(item.createdAt).getTime();
@@ -173,14 +358,23 @@ export class MessagingController {
async remove(
@Param('id') channelId: string,
@Param('messageId') messageId: string,
@Req() req: { userId?: string },
@Headers('idempotency-key') idempotencyKey?: string,
) {
const scope = `DELETE:/channels/${channelId}/messages/${messageId}`;
const existed = await this.getIdempotentResponse(scope, idempotencyKey);
if (existed) return existed;
// Guild C-1: participant + author-ownership.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
await this.assertParticipant(channelId, userId);
const item = await this.messageRepo.findOne({ where: { channelId, messageId } });
if (!item) return { status: 'not_found' };
if (item.authorUserId !== userId) {
throw new ForbiddenException('not the message author');
}
item.isDeleted = true;
item.deletedAt = new Date();
@@ -218,10 +412,14 @@ export class MessagingController {
@Get()
async listBySeq(
@Param('id') channelId: string,
@Req() req: { userId?: string },
@Query('seq_from') seqFrom?: string,
@Query('seq_to') seqTo?: string,
@Query('limit') limit?: string,
) {
// Guild C-1: only participants may read channel history.
const userId = String(req.userId ?? '');
if (!userId) throw new ForbiddenException('missing user');
const from = seqFrom ? Number(seqFrom) : 1;
const to = seqTo ? Number(seqTo) : Number.MAX_SAFE_INTEGER;
const safeLimit = clampLimit(limit, DEFAULT_PAGE_LIMIT, MAX_PAGE_LIMIT);
@@ -241,10 +439,7 @@ export class MessagingController {
};
}
const channel = await this.channelRepo.findOne({ where: { id: channelId } });
if (!channel) {
throw new NotFoundException('channel not found');
}
const channel = await this.assertParticipant(channelId, userId);
const qb = this.messageRepo
.createQueryBuilder('m')

View File

@@ -1,12 +1,17 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { MessagingController } from './messaging.controller';
import { Channel } from '../entities/channel.entity';
import { Message } from '../entities/message.entity';
import { IdempotencyRecord } from '../entities/idempotency-record.entity';
import { MessagingController } from './messaging.controller.js';
import { Channel } from '../entities/channel.entity.js';
import { ChannelMember } from '../entities/channel-member.entity.js';
import { Message } from '../entities/message.entity.js';
import { IdempotencyRecord } from '../entities/idempotency-record.entity.js';
import { WakeMapping } from '../entities/wake-mapping.entity.js';
import { AdminCacheService } from '../common/admin-cache.service.js';
@Module({
imports: [TypeOrmModule.forFeature([Channel, Message, IdempotencyRecord])],
imports: [TypeOrmModule.forFeature([Channel, ChannelMember, Message, IdempotencyRecord, WakeMapping])],
controllers: [MessagingController],
providers: [AdminCacheService],
exports: [AdminCacheService],
})
export class MessagingModule {}

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest';
import { clampLimit, computeNextExpectedSeq } from './pagination.util';
import { clampLimit, computeNextExpectedSeq } from './pagination.util.js';
describe('pagination utils', () => {
it('clamps limit safely', () => {

View File

@@ -9,7 +9,105 @@ import {
} from '@nestjs/websockets';
import { Logger } from '@nestjs/common';
import { Server, Socket } from 'socket.io';
import { introspectGuildToken } from '../common/center-auth';
import { introspectGuildToken } from '../common/center-auth.js';
type XType = 'general' | 'work' | 'report' | 'discuss' | 'triage' | 'custom' | 'dm' | 'announce';
/**
* Cross-presence info needed by `announce`-type delivery: a recipient
* with hf-side status === 'busy' has the message discarded silently
* (don't enter their session, no UI emit). Other statuses + non-announce
* channels are unaffected. Presence is sourced from the
* `agent_presences` table populated by Fabric.OpenclawPlugin's
* presence-sync loop (which reads from HF plugin's `__hfAgentStatus`).
*/
export type PresenceStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline' | 'unknown';
/**
* Per-recipient delivery decision for a non-rotating channel message.
*
* • `wake` — push the event AND wake the recipient (model turn fires)
* • `observer` — push the event with wakeup=false (silent; UI displays
* but the openclaw plugin records-only without dispatch). Currently
* used for the Center admin observing triage traffic
* • `skip` — don't even emit the event to this recipient
*
* Wakeup-only channels (general/report/dm/custom) never return
* 'observer'; the legacy behaviour is preserved end-to-end.
*
* Precedence for triage (the only place 'skip' / 'observer' fire):
* 1. author never gets back their own message
* 2. wake_mapping (on-duty) → wake
* 3. mention → wake (NEW: was 'skip' before — see Fabric PR 'triage
* mention exception')
* 4. admin (Center-scoped, at most one) → observer
* 5. everyone else → skip (was 'deliver, wakeup=false' before)
*/
export type DeliveryDecision = 'wake' | 'observer' | 'skip';
export interface ComputeDeliveryArgs {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId, or null. */
adminUserId?: string | null;
/** Recipient's current presence; only consulted for `announce` xType. Defaults to 'unknown' (treated as not-busy). */
recipientPresence?: PresenceStatus;
}
export function computeDelivery(args: ComputeDeliveryArgs): DeliveryDecision {
const { xType, recipientUserId, authorUserId, wakeUserIds, mentionUserIds, adminUserId, recipientPresence } = args;
if (recipientUserId === authorUserId) return 'skip';
switch (xType) {
case 'triage':
if (wakeUserIds.has(recipientUserId)) return 'wake';
if (mentionUserIds?.has(recipientUserId)) return 'wake';
if (adminUserId && recipientUserId === adminUserId) return 'observer';
return 'skip';
case 'general':
if (mentionUserIds && mentionUserIds.size > 0) {
return mentionUserIds.has(recipientUserId) ? 'wake' : 'observer';
}
return 'wake';
case 'custom':
// wake_mapping decides who wakes; everyone else still sees the
// message (observer) — preserves the legacy "deliver to all, wake
// some" contract for custom channels.
return wakeUserIds.has(recipientUserId) ? 'wake' : 'observer';
case 'dm':
return 'wake';
case 'announce':
// System-broadcast channels (e.g. Dialectic topic announcements).
// Recipients with HF status === 'busy' have the message discarded
// silently — busy agents should not be distracted by signup pings
// they can't act on. All other presences (idle/on_call/exhausted/
// offline/unknown) get the message as 'observer' (no wake): the
// channel itself is browsable; agents proactively decide what to
// do with announcements when they next look at their inbox.
if (recipientPresence === 'busy') return 'skip';
return 'observer';
default:
// report (and anything else): deliver as observer, no wake
return 'observer';
}
}
/**
* @deprecated Use computeDelivery (returns 3-state). Kept for any
* external callers; treats 'observer' and 'skip' both as `false`.
*/
export function computeWakeup(args: {
xType: XType;
recipientUserId: string;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
}): boolean {
return computeDelivery(args) === 'wake';
}
@WebSocketGateway({
namespace: '/realtime',
@@ -24,6 +122,12 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
private readonly logger = new Logger(RealtimeGateway.name);
private readonly onlineUsers = new Set<string>();
// Optional: injected at module wiring time. Used by emitMessageCreated
// to pre-load recipient presence for announce-type channels.
// Typed loosely to avoid a circular import between realtime and agents
// modules; the actual interface lives in agents/agent-presence.service.
presence?: { getStatusMap(ids: string[]): Promise<Map<string, PresenceStatus>> };
private userIdFromClient(client: Socket): string {
const authUser = client.handshake.auth?.userId;
const headerUser = client.handshake.headers['x-user-id'];
@@ -59,6 +163,10 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
const userId = result.user.id || this.userIdFromClient(client);
client.data.userId = userId;
this.onlineUsers.add(userId);
// Per-user room: lets server code emit user-scoped events (e.g.
// channel.joined when membership changes) without bookkeeping a
// userId→sockets map. All of this user's sockets receive the event.
client.join(`user:${userId}`);
this.server.emit('presence.online', {
userId,
onlineCount: this.onlineUsers.size,
@@ -133,4 +241,78 @@ export class RealtimeGateway implements OnGatewayConnection, OnGatewayDisconnect
emitChannelEvent(channelId: string, event: string, data: Record<string, unknown>): void {
this.server.to(`channel:${channelId}`).emit(event, data);
}
// Emit a user-scoped event to all sockets currently connected for `userId`
// (via the `user:<userId>` room joined in handleConnection). No-op for
// offline users — the next connect's initial channel-list fetch covers it.
emitToUser(userId: string, event: string, data: Record<string, unknown>): void {
if (!userId) return;
this.server.to(`user:${userId}`).emit(event, data);
}
// Emits message.created per-recipient using the 3-state delivery
// decision (wake / observer / skip). Skipped recipients receive
// nothing — used by triage channels to keep non-on-duty / non-mention
// / non-admin users completely out of the loop, and by announce
// channels to suppress delivery to recipients whose presence is busy.
async emitMessageCreated(
channelId: string,
data: Record<string, unknown>,
ctx: {
xType: XType;
authorUserId: string;
wakeUserIds: Set<string>;
mentionUserIds?: Set<string>;
/** Single Center-scoped admin userId (or null). */
adminUserId?: string | null;
},
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
// For announce-type channels, pre-load presence for all recipients
// in one query so the per-recipient loop doesn't fan out to N round
// trips. For other xTypes, presence is irrelevant — skip the lookup.
let presenceMap: Map<string, PresenceStatus> | undefined;
if (ctx.xType === 'announce' && this.presence) {
const recipientIds = sockets
.map((s) => (typeof s.data.userId === 'string' ? (s.data.userId as string) : ''))
.filter((id) => id && !id.startsWith('anon:'));
presenceMap = await this.presence.getStatusMap(recipientIds);
}
for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const decision = computeDelivery({
xType: ctx.xType,
recipientUserId,
authorUserId: ctx.authorUserId,
wakeUserIds: ctx.wakeUserIds,
mentionUserIds: ctx.mentionUserIds,
adminUserId: ctx.adminUserId,
recipientPresence: presenceMap?.get(recipientUserId) ?? 'unknown',
});
if (decision === 'skip') continue;
s.emit('message.created', {
...data,
channelId,
wakeup: decision === 'wake',
xType: ctx.xType,
});
}
}
// discuss/work + /ack: exactly one recipient (the new current speaker) gets
// wakeup=true; everyone else false. One message-id; metadata at push only.
async emitMessageTargeted(
channelId: string,
data: Record<string, unknown>,
wakeupUserId: string | null,
): Promise<void> {
const sockets = await this.server.in(`channel:${channelId}`).fetchSockets();
for (const s of sockets) {
const recipientUserId = typeof s.data.userId === 'string' ? s.data.userId : `anon:${s.id}`;
const wakeup = wakeupUserId !== null && recipientUserId === wakeupUserId;
s.emit('message.created', { ...data, channelId, wakeup });
}
}
}

View File

@@ -1,9 +1,25 @@
import { Global, Module } from '@nestjs/common';
import { RealtimeGateway } from './realtime.gateway';
import { Global, Module, OnModuleInit } from '@nestjs/common';
import { RealtimeGateway } from './realtime.gateway.js';
import { AgentPresenceModule } from '../agents/agent-presence.module.js';
import { AgentPresenceService } from '../agents/agent-presence.service.js';
@Global()
@Module({
imports: [AgentPresenceModule],
providers: [RealtimeGateway],
exports: [RealtimeGateway],
})
export class RealtimeModule {}
export class RealtimeModule implements OnModuleInit {
// Wire presence into the gateway at startup. Using assignment (vs
// constructor injection) keeps the gateway free of the agents-module
// import — no risk of circular dependency, and announce-channel
// delivery degrades gracefully (presence stays undefined → 'unknown'
// status → no busy-discard) if AgentPresenceModule is ever removed.
constructor(
private readonly gateway: RealtimeGateway,
private readonly presence: AgentPresenceService,
) {}
onModuleInit(): void {
this.gateway.presence = this.presence;
}
}

View File

@@ -1,7 +1,8 @@
{
"compilerOptions": {
"module": "commonjs",
"target": "es2020",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"target": "es2022",
"strict": true,
"esModuleInterop": true,
"experimentalDecorators": true,