feat(telemetry): implement offline caching with batch upload
- Add offline cache with disk persistence - Flush cached payloads on successful heartbeat - Store failed payloads to cache for retry - Configurable cache path and max size - Load cache on startup
This commit is contained in:
@@ -13,6 +13,7 @@ import { platform, hostname, freemem, totalmem, uptime } from 'os';
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
// Config from environment (set by plugin)
|
||||
const openclawPath = process.env.OPENCLAW_PATH || `${process.env.HOME}/.openclaw`;
|
||||
const CONFIG = {
|
||||
backendUrl: process.env.HF_MONITOR_BACKEND_URL || 'https://monitor.hangman-lab.top',
|
||||
identifier: process.env.HF_MONITOR_IDENTIFIER || hostname(),
|
||||
@@ -20,8 +21,10 @@ const CONFIG = {
|
||||
reportIntervalSec: parseInt(process.env.HF_MONITOR_REPORT_INTERVAL || '30', 10),
|
||||
httpFallbackIntervalSec: parseInt(process.env.HF_MONITOR_HTTP_FALLBACK_INTERVAL || '60', 10),
|
||||
logLevel: process.env.HF_MONITOR_LOG_LEVEL || 'info',
|
||||
openclawPath: process.env.OPENCLAW_PATH || `${process.env.HOME}/.openclaw`,
|
||||
openclawPath,
|
||||
openclawVersion: process.env.OPENCLAW_VERSION || 'unknown',
|
||||
cachePath: process.env.HF_MONITOR_CACHE_PATH || `${openclawPath}/telemetry_cache.json`,
|
||||
maxCacheSize: parseInt(process.env.HF_MONITOR_MAX_CACHE_SIZE || '100', 10),
|
||||
};
|
||||
|
||||
// Logging
|
||||
@@ -37,6 +40,95 @@ let wsConnection = null;
|
||||
let lastSuccessfulSend = null;
|
||||
let consecutiveFailures = 0;
|
||||
let isShuttingDown = false;
|
||||
let offlineCache = [];
|
||||
|
||||
/**
|
||||
* Load offline cache from disk
|
||||
*/
|
||||
async function loadCache() {
|
||||
try {
|
||||
await access(CONFIG.cachePath, constants.R_OK);
|
||||
const data = JSON.parse(await readFile(CONFIG.cachePath, 'utf8'));
|
||||
offlineCache = Array.isArray(data) ? data : [];
|
||||
log.info(`Loaded ${offlineCache.length} cached payloads from disk`);
|
||||
} catch {
|
||||
offlineCache = [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save offline cache to disk
|
||||
*/
|
||||
async function saveCache() {
|
||||
try {
|
||||
const { writeFile } = await import('fs/promises');
|
||||
// Keep only the most recent entries
|
||||
const toSave = offlineCache.slice(-CONFIG.maxCacheSize);
|
||||
await writeFile(CONFIG.cachePath, JSON.stringify(toSave, null, 2));
|
||||
log.debug(`Saved ${toSave.length} payloads to cache`);
|
||||
} catch (err) {
|
||||
log.warn('Failed to save cache:', err.message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add payload to offline cache
|
||||
*/
|
||||
async function addToCache(payload) {
|
||||
offlineCache.push(payload);
|
||||
// Trim to max size
|
||||
if (offlineCache.length > CONFIG.maxCacheSize) {
|
||||
offlineCache = offlineCache.slice(-CONFIG.maxCacheSize);
|
||||
}
|
||||
await saveCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush cached payloads to server
|
||||
*/
|
||||
async function flushCache() {
|
||||
if (offlineCache.length === 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
log.info(`Flushing ${offlineCache.length} cached payloads...`);
|
||||
|
||||
const headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Server-Identifier': CONFIG.identifier,
|
||||
};
|
||||
|
||||
if (CONFIG.apiKey) {
|
||||
headers['X-API-Key'] = CONFIG.apiKey;
|
||||
}
|
||||
|
||||
let successCount = 0;
|
||||
const toRetry = [];
|
||||
|
||||
for (const payload of offlineCache) {
|
||||
try {
|
||||
const response = await fetch(`${CONFIG.backendUrl}/monitor/server/heartbeat-v2`, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify(payload),
|
||||
});
|
||||
|
||||
if (response.ok) {
|
||||
successCount++;
|
||||
} else {
|
||||
toRetry.push(payload);
|
||||
}
|
||||
} catch {
|
||||
toRetry.push(payload);
|
||||
}
|
||||
}
|
||||
|
||||
offlineCache = toRetry;
|
||||
await saveCache();
|
||||
|
||||
log.info(`Flushed ${successCount} of ${offlineCache.length + successCount} cached payloads`);
|
||||
return successCount > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect system metrics
|
||||
@@ -190,6 +282,11 @@ async function buildPayload() {
|
||||
*/
|
||||
async function sendHttpHeartbeat() {
|
||||
try {
|
||||
// First, try to flush any cached data
|
||||
if (offlineCache.length > 0) {
|
||||
await flushCache();
|
||||
}
|
||||
|
||||
const payload = await buildPayload();
|
||||
log.debug('Sending HTTP heartbeat...');
|
||||
|
||||
@@ -220,6 +317,16 @@ async function sendHttpHeartbeat() {
|
||||
} catch (err) {
|
||||
log.error('HTTP heartbeat failed:', err.message);
|
||||
consecutiveFailures++;
|
||||
|
||||
// Add current payload to cache on failure
|
||||
try {
|
||||
const payload = await buildPayload();
|
||||
await addToCache(payload);
|
||||
log.info(`Added failed payload to cache (${offlineCache.length} total)`);
|
||||
} catch (cacheErr) {
|
||||
log.warn('Failed to cache payload:', cacheErr.message);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -274,10 +381,15 @@ log.info('Config:', {
|
||||
backendUrl: CONFIG.backendUrl,
|
||||
reportIntervalSec: CONFIG.reportIntervalSec,
|
||||
hasApiKey: !!CONFIG.apiKey,
|
||||
cachePath: CONFIG.cachePath,
|
||||
maxCacheSize: CONFIG.maxCacheSize,
|
||||
});
|
||||
|
||||
if (!CONFIG.apiKey) {
|
||||
log.warn('Missing HF_MONITOR_API_KEY environment variable - API authentication will be disabled');
|
||||
}
|
||||
|
||||
// Load offline cache
|
||||
await loadCache();
|
||||
|
||||
reportingLoop();
|
||||
|
||||
Reference in New Issue
Block a user