3 Commits

Author SHA1 Message Date
6fe06f55dd feat(plugin): wire PresenceSync into gateway_start lifecycle (Phase 1.5)
Completes the Phase 1 hand-off chain — HF status now actually reaches
Fabric.Backend.Guild and busy-discard on announce channels becomes
operational end-to-end.

inbound.ts:
- Add getPresenceAccounts() — returns per-agent {agentId, fabricUserId,
  guildBaseUrl, fabricApiKey} for every agent that successfully logged
  in. fabricUserId comes from session.user.id cached on the identity
  registry; guildBaseUrl from session.guilds[0].endpoint captured in a
  new private firstGuildEndpointByAgent map during connectAgent().
- Multi-guild presence is deferred; the first guild per agent is the
  push target. For sim/prod-v1 each agent is in one guild so this is a
  no-op simplification.

index.ts gateway_start:
- After inbound.start() resolves, instantiate PresenceSync, call
  setAccounts(inbound.getPresenceAccounts()), start().
- 5-min refresh timer re-harvests accounts (catches agents added via
  tool-based identity registration AFTER initial start — e.g.
  recruitment flow). setAccounts is idempotent.
- gateway_stop now clears the refresh timer and stops PresenceSync
  before stopping inbound.

End-to-end check (still need sim verification):
  HF plugin scheduler heartbeat -> globalThis.__hfAgentStatus
   -> PresenceSync tick (30s) -> PUT /agents/:uid/presence
   -> agent_presences row -> computeDelivery for xType=announce
   -> busy recipients skipped, idle recipients get observer delivery.

Type-check: only pre-existing openclaw/* runtime-resolved-by-jiti
errors remain; new presence wiring compiles clean.

See DIALECTIC-V2-DESIGN.md section 10 Phase 1 (deferred items now
landed).
2026-05-23 11:37:08 +01:00
a15dc880af feat(plugin): add presence-sync module (Phase 1 partial wire)
Drops the PresenceSync class file under src/. Reads each agents HF
status from globalThis.__hfAgentStatus (exposed by
HarborForge.OpenclawPlugin) every 30s and PUTs deltas to
Fabric.Backend.Guild PUT /agents/:userId/presence so the backend can
do busy-discard on announce channel deliveries.

Implementation:
- Diffs against in-memory lastStatus map per agentId; PUT only on
  change. No-op when __hfAgentStatus is undefined (HF plugin not
  loaded) — degrades gracefully, backend defaults presence to
  unknown which means no busy filtering.
- Per-account context: {agentId, fabricUserId, guildBaseUrl,
  fabricApiKey}. Uses x-api-key header so it goes through the
  existing ApiKeyGuard path on the backend.

NOT YET WIRED into index.ts gateway_start lifecycle. To finish
wiring, the registerFull block needs to:
  1. After FabricInbound.start() resolves, harvest each agents
     fabric user id (introspected by Center during session login —
     available on FabricSession.user.id).
  2. Build PresenceSyncAccount[] from those + the existing accounts
     list (which already has agentId + fabricApiKey + guildBaseUrl).
  3. presence = new PresenceSync(api.logger); presence.setAccounts(...);
     presence.start();
  4. presence.stop() on gateway_stop.

Reason for splitting: wiring needs the FabricInbound public API to
expose per-account session metadata, which is a small but separate
refactor. Module ships standalone now so the dependency direction is
clear and the wire-up patch is small.

See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md section 7 (resolved
push-model design).
2026-05-23 11:32:24 +01:00
5dcbd99c28 Merge pull request 'feat(tools): fabric-send-message + fabric-channel-list + fabric-message-history' (#4) from feat/fabric-send-and-discover-tools into main 2026-05-22 22:11:06 +00:00
3 changed files with 163 additions and 1 deletions

View File

@@ -13,11 +13,17 @@ import { registerFabricTools } from './src/tools.js';
import { FabricClient } from './src/fabric-client.js';
import { IdentityRegistry } from './src/identity.js';
import { syncFabricCommands } from './src/command-sync.js';
import { PresenceSync } from './src/presence-sync.js';
import path from 'node:path';
import os from 'node:os';
let runtimeRef: unknown = null;
let inbound: FabricInbound | null = null;
let presence: PresenceSync | null = null;
// Periodic re-harvest of presence accounts so newly-connected agents
// (registered through tool-based identity flow AFTER initial start)
// get picked up. Cleared on gateway_stop.
let presenceRefreshTimer: ReturnType<typeof setInterval> | null = null;
export { fabricChannelPlugin } from './src/channel.js';
@@ -82,7 +88,24 @@ export default defineChannelPluginEntry({
api.logger,
accounts,
);
void inbound.start();
// start() resolves once all accounts have attempted login; per-
// agent failures are logged but don't reject. Once it resolves we
// can harvest the presence accounts (those that DID log in have
// their fabricUserId + first guild endpoint populated).
void inbound.start().then(() => {
if (!inbound) return;
presence = new PresenceSync(api.logger);
presence.setAccounts(inbound.getPresenceAccounts());
presence.start();
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
// Re-harvest every 5 min: catches agents added via tool-based
// identity provisioning after gateway_start (recruitment flow).
// setAccounts is idempotent — duplicates collapse on agentId.
presenceRefreshTimer = setInterval(() => {
if (inbound && presence) presence.setAccounts(inbound.getPresenceAccounts());
}, 5 * 60_000);
});
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
void syncFabricCommands(client, cfg, accounts, api.logger);
});
@@ -93,6 +116,9 @@ export default defineChannelPluginEntry({
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
api.on('gateway_stop', () => {
void flushAllFabric();
if (presenceRefreshTimer) { clearInterval(presenceRefreshTimer); presenceRefreshTimer = null; }
presence?.stop();
presence = null;
inbound?.stop();
inbound = null;
});

View File

@@ -263,8 +263,52 @@ export class FabricInbound {
this.sockets = [];
}
/**
* Per-account metadata harvested during `start()` — used by
* PresenceSync to know where to push each agent's HF status.
*
* `fabricUserId` is filled from `session.user.id` after agent-login.
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
* guild presence push is a future concern; for sim/prod-v1 each agent
* is in one guild).
*
* Returns ONLY agents that successfully connected — failed-login
* agents have no fabricUserId yet and are excluded.
*/
getPresenceAccounts(): Array<{
agentId: string;
fabricUserId: string;
guildBaseUrl: string;
fabricApiKey: string;
}> {
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = [];
for (const entry of this.identity.list()) {
if (!entry.fabricUserId) continue;
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
if (!presenceGuildUrl) continue;
out.push({
agentId: entry.agentId,
fabricUserId: entry.fabricUserId,
guildBaseUrl: presenceGuildUrl,
fabricApiKey: entry.fabricApiKey,
});
}
return out;
}
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
// guild per agent (used as the presence-push target).
private firstGuildEndpointByAgent = new Map<string, string>();
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
const selfUserId = session.user.id;
// First-guild capture for presence-sync push target. session.guilds is
// already in priority order from Center; we take the first one with a
// valid endpoint and stop. Multi-guild presence is a future concern.
if (!this.firstGuildEndpointByAgent.has(agentId)) {
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
}
for (const g of session.guilds) {
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
if (!tok) continue;

92
src/presence-sync.ts Normal file
View File

@@ -0,0 +1,92 @@
/**
* presence-sync — read each connected agent's HF status (via the
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
* on `announce`-type channel deliveries.
*
* Push model: we only PUT when an agent's status actually changes
* (since the last push). The HF-side accessor has its own TTL cache
* to absorb the every-30s polling.
*
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
* is a no-op — Fabric backend defaults presence to 'unknown' which is
* treated as not-busy. Announce-channel delivery still works; busy
* filtering simply doesn't kick in.
*/
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
type Logger = { info: (m: string) => void; warn: (m: string) => void };
export interface PresenceSyncAccount {
agentId: string;
fabricUserId: string; // the agent's Fabric Center user id (UUID)
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
fabricApiKey: string; // existing per-account key
}
export class PresenceSync {
private timer: ReturnType<typeof setInterval> | null = null;
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
private readonly accounts = new Map<string, PresenceSyncAccount>();
constructor(private readonly logger: Logger) {}
setAccounts(accounts: PresenceSyncAccount[]): void {
this.accounts.clear();
for (const a of accounts) this.accounts.set(a.agentId, a);
}
start(intervalMs = 30_000): void {
if (this.timer) return;
this.timer = setInterval(() => {
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
}, intervalMs);
// run once immediately so initial state lands fast
void this.tick();
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
}
private async tick(): Promise<void> {
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
for (const [agentId, acct] of this.accounts) {
let status: HfStatus | undefined;
try {
status = await bridge.get(agentId);
} catch {
continue;
}
if (!status) continue;
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
try {
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
const res = await fetch(url, {
method: 'PUT',
headers: {
'content-type': 'application/json',
'x-api-key': acct.fabricApiKey,
},
body: JSON.stringify({ status, source: 'hf-plugin' }),
});
if (res.ok) {
this.lastStatus.set(agentId, status);
this.logger.info(`fabric: presence-sync ${agentId}${status}`);
} else {
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
}
} catch (err) {
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
}
}
}
}