8 Commits

Author SHA1 Message Date
6fe06f55dd feat(plugin): wire PresenceSync into gateway_start lifecycle (Phase 1.5)
Completes the Phase 1 hand-off chain — HF status now actually reaches
Fabric.Backend.Guild and busy-discard on announce channels becomes
operational end-to-end.

inbound.ts:
- Add getPresenceAccounts() — returns per-agent {agentId, fabricUserId,
  guildBaseUrl, fabricApiKey} for every agent that successfully logged
  in. fabricUserId comes from session.user.id cached on the identity
  registry; guildBaseUrl from session.guilds[0].endpoint captured in a
  new private firstGuildEndpointByAgent map during connectAgent().
- Multi-guild presence is deferred; the first guild per agent is the
  push target. For sim/prod-v1 each agent is in one guild so this is a
  no-op simplification.

index.ts gateway_start:
- After inbound.start() resolves, instantiate PresenceSync, call
  setAccounts(inbound.getPresenceAccounts()), start().
- 5-min refresh timer re-harvests accounts (catches agents added via
  tool-based identity registration AFTER initial start — e.g.
  recruitment flow). setAccounts is idempotent.
- gateway_stop now clears the refresh timer and stops PresenceSync
  before stopping inbound.

End-to-end check (still need sim verification):
  HF plugin scheduler heartbeat -> globalThis.__hfAgentStatus
   -> PresenceSync tick (30s) -> PUT /agents/:uid/presence
   -> agent_presences row -> computeDelivery for xType=announce
   -> busy recipients skipped, idle recipients get observer delivery.

Type-check: only pre-existing openclaw/* runtime-resolved-by-jiti
errors remain; new presence wiring compiles clean.

See DIALECTIC-V2-DESIGN.md section 10 Phase 1 (deferred items now
landed).
2026-05-23 11:37:08 +01:00
a15dc880af feat(plugin): add presence-sync module (Phase 1 partial wire)
Drops the PresenceSync class file under src/. Reads each agents HF
status from globalThis.__hfAgentStatus (exposed by
HarborForge.OpenclawPlugin) every 30s and PUTs deltas to
Fabric.Backend.Guild PUT /agents/:userId/presence so the backend can
do busy-discard on announce channel deliveries.

Implementation:
- Diffs against in-memory lastStatus map per agentId; PUT only on
  change. No-op when __hfAgentStatus is undefined (HF plugin not
  loaded) — degrades gracefully, backend defaults presence to
  unknown which means no busy filtering.
- Per-account context: {agentId, fabricUserId, guildBaseUrl,
  fabricApiKey}. Uses x-api-key header so it goes through the
  existing ApiKeyGuard path on the backend.

NOT YET WIRED into index.ts gateway_start lifecycle. To finish
wiring, the registerFull block needs to:
  1. After FabricInbound.start() resolves, harvest each agents
     fabric user id (introspected by Center during session login —
     available on FabricSession.user.id).
  2. Build PresenceSyncAccount[] from those + the existing accounts
     list (which already has agentId + fabricApiKey + guildBaseUrl).
  3. presence = new PresenceSync(api.logger); presence.setAccounts(...);
     presence.start();
  4. presence.stop() on gateway_stop.

Reason for splitting: wiring needs the FabricInbound public API to
expose per-account session metadata, which is a small but separate
refactor. Module ships standalone now so the dependency direction is
clear and the wire-up patch is small.

See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md section 7 (resolved
push-model design).
2026-05-23 11:32:24 +01:00
5dcbd99c28 Merge pull request 'feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history' (#4) from feat/fabric-send-and-discover-tools into main 2026-05-22 22:11:06 +00:00
cd36d1b9e2 feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history
Plugin previously had no way for an agent to send text into a specific
channel proactively — outbound went only through the channel-reply
path (responds to the channel that woke the agent). discussion-complete
internally called client.postMessage but only for the close-time
summary, no general-purpose surface.

Three new tools (+ declare existing fabric-canvas / fabric-channel that
were registered but missing from contracts.tools so agents couldn't
see them per the openclaw plugin contract):

  * fabric-send-message {guildNodeId, channelId, content}
      → {ok, messageId, seq}
    Author = calling agent. Use for ARD broadcasts, follow-ups in a
    different channel, etc.

  * fabric-channel-list {guildNodeId, nameFilter?, xType?, includeClosed?}
      → {ok, count, channels[]}
    Backend filters to public + member channels; nameFilter is client-
    side case-insensitive substring; xType / includeClosed apply post-
    fetch. Returns id/name/xType/lastSeq so callers can pipe into the
    other tools.

  * fabric-message-history {guildNodeId, channelId, seqFrom?, seqTo?, limit?}
      → {ok, page, messages[]}
    Tail-by-default: omit seqFrom/seqTo and the tool fetches the
    channel head from listChannels then asks for [head-limit+1, head].
    Limit default 20, max 200. Backend rejects non-participants.

Plus 3 supporting client methods (listChannels, listMessages — both
GET via existing req helper).

contracts.tools updated to declare these 5 (3 new + 2 previously-
silent ones). Verified earlier in sim restart logs: openclaw warned
'plugin tool is undeclared (fabric): fabric-canvas / fabric-channel'
so agents couldn't use them despite registerTool firing.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 23:11:01 +01:00
9c910f082b Merge pull request 'feat(triage): per-channel serial queue + HF on_call gate + observer skip' (#3) from feat/triage-on-call-gate-and-queue into main 2026-05-22 21:59:23 +00:00
c5fd091f5a fix(triage): resolve claw_identifier via openclaw config (HF plugin's identifier)
os.hostname() fallback is wrong in sim where container hostname (server.t2)
doesn't match the HF agent row's claw_identifier (sim-t2). Add intermediate
fallback that reads openclaw config plugins.harbor-forge.identifier — the
same value the HF plugin uses for its outbound HF calls — keeping plugin
and HF agent state aligned without a per-service-unit HF_CLAW_IDENTIFIER
env override.

Priority:
  1. HF_CLAW_IDENTIFIER env (operator override)
  2. openclaw config plugins.harbor-forge.identifier (NEW)
  3. os.hostname() last-resort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:49:31 +01:00
c5a33c33ec feat(triage): per-channel serial queue + HF on_call gate + observer skip
Three behavioral changes to inbound message handling to support the
new triage flow:

## 1. Per-channel serial queue

Replaces `void this.dispatch(...)` (fire-and-forget) with a per-channel
chain so consecutive messages on the same channel are processed strictly
in order — no concurrent model turns for the same channel. Other
channels remain independent (parallelism preserved across channels).

Implementation: `Map<channelId, Promise>` where each new task awaits
the previous. The map entry self-cleans when the chain settles AND
no newer task has overwritten it.

## 2. HF on_call gate (triage + wake=true only)

Before dispatching a triage wake to the on-duty agent, hit HF
`GET /calendar/agent/status?agent_id=...`. If the agent isn't
currently on_call, the message is pushed to a per-agent gated queue
instead of dispatched — no model turn fires.

Status check is cached for 5s to amortise across rapid triage bursts.

When a subsequent triage message arrives and the agent IS on_call by
that point, the gated queue drains FIFO (re-enqueued through the same
per-channel chain so order is kept) before the new message dispatches.

Drained queue is in-memory only; on gateway restart the underlying
Fabric messages get re-fetched via the connect-time history sweep.

## 3. Triage observer skip (wake=false)

Triage messages that arrive with wakeup=false are admin observers — by
spec they MUST NOT enter the agent's session history. Skipped entirely
(no recordInboundSession call). The next time this agent legitimately
wakes for triage, their context contains only past wakeups + their own
outgoing messages — no observer-side chatter from other agents.

For NON-triage channels the legacy "record-as-history" stays — those
keep their full channel conversation available for later wakes.

## Env

- HF_API_BASE_URL  — defaults `https://monitor.hangman-lab.top`
- HF_CLAW_IDENTIFIER — defaults to `os.hostname()`

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 22:17:39 +01:00
28f5083679 Merge pull request 'feat(inbound): listen for backend-pushed channel.joined/left events' (#2) from feat/inbound-listen-push-events into main 2026-05-21 07:12:51 +00:00
6 changed files with 578 additions and 3 deletions

View File

@@ -13,11 +13,17 @@ import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef: unknown = null;
let inbound: FabricInbound | null = null;
let presence: PresenceSync | null = null;
// Periodic re-harvest of presence accounts so newly-connected agents
// (registered through tool-based identity flow AFTER initial start)
// get picked up. Cleared on gateway_stop.
let presenceRefreshTimer: ReturnType<typeof setInterval> | null = null;
export { fabricChannelPlugin } from './src/channel.js';
@@ -82,7 +88,24 @@ export default defineChannelPluginEntry({
api.logger,
accounts,
);
void inbound.start();
// start() resolves once all accounts have attempted login; per-
// agent failures are logged but don't reject. Once it resolves we
// can harvest the presence accounts (those that DID log in have
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound) return;
presence = new PresenceSync(api.logger);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
// Re-harvest every 5 min: catches agents added via tool-based
// identity provisioning after gateway_start (recruitment flow).
// setAccounts is idempotent — duplicates collapse on agentId.
presenceRefreshTimer = setInterval(() => {
if (inbound && presence) presence.setAccounts(inbound.getPresenceAccounts());
}, 5 * 60_000);
});
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
@@ -93,6 +116,9 @@ export default defineChannelPluginEntry({
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
if (presenceRefreshTimer) { clearInterval(presenceRefreshTimer); presenceRefreshTimer = null; }
presence?.stop();
presence = null;
inbound?.stop();
inbound = null;
});

View File

@@ -14,7 +14,12 @@
"create-work-channel",
"create-report-channel",
"create-discussion-channel",
"discussion-complete"
"discussion-complete",
"fabric-canvas",
"fabric-channel",
"fabric-send-message",
"fabric-channel-list",
"fabric-message-history"
]
},
"configSchema": {

View File

@@ -190,6 +190,76 @@ export class FabricClient {
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
}
// ---- channel discovery + message read (used by the agent-facing
// fabric-channel-list / fabric-message-history tools) ----
/**
* List channels in a guild visible to the calling user. Backend
* filters to public + channels the user is a member of.
*/
listChannels(
guildEndpoint: string,
guildToken: string,
guildNodeId: string,
): Promise<Array<{
id: string;
guildId: string;
name: string;
xType: string;
kind: string;
isPublic: boolean;
closed: boolean;
lastSeq: number;
createdAt: string;
}>> {
return this.req(
'GET',
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
guildToken,
);
}
/**
* Page through a channel's message history by `seq`.
*
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
* tail. Page metadata in the response describes what to ask next.
*/
listMessages(
guildEndpoint: string,
guildToken: string,
channelId: string,
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
): Promise<{
items: Array<{
messageId: string;
seq: number;
content: string;
authorUserId: string;
createdAt: string;
editedAt: string | null;
deletedAt: string | null;
isDeleted: boolean;
}>;
page: {
seqFrom: number;
seqTo: number;
limit: number;
returned: number;
hasMore: boolean;
nextExpectedSeq: number;
highestCommittedSeq: number;
};
}> {
const qs = new URLSearchParams();
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
return this.req('GET', url, guildToken);
}
}
export type CanvasFormat = 'md' | 'html' | 'text';

View File

@@ -67,6 +67,136 @@ export class FabricInbound {
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
// Per-channel serial work queue. Every inbound socket message for a
// channel awaits the previous task for that same channel, so model
// turns never interleave. Map key = channelId; value is the tail of
// the chain (an in-flight promise the next task awaits).
//
// Why per-channel and not per-agent: a single agent may sit in
// several triage / general channels; we want each channel to flow at
// its own speed but the SAME channel's traffic to be strictly serial.
// For dm and discuss the queue also serialises but those traditionally
// had at-most-one-in-flight anyway via the turn engine.
private channelChains = new Map<string, Promise<void>>();
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
// agent/status round-trip off the hot path for back-to-back triage
// messages. Short TTL because status flips are rare-but-meaningful.
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
private static readonly AGENT_STATUS_TTL_MS = 5_000;
// Triage messages that arrived while the on-duty agent wasn't on_call
// — sit here until either (a) the agent becomes on_call and the next
// triage arrival drains them, or (b) the gateway restarts (lost; ok
// because the underlying Fabric messages are persisted and re-fetched
// on agent reconnect's history sweep).
private pendingTriageGated: Array<{
agentId: string;
g: { nodeId: string; endpoint: string };
channelId: string;
m: FabricMessage;
session: FabricSession;
}> = [];
// Schedule `task` to run after every previous task on the same
// channel has completed. Returns the promise so callers can await
// their own result if they need to; the chain itself is fire-and-
// forget from the socket.on handler.
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
const next = prev.then(task).catch((err) => {
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
});
this.channelChains.set(channelId, next);
// Best-effort cleanup so the Map doesn't grow without bound for
// long-running gateways: drop the entry when the chain settles, but
// only if it's still the latest reference (newer enqueue may have
// overwritten it in the meantime).
void next.finally(() => {
if (this.channelChains.get(channelId) === next) {
this.channelChains.delete(channelId);
}
});
return next;
}
// Hit HF backend to check whether `agentId` is currently on_call.
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
// on_call" — triage stays gated rather than risking a confused wake.
private async checkAgentOnCall(agentId: string): Promise<boolean> {
const cached = this.agentStatusCache.get(agentId);
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
return cached.onCall;
}
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
// CLAW_IDENTIFIER resolution priority:
// 1. HF_CLAW_IDENTIFIER env (operator override)
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
// plugin itself uses — keeps the two in sync without an extra
// env per service unit)
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
// container hostname is `server.t2` but HF agent row has
// `claw_identifier=sim-t2`; matching is mandatory for the HF
// backend's _require_agent() check)
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
if (!claw) {
try {
// openclaw config shape (verified in sim):
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
const cfg = this.cfg as {
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
};
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
claw = fromCfg.trim();
}
} catch {
/* fall through to hostname */
}
}
if (!claw) {
claw = (await import('os')).hostname();
}
let onCall = false;
try {
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
const res = await fetch(url, {
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
});
if (res.ok) {
const data = (await res.json()) as { status?: string };
onCall = (data.status ?? '').toLowerCase() === 'on_call';
}
} catch (err) {
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
}
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
return onCall;
}
// FIFO drain of all triage-gated messages for `agentId` (called when
// we just learned they're on_call). Each drained message is dispatched
// through its own channel chain so per-channel serial order is kept.
private async drainGatedFor(agentId: string): Promise<void> {
const keep: typeof this.pendingTriageGated = [];
const drain: typeof this.pendingTriageGated = [];
for (const item of this.pendingTriageGated) {
if (item.agentId === agentId) drain.push(item);
else keep.push(item);
}
if (drain.length === 0) return;
this.pendingTriageGated = keep;
for (const item of drain) {
this.log.info(
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
);
// Re-enqueue via the per-channel chain so ordering is preserved.
this.enqueueChannelTask(item.channelId, async () => {
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
});
}
}
// Return a fresh guild access token for the agent, re-authenticating with
// the agent's Fabric API key when the cached session is stale. Falls back
// to the connect-time session token if re-login fails.
@@ -133,8 +263,52 @@ export class FabricInbound {
this.sockets = [];
}
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts(): Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
fabricApiKey: string;
}> {
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId) continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
if (!presenceGuildUrl) continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target).
private firstGuildEndpointByAgent = new Map<string, string>();
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue;
@@ -232,7 +406,33 @@ export class FabricInbound {
if (this.seen.has(key)) return;
this.seen.add(key);
if (this.seen.size > 5000) this.seen.clear();
void this.dispatch(agentId, g, channelId, m, session);
// Per-channel serial queue. Prevents concurrent model turns for
// the same channel — important for triage where a second wake
// arriving mid-reply would interleave with the in-flight one.
this.enqueueChannelTask(channelId, async () => {
// Triage on_call gate: if the on-duty agent isn't currently
// on_call per HF, don't dispatch yet — just sit on the
// per-channel queue. Subsequent triage messages will recheck;
// when the agent becomes on_call, the next arrival drains.
//
// Also handles: triage + wake=true must verify status before
// committing to a model turn. Non-triage and triage observer
// (wake=false) skip the gate.
if (m.xType === 'triage' && m.wakeup === true) {
const onCall = await this.checkAgentOnCall(agentId);
if (!onCall) {
this.log.info(
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
);
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
return;
}
// Drain any previously-gated messages (FIFO) before this one,
// now that we know the agent is on_call.
await this.drainGatedFor(agentId);
}
await this.dispatch(agentId, g, channelId, m, session);
});
});
socket.connect();
this.sockets.push(socket);
@@ -337,6 +537,19 @@ export class FabricInbound {
// any message that isn't the agent's own (already filtered above) is
// always delivered to the model.
if (m.xType !== 'dm' && m.wakeup !== true) {
// Triage exception: non-wake messages (admin observer) MUST NOT
// enter the agent's session at all. The next time the agent
// wakes for a triage message, their context should contain only
// their own past wakeups + their own outgoing messages — never
// the observer-only chatter from other agents. For non-triage
// channels keep the legacy "record-as-history" so a later wake
// sees the full channel conversation.
if (m.xType === 'triage') {
this.log.info(
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
);
return;
}
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
await core.channel.session.recordInboundSession({
storePath,

92
src/presence-sync.ts Normal file
View File

@@ -0,0 +1,92 @@
/**
* presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
* on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
type Logger = { info: (m: string) => void; warn: (m: string) => void };
export interface PresenceSyncAccount {
agentId: string;
fabricUserId: string; // the agent's Fabric Center user id (UUID)
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
fabricApiKey: string; // existing per-account key
}
export class PresenceSync {
private timer: ReturnType<typeof setInterval> | null = null;
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
private readonly accounts = new Map<string, PresenceSyncAccount>();
constructor(private readonly logger: Logger) {}
setAccounts(accounts: PresenceSyncAccount[]): void {
this.accounts.clear();
for (const a of accounts) this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000): void {
if (this.timer) return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
private async tick(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status: HfStatus | undefined;
try {
status = await bridge.get(agentId);
} catch {
continue;
}
if (!status) continue;
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
try {
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
'x-api-key': acct.fabricApiKey,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} else {
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
} catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}

View File

@@ -243,4 +243,173 @@ export function registerFabricTools(
}
},
}));
// -----------------------------------------------------------------
// fabric-send-message: post a message into a specific channel.
//
// Unlike a normal channel reply (which goes back to whatever channel
// woke the agent), this lets the agent proactively initiate text into
// any channel they are a member of — e.g. ARD broadcasting daily
// workload to #agents-room, or triage agent following up on an
// already-routed task by commenting in #updates.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-send-message',
description:
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId', 'content'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
},
},
execute: async (p: { guildNodeId: string; channelId: string; content: string }) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
const res = (await client.postMessage(
guild.endpoint,
token,
p.channelId,
p.content,
session.user.id,
)) as { messageId?: string; seq?: number };
return { ok: true, messageId: res.messageId, seq: res.seq };
},
}));
// -----------------------------------------------------------------
// fabric-channel-list: enumerate channels the calling agent can see
// in a given guild. Backend filters to public channels + channels the
// agent is a member of. Returns id / name / xType per channel so the
// agent can pick a channelId for fabric-send-message etc.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-channel-list',
description:
'List channels visible to the calling agent in a guild. Optional ' +
'nameFilter does a case-insensitive substring match client-side. ' +
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId'],
properties: {
guildNodeId: { type: 'string' },
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
xType: {
type: 'string',
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
description: 'optional filter by x_type',
},
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
},
},
execute: async (p: {
guildNodeId: string;
nameFilter?: string;
xType?: string;
includeClosed?: boolean;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const needle = (p.nameFilter ?? '').toLowerCase();
const filtered = all.filter((c) => {
if (!p.includeClosed && c.closed) return false;
if (p.xType && c.xType !== p.xType) return false;
if (needle && !c.name.toLowerCase().includes(needle)) return false;
return true;
});
return {
ok: true,
count: filtered.length,
channels: filtered.map((c) => ({
id: c.id,
name: c.name,
xType: c.xType,
isPublic: c.isPublic,
closed: c.closed,
lastSeq: c.lastSeq,
})),
};
},
}));
// -----------------------------------------------------------------
// fabric-message-history: read a channel's recent message history by
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
// the last `limit` messages (limit defaults to 20, max 200).
//
// Use cases: catch-up on a channel that was muted while the agent was
// gated; verify a previous message went through; lookup recent
// duplicates before opening a new task in triage.
// -----------------------------------------------------------------
api.registerTool((ctx: Ctx) => ({
name: 'fabric-message-history',
description:
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
'tail (last `limit` messages, default 20, max 200). Backend ' +
'requires the calling agent to be a channel participant.',
parameters: {
type: 'object',
additionalProperties: false,
required: ['guildNodeId', 'channelId'],
properties: {
guildNodeId: { type: 'string' },
channelId: { type: 'string' },
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
},
},
execute: async (p: {
guildNodeId: string;
channelId: string;
seqFrom?: number;
seqTo?: number;
limit?: number;
}) => {
const agentId = ctx.agentId;
if (!agentId) return { ok: false, error: 'no agent context' };
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
const limit = p.limit ?? 20;
// Tail mode: discover channel head via channel listing, then ask
// for [head-limit+1, head]. Avoids needing the agent to know seq.
let seqFrom = p.seqFrom;
let seqTo = p.seqTo;
if (seqFrom === undefined && seqTo === undefined) {
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
const ch = channels.find((c) => c.id === p.channelId);
const head = ch?.lastSeq ?? 0;
seqFrom = Math.max(1, head - limit + 1);
seqTo = head;
}
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
seqFrom,
seqTo,
limit,
});
return {
ok: true,
page: res.page,
messages: res.items.map((m) => ({
messageId: m.messageId,
seq: m.seq,
authorUserId: m.authorUserId,
content: m.content,
createdAt: m.createdAt,
isDeleted: m.isDeleted,
})),
};
},
}));
}