3 Commits

Author SHA1 Message Date
b8e0e424fa fix(inbound): route fabric DM channels as peer.kind='direct' / ChatType='direct'
Inbound was hardcoding `peer: { kind: 'group' }` and `ChatType: 'group'`
for every fabric channel regardless of xType. As a result:

- sessionKey for a DM was `agent:<id>:fabric:group:<chan>` instead of
  `agent:<id>:fabric:direct:<chan>`
- ctx.ChatType='group' caused user-prompt metadata to render
  `is_group_chat: true` on a DM
- openclaw's `isDirectMessage()` check (ChatType==='direct') returned
  false, so DM-specific prompt and turn behavior never engaged

Caught by recruiter test in session 40c51de2: the model's thinking trace
acknowledged "fabric DM channel" (from the ClawPrompts chat-injector
hook) but the surrounding user-prompt metadata contradicted it with
`is_group_chat: true`, and the model reasoned its way out of running
`workflow_start`.

Fix factors a small helper `fabricPeerRoutingForXType` (and a cache-
backed `fabricPeerRoutingForChannel` for outbound) in channel.ts that
maps:
  - 'dm'  → { peerKind: 'direct', chatType: 'direct' }
  - rest  → { peerKind: 'group',  chatType: 'group' }   (no change)

Inbound uses m.xType directly (live, authoritative). Outbound has no
xType in its call signature, so it consults the channel-meta cache
populated by inbound (same `getChannelType` already exposed via
__fabric). Cache miss falls back to 'group' — the pre-fix default, no
regression. The proactive-DM-without-prior-inbound edge case still
routes that one outbound as 'group'; the next round agrees on 'direct'.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 14:26:42 +01:00
h z
81a10f2a1f Merge #5 feat(channel-meta): __fabric.getChannelType 2026-05-25 10:38:22 +00:00
c5429129d9 feat(channel-meta): expose globalThis.__fabric.getChannelType for narrow gating
Inbound `message.created` already carries `xType` (dm / triage / group /
broadcast / etc.) — record it in a per-channel cache so other plugins
can answer "is this channel a DM?" without poking the Center API.

New module src/channel-meta.ts:
  - in-memory Map<channelId, xType>
  - lazily loaded from ~/.openclaw/fabric-channel-meta.json on first
    access (so first-ever DM after a fresh gateway start still hits
    cache from the previous run)
  - debounced 250ms flush on dirty; force-flush on gateway_stop
  - recordChannelType(channelId, xType): called from inbound
  - getChannelType(channelId): null if unknown — caller MUST treat null
    as "don't know", NOT as "assume DM" (would re-introduce the false-
    positive on group channels we're trying to eliminate)

Wiring:
  - inbound.ts socket.on('message.created'): records xType BEFORE the
    self-author / dedup gates (channel type is observer-agnostic)
  - index.ts: installs globalThis.__fabric = { getChannelType } on
    registerFull(); flushes on gateway_stop

Consumer: ClawPrompts' fabric-chat-injector will start gating its prompt
injection on getChannelType(channelId) === 'dm' (companion PR on
ClawPrompts). Removes the phase-1 "any fabric channel" false-positive.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 11:28:36 +01:00
9 changed files with 359 additions and 12 deletions

24
dist/fabric/index.js vendored
View File

@@ -6,6 +6,7 @@
import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
@@ -43,6 +44,29 @@ export default defineChannelPluginEntry({
const client = new FabricClient(centerApiBase);
const identity = new IdentityRegistry(idFile);
registerFabricTools({ registerTool: (d) => api.registerTool(d), logger: api.logger }, client, identity);
// Cross-plugin API: globalThis.__fabric
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
// injection to DM-typed channels only. The channel-meta cache is
// populated lazily from inbound (message.created carries xType) and
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
// very first DM after a fresh gateway start hits cache from the
// previous run rather than firing the injector on the wrong type.
//
// null return = channel never seen (cache cold). Callers MUST NOT
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis;
_G['__fabric'] = { getChannelType };
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try {
flushChannelMeta();
}
catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
}
api.on('gateway_start', () => {
const _G = globalThis;
if (_G._fabricInboundStarted)

105
dist/fabric/src/channel-meta.js vendored Normal file
View File

@@ -0,0 +1,105 @@
/**
* Channel-meta cache. Records (channelId → xType) for every fabric
* channel the gateway has seen at least one inbound message in.
*
* Populated lazily from inbound (`recordChannelType` is called for
* every `message.created` event with non-empty `xType`). Persisted to
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
* gateway restarts (so the very first DM after restart still gets the
* right xType without waiting for a fresh inbound).
*
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
* to xType==='dm' only.
*
* Failure mode: lookup misses (channel never seen / inbound dropped
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
* fall back to "assume DM" or you re-introduce the false-positive on
* group channels.
*/
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { homedir } from 'node:os';
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
let memory = new Map();
let loaded = false;
let dirty = false;
let flushTimer = null;
function load() {
if (loaded)
return;
loaded = true;
try {
if (!existsSync(CACHE_FILE))
return;
const raw = readFileSync(CACHE_FILE, 'utf8');
const parsed = JSON.parse(raw);
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
if (typeof k === 'string' && typeof v === 'string')
memory.set(k, v);
}
}
catch {
// ignore — start with empty cache on corruption
}
}
function scheduleFlush() {
if (flushTimer)
return;
// Debounce writes — many inbound messages may arrive in a burst.
// 250ms coalesces them; on gateway_stop the channel plugin can force
// a synchronous flush via flushChannelMeta().
flushTimer = setTimeout(() => {
flushTimer = null;
if (!dirty)
return;
dirty = false;
flushSync();
}, 250);
}
function flushSync() {
try {
const dir = dirname(CACHE_FILE);
if (!existsSync(dir))
mkdirSync(dir, { recursive: true });
const out = { channels: Object.fromEntries(memory) };
const tmp = CACHE_FILE + '.tmp';
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
renameSync(tmp, CACHE_FILE);
}
catch {
// swallow — cache is an optimization; loss-on-write is recoverable
}
}
/** Called by inbound on every message.created. xType empty → no-op. */
export function recordChannelType(channelId, xType) {
if (!channelId || !xType)
return;
load();
const existing = memory.get(channelId);
if (existing === xType)
return;
memory.set(channelId, xType);
dirty = true;
scheduleFlush();
}
/** Cross-plugin lookup. null when channel never seen / unknown. */
export function getChannelType(channelId) {
if (!channelId)
return null;
load();
return memory.get(channelId) ?? null;
}
/** Force-flush — called on plugin shutdown to make sure recently
* recorded entries hit disk before the gateway dies. */
export function flushChannelMeta() {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (dirty) {
dirty = false;
flushSync();
}
}
export const CHANNEL_META_PATH = CACHE_FILE;

View File

@@ -11,6 +11,15 @@
import { createChatChannelPlugin, createChannelPluginBase, buildChannelOutboundSessionRoute, } from 'openclaw/plugin-sdk/core';
import { FabricClient } from './fabric-client.js';
import { listFabricAccountIds, resolveFabricAccount, resolveDefaultFabricAccountId, } from './accounts.js';
import { getChannelType } from './channel-meta.js';
export function fabricPeerRoutingForXType(xType) {
if (xType === 'dm')
return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId) {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
// ---- target grammar: fabric:<channelId> ----
export function stripFabricTargetPrefix(raw) {
let s = (raw ?? '').trim();
@@ -38,13 +47,18 @@ export function resolveFabricOutboundSessionRoute(params) {
const id = stripFabricTargetPrefix(params.target);
if (!id)
return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: 'fabric',
accountId: params.accountId,
peer: { kind: 'group', id },
chatType: 'group',
peer: { kind: peerKind, id },
chatType,
from: `fabric:channel:${id}`,
to: `fabric:${id}`,
});

View File

@@ -4,6 +4,8 @@ import { join } from 'node:path';
import { io } from 'socket.io-client';
import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-dispatch';
import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
export class FabricInbound {
core;
@@ -341,6 +343,13 @@ export class FabricInbound {
const channelId = m.channelId ?? '';
if (!channelId)
return;
// Record xType into the channel-meta cache before self-author
// / dedup gates — channel type doesn't depend on who sent the
// message, and recording it on observer-only triage messages
// is still useful (the next consumer asking
// __fabric.getChannelType wants the answer regardless of
// whether THIS message was delivered to an agent).
recordChannelType(channelId, m.xType);
if (m.authorUserId && m.authorUserId === selfUserId)
return;
const key = `${agentId}:${m.messageId}`;
@@ -425,11 +434,19 @@ export class FabricInbound {
const core = this.core;
const cfg = this.cfg;
try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
@@ -443,7 +460,7 @@ export class FabricInbound {
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ChatType: chatType,
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',

View File

@@ -311,10 +311,12 @@ export function registerFabricTools(api, client, identity) {
api.registerTool((ctx) => ({
name: 'fabric-guild-list',
description: 'List guilds the calling agent is a member of. Returns ' +
'{nodeId, name, purpose, status} per row. `purpose` is a free-form ' +
"description of what each guild is for. Use this BEFORE " +
'fabric-channel-list when a workflow asks you to pick the ' +
'right guild by intent (no guild ids hardcoded into workflows).',
'{nodeId, name, purpose, status} per row. ' +
"`purpose` is a free-form description of what each guild is for " +
'pick the guild whose purpose matches your intent. Use this tool ' +
'BEFORE fabric-channel-list when a workflow asks you to pick the ' +
'right guild by intent (e.g. "find a guild whose purpose mentions ' +
'debate broadcasts" → then list its announce-type channels).',
parameters: {
type: 'object',
additionalProperties: false,

View File

@@ -7,6 +7,7 @@ import { defineChannelPluginEntry } from 'openclaw/plugin-sdk/core';
import type { OpenClawPluginApi } from 'openclaw/plugin-sdk/core';
import { fabricChannelPlugin } from './src/channel.js';
import { flushAllFabric } from './src/coalesce.js';
import { getChannelType, flushChannelMeta } from './src/channel-meta.js';
import { FabricInbound } from './src/inbound.js';
import { listEnabledFabricAccounts } from './src/accounts.js';
import { registerFabricTools } from './src/tools.js';
@@ -62,6 +63,27 @@ export default defineChannelPluginEntry({
identity,
);
// Cross-plugin API: globalThis.__fabric
// Consumed by ClawPrompts' fabric-chat-injector to narrow its prompt
// injection to DM-typed channels only. The channel-meta cache is
// populated lazily from inbound (message.created carries xType) and
// persisted to ~/.openclaw/fabric-channel-meta.json — so even the
// very first DM after a fresh gateway start hits cache from the
// previous run rather than firing the injector on the wrong type.
//
// null return = channel never seen (cache cold). Callers MUST NOT
// fall back to "assume DM" — fail closed on unknown.
{
const _G = globalThis as Record<string, unknown>;
_G['__fabric'] = { getChannelType };
// Flush channel-meta cache when the gateway shuts down so
// recently-recorded xType entries don't get lost.
api.on('gateway_stop', () => {
try { flushChannelMeta(); } catch { /* ignore */ }
});
api.logger.info('fabric: __fabric cross-plugin API installed (getChannelType)');
}
api.on('gateway_start', () => {
const _G = globalThis as Record<string, unknown>;
if (_G._fabricInboundStarted) return;

108
src/channel-meta.ts Normal file
View File

@@ -0,0 +1,108 @@
/**
* Channel-meta cache. Records (channelId → xType) for every fabric
* channel the gateway has seen at least one inbound message in.
*
* Populated lazily from inbound (`recordChannelType` is called for
* every `message.created` event with non-empty `xType`). Persisted to
* `~/.openclaw/fabric-channel-meta.json` so the cache survives
* gateway restarts (so the very first DM after restart still gets the
* right xType without waiting for a fresh inbound).
*
* Exposed cross-plugin via `globalThis.__fabric.getChannelType`. Used
* by ClawPrompts' fabric-chat-injector to narrow its prompt injection
* to xType==='dm' only.
*
* Failure mode: lookup misses (channel never seen / inbound dropped
* xType) return null. Callers MUST treat null as "unknown" — DO NOT
* fall back to "assume DM" or you re-introduce the false-positive on
* group channels.
*/
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { homedir } from 'node:os';
const CACHE_FILE = join(homedir(), '.openclaw', 'fabric-channel-meta.json');
interface ChannelMetaFile {
// channelId → xType ('dm' | 'triage' | 'group' | etc.)
channels: Record<string, string>;
}
let memory = new Map<string, string>();
let loaded = false;
let dirty = false;
let flushTimer: ReturnType<typeof setTimeout> | null = null;
function load(): void {
if (loaded) return;
loaded = true;
try {
if (!existsSync(CACHE_FILE)) return;
const raw = readFileSync(CACHE_FILE, 'utf8');
const parsed = JSON.parse(raw) as ChannelMetaFile;
for (const [k, v] of Object.entries(parsed.channels ?? {})) {
if (typeof k === 'string' && typeof v === 'string') memory.set(k, v);
}
} catch {
// ignore — start with empty cache on corruption
}
}
function scheduleFlush(): void {
if (flushTimer) return;
// Debounce writes — many inbound messages may arrive in a burst.
// 250ms coalesces them; on gateway_stop the channel plugin can force
// a synchronous flush via flushChannelMeta().
flushTimer = setTimeout(() => {
flushTimer = null;
if (!dirty) return;
dirty = false;
flushSync();
}, 250);
}
function flushSync(): void {
try {
const dir = dirname(CACHE_FILE);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const out: ChannelMetaFile = { channels: Object.fromEntries(memory) };
const tmp = CACHE_FILE + '.tmp';
writeFileSync(tmp, JSON.stringify(out, null, 2) + '\n', 'utf8');
renameSync(tmp, CACHE_FILE);
} catch {
// swallow — cache is an optimization; loss-on-write is recoverable
}
}
/** Called by inbound on every message.created. xType empty → no-op. */
export function recordChannelType(channelId: string, xType: string | undefined): void {
if (!channelId || !xType) return;
load();
const existing = memory.get(channelId);
if (existing === xType) return;
memory.set(channelId, xType);
dirty = true;
scheduleFlush();
}
/** Cross-plugin lookup. null when channel never seen / unknown. */
export function getChannelType(channelId: string): string | null {
if (!channelId) return null;
load();
return memory.get(channelId) ?? null;
}
/** Force-flush — called on plugin shutdown to make sure recently
* recorded entries hit disk before the gateway dies. */
export function flushChannelMeta(): void {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
if (dirty) {
dirty = false;
flushSync();
}
}
export const CHANNEL_META_PATH = CACHE_FILE;

View File

@@ -21,6 +21,39 @@ import {
resolveDefaultFabricAccountId,
type ResolvedFabricAccount,
} from './accounts.js';
import { getChannelType } from './channel-meta.js';
/**
* Map a Fabric channel xType to an openclaw routing peer.kind / ChatType.
*
* Fabric distinguishes channels by xType ('dm' | 'triage' | 'group' |
* 'broadcast' | 'announce' | ...). Openclaw's session router only knows
* 'direct' | 'group' | 'channel'. We collapse:
* - 'dm' → 'direct' (1:1 conversation; agent always speaks)
* - rest → 'group' (multi-party; turn-engine gates speech)
*
* Sessions are keyed by peer.kind, so inbound and outbound MUST agree —
* otherwise the agent's outbound message lands in a different session
* than the inbound that triggered it and conversation state splits.
*
* Outbound has no live xType (the agent target is just a channelId), so
* it consults the channel-meta cache populated by inbound. Cache miss
* (channel never observed) falls back to 'group' — same as the pre-fix
* behavior, no regression on cold cache. The proactive-DM-first-message
* edge case (agent DMs a channel before any inbound) still lands as
* 'group' on that one outbound; the next inbound + outbound pair will
* agree on 'direct'.
*/
export type FabricPeerRouting = { peerKind: 'direct' | 'group'; chatType: 'direct' | 'group' };
export function fabricPeerRoutingForXType(xType: string | null | undefined): FabricPeerRouting {
if (xType === 'dm') return { peerKind: 'direct', chatType: 'direct' };
return { peerKind: 'group', chatType: 'group' };
}
export function fabricPeerRoutingForChannel(channelId: string): FabricPeerRouting {
return fabricPeerRoutingForXType(getChannelType(channelId));
}
type AnyCfg = { channels?: { fabric?: unknown }; [k: string]: unknown };
@@ -45,13 +78,18 @@ export function looksLikeFabricTargetId(raw: string): boolean {
export function resolveFabricOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
const id = stripFabricTargetPrefix(params.target);
if (!id) return null;
// Consult the channel-meta cache populated by inbound — DM channels
// need peer.kind='direct' so the outbound session key matches the
// inbound one. Cache miss falls back to 'group' (the pre-fix default,
// no regression on cold cache).
const { peerKind, chatType } = fabricPeerRoutingForChannel(id);
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: 'fabric',
accountId: params.accountId,
peer: { kind: 'group', id },
chatType: 'group',
peer: { kind: peerKind, id },
chatType,
from: `fabric:channel:${id}`,
to: `fabric:${id}`,
});

View File

@@ -6,6 +6,8 @@ import { dispatchInboundReplyWithBase } from 'openclaw/plugin-sdk/inbound-reply-
import type { FabricClient, FabricSession } from './fabric-client.js';
import type { IdentityRegistry } from './identity.js';
import { resolveCoalesce } from './accounts.js';
import { fabricPeerRoutingForXType } from './channel.js';
import { recordChannelType } from './channel-meta.js';
import { enqueueDelivery, flushFabricForChannel } from './coalesce.js';
// COMPAT NOTE (openclaw v2026.5.7): the inbound path mirrors how bundled
@@ -401,6 +403,13 @@ export class FabricInbound {
socket.on('message.created', (m: FabricMessage) => {
const channelId = m.channelId ?? '';
if (!channelId) return;
// Record xType into the channel-meta cache before self-author
// / dedup gates — channel type doesn't depend on who sent the
// message, and recording it on observer-only triage messages
// is still useful (the next consumer asking
// __fabric.getChannelType wants the answer regardless of
// whether THIS message was delivered to an agent).
recordChannelType(channelId, m.xType);
if (m.authorUserId && m.authorUserId === selfUserId) return;
const key = `${agentId}:${m.messageId}`;
if (this.seen.has(key)) return;
@@ -496,11 +505,19 @@ export class FabricInbound {
const core = this.core as Core & Record<string, unknown>;
const cfg = this.cfg as { session?: { store?: unknown } };
try {
// Route by xType. DM channels need peer.kind='direct' so openclaw
// treats them as 1:1 (sessionKey 'agent:<id>:fabric:direct:<chan>'
// and ctx.ChatType='direct') rather than as a multi-party group.
// Without this, the agent's user-prompt metadata says
// 'is_group_chat: true' on a DM and downstream prompt logic
// (commands-handlers `isDirectMessage` checks ChatType==='direct')
// misclassifies the turn.
const { peerKind, chatType } = fabricPeerRoutingForXType(m.xType);
const route = core.channel.routing.resolveAgentRoute({
cfg: this.cfg,
channel: 'fabric',
accountId: agentId,
peer: { kind: 'group', id: channelId },
peer: { kind: peerKind, id: channelId },
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
@@ -515,7 +532,7 @@ export class FabricInbound {
To: `fabric:${channelId}`,
SessionKey: route.sessionKey,
AccountId: route.accountId ?? agentId,
ChatType: 'group',
ChatType: chatType,
ConversationLabel: `fabric:${guild.nodeId}`,
SenderId: m.authorUserId ?? 'fabric',
Provider: 'fabric',