Compare commits
6 Commits
92945b777d
...
feat/triag
| Author | SHA1 | Date | |
|---|---|---|---|
| c5fd091f5a | |||
| c5a33c33ec | |||
| 28f5083679 | |||
| a060ff98a2 | |||
| b9a5456d57 | |||
| d1d5ad10ca |
91
dist/fabric/src/inbound.js
vendored
91
dist/fabric/src/inbound.js
vendored
@@ -14,6 +14,14 @@ export class FabricInbound {
|
||||
accounts;
|
||||
sockets = [];
|
||||
seen = new Set();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
channelSyncTimers = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
static CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -75,6 +83,9 @@ export class FabricInbound {
|
||||
}
|
||||
}
|
||||
stop() {
|
||||
for (const t of this.channelSyncTimers)
|
||||
clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets)
|
||||
s.disconnect();
|
||||
this.sockets = [];
|
||||
@@ -90,19 +101,83 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set();
|
||||
const syncChannels = async (kind) => {
|
||||
let freshTok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${tok}` } });
|
||||
const channels = res.ok ? (await res.json()) : [];
|
||||
for (const c of channels)
|
||||
socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
}
|
||||
catch {
|
||||
/* best effort */
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`, { headers: { authorization: `Bearer ${authTok}` } });
|
||||
if (!res.ok)
|
||||
return;
|
||||
const channels = (await res.json());
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`);
|
||||
}
|
||||
else if (added > 0 || removed > 0) {
|
||||
this.log.info(`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`);
|
||||
}
|
||||
}
|
||||
catch {
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
// Push-based membership events from the backend (companion to
|
||||
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
|
||||
// server tells us this user was added to / removed from a
|
||||
// channel, we sub/unsub the socket.io room immediately — no
|
||||
// 60s wait for the polling resync. Polling remains as a safety
|
||||
// net for missed events.
|
||||
socket.on('channel.joined', (evt) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || joined.has(id))
|
||||
return;
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
socket.on('channel.left', (evt) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || !joined.has(id))
|
||||
return;
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId)
|
||||
|
||||
262
src/inbound.ts
262
src/inbound.ts
@@ -52,6 +52,14 @@ type FabricMessage = {
|
||||
export class FabricInbound {
|
||||
private sockets: Socket[] = [];
|
||||
private seen = new Set<string>();
|
||||
// Timers that periodically re-sync channel membership per (agent, guild).
|
||||
// Without this, the agent's socket.io subscriptions are a snapshot taken
|
||||
// at connect time — any channel the agent joins later (e.g. a fresh DM
|
||||
// created by another user) is unreachable until the gateway restarts.
|
||||
private channelSyncTimers: NodeJS.Timeout[] = [];
|
||||
// Resync cadence. Backend doesn't push a `channel.joined` event, so we
|
||||
// poll. 60s keeps the lag bounded without hammering the backend.
|
||||
private static readonly CHANNEL_SYNC_INTERVAL_MS = 60_000;
|
||||
// Guild access tokens are short-lived (~15 min). The socket survives via
|
||||
// socket.io reconnect, but the token captured at connect time goes stale,
|
||||
// so HTTP calls (attachment download, posting the reply) start 401ing.
|
||||
@@ -59,6 +67,136 @@ export class FabricInbound {
|
||||
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
||||
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
||||
|
||||
// Per-channel serial work queue. Every inbound socket message for a
|
||||
// channel awaits the previous task for that same channel, so model
|
||||
// turns never interleave. Map key = channelId; value is the tail of
|
||||
// the chain (an in-flight promise the next task awaits).
|
||||
//
|
||||
// Why per-channel and not per-agent: a single agent may sit in
|
||||
// several triage / general channels; we want each channel to flow at
|
||||
// its own speed but the SAME channel's traffic to be strictly serial.
|
||||
// For dm and discuss the queue also serialises but those traditionally
|
||||
// had at-most-one-in-flight anyway via the turn engine.
|
||||
private channelChains = new Map<string, Promise<void>>();
|
||||
|
||||
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
|
||||
// agent/status round-trip off the hot path for back-to-back triage
|
||||
// messages. Short TTL because status flips are rare-but-meaningful.
|
||||
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
|
||||
private static readonly AGENT_STATUS_TTL_MS = 5_000;
|
||||
|
||||
// Triage messages that arrived while the on-duty agent wasn't on_call
|
||||
// — sit here until either (a) the agent becomes on_call and the next
|
||||
// triage arrival drains them, or (b) the gateway restarts (lost; ok
|
||||
// because the underlying Fabric messages are persisted and re-fetched
|
||||
// on agent reconnect's history sweep).
|
||||
private pendingTriageGated: Array<{
|
||||
agentId: string;
|
||||
g: { nodeId: string; endpoint: string };
|
||||
channelId: string;
|
||||
m: FabricMessage;
|
||||
session: FabricSession;
|
||||
}> = [];
|
||||
|
||||
// Schedule `task` to run after every previous task on the same
|
||||
// channel has completed. Returns the promise so callers can await
|
||||
// their own result if they need to; the chain itself is fire-and-
|
||||
// forget from the socket.on handler.
|
||||
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
|
||||
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
|
||||
const next = prev.then(task).catch((err) => {
|
||||
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
|
||||
});
|
||||
this.channelChains.set(channelId, next);
|
||||
// Best-effort cleanup so the Map doesn't grow without bound for
|
||||
// long-running gateways: drop the entry when the chain settles, but
|
||||
// only if it's still the latest reference (newer enqueue may have
|
||||
// overwritten it in the meantime).
|
||||
void next.finally(() => {
|
||||
if (this.channelChains.get(channelId) === next) {
|
||||
this.channelChains.delete(channelId);
|
||||
}
|
||||
});
|
||||
return next;
|
||||
}
|
||||
|
||||
// Hit HF backend to check whether `agentId` is currently on_call.
|
||||
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
|
||||
// on_call" — triage stays gated rather than risking a confused wake.
|
||||
private async checkAgentOnCall(agentId: string): Promise<boolean> {
|
||||
const cached = this.agentStatusCache.get(agentId);
|
||||
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
|
||||
return cached.onCall;
|
||||
}
|
||||
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
|
||||
// CLAW_IDENTIFIER resolution priority:
|
||||
// 1. HF_CLAW_IDENTIFIER env (operator override)
|
||||
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
|
||||
// plugin itself uses — keeps the two in sync without an extra
|
||||
// env per service unit)
|
||||
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
|
||||
// container hostname is `server.t2` but HF agent row has
|
||||
// `claw_identifier=sim-t2`; matching is mandatory for the HF
|
||||
// backend's _require_agent() check)
|
||||
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
|
||||
if (!claw) {
|
||||
try {
|
||||
// openclaw config shape (verified in sim):
|
||||
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
|
||||
const cfg = this.cfg as {
|
||||
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
|
||||
};
|
||||
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
|
||||
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
|
||||
claw = fromCfg.trim();
|
||||
}
|
||||
} catch {
|
||||
/* fall through to hostname */
|
||||
}
|
||||
}
|
||||
if (!claw) {
|
||||
claw = (await import('os')).hostname();
|
||||
}
|
||||
let onCall = false;
|
||||
try {
|
||||
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
|
||||
const res = await fetch(url, {
|
||||
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
|
||||
});
|
||||
if (res.ok) {
|
||||
const data = (await res.json()) as { status?: string };
|
||||
onCall = (data.status ?? '').toLowerCase() === 'on_call';
|
||||
}
|
||||
} catch (err) {
|
||||
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
|
||||
}
|
||||
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
|
||||
return onCall;
|
||||
}
|
||||
|
||||
// FIFO drain of all triage-gated messages for `agentId` (called when
|
||||
// we just learned they're on_call). Each drained message is dispatched
|
||||
// through its own channel chain so per-channel serial order is kept.
|
||||
private async drainGatedFor(agentId: string): Promise<void> {
|
||||
const keep: typeof this.pendingTriageGated = [];
|
||||
const drain: typeof this.pendingTriageGated = [];
|
||||
for (const item of this.pendingTriageGated) {
|
||||
if (item.agentId === agentId) drain.push(item);
|
||||
else keep.push(item);
|
||||
}
|
||||
if (drain.length === 0) return;
|
||||
this.pendingTriageGated = keep;
|
||||
for (const item of drain) {
|
||||
this.log.info(
|
||||
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
|
||||
);
|
||||
// Re-enqueue via the per-channel chain so ordering is preserved.
|
||||
this.enqueueChannelTask(item.channelId, async () => {
|
||||
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Return a fresh guild access token for the agent, re-authenticating with
|
||||
// the agent's Fabric API key when the cached session is stale. Falls back
|
||||
// to the connect-time session token if re-login fails.
|
||||
@@ -119,6 +257,8 @@ export class FabricInbound {
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const t of this.channelSyncTimers) clearInterval(t);
|
||||
this.channelSyncTimers = [];
|
||||
for (const s of this.sockets) s.disconnect();
|
||||
this.sockets = [];
|
||||
}
|
||||
@@ -133,20 +273,87 @@ export class FabricInbound {
|
||||
auth: { token: tok },
|
||||
autoConnect: false,
|
||||
});
|
||||
const joinAll = async () => {
|
||||
// Tracked socket.io rooms for this (agent, guild). The initial fetch
|
||||
// on `connect` seeds it; the periodic resync diffs against it so we
|
||||
// only emit `join_channel` for genuinely new channels (and
|
||||
// `leave_channel` for ones the agent is no longer in).
|
||||
const joined = new Set<string>();
|
||||
const syncChannels = async (kind: 'initial' | 'resync') => {
|
||||
let freshTok: string | undefined;
|
||||
try {
|
||||
freshTok = await this.freshGuildToken(agentId, g.nodeId, session);
|
||||
} catch {
|
||||
freshTok = tok;
|
||||
}
|
||||
const authTok = freshTok ?? tok;
|
||||
try {
|
||||
const res = await fetch(
|
||||
`${g.endpoint}/api/channels?guildId=${encodeURIComponent(g.nodeId)}`,
|
||||
{ headers: { authorization: `Bearer ${tok}` } },
|
||||
{ headers: { authorization: `Bearer ${authTok}` } },
|
||||
);
|
||||
const channels = res.ok ? ((await res.json()) as Array<{ id: string }>) : [];
|
||||
for (const c of channels) socket.emit('join_channel', { channelId: c.id });
|
||||
this.log.info(`fabric: agent ${agentId} joined ${channels.length} channel(s) on ${g.nodeId}`);
|
||||
if (!res.ok) return;
|
||||
const channels = (await res.json()) as Array<{ id: string }>;
|
||||
const current = new Set(channels.map((c) => c.id));
|
||||
let added = 0;
|
||||
let removed = 0;
|
||||
for (const id of current) {
|
||||
if (!joined.has(id)) {
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
for (const id of [...joined]) {
|
||||
if (!current.has(id)) {
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
removed++;
|
||||
}
|
||||
}
|
||||
if (kind === 'initial') {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} joined ${current.size} channel(s) on ${g.nodeId}`,
|
||||
);
|
||||
} else if (added > 0 || removed > 0) {
|
||||
this.log.info(
|
||||
`fabric: agent ${agentId} channel resync on ${g.nodeId}: +${added} -${removed} (now ${joined.size})`,
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
/* best effort */
|
||||
/* best effort — next tick will retry */
|
||||
}
|
||||
};
|
||||
socket.on('connect', () => void joinAll());
|
||||
socket.on('connect', () => {
|
||||
// On every (re)connect the server forgets prior subscriptions, so
|
||||
// reset our local view and seed from a fresh fetch.
|
||||
joined.clear();
|
||||
void syncChannels('initial');
|
||||
});
|
||||
// Push-based membership events from the backend (companion to
|
||||
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
|
||||
// server tells us this user was added to / removed from a
|
||||
// channel, we sub/unsub the socket.io room immediately — no
|
||||
// 60s wait for the polling resync. Polling remains as a safety
|
||||
// net for missed events.
|
||||
socket.on('channel.joined', (evt: { channelId?: string }) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || joined.has(id)) return;
|
||||
socket.emit('join_channel', { channelId: id });
|
||||
joined.add(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
socket.on('channel.left', (evt: { channelId?: string }) => {
|
||||
const id = evt?.channelId;
|
||||
if (!id || !joined.has(id)) return;
|
||||
socket.emit('leave_channel', { channelId: id });
|
||||
joined.delete(id);
|
||||
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||
});
|
||||
const syncTimer = setInterval(
|
||||
() => void syncChannels('resync'),
|
||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||
);
|
||||
this.channelSyncTimers.push(syncTimer);
|
||||
socket.on('message.created', (m: FabricMessage) => {
|
||||
const channelId = m.channelId ?? '';
|
||||
if (!channelId) return;
|
||||
@@ -155,7 +362,33 @@ export class FabricInbound {
|
||||
if (this.seen.has(key)) return;
|
||||
this.seen.add(key);
|
||||
if (this.seen.size > 5000) this.seen.clear();
|
||||
void this.dispatch(agentId, g, channelId, m, session);
|
||||
// Per-channel serial queue. Prevents concurrent model turns for
|
||||
// the same channel — important for triage where a second wake
|
||||
// arriving mid-reply would interleave with the in-flight one.
|
||||
this.enqueueChannelTask(channelId, async () => {
|
||||
// Triage on_call gate: if the on-duty agent isn't currently
|
||||
// on_call per HF, don't dispatch yet — just sit on the
|
||||
// per-channel queue. Subsequent triage messages will recheck;
|
||||
// when the agent becomes on_call, the next arrival drains.
|
||||
//
|
||||
// Also handles: triage + wake=true must verify status before
|
||||
// committing to a model turn. Non-triage and triage observer
|
||||
// (wake=false) skip the gate.
|
||||
if (m.xType === 'triage' && m.wakeup === true) {
|
||||
const onCall = await this.checkAgentOnCall(agentId);
|
||||
if (!onCall) {
|
||||
this.log.info(
|
||||
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
|
||||
);
|
||||
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
|
||||
return;
|
||||
}
|
||||
// Drain any previously-gated messages (FIFO) before this one,
|
||||
// now that we know the agent is on_call.
|
||||
await this.drainGatedFor(agentId);
|
||||
}
|
||||
await this.dispatch(agentId, g, channelId, m, session);
|
||||
});
|
||||
});
|
||||
socket.connect();
|
||||
this.sockets.push(socket);
|
||||
@@ -260,6 +493,19 @@ export class FabricInbound {
|
||||
// any message that isn't the agent's own (already filtered above) is
|
||||
// always delivered to the model.
|
||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||
// Triage exception: non-wake messages (admin observer) MUST NOT
|
||||
// enter the agent's session at all. The next time the agent
|
||||
// wakes for a triage message, their context should contain only
|
||||
// their own past wakeups + their own outgoing messages — never
|
||||
// the observer-only chatter from other agents. For non-triage
|
||||
// channels keep the legacy "record-as-history" so a later wake
|
||||
// sees the full channel conversation.
|
||||
if (m.xType === 'triage') {
|
||||
this.log.info(
|
||||
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||
await core.channel.session.recordInboundSession({
|
||||
storePath,
|
||||
|
||||
Reference in New Issue
Block a user