7 Commits

Author SHA1 Message Date
h z
73bfbd7594 Merge pull request 'feat(triage): per-channel serial queue + HF on_call gate + observer skip' (#3) from feat/triage-on-call-gate-and-queue into main 2026-05-22 21:59:23 +00:00
hanghang zhang
b7e9d278cd fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier)
os.hostname() fallback is wrong in sim where container hostname (server.t2)
doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate
fallback that reads openclaw config plugins.harbor-forge.identifier — the
same value the HF plugin uses for its outbound HF calls — keeping plugin
and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER
env override.

Priority:
  1. HF_CLAW_IDENTIFIER env (operator override)
  2. openclaw config plugins.harbor-forge.identifier (NEW)
  3. os.hostname() last-resort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:49:31 +01:00
hanghang zhang
c68de029e4 feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the
new triage flow:

## 1. Per-channel serial queue

Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel
chain so consecutive messages on the same channel are processed strictly
in order — no concurrent model turns for the same channel. Other
channels remain independent (parallelism preserved across channels).

Implementation: `Map<channelId, Promise>` where each new task awaits
the previous. The map entry self-cleans when the chain settles AND
no newer task has overwritten it.

## 2. HF on_call gate (triage + wake=true only)

Before dispatching a triage wake to the on-duty agent, hit HF
`GET /calendar/agent/status?agent_id=...`. If the agent isn't
currently on_call, the message is pushed to a per-agent gated queue
instead of dispatched — no model turn fires.

Status check is cached for 5s to amortise across rapid triage bursts.

When a subsequent triage message arrives and the agent IS on_call by
that point, the gated queue drains FIFO (re-enqueued through the same
per-channel chain so order is kept) before the new message dispatches.

Drained queue is in-memory only; on gateway restart the underlying
Fabric messages get re-fetched via the connect-time history sweep.

## 3. Triage observer skip (wake=false)

Triage messages that arrive with wakeup=false are admin observers — by
spec they MUST NOT enter the agent's session history. Skipped entirely
(no recordInboundSession call). The next time this agent legitimately
wakes for triage, their context contains only past wakeups + their own
outgoing messages — no observer-side chatter from other agents.

For NON-triage channels the legacy "record-as-history" stays — those
keep their full channel conversation available for later wakes.

## Env

- HF_API_BASE_URL  — defaults `https://monitor.hangman-lab.top`
- HF_CLAW_IDENTIFIER — defaults to `os.hostname()`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:17:39 +01:00
h z
2ec97eceda Merge pull request 'feat(inbound): listen for backend-pushed channel.joined/left events' (#2) from feat/inbound-listen-push-events into main 2026-05-21 07:12:51 +00:00
hanghang zhang
0f1fb27cf9 feat(inbound): listen for backend-pushed channel.joined/left events
Companion to nav/Fabric.Backend.Guild#<TBD> which adds the server-side
emitToUser broadcast on channel membership changes. Before, the inbound
only learned about new channels via the 60s polling resync (worst-case
60s lag). Now the backend tells us directly so sub/unsub is realtime.

socket.on('channel.joined', evt) → join the socket.io room for evt.channelId
                                    and add to the local 'joined' set.
socket.on('channel.left',   evt) → leave + remove from 'joined'.

Both events are idempotent (`if (joined.has(id))` / `if (!joined.has(id))`)
so duplicate emits from server are safe. Polling resync still runs every
60s as a safety net for transient socket drops between emit and
reconnect, partial server failures, etc.

When backend lacks this support (older deployments), nothing breaks —
the event simply never fires and polling carries the load as before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 08:08:33 +01:00
h z
8224975119 Merge pull request 'fix: dynamically sync inbound channel subscriptions' (#1) from fix/inbound-dynamic-channel-sync into main 2026-05-21 06:56:49 +00:00
hanghang zhang
7d90ae4f2b fix: dynamically sync inbound channel subscriptions
The fabric inbound previously called `joinAll()` once on socket.io
`connect` — it fetched the agent's channel list via
`GET /api/channels?guildId=...` and emitted `join_channel` for each.
Any channel the agent joined *after* connect (e.g. a fresh DM created
by another user that includes this agent) was unreachable until the
gateway restarted: the socket was never subscribed to that room, so
backend `message.created` push events never arrived.

Backend doesn't emit a user-scoped `channel.joined` event we could
piggy-back on (only `message.created`), so the fix is to poll. Every
60s the agent's channel list is re-fetched and diffed against a local
`joined` set:
- new channel ids → `socket.emit('join_channel', {channelId})` + add
- ids in `joined` but absent from the fresh list → `leave_channel`
  emit + remove (best-effort; cleans subs if the agent is removed from
  a channel)

Re-uses `freshGuildToken()` so the resync fetch survives token
expiry (15-min TTL). Initial `connect` resets the local `joined`
set since the server forgets prior room subscriptions on reconnect.

Timers are tracked in `channelSyncTimers` and cleared in `stop()`
alongside socket disconnect.

Verified against prod server.t2 scenario: hzhang creates DM channel
including agent 'manager' → without this fix, manager only sees the
message after a gateway restart; with this fix, manager receives the
message within at most 60s (next resync tick).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 07:45:59 +01:00
3 changed files with 1 additions and 245 deletions

View File

@@ -14,12 +14,7 @@
"create-work-channel", "create-work-channel",
"create-report-channel", "create-report-channel",
"create-discussion-channel", "create-discussion-channel",
"discussion-complete", "discussion-complete"
"fabric-canvas",
"fabric-channel",
"fabric-send-message",
"fabric-channel-list",
"fabric-message-history"
] ]
}, },
"configSchema": { "configSchema": {

View File

@@ -190,76 +190,6 @@ export class FabricClient {
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> { removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token); return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
} }
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(
guildEndpoint: string,
guildToken: string,
guildNodeId: string,
): Promise<Array<{
id: string;
guildId: string;
name: string;
xType: string;
kind: string;
isPublic: boolean;
closed: boolean;
lastSeq: number;
createdAt: string;
}>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
guildToken,
);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(
guildEndpoint: string,
guildToken: string,
channelId: string,
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
): Promise<{
items: Array<{
messageId: string;
seq: number;
content: string;
authorUserId: string;
createdAt: string;
editedAt: string | null;
deletedAt: string | null;
isDeleted: boolean;
}>;
page: {
seqFrom: number;
seqTo: number;
limit: number;
returned: number;
hasMore: boolean;
nextExpectedSeq: number;
highestCommittedSeq: number;
};
}> {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
} }
export type CanvasFormat = 'md' | 'html' | 'text'; export type CanvasFormat = 'md' | 'html' | 'text';

View File

@@ -243,173 +243,4 @@ export function registerFabricTools(
} }
}, },
})); }));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-message',
description:
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (p: { guildNodeId: string; channelId: string; content: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(
guild.endpoint,
token,
p.channelId,
p.content,
session.user.id,
)) as { messageId?: string; seq?: number };
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-list',
description:
'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (p: {
guildNodeId: string;
nameFilter?: string;
xType?: string;
includeClosed?: boolean;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed) return false;
if (p.xType && c.xType !== p.xType) return false;
if (needle && !c.name.toLowerCase().includes(needle)) return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-message-history',
description:
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (p: {
guildNodeId: string;
channelId: string;
seqFrom?: number;
seqTo?: number;
limit?: number;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
} }