feat(guild-events): add outbound retry queue with exponential backoff

This commit is contained in:
nav
2026-05-12 11:28:21 +00:00
parent 7e458ad6d3
commit 24cbee3135
2 changed files with 102 additions and 24 deletions

View File

@@ -2,10 +2,18 @@ import { Injectable, Logger } from '@nestjs/common';
import { createHmac, randomUUID } from 'crypto'; import { createHmac, randomUUID } from 'crypto';
import { FabricEventEnvelope } from './event-envelope'; import { FabricEventEnvelope } from './event-envelope';
type RetryTask = {
envelope: FabricEventEnvelope;
attempts: number;
nextRunAt: number;
};
@Injectable() @Injectable()
export class EventsService { export class EventsService {
private readonly logger = new Logger(EventsService.name); private readonly logger = new Logger(EventsService.name);
private readonly sentEventIds = new Map<string, number>(); private readonly sentEventIds = new Map<string, number>();
private readonly retryQueue: RetryTask[] = [];
private retryTimer: NodeJS.Timeout | null = null;
private cleanupSentCache(now: number): void { private cleanupSentCache(now: number): void {
const ttlMs = 10 * 60 * 1000; const ttlMs = 10 * 60 * 1000;
@@ -21,6 +29,90 @@ export class EventsService {
return createHmac('sha256', secret).update(canonical).digest('hex'); return createHmac('sha256', secret).update(canonical).digest('hex');
} }
private scheduleRetryPump(): void {
if (this.retryTimer) return;
this.retryTimer = setInterval(() => {
void this.processRetryQueue();
}, 1000);
}
private enqueueRetry(envelope: FabricEventEnvelope, attempts: number): void {
if (attempts >= 5) {
this.logger.warn(`drop event after max retries: ${envelope.event_id}`);
return;
}
const delayMs = Math.pow(2, attempts) * 1000; // 1s,2s,4s,8s,16s
this.retryQueue.push({
envelope,
attempts: attempts + 1,
nextRunAt: Date.now() + delayMs,
});
this.scheduleRetryPump();
}
private async deliverEnvelope(envelope: FabricEventEnvelope): Promise<boolean> {
const webhookUrl = process.env.FABRIC_WEBHOOK_URL;
if (!webhookUrl) {
this.logger.log(`event(no-webhook): ${JSON.stringify(envelope)}`);
return true;
}
const timestamp = new Date().toISOString();
const nonce = randomUUID();
const payload = JSON.stringify(envelope);
const signature = this.signWebhook(payload, timestamp, nonce);
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-fabric-version': '1',
'x-fabric-timestamp': timestamp,
'x-fabric-nonce': nonce,
'x-fabric-signature': signature,
},
body: payload,
});
if (response.status >= 200 && response.status < 300) {
return true;
}
// retry only transient statuses
if ([429, 500, 502, 503, 504].includes(response.status)) {
return false;
}
// permanent failure: don't retry
this.logger.warn(`event delivery permanent failure: ${response.status}`);
return true;
}
private async processRetryQueue(): Promise<void> {
const now = Date.now();
const due = this.retryQueue.filter((t) => t.nextRunAt <= now);
if (due.length === 0) return;
for (const task of due) {
const idx = this.retryQueue.indexOf(task);
if (idx >= 0) this.retryQueue.splice(idx, 1);
try {
const delivered = await this.deliverEnvelope(task.envelope);
if (delivered) {
this.sentEventIds.set(task.envelope.event_id, Date.now());
} else {
this.enqueueRetry(task.envelope, task.attempts);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`retry delivery failed: ${message}`);
this.enqueueRetry(task.envelope, task.attempts);
}
}
}
buildEnvelope(input: { buildEnvelope(input: {
eventType: string; eventType: string;
guildId?: string | null; guildId?: string | null;
@@ -56,33 +148,19 @@ export class EventsService {
} }
const webhookUrl = process.env.FABRIC_WEBHOOK_URL; const webhookUrl = process.env.FABRIC_WEBHOOK_URL;
if (!webhookUrl) {
this.logger.log(`event(no-webhook): ${JSON.stringify(envelope)}`);
this.sentEventIds.set(envelope.event_id, now);
return envelope;
}
try { try {
const timestamp = new Date(now).toISOString(); const delivered = await this.deliverEnvelope(envelope);
const nonce = randomUUID(); if (delivered) {
const payload = JSON.stringify(envelope); this.sentEventIds.set(envelope.event_id, now);
const signature = this.signWebhook(payload, timestamp, nonce); } else {
this.enqueueRetry(envelope, 0);
await fetch(webhookUrl, { }
method: 'POST',
headers: {
'content-type': 'application/json',
'x-fabric-version': '1',
'x-fabric-timestamp': timestamp,
'x-fabric-nonce': nonce,
'x-fabric-signature': signature,
},
body: payload,
});
this.sentEventIds.set(envelope.event_id, now);
} catch (error) { } catch (error) {
const message = error instanceof Error ? error.message : String(error); const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`event delivery failed: ${message}`); this.logger.warn(`event delivery failed: ${message}`);
if (webhookUrl) {
this.enqueueRetry(envelope, 0);
}
} }
return envelope; return envelope;

View File

@@ -73,7 +73,7 @@
## 4. 插件与扩展面(为 OpenclawPlugin 准备) ## 4. 插件与扩展面(为 OpenclawPlugin 准备)
- [x] Webhook 事件信封落地event_id/event_type/occurred_at/data - [x] Webhook 事件信封落地event_id/event_type/occurred_at/data
- [x] HMAC 签名与重放防护 - [x] HMAC 签名与重放防护
- [ ] 出站重试队列(指数退避) - [x] 出站重试队列(指数退避)
- [ ] Bot Token 入站调用鉴权 - [ ] Bot Token 入站调用鉴权
--- ---