From 24cbee313583440d6ca62769c285f7ee1d676708 Mon Sep 17 00:00:00 2001 From: nav Date: Tue, 12 May 2026 11:28:21 +0000 Subject: [PATCH] feat(guild-events): add outbound retry queue with exponential backoff --- .../src/events/events.service.ts | 124 ++++++++++++++---- docs/TODO-backend-center-guild.md | 2 +- 2 files changed, 102 insertions(+), 24 deletions(-) diff --git a/Fabric.Backend.Guild/src/events/events.service.ts b/Fabric.Backend.Guild/src/events/events.service.ts index d24ea66..fa26bbd 100644 --- a/Fabric.Backend.Guild/src/events/events.service.ts +++ b/Fabric.Backend.Guild/src/events/events.service.ts @@ -2,10 +2,18 @@ import { Injectable, Logger } from '@nestjs/common'; import { createHmac, randomUUID } from 'crypto'; import { FabricEventEnvelope } from './event-envelope'; +type RetryTask = { + envelope: FabricEventEnvelope; + attempts: number; + nextRunAt: number; +}; + @Injectable() export class EventsService { private readonly logger = new Logger(EventsService.name); private readonly sentEventIds = new Map(); + private readonly retryQueue: RetryTask[] = []; + private retryTimer: NodeJS.Timeout | null = null; private cleanupSentCache(now: number): void { const ttlMs = 10 * 60 * 1000; @@ -21,6 +29,90 @@ export class EventsService { return createHmac('sha256', secret).update(canonical).digest('hex'); } + private scheduleRetryPump(): void { + if (this.retryTimer) return; + this.retryTimer = setInterval(() => { + void this.processRetryQueue(); + }, 1000); + } + + private enqueueRetry(envelope: FabricEventEnvelope, attempts: number): void { + if (attempts >= 5) { + this.logger.warn(`drop event after max retries: ${envelope.event_id}`); + return; + } + + const delayMs = Math.pow(2, attempts) * 1000; // 1s,2s,4s,8s,16s + this.retryQueue.push({ + envelope, + attempts: attempts + 1, + nextRunAt: Date.now() + delayMs, + }); + this.scheduleRetryPump(); + } + + private async deliverEnvelope(envelope: FabricEventEnvelope): Promise { + const webhookUrl = process.env.FABRIC_WEBHOOK_URL; + if (!webhookUrl) { + this.logger.log(`event(no-webhook): ${JSON.stringify(envelope)}`); + return true; + } + + const timestamp = new Date().toISOString(); + const nonce = randomUUID(); + const payload = JSON.stringify(envelope); + const signature = this.signWebhook(payload, timestamp, nonce); + + const response = await fetch(webhookUrl, { + method: 'POST', + headers: { + 'content-type': 'application/json', + 'x-fabric-version': '1', + 'x-fabric-timestamp': timestamp, + 'x-fabric-nonce': nonce, + 'x-fabric-signature': signature, + }, + body: payload, + }); + + if (response.status >= 200 && response.status < 300) { + return true; + } + + // retry only transient statuses + if ([429, 500, 502, 503, 504].includes(response.status)) { + return false; + } + + // permanent failure: don't retry + this.logger.warn(`event delivery permanent failure: ${response.status}`); + return true; + } + + private async processRetryQueue(): Promise { + const now = Date.now(); + const due = this.retryQueue.filter((t) => t.nextRunAt <= now); + if (due.length === 0) return; + + for (const task of due) { + const idx = this.retryQueue.indexOf(task); + if (idx >= 0) this.retryQueue.splice(idx, 1); + + try { + const delivered = await this.deliverEnvelope(task.envelope); + if (delivered) { + this.sentEventIds.set(task.envelope.event_id, Date.now()); + } else { + this.enqueueRetry(task.envelope, task.attempts); + } + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + this.logger.warn(`retry delivery failed: ${message}`); + this.enqueueRetry(task.envelope, task.attempts); + } + } + } + buildEnvelope(input: { eventType: string; guildId?: string | null; @@ -56,33 +148,19 @@ export class EventsService { } const webhookUrl = process.env.FABRIC_WEBHOOK_URL; - if (!webhookUrl) { - this.logger.log(`event(no-webhook): ${JSON.stringify(envelope)}`); - this.sentEventIds.set(envelope.event_id, now); - return envelope; - } - try { - const timestamp = new Date(now).toISOString(); - const nonce = randomUUID(); - const payload = JSON.stringify(envelope); - const signature = this.signWebhook(payload, timestamp, nonce); - - await fetch(webhookUrl, { - method: 'POST', - headers: { - 'content-type': 'application/json', - 'x-fabric-version': '1', - 'x-fabric-timestamp': timestamp, - 'x-fabric-nonce': nonce, - 'x-fabric-signature': signature, - }, - body: payload, - }); - this.sentEventIds.set(envelope.event_id, now); + const delivered = await this.deliverEnvelope(envelope); + if (delivered) { + this.sentEventIds.set(envelope.event_id, now); + } else { + this.enqueueRetry(envelope, 0); + } } catch (error) { const message = error instanceof Error ? error.message : String(error); this.logger.warn(`event delivery failed: ${message}`); + if (webhookUrl) { + this.enqueueRetry(envelope, 0); + } } return envelope; diff --git a/docs/TODO-backend-center-guild.md b/docs/TODO-backend-center-guild.md index 570e1fa..7d40f49 100644 --- a/docs/TODO-backend-center-guild.md +++ b/docs/TODO-backend-center-guild.md @@ -73,7 +73,7 @@ ## 4. 插件与扩展面(为 OpenclawPlugin 准备) - [x] Webhook 事件信封落地(event_id/event_type/occurred_at/data) - [x] HMAC 签名与重放防护 -- [ ] 出站重试队列(指数退避) +- [x] 出站重试队列(指数退避) - [ ] Bot Token 入站调用鉴权 ---