From 074f1e9eef8b0d1fe46bb4b032d06b9f04612460 Mon Sep 17 00:00:00 2001 From: zhi Date: Thu, 19 Mar 2026 16:21:46 +0000 Subject: [PATCH] feat(telemetry): implement offline caching with batch upload - Add offline cache with disk persistence - Flush cached payloads on successful heartbeat - Store failed payloads to cache for retry - Configurable cache path and max size - Load cache on startup --- server/telemetry.mjs | 114 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 113 insertions(+), 1 deletion(-) diff --git a/server/telemetry.mjs b/server/telemetry.mjs index 3ceafa4..85437e5 100644 --- a/server/telemetry.mjs +++ b/server/telemetry.mjs @@ -13,6 +13,7 @@ import { platform, hostname, freemem, totalmem, uptime } from 'os'; const execAsync = promisify(exec); // Config from environment (set by plugin) +const openclawPath = process.env.OPENCLAW_PATH || `${process.env.HOME}/.openclaw`; const CONFIG = { backendUrl: process.env.HF_MONITOR_BACKEND_URL || 'https://monitor.hangman-lab.top', identifier: process.env.HF_MONITOR_IDENTIFIER || hostname(), @@ -20,8 +21,10 @@ const CONFIG = { reportIntervalSec: parseInt(process.env.HF_MONITOR_REPORT_INTERVAL || '30', 10), httpFallbackIntervalSec: parseInt(process.env.HF_MONITOR_HTTP_FALLBACK_INTERVAL || '60', 10), logLevel: process.env.HF_MONITOR_LOG_LEVEL || 'info', - openclawPath: process.env.OPENCLAW_PATH || `${process.env.HOME}/.openclaw`, + openclawPath, openclawVersion: process.env.OPENCLAW_VERSION || 'unknown', + cachePath: process.env.HF_MONITOR_CACHE_PATH || `${openclawPath}/telemetry_cache.json`, + maxCacheSize: parseInt(process.env.HF_MONITOR_MAX_CACHE_SIZE || '100', 10), }; // Logging @@ -37,6 +40,95 @@ let wsConnection = null; let lastSuccessfulSend = null; let consecutiveFailures = 0; let isShuttingDown = false; +let offlineCache = []; + +/** + * Load offline cache from disk + */ +async function loadCache() { + try { + await access(CONFIG.cachePath, constants.R_OK); + const data = JSON.parse(await readFile(CONFIG.cachePath, 'utf8')); + offlineCache = Array.isArray(data) ? data : []; + log.info(`Loaded ${offlineCache.length} cached payloads from disk`); + } catch { + offlineCache = []; + } +} + +/** + * Save offline cache to disk + */ +async function saveCache() { + try { + const { writeFile } = await import('fs/promises'); + // Keep only the most recent entries + const toSave = offlineCache.slice(-CONFIG.maxCacheSize); + await writeFile(CONFIG.cachePath, JSON.stringify(toSave, null, 2)); + log.debug(`Saved ${toSave.length} payloads to cache`); + } catch (err) { + log.warn('Failed to save cache:', err.message); + } +} + +/** + * Add payload to offline cache + */ +async function addToCache(payload) { + offlineCache.push(payload); + // Trim to max size + if (offlineCache.length > CONFIG.maxCacheSize) { + offlineCache = offlineCache.slice(-CONFIG.maxCacheSize); + } + await saveCache(); +} + +/** + * Flush cached payloads to server + */ +async function flushCache() { + if (offlineCache.length === 0) { + return true; + } + + log.info(`Flushing ${offlineCache.length} cached payloads...`); + + const headers = { + 'Content-Type': 'application/json', + 'X-Server-Identifier': CONFIG.identifier, + }; + + if (CONFIG.apiKey) { + headers['X-API-Key'] = CONFIG.apiKey; + } + + let successCount = 0; + const toRetry = []; + + for (const payload of offlineCache) { + try { + const response = await fetch(`${CONFIG.backendUrl}/monitor/server/heartbeat-v2`, { + method: 'POST', + headers, + body: JSON.stringify(payload), + }); + + if (response.ok) { + successCount++; + } else { + toRetry.push(payload); + } + } catch { + toRetry.push(payload); + } + } + + offlineCache = toRetry; + await saveCache(); + + log.info(`Flushed ${successCount} of ${offlineCache.length + successCount} cached payloads`); + return successCount > 0; +} /** * Collect system metrics @@ -190,6 +282,11 @@ async function buildPayload() { */ async function sendHttpHeartbeat() { try { + // First, try to flush any cached data + if (offlineCache.length > 0) { + await flushCache(); + } + const payload = await buildPayload(); log.debug('Sending HTTP heartbeat...'); @@ -220,6 +317,16 @@ async function sendHttpHeartbeat() { } catch (err) { log.error('HTTP heartbeat failed:', err.message); consecutiveFailures++; + + // Add current payload to cache on failure + try { + const payload = await buildPayload(); + await addToCache(payload); + log.info(`Added failed payload to cache (${offlineCache.length} total)`); + } catch (cacheErr) { + log.warn('Failed to cache payload:', cacheErr.message); + } + return false; } } @@ -274,10 +381,15 @@ log.info('Config:', { backendUrl: CONFIG.backendUrl, reportIntervalSec: CONFIG.reportIntervalSec, hasApiKey: !!CONFIG.apiKey, + cachePath: CONFIG.cachePath, + maxCacheSize: CONFIG.maxCacheSize, }); if (!CONFIG.apiKey) { log.warn('Missing HF_MONITOR_API_KEY environment variable - API authentication will be disabled'); } +// Load offline cache +await loadCache(); + reportingLoop();