import { Injectable, Logger } from '@nestjs/common'; import { createHmac, randomUUID } from 'crypto'; import { FabricEventEnvelope } from './event-envelope.js'; type RetryTask = { envelope: FabricEventEnvelope; attempts: number; nextRunAt: number; }; @Injectable() export class EventsService { private readonly logger = new Logger(EventsService.name); private readonly sentEventIds = new Map(); private readonly retryQueue: RetryTask[] = []; private retryTimer: NodeJS.Timeout | null = null; private cleanupSentCache(now: number): void { const ttlMs = 10 * 60 * 1000; for (const [eventId, ts] of this.sentEventIds.entries()) { if (now - ts > ttlMs) this.sentEventIds.delete(eventId); } } private signWebhook(payload: string, timestamp: string, nonce: string): string { const secret = process.env.FABRIC_WEBHOOK_SECRET; if (!secret) return ''; const canonical = ['POST', '/webhook/events', timestamp, nonce, payload].join('\n'); return createHmac('sha256', secret).update(canonical).digest('hex'); } private scheduleRetryPump(): void { if (this.retryTimer) return; this.retryTimer = setInterval(() => { void this.processRetryQueue(); }, 1000); } private enqueueRetry(envelope: FabricEventEnvelope, attempts: number): void { if (attempts >= 5) { this.logger.warn(`drop event after max retries: ${envelope.event_id}`); return; } const delayMs = Math.pow(2, attempts) * 1000; // 1s,2s,4s,8s,16s this.retryQueue.push({ envelope, attempts: attempts + 1, nextRunAt: Date.now() + delayMs, }); this.scheduleRetryPump(); } private async deliverEnvelope(envelope: FabricEventEnvelope): Promise { const webhookUrl = process.env.FABRIC_WEBHOOK_URL; if (!webhookUrl) { this.logger.log(`event(no-webhook): ${JSON.stringify(envelope)}`); return true; } const timestamp = new Date().toISOString(); const nonce = randomUUID(); const payload = JSON.stringify(envelope); const signature = this.signWebhook(payload, timestamp, nonce); const response = await fetch(webhookUrl, { method: 'POST', headers: { 'content-type': 'application/json', 'x-fabric-version': '1', 'x-fabric-timestamp': timestamp, 'x-fabric-nonce': nonce, 'x-fabric-signature': signature, }, body: payload, }); if (response.status >= 200 && response.status < 300) { return true; } // retry only transient statuses if ([429, 500, 502, 503, 504].includes(response.status)) { return false; } // permanent failure: don't retry this.logger.warn(`event delivery permanent failure: ${response.status}`); return true; } private async processRetryQueue(): Promise { const now = Date.now(); const due = this.retryQueue.filter((t) => t.nextRunAt <= now); if (due.length === 0) return; for (const task of due) { const idx = this.retryQueue.indexOf(task); if (idx >= 0) this.retryQueue.splice(idx, 1); try { const delivered = await this.deliverEnvelope(task.envelope); if (delivered) { this.sentEventIds.set(task.envelope.event_id, Date.now()); } else { this.enqueueRetry(task.envelope, task.attempts); } } catch (error) { const message = error instanceof Error ? error.message : String(error); this.logger.warn(`retry delivery failed: ${message}`); this.enqueueRetry(task.envelope, task.attempts); } } } buildEnvelope(input: { eventType: string; guildId?: string | null; channelId?: string | null; actorId?: string | null; data: Record; }): FabricEventEnvelope { return { event_id: randomUUID(), event_type: input.eventType, occurred_at: new Date().toISOString(), guild_id: input.guildId ?? null, channel_id: input.channelId ?? null, actor_id: input.actorId ?? null, data: input.data, }; } async emit(input: { eventType: string; guildId?: string | null; channelId?: string | null; actorId?: string | null; data: Record; }): Promise { const envelope = this.buildEnvelope(input); const now = Date.now(); this.cleanupSentCache(now); if (this.sentEventIds.has(envelope.event_id)) { this.logger.warn(`skip duplicate event_id: ${envelope.event_id}`); return envelope; } const webhookUrl = process.env.FABRIC_WEBHOOK_URL; try { const delivered = await this.deliverEnvelope(envelope); if (delivered) { this.sentEventIds.set(envelope.event_id, now); } else { this.enqueueRetry(envelope, 0); } } catch (error) { const message = error instanceof Error ? error.message : String(error); this.logger.warn(`event delivery failed: ${message}`); if (webhookUrl) { this.enqueueRetry(envelope, 0); } } return envelope; } }