Compare commits
3 Commits
feat/fabri
...
6fe06f55dd
| Author | SHA1 | Date | |
|---|---|---|---|
| 6fe06f55dd | |||
| a15dc880af | |||
| 5dcbd99c28 |
28
index.ts
28
index.ts
@@ -13,11 +13,17 @@ import { registerFabricTools } from './src/tools.js';
|
||||
import { FabricClient } from './src/fabric-client.js';
|
||||
import { IdentityRegistry } from './src/identity.js';
|
||||
import { syncFabricCommands } from './src/command-sync.js';
|
||||
import { PresenceSync } from './src/presence-sync.js';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
|
||||
let runtimeRef: unknown = null;
|
||||
let inbound: FabricInbound | null = null;
|
||||
let presence: PresenceSync | null = null;
|
||||
// Periodic re-harvest of presence accounts so newly-connected agents
|
||||
// (registered through tool-based identity flow AFTER initial start)
|
||||
// get picked up. Cleared on gateway_stop.
|
||||
let presenceRefreshTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
export { fabricChannelPlugin } from './src/channel.js';
|
||||
|
||||
@@ -82,7 +88,24 @@ export default defineChannelPluginEntry({
|
||||
api.logger,
|
||||
accounts,
|
||||
);
|
||||
void inbound.start();
|
||||
// start() resolves once all accounts have attempted login; per-
|
||||
// agent failures are logged but don't reject. Once it resolves we
|
||||
// can harvest the presence accounts (those that DID log in have
|
||||
// their fabricUserId + first guild endpoint populated).
|
||||
void inbound.start().then(() => {
|
||||
if (!inbound) return;
|
||||
presence = new PresenceSync(api.logger);
|
||||
presence.setAccounts(inbound.getPresenceAccounts());
|
||||
presence.start();
|
||||
api.logger.info(`fabric: presence-sync started for ${inbound.getPresenceAccounts().length} account(s)`);
|
||||
|
||||
// Re-harvest every 5 min: catches agents added via tool-based
|
||||
// identity provisioning after gateway_start (recruitment flow).
|
||||
// setAccounts is idempotent — duplicates collapse on agentId.
|
||||
presenceRefreshTimer = setInterval(() => {
|
||||
if (inbound && presence) presence.setAccounts(inbound.getPresenceAccounts());
|
||||
}, 5 * 60_000);
|
||||
});
|
||||
api.logger.info(`fabric: inbound started for ${accounts.length} account(s)`);
|
||||
void syncFabricCommands(client, cfg, accounts, api.logger);
|
||||
});
|
||||
@@ -93,6 +116,9 @@ export default defineChannelPluginEntry({
|
||||
// BEFORE deliver()). gateway_stop only flushes any leftover buffer.
|
||||
api.on('gateway_stop', () => {
|
||||
void flushAllFabric();
|
||||
if (presenceRefreshTimer) { clearInterval(presenceRefreshTimer); presenceRefreshTimer = null; }
|
||||
presence?.stop();
|
||||
presence = null;
|
||||
inbound?.stop();
|
||||
inbound = null;
|
||||
});
|
||||
|
||||
@@ -263,8 +263,52 @@ export class FabricInbound {
|
||||
this.sockets = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Per-account metadata harvested during `start()` — used by
|
||||
* PresenceSync to know where to push each agent's HF status.
|
||||
*
|
||||
* `fabricUserId` is filled from `session.user.id` after agent-login.
|
||||
* `guildBaseUrl` is the FIRST guild the agent is connected to (multi-
|
||||
* guild presence push is a future concern; for sim/prod-v1 each agent
|
||||
* is in one guild).
|
||||
*
|
||||
* Returns ONLY agents that successfully connected — failed-login
|
||||
* agents have no fabricUserId yet and are excluded.
|
||||
*/
|
||||
getPresenceAccounts(): Array<{
|
||||
agentId: string;
|
||||
fabricUserId: string;
|
||||
guildBaseUrl: string;
|
||||
fabricApiKey: string;
|
||||
}> {
|
||||
const out: Array<{ agentId: string; fabricUserId: string; guildBaseUrl: string; fabricApiKey: string }> = [];
|
||||
for (const entry of this.identity.list()) {
|
||||
if (!entry.fabricUserId) continue;
|
||||
const presenceGuildUrl = this.firstGuildEndpointByAgent.get(entry.agentId);
|
||||
if (!presenceGuildUrl) continue;
|
||||
out.push({
|
||||
agentId: entry.agentId,
|
||||
fabricUserId: entry.fabricUserId,
|
||||
guildBaseUrl: presenceGuildUrl,
|
||||
fabricApiKey: entry.fabricApiKey,
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// Filled by connectAgent for each (agent, guild). Tracks ONLY the first
|
||||
// guild per agent (used as the presence-push target).
|
||||
private firstGuildEndpointByAgent = new Map<string, string>();
|
||||
|
||||
private async connectAgent(agentId: string, session: FabricSession): Promise<void> {
|
||||
const selfUserId = session.user.id;
|
||||
// First-guild capture for presence-sync push target. session.guilds is
|
||||
// already in priority order from Center; we take the first one with a
|
||||
// valid endpoint and stop. Multi-guild presence is a future concern.
|
||||
if (!this.firstGuildEndpointByAgent.has(agentId)) {
|
||||
const firstGuild = session.guilds.find((g) => typeof g.endpoint === 'string' && g.endpoint.length > 0);
|
||||
if (firstGuild) this.firstGuildEndpointByAgent.set(agentId, firstGuild.endpoint);
|
||||
}
|
||||
for (const g of session.guilds) {
|
||||
const tok = session.guildAccessTokens.find((t) => t.guildNodeId === g.nodeId)?.token;
|
||||
if (!tok) continue;
|
||||
|
||||
92
src/presence-sync.ts
Normal file
92
src/presence-sync.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
/**
|
||||
* presence-sync — read each connected agent's HF status (via the
|
||||
* cross-plugin `globalThis.__hfAgentStatus.get(agentId)` exposed by
|
||||
* HarborForge.OpenclawPlugin) and push diffs to Fabric.Backend.Guild
|
||||
* `PUT /agents/:userId/presence` so the backend can apply busy-discard
|
||||
* on `announce`-type channel deliveries.
|
||||
*
|
||||
* Push model: we only PUT when an agent's status actually changes
|
||||
* (since the last push). The HF-side accessor has its own TTL cache
|
||||
* to absorb the every-30s polling.
|
||||
*
|
||||
* If HF plugin isn't loaded (`__hfAgentStatus` undefined), the loop
|
||||
* is a no-op — Fabric backend defaults presence to 'unknown' which is
|
||||
* treated as not-busy. Announce-channel delivery still works; busy
|
||||
* filtering simply doesn't kick in.
|
||||
*/
|
||||
|
||||
type HfStatus = 'idle' | 'on_call' | 'busy' | 'exhausted' | 'offline';
|
||||
type Bridge = { get(agentId: string): Promise<HfStatus | undefined> };
|
||||
type Logger = { info: (m: string) => void; warn: (m: string) => void };
|
||||
|
||||
export interface PresenceSyncAccount {
|
||||
agentId: string;
|
||||
fabricUserId: string; // the agent's Fabric Center user id (UUID)
|
||||
guildBaseUrl: string; // e.g. https://fabric.hangman-lab.top/guild/<id>
|
||||
fabricApiKey: string; // existing per-account key
|
||||
}
|
||||
|
||||
export class PresenceSync {
|
||||
private timer: ReturnType<typeof setInterval> | null = null;
|
||||
private readonly lastStatus = new Map<string, HfStatus>(); // by agentId
|
||||
private readonly accounts = new Map<string, PresenceSyncAccount>();
|
||||
|
||||
constructor(private readonly logger: Logger) {}
|
||||
|
||||
setAccounts(accounts: PresenceSyncAccount[]): void {
|
||||
this.accounts.clear();
|
||||
for (const a of accounts) this.accounts.set(a.agentId, a);
|
||||
}
|
||||
|
||||
start(intervalMs = 30_000): void {
|
||||
if (this.timer) return;
|
||||
this.timer = setInterval(() => {
|
||||
this.tick().catch((err) => this.logger.warn(`fabric: presence-sync error: ${String(err)}`));
|
||||
}, intervalMs);
|
||||
// run once immediately so initial state lands fast
|
||||
void this.tick();
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
if (this.timer) {
|
||||
clearInterval(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
}
|
||||
|
||||
private async tick(): Promise<void> {
|
||||
const bridge = (globalThis as Record<string, unknown>)['__hfAgentStatus'] as Bridge | undefined;
|
||||
if (!bridge || typeof bridge.get !== 'function') return; // HF plugin not loaded — skip
|
||||
|
||||
for (const [agentId, acct] of this.accounts) {
|
||||
let status: HfStatus | undefined;
|
||||
try {
|
||||
status = await bridge.get(agentId);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!status) continue;
|
||||
if (this.lastStatus.get(agentId) === status) continue; // no change → no PUT
|
||||
|
||||
try {
|
||||
const url = `${acct.guildBaseUrl.replace(/\/$/, '')}/agents/${encodeURIComponent(acct.fabricUserId)}/presence`;
|
||||
const res = await fetch(url, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'content-type': 'application/json',
|
||||
'x-api-key': acct.fabricApiKey,
|
||||
},
|
||||
body: JSON.stringify({ status, source: 'hf-plugin' }),
|
||||
});
|
||||
if (res.ok) {
|
||||
this.lastStatus.set(agentId, status);
|
||||
this.logger.info(`fabric: presence-sync ${agentId} → ${status}`);
|
||||
} else {
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} failed: ${res.status}`);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`fabric: presence-sync PUT ${agentId} threw: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user