Compare commits
7 Commits
fix/inboun
...
feat/fabri
| Author | SHA1 | Date | |
|---|---|---|---|
| cd36d1b9e2 | |||
| 9c910f082b | |||
| c5fd091f5a | |||
| c5a33c33ec | |||
| 28f5083679 | |||
| a060ff98a2 | |||
| b9a5456d57 |
22
dist/fabric/src/inbound.js
vendored
22
dist/fabric/src/inbound.js
vendored
@@ -154,6 +154,28 @@ export class FabricInbound {
|
|||||||
joined.clear();
|
joined.clear();
|
||||||
void syncChannels('initial');
|
void syncChannels('initial');
|
||||||
});
|
});
|
||||||
|
// Push-based membership events from the backend (companion to
|
||||||
|
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
|
||||||
|
// server tells us this user was added to / removed from a
|
||||||
|
// channel, we sub/unsub the socket.io room immediately — no
|
||||||
|
// 60s wait for the polling resync. Polling remains as a safety
|
||||||
|
// net for missed events.
|
||||||
|
socket.on('channel.joined', (evt) => {
|
||||||
|
const id = evt?.channelId;
|
||||||
|
if (!id || joined.has(id))
|
||||||
|
return;
|
||||||
|
socket.emit('join_channel', { channelId: id });
|
||||||
|
joined.add(id);
|
||||||
|
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||||
|
});
|
||||||
|
socket.on('channel.left', (evt) => {
|
||||||
|
const id = evt?.channelId;
|
||||||
|
if (!id || !joined.has(id))
|
||||||
|
return;
|
||||||
|
socket.emit('leave_channel', { channelId: id });
|
||||||
|
joined.delete(id);
|
||||||
|
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||||
|
});
|
||||||
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
const syncTimer = setInterval(() => void syncChannels('resync'), FabricInbound.CHANNEL_SYNC_INTERVAL_MS);
|
||||||
this.channelSyncTimers.push(syncTimer);
|
this.channelSyncTimers.push(syncTimer);
|
||||||
socket.on('message.created', (m) => {
|
socket.on('message.created', (m) => {
|
||||||
|
|||||||
@@ -14,7 +14,12 @@
|
|||||||
"create-work-channel",
|
"create-work-channel",
|
||||||
"create-report-channel",
|
"create-report-channel",
|
||||||
"create-discussion-channel",
|
"create-discussion-channel",
|
||||||
"discussion-complete"
|
"discussion-complete",
|
||||||
|
"fabric-canvas",
|
||||||
|
"fabric-channel",
|
||||||
|
"fabric-send-message",
|
||||||
|
"fabric-channel-list",
|
||||||
|
"fabric-message-history"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"configSchema": {
|
"configSchema": {
|
||||||
|
|||||||
@@ -190,6 +190,76 @@ export class FabricClient {
|
|||||||
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
|
removeCanvas(endpoint: string, token: string, channelId: string): Promise<unknown> {
|
||||||
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
|
return this.req('DELETE', this.canvasUrl(endpoint, channelId), token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- channel discovery + message read (used by the agent-facing
|
||||||
|
// fabric-channel-list / fabric-message-history tools) ----
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List channels in a guild visible to the calling user. Backend
|
||||||
|
* filters to public + channels the user is a member of.
|
||||||
|
*/
|
||||||
|
listChannels(
|
||||||
|
guildEndpoint: string,
|
||||||
|
guildToken: string,
|
||||||
|
guildNodeId: string,
|
||||||
|
): Promise<Array<{
|
||||||
|
id: string;
|
||||||
|
guildId: string;
|
||||||
|
name: string;
|
||||||
|
xType: string;
|
||||||
|
kind: string;
|
||||||
|
isPublic: boolean;
|
||||||
|
closed: boolean;
|
||||||
|
lastSeq: number;
|
||||||
|
createdAt: string;
|
||||||
|
}>> {
|
||||||
|
return this.req(
|
||||||
|
'GET',
|
||||||
|
`${guildEndpoint}/api/channels?guildId=${encodeURIComponent(guildNodeId)}`,
|
||||||
|
guildToken,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Page through a channel's message history by `seq`.
|
||||||
|
*
|
||||||
|
* Backend defaults: 50 / call, max 200. The `seq` field starts at 1
|
||||||
|
* per channel; pass `seqFrom=channel.lastSeq - N + 1` to get the
|
||||||
|
* tail. Page metadata in the response describes what to ask next.
|
||||||
|
*/
|
||||||
|
listMessages(
|
||||||
|
guildEndpoint: string,
|
||||||
|
guildToken: string,
|
||||||
|
channelId: string,
|
||||||
|
opts: { seqFrom?: number; seqTo?: number; limit?: number } = {},
|
||||||
|
): Promise<{
|
||||||
|
items: Array<{
|
||||||
|
messageId: string;
|
||||||
|
seq: number;
|
||||||
|
content: string;
|
||||||
|
authorUserId: string;
|
||||||
|
createdAt: string;
|
||||||
|
editedAt: string | null;
|
||||||
|
deletedAt: string | null;
|
||||||
|
isDeleted: boolean;
|
||||||
|
}>;
|
||||||
|
page: {
|
||||||
|
seqFrom: number;
|
||||||
|
seqTo: number;
|
||||||
|
limit: number;
|
||||||
|
returned: number;
|
||||||
|
hasMore: boolean;
|
||||||
|
nextExpectedSeq: number;
|
||||||
|
highestCommittedSeq: number;
|
||||||
|
};
|
||||||
|
}> {
|
||||||
|
const qs = new URLSearchParams();
|
||||||
|
if (opts.seqFrom !== undefined) qs.set('seq_from', String(opts.seqFrom));
|
||||||
|
if (opts.seqTo !== undefined) qs.set('seq_to', String(opts.seqTo));
|
||||||
|
if (opts.limit !== undefined) qs.set('limit', String(opts.limit));
|
||||||
|
const url = `${guildEndpoint}/api/channels/${channelId}/messages` + (qs.toString() ? `?${qs}` : '');
|
||||||
|
return this.req('GET', url, guildToken);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export type CanvasFormat = 'md' | 'html' | 'text';
|
export type CanvasFormat = 'md' | 'html' | 'text';
|
||||||
|
|||||||
191
src/inbound.ts
191
src/inbound.ts
@@ -67,6 +67,136 @@ export class FabricInbound {
|
|||||||
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
private tokenCache = new Map<string, { session: FabricSession; at: number }>();
|
||||||
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
private static readonly TOKEN_TTL_MS = 8 * 60 * 1000;
|
||||||
|
|
||||||
|
// Per-channel serial work queue. Every inbound socket message for a
|
||||||
|
// channel awaits the previous task for that same channel, so model
|
||||||
|
// turns never interleave. Map key = channelId; value is the tail of
|
||||||
|
// the chain (an in-flight promise the next task awaits).
|
||||||
|
//
|
||||||
|
// Why per-channel and not per-agent: a single agent may sit in
|
||||||
|
// several triage / general channels; we want each channel to flow at
|
||||||
|
// its own speed but the SAME channel's traffic to be strictly serial.
|
||||||
|
// For dm and discuss the queue also serialises but those traditionally
|
||||||
|
// had at-most-one-in-flight anyway via the turn engine.
|
||||||
|
private channelChains = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
|
// Agent.status snapshot cache (5s TTL) — keeps the HF /calendar/
|
||||||
|
// agent/status round-trip off the hot path for back-to-back triage
|
||||||
|
// messages. Short TTL because status flips are rare-but-meaningful.
|
||||||
|
private agentStatusCache = new Map<string, { onCall: boolean; at: number }>();
|
||||||
|
private static readonly AGENT_STATUS_TTL_MS = 5_000;
|
||||||
|
|
||||||
|
// Triage messages that arrived while the on-duty agent wasn't on_call
|
||||||
|
// — sit here until either (a) the agent becomes on_call and the next
|
||||||
|
// triage arrival drains them, or (b) the gateway restarts (lost; ok
|
||||||
|
// because the underlying Fabric messages are persisted and re-fetched
|
||||||
|
// on agent reconnect's history sweep).
|
||||||
|
private pendingTriageGated: Array<{
|
||||||
|
agentId: string;
|
||||||
|
g: { nodeId: string; endpoint: string };
|
||||||
|
channelId: string;
|
||||||
|
m: FabricMessage;
|
||||||
|
session: FabricSession;
|
||||||
|
}> = [];
|
||||||
|
|
||||||
|
// Schedule `task` to run after every previous task on the same
|
||||||
|
// channel has completed. Returns the promise so callers can await
|
||||||
|
// their own result if they need to; the chain itself is fire-and-
|
||||||
|
// forget from the socket.on handler.
|
||||||
|
private enqueueChannelTask(channelId: string, task: () => Promise<void>): Promise<void> {
|
||||||
|
const prev = this.channelChains.get(channelId) ?? Promise.resolve();
|
||||||
|
const next = prev.then(task).catch((err) => {
|
||||||
|
this.log.warn(`fabric: per-channel task failed channel=${channelId}: ${String(err)}`);
|
||||||
|
});
|
||||||
|
this.channelChains.set(channelId, next);
|
||||||
|
// Best-effort cleanup so the Map doesn't grow without bound for
|
||||||
|
// long-running gateways: drop the entry when the chain settles, but
|
||||||
|
// only if it's still the latest reference (newer enqueue may have
|
||||||
|
// overwritten it in the meantime).
|
||||||
|
void next.finally(() => {
|
||||||
|
if (this.channelChains.get(channelId) === next) {
|
||||||
|
this.channelChains.delete(channelId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hit HF backend to check whether `agentId` is currently on_call.
|
||||||
|
// Cached for 5s. Failures (network, 404, etc.) are treated as "not
|
||||||
|
// on_call" — triage stays gated rather than risking a confused wake.
|
||||||
|
private async checkAgentOnCall(agentId: string): Promise<boolean> {
|
||||||
|
const cached = this.agentStatusCache.get(agentId);
|
||||||
|
if (cached && Date.now() - cached.at < FabricInbound.AGENT_STATUS_TTL_MS) {
|
||||||
|
return cached.onCall;
|
||||||
|
}
|
||||||
|
const base = (process.env.HF_API_BASE_URL ?? '').trim() || 'https://monitor.hangman-lab.top';
|
||||||
|
// CLAW_IDENTIFIER resolution priority:
|
||||||
|
// 1. HF_CLAW_IDENTIFIER env (operator override)
|
||||||
|
// 2. openclaw config `plugins.harbor-forge.identifier` (what the HF
|
||||||
|
// plugin itself uses — keeps the two in sync without an extra
|
||||||
|
// env per service unit)
|
||||||
|
// 3. os.hostname() last-resort fallback (often wrong: e.g. sim
|
||||||
|
// container hostname is `server.t2` but HF agent row has
|
||||||
|
// `claw_identifier=sim-t2`; matching is mandatory for the HF
|
||||||
|
// backend's _require_agent() check)
|
||||||
|
let claw = (process.env.HF_CLAW_IDENTIFIER ?? '').trim();
|
||||||
|
if (!claw) {
|
||||||
|
try {
|
||||||
|
// openclaw config shape (verified in sim):
|
||||||
|
// { plugins: { entries: { 'harbor-forge': { config: { identifier } } } } }
|
||||||
|
const cfg = this.cfg as {
|
||||||
|
plugins?: { entries?: Record<string, { config?: { identifier?: string } }> };
|
||||||
|
};
|
||||||
|
const fromCfg = cfg?.plugins?.entries?.['harbor-forge']?.config?.identifier;
|
||||||
|
if (fromCfg && typeof fromCfg === 'string' && fromCfg.trim()) {
|
||||||
|
claw = fromCfg.trim();
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
/* fall through to hostname */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!claw) {
|
||||||
|
claw = (await import('os')).hostname();
|
||||||
|
}
|
||||||
|
let onCall = false;
|
||||||
|
try {
|
||||||
|
const url = `${base.replace(/\/$/, '')}/calendar/agent/status?agent_id=${encodeURIComponent(agentId)}`;
|
||||||
|
const res = await fetch(url, {
|
||||||
|
headers: { 'X-Agent-ID': agentId, 'X-Claw-Identifier': claw },
|
||||||
|
});
|
||||||
|
if (res.ok) {
|
||||||
|
const data = (await res.json()) as { status?: string };
|
||||||
|
onCall = (data.status ?? '').toLowerCase() === 'on_call';
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.log.warn(`fabric: HF status check failed agent=${agentId}: ${String(err)}`);
|
||||||
|
}
|
||||||
|
this.agentStatusCache.set(agentId, { onCall, at: Date.now() });
|
||||||
|
return onCall;
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIFO drain of all triage-gated messages for `agentId` (called when
|
||||||
|
// we just learned they're on_call). Each drained message is dispatched
|
||||||
|
// through its own channel chain so per-channel serial order is kept.
|
||||||
|
private async drainGatedFor(agentId: string): Promise<void> {
|
||||||
|
const keep: typeof this.pendingTriageGated = [];
|
||||||
|
const drain: typeof this.pendingTriageGated = [];
|
||||||
|
for (const item of this.pendingTriageGated) {
|
||||||
|
if (item.agentId === agentId) drain.push(item);
|
||||||
|
else keep.push(item);
|
||||||
|
}
|
||||||
|
if (drain.length === 0) return;
|
||||||
|
this.pendingTriageGated = keep;
|
||||||
|
for (const item of drain) {
|
||||||
|
this.log.info(
|
||||||
|
`fabric: triage drain agent=${item.agentId} channel=${item.channelId} msg=${item.m.messageId}`,
|
||||||
|
);
|
||||||
|
// Re-enqueue via the per-channel chain so ordering is preserved.
|
||||||
|
this.enqueueChannelTask(item.channelId, async () => {
|
||||||
|
await this.dispatch(item.agentId, item.g, item.channelId, item.m, item.session);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Return a fresh guild access token for the agent, re-authenticating with
|
// Return a fresh guild access token for the agent, re-authenticating with
|
||||||
// the agent's Fabric API key when the cached session is stale. Falls back
|
// the agent's Fabric API key when the cached session is stale. Falls back
|
||||||
// to the connect-time session token if re-login fails.
|
// to the connect-time session token if re-login fails.
|
||||||
@@ -199,6 +329,26 @@ export class FabricInbound {
|
|||||||
joined.clear();
|
joined.clear();
|
||||||
void syncChannels('initial');
|
void syncChannels('initial');
|
||||||
});
|
});
|
||||||
|
// Push-based membership events from the backend (companion to
|
||||||
|
// Fabric.Backend.Guild's RealtimeGateway.emitToUser). When the
|
||||||
|
// server tells us this user was added to / removed from a
|
||||||
|
// channel, we sub/unsub the socket.io room immediately — no
|
||||||
|
// 60s wait for the polling resync. Polling remains as a safety
|
||||||
|
// net for missed events.
|
||||||
|
socket.on('channel.joined', (evt: { channelId?: string }) => {
|
||||||
|
const id = evt?.channelId;
|
||||||
|
if (!id || joined.has(id)) return;
|
||||||
|
socket.emit('join_channel', { channelId: id });
|
||||||
|
joined.add(id);
|
||||||
|
this.log.info(`fabric: agent ${agentId} channel.joined push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||||
|
});
|
||||||
|
socket.on('channel.left', (evt: { channelId?: string }) => {
|
||||||
|
const id = evt?.channelId;
|
||||||
|
if (!id || !joined.has(id)) return;
|
||||||
|
socket.emit('leave_channel', { channelId: id });
|
||||||
|
joined.delete(id);
|
||||||
|
this.log.info(`fabric: agent ${agentId} channel.left push on ${g.nodeId}: ${id} (now ${joined.size})`);
|
||||||
|
});
|
||||||
const syncTimer = setInterval(
|
const syncTimer = setInterval(
|
||||||
() => void syncChannels('resync'),
|
() => void syncChannels('resync'),
|
||||||
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
FabricInbound.CHANNEL_SYNC_INTERVAL_MS,
|
||||||
@@ -212,7 +362,33 @@ export class FabricInbound {
|
|||||||
if (this.seen.has(key)) return;
|
if (this.seen.has(key)) return;
|
||||||
this.seen.add(key);
|
this.seen.add(key);
|
||||||
if (this.seen.size > 5000) this.seen.clear();
|
if (this.seen.size > 5000) this.seen.clear();
|
||||||
void this.dispatch(agentId, g, channelId, m, session);
|
// Per-channel serial queue. Prevents concurrent model turns for
|
||||||
|
// the same channel — important for triage where a second wake
|
||||||
|
// arriving mid-reply would interleave with the in-flight one.
|
||||||
|
this.enqueueChannelTask(channelId, async () => {
|
||||||
|
// Triage on_call gate: if the on-duty agent isn't currently
|
||||||
|
// on_call per HF, don't dispatch yet — just sit on the
|
||||||
|
// per-channel queue. Subsequent triage messages will recheck;
|
||||||
|
// when the agent becomes on_call, the next arrival drains.
|
||||||
|
//
|
||||||
|
// Also handles: triage + wake=true must verify status before
|
||||||
|
// committing to a model turn. Non-triage and triage observer
|
||||||
|
// (wake=false) skip the gate.
|
||||||
|
if (m.xType === 'triage' && m.wakeup === true) {
|
||||||
|
const onCall = await this.checkAgentOnCall(agentId);
|
||||||
|
if (!onCall) {
|
||||||
|
this.log.info(
|
||||||
|
`fabric: triage wake gated (agent=${agentId} not on_call) — re-queue msg=${m.messageId}`,
|
||||||
|
);
|
||||||
|
this.pendingTriageGated.push({ agentId, g, channelId, m, session });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Drain any previously-gated messages (FIFO) before this one,
|
||||||
|
// now that we know the agent is on_call.
|
||||||
|
await this.drainGatedFor(agentId);
|
||||||
|
}
|
||||||
|
await this.dispatch(agentId, g, channelId, m, session);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
socket.connect();
|
socket.connect();
|
||||||
this.sockets.push(socket);
|
this.sockets.push(socket);
|
||||||
@@ -317,6 +493,19 @@ export class FabricInbound {
|
|||||||
// any message that isn't the agent's own (already filtered above) is
|
// any message that isn't the agent's own (already filtered above) is
|
||||||
// always delivered to the model.
|
// always delivered to the model.
|
||||||
if (m.xType !== 'dm' && m.wakeup !== true) {
|
if (m.xType !== 'dm' && m.wakeup !== true) {
|
||||||
|
// Triage exception: non-wake messages (admin observer) MUST NOT
|
||||||
|
// enter the agent's session at all. The next time the agent
|
||||||
|
// wakes for a triage message, their context should contain only
|
||||||
|
// their own past wakeups + their own outgoing messages — never
|
||||||
|
// the observer-only chatter from other agents. For non-triage
|
||||||
|
// channels keep the legacy "record-as-history" so a later wake
|
||||||
|
// sees the full channel conversation.
|
||||||
|
if (m.xType === 'triage') {
|
||||||
|
this.log.info(
|
||||||
|
`fabric: triage observer skip agent=${agentId} channel=${channelId} msg=${m.messageId}`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
const ctxPayload = core.channel.reply.finalizeInboundContext(baseCtx);
|
||||||
await core.channel.session.recordInboundSession({
|
await core.channel.session.recordInboundSession({
|
||||||
storePath,
|
storePath,
|
||||||
|
|||||||
169
src/tools.ts
169
src/tools.ts
@@ -243,4 +243,173 @@ export function registerFabricTools(
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------
|
||||||
|
// fabric-send-message: post a message into a specific channel.
|
||||||
|
//
|
||||||
|
// Unlike a normal channel reply (which goes back to whatever channel
|
||||||
|
// woke the agent), this lets the agent proactively initiate text into
|
||||||
|
// any channel they are a member of — e.g. ARD broadcasting daily
|
||||||
|
// workload to #agents-room, or triage agent following up on an
|
||||||
|
// already-routed task by commenting in #updates.
|
||||||
|
// -----------------------------------------------------------------
|
||||||
|
api.registerTool((ctx: Ctx) => ({
|
||||||
|
name: 'fabric-send-message',
|
||||||
|
description:
|
||||||
|
'Send a text message into a specific Fabric channel. Author is the calling agent. ' +
|
||||||
|
'Requires guildNodeId + channelId + content. Returns {ok, messageId, seq}.',
|
||||||
|
parameters: {
|
||||||
|
type: 'object',
|
||||||
|
additionalProperties: false,
|
||||||
|
required: ['guildNodeId', 'channelId', 'content'],
|
||||||
|
properties: {
|
||||||
|
guildNodeId: { type: 'string' },
|
||||||
|
channelId: { type: 'string' },
|
||||||
|
content: { type: 'string', description: 'Message body (markdown supported by the renderer).' },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
execute: async (p: { guildNodeId: string; channelId: string; content: string }) => {
|
||||||
|
const agentId = ctx.agentId;
|
||||||
|
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||||
|
const { session, guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||||
|
const res = (await client.postMessage(
|
||||||
|
guild.endpoint,
|
||||||
|
token,
|
||||||
|
p.channelId,
|
||||||
|
p.content,
|
||||||
|
session.user.id,
|
||||||
|
)) as { messageId?: string; seq?: number };
|
||||||
|
return { ok: true, messageId: res.messageId, seq: res.seq };
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------
|
||||||
|
// fabric-channel-list: enumerate channels the calling agent can see
|
||||||
|
// in a given guild. Backend filters to public channels + channels the
|
||||||
|
// agent is a member of. Returns id / name / xType per channel so the
|
||||||
|
// agent can pick a channelId for fabric-send-message etc.
|
||||||
|
// -----------------------------------------------------------------
|
||||||
|
api.registerTool((ctx: Ctx) => ({
|
||||||
|
name: 'fabric-channel-list',
|
||||||
|
description:
|
||||||
|
'List channels visible to the calling agent in a guild. Optional ' +
|
||||||
|
'nameFilter does a case-insensitive substring match client-side. ' +
|
||||||
|
'Use this to find a channelId before fabric-send-message / fabric-message-history.',
|
||||||
|
parameters: {
|
||||||
|
type: 'object',
|
||||||
|
additionalProperties: false,
|
||||||
|
required: ['guildNodeId'],
|
||||||
|
properties: {
|
||||||
|
guildNodeId: { type: 'string' },
|
||||||
|
nameFilter: { type: 'string', description: 'optional substring match on channel name (case-insensitive)' },
|
||||||
|
xType: {
|
||||||
|
type: 'string',
|
||||||
|
enum: ['general', 'work', 'report', 'discuss', 'triage', 'custom', 'dm'],
|
||||||
|
description: 'optional filter by x_type',
|
||||||
|
},
|
||||||
|
includeClosed: { type: 'boolean', description: 'default false — closed channels filtered out' },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
execute: async (p: {
|
||||||
|
guildNodeId: string;
|
||||||
|
nameFilter?: string;
|
||||||
|
xType?: string;
|
||||||
|
includeClosed?: boolean;
|
||||||
|
}) => {
|
||||||
|
const agentId = ctx.agentId;
|
||||||
|
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||||
|
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||||
|
const all = await client.listChannels(guild.endpoint, token, p.guildNodeId);
|
||||||
|
const needle = (p.nameFilter ?? '').toLowerCase();
|
||||||
|
const filtered = all.filter((c) => {
|
||||||
|
if (!p.includeClosed && c.closed) return false;
|
||||||
|
if (p.xType && c.xType !== p.xType) return false;
|
||||||
|
if (needle && !c.name.toLowerCase().includes(needle)) return false;
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
ok: true,
|
||||||
|
count: filtered.length,
|
||||||
|
channels: filtered.map((c) => ({
|
||||||
|
id: c.id,
|
||||||
|
name: c.name,
|
||||||
|
xType: c.xType,
|
||||||
|
isPublic: c.isPublic,
|
||||||
|
closed: c.closed,
|
||||||
|
lastSeq: c.lastSeq,
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------
|
||||||
|
// fabric-message-history: read a channel's recent message history by
|
||||||
|
// `seq`. Tail-by-default: when `seqFrom`/`seqTo` are omitted, returns
|
||||||
|
// the last `limit` messages (limit defaults to 20, max 200).
|
||||||
|
//
|
||||||
|
// Use cases: catch-up on a channel that was muted while the agent was
|
||||||
|
// gated; verify a previous message went through; lookup recent
|
||||||
|
// duplicates before opening a new task in triage.
|
||||||
|
// -----------------------------------------------------------------
|
||||||
|
api.registerTool((ctx: Ctx) => ({
|
||||||
|
name: 'fabric-message-history',
|
||||||
|
description:
|
||||||
|
"Read a channel's recent message history. Omit seqFrom/seqTo to " +
|
||||||
|
'tail (last `limit` messages, default 20, max 200). Backend ' +
|
||||||
|
'requires the calling agent to be a channel participant.',
|
||||||
|
parameters: {
|
||||||
|
type: 'object',
|
||||||
|
additionalProperties: false,
|
||||||
|
required: ['guildNodeId', 'channelId'],
|
||||||
|
properties: {
|
||||||
|
guildNodeId: { type: 'string' },
|
||||||
|
channelId: { type: 'string' },
|
||||||
|
seqFrom: { type: 'integer', minimum: 1, description: 'inclusive lower bound; default = tail' },
|
||||||
|
seqTo: { type: 'integer', minimum: 1, description: 'inclusive upper bound; default = channel head' },
|
||||||
|
limit: { type: 'integer', minimum: 1, maximum: 200, description: 'default 20' },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
execute: async (p: {
|
||||||
|
guildNodeId: string;
|
||||||
|
channelId: string;
|
||||||
|
seqFrom?: number;
|
||||||
|
seqTo?: number;
|
||||||
|
limit?: number;
|
||||||
|
}) => {
|
||||||
|
const agentId = ctx.agentId;
|
||||||
|
if (!agentId) return { ok: false, error: 'no agent context' };
|
||||||
|
const { guild, token } = await ctxGuild(agentId, p.guildNodeId);
|
||||||
|
const limit = p.limit ?? 20;
|
||||||
|
|
||||||
|
// Tail mode: discover channel head via channel listing, then ask
|
||||||
|
// for [head-limit+1, head]. Avoids needing the agent to know seq.
|
||||||
|
let seqFrom = p.seqFrom;
|
||||||
|
let seqTo = p.seqTo;
|
||||||
|
if (seqFrom === undefined && seqTo === undefined) {
|
||||||
|
const channels = await client.listChannels(guild.endpoint, token, p.guildNodeId);
|
||||||
|
const ch = channels.find((c) => c.id === p.channelId);
|
||||||
|
const head = ch?.lastSeq ?? 0;
|
||||||
|
seqFrom = Math.max(1, head - limit + 1);
|
||||||
|
seqTo = head;
|
||||||
|
}
|
||||||
|
|
||||||
|
const res = await client.listMessages(guild.endpoint, token, p.channelId, {
|
||||||
|
seqFrom,
|
||||||
|
seqTo,
|
||||||
|
limit,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
ok: true,
|
||||||
|
page: res.page,
|
||||||
|
messages: res.items.map((m) => ({
|
||||||
|
messageId: m.messageId,
|
||||||
|
seq: m.seq,
|
||||||
|
authorUserId: m.authorUserId,
|
||||||
|
content: m.content,
|
||||||
|
createdAt: m.createdAt,
|
||||||
|
isDeleted: m.isDeleted,
|
||||||
|
})),
|
||||||
|
};
|
||||||
|
},
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user