import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Tuple import requests from sqlalchemy.orm import Session from app.models.task import Task, TaskStatus from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState _CACHE: Dict[str, Dict[str, Any]] = {} def _now(): return datetime.now(timezone.utc) def _parse_credential(raw: str) -> Dict[str, Any]: raw = (raw or '').strip() if raw.startswith('{'): try: return json.loads(raw) except Exception: return {'api_key': raw} return {'api_key': raw} def _parse_reset_at(value) -> datetime | None: if not value: return None if isinstance(value, datetime): return value if isinstance(value, (int, float)): return datetime.fromtimestamp(value, tz=timezone.utc) if isinstance(value, str): try: return datetime.fromisoformat(value.replace('Z', '+00:00')) except Exception: return None return None def _normalize_usage_payload(payload: Dict[str, Any]) -> Dict[str, Any]: used = payload.get('used') or payload.get('usage') or payload.get('consumed') or payload.get('total_usage') limit = payload.get('limit') or payload.get('quota') or payload.get('hard_limit') or payload.get('total') remaining = payload.get('remain') or payload.get('remaining') or payload.get('left') usage_pct = payload.get('usage_pct') or payload.get('percent') or payload.get('usage_percent') window_label = payload.get('window') or payload.get('window_label') reset_at = payload.get('reset_at') or payload.get('reset_time') or payload.get('reset') if used is None and remaining is not None and limit is not None: try: used = float(limit) - float(remaining) except Exception: pass if usage_pct is None and used is not None and limit: try: usage_pct = round(float(used) / float(limit) * 100, 2) except Exception: pass return { 'window_label': window_label, 'used': used, 'limit': limit, 'usage_pct': usage_pct, 'reset_at': _parse_reset_at(reset_at), 'raw': payload, } def get_task_stats_cached(db: Session, ttl_seconds: int = 1800): key = 'task_stats_24h' now = _now() hit = _CACHE.get(key) if hit and (now - hit['at']).total_seconds() < ttl_seconds: return hit['data'] since = now - timedelta(hours=24) total = db.query(Task).count() new_24h = db.query(Task).filter(Task.created_at >= since).count() processed_24h = db.query(Task).filter( Task.updated_at != None, Task.updated_at >= since, Task.status == TaskStatus.CLOSED, ).count() data = { 'total_tasks': total, 'new_tasks_24h': new_24h, 'processed_tasks_24h': processed_24h, 'computed_at': now.isoformat(), 'cache_ttl_seconds': ttl_seconds, } _CACHE[key] = {'at': now, 'data': data} return data def _provider_headers(provider: str, credential: str, extra: Dict[str, Any] | None = None): extra = extra or {} if extra.get('auth_header'): val = extra.get('auth_value') if not val: scheme = extra.get('auth_scheme') val = f"{scheme} {credential}" if scheme else credential return {extra['auth_header']: val} if provider == 'openai': return {'Authorization': f'Bearer {credential}'} if provider == 'anthropic': return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} return {} def test_provider_connection(provider: str, credential: str): provider = provider.lower() info = _parse_credential(credential) key = info.get('api_key') or credential try: if provider == 'openai': r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) return r.status_code == 200, f'status={r.status_code}' if provider == 'anthropic': r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) return r.status_code == 200, f'status={r.status_code}' usage_url = info.get('usage_url') or info.get('test_url') if provider == 'kimi' and not usage_url: usage_url = 'https://www.kimi.com/api/user/usage' if usage_url: r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) return r.status_code < 500, f'status={r.status_code}' if provider in {'minimax', 'kimi', 'qwen'}: return True, 'accepted (connectivity check pending provider-specific adapter)' return False, 'unsupported provider' except Exception as e: return False, str(e) def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: info = _parse_credential(credential) key = info.get('api_key') or credential headers = _provider_headers('openai', key, info) today = _now().date() start = (today - timedelta(days=7)).isoformat() end = today.isoformat() usage_url = info.get('usage_url') or f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}' sub_url = info.get('subscription_url') or 'https://api.openai.com/v1/dashboard/billing/subscription' u = requests.get(usage_url, headers=headers, timeout=12) s = requests.get(sub_url, headers=headers, timeout=12) if u.status_code != 200 or s.status_code != 200: return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'} usage = u.json() sub = s.json() total_usage = usage.get('total_usage') hard_limit = sub.get('hard_limit_usd') reset_at_ts = sub.get('billing_cycle_anchor') reset_at = datetime.fromtimestamp(reset_at_ts, tz=timezone.utc) if reset_at_ts else None usage_pct = None if total_usage is not None and hard_limit: usage_pct = round(total_usage / hard_limit * 100, 2) return 'ok', { 'window_label': '7d', 'used': total_usage, 'limit': hard_limit, 'usage_pct': usage_pct, 'reset_at': reset_at, 'raw': {'usage': usage, 'subscription': sub}, } def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]: info = _parse_credential(credential) key = info.get('api_key') or credential usage_url = info.get('usage_url') if not usage_url: return 'unsupported', {'error': 'anthropic usage API not configured'} r = requests.get(usage_url, headers=_provider_headers('anthropic', key, info), timeout=12) if r.status_code != 200: return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} payload = r.json() if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): payload = payload['data'] return 'ok', _normalize_usage_payload(payload) def _kimi_usage(credential: str) -> Tuple[str, Dict[str, Any]]: info = _parse_credential(credential) key = info.get('api_key') or credential usage_url = info.get('usage_url') or 'https://www.kimi.com/api/user/usage' r = requests.get(usage_url, headers=_provider_headers('kimi', key, info), timeout=12) if r.status_code != 200: return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} payload = r.json() if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): payload = payload['data'] return 'ok', _normalize_usage_payload(payload) def _minimax_usage(credential: str) -> Tuple[str, Dict[str, Any]]: info = _parse_credential(credential) key = info.get('api_key') or credential usage_url = info.get('usage_url') if not usage_url: return 'unsupported', {'error': 'minimax usage API not configured'} r = requests.get(usage_url, headers=_provider_headers('minimax', key, info), timeout=12) if r.status_code != 200: return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} payload = r.json() if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): payload = payload['data'] return 'ok', _normalize_usage_payload(payload) def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]: info = _parse_credential(credential) key = info.get('api_key') or credential usage_url = info.get('usage_url') if not usage_url: return 'unsupported', {'error': f'{provider} usage API not configured'} r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) if r.status_code != 200: return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} payload = r.json() return 'ok', _normalize_usage_payload(payload) def refresh_provider_usage_once(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() now = _now() for a in accounts: status = 'pending' payload: Dict[str, Any] = {} if a.provider == 'openai': status, payload = _openai_usage(a.credential) elif a.provider == 'anthropic': status, payload = _anthropic_usage(a.credential) elif a.provider == 'kimi': status, payload = _kimi_usage(a.credential) elif a.provider == 'minimax': status, payload = _minimax_usage(a.credential) elif a.provider == 'qwen': status, payload = _generic_usage(a.provider, a.credential) else: ok, msg = test_provider_connection(a.provider, a.credential) status = 'ok' if ok else 'error' payload = {'error': None if ok else msg} snap = ProviderUsageSnapshot( account_id=a.id, window_label=payload.get('window_label'), used=payload.get('used'), limit=payload.get('limit'), usage_pct=payload.get('usage_pct'), reset_at=payload.get('reset_at'), status=status, error=payload.get('error'), raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False), fetched_at=now, ) db.add(snap) db.commit() def get_provider_usage_view(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() rows = [] for a in accounts: snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first() rows.append({ 'account_id': a.id, 'provider': a.provider, 'label': a.label, 'window': snap.window_label if snap else None, 'usage_pct': snap.usage_pct if snap else None, 'used': snap.used if snap else None, 'limit': snap.limit if snap else None, 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, 'status': snap.status if snap else 'pending', 'error': snap.error if snap else None, 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, }) return rows def get_server_states_view(db: Session, offline_after_minutes: int = 7): now = _now() servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all() out = [] for s in servers: st = db.query(ServerState).filter(ServerState.server_id == s.id).first() last_seen = st.last_seen_at if st else None # Handle timezone-naive datetimes from database if last_seen and last_seen.tzinfo is None: last_seen = last_seen.replace(tzinfo=timezone.utc) online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60) out.append({ 'server_id': s.id, 'identifier': s.identifier, 'display_name': s.display_name or s.identifier, 'online': online, 'openclaw_version': st.openclaw_version if st else None, 'plugin_version': st.plugin_version if st else None, 'cpu_pct': st.cpu_pct if st else None, 'mem_pct': st.mem_pct if st else None, 'disk_pct': st.disk_pct if st else None, 'swap_pct': st.swap_pct if st else None, 'agents': json.loads(st.agents_json) if st and st.agents_json else [], 'nginx_installed': st.nginx_installed if st else None, 'nginx_sites': json.loads(st.nginx_sites_json) if st and st.nginx_sites_json else [], 'last_seen_at': last_seen.isoformat() if last_seen else None, }) return out