import json from datetime import datetime, timedelta, timezone from typing import Any, Dict, Tuple import requests from sqlalchemy.orm import Session from app.models.models import Issue from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState _CACHE: Dict[str, Dict[str, Any]] = {} def _now(): return datetime.now(timezone.utc) def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): key = 'issue_stats_24h' now = _now() hit = _CACHE.get(key) if hit and (now - hit['at']).total_seconds() < ttl_seconds: return hit['data'] since = now - timedelta(hours=24) total = db.query(Issue).count() new_24h = db.query(Issue).filter(Issue.created_at >= since).count() processed_24h = db.query(Issue).filter( Issue.updated_at != None, Issue.updated_at >= since, Issue.status.in_(['resolved', 'closed']) ).count() data = { 'total_issues': total, 'new_issues_24h': new_24h, 'processed_issues_24h': processed_24h, 'computed_at': now.isoformat(), 'cache_ttl_seconds': ttl_seconds, } _CACHE[key] = {'at': now, 'data': data} return data def _provider_headers(provider: str, credential: str): if provider == 'openai': return {'Authorization': f'Bearer {credential}'} if provider == 'anthropic': return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} return None def test_provider_connection(provider: str, credential: str): provider = provider.lower() try: if provider == 'openai': r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) return r.status_code == 200, f'status={r.status_code}' if provider == 'anthropic': r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) return r.status_code == 200, f'status={r.status_code}' if provider in {'minimax', 'kimi', 'qwen'}: # Endpoints/usage API vary by deployment; keep as accepted-but-unverified for now. return True, 'accepted (connectivity check pending provider-specific adapter)' return False, 'unsupported provider' except Exception as e: return False, str(e) def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: # Uses legacy billing endpoints; may be disabled in some orgs. headers = _provider_headers('openai', credential) today = _now().date() start = (today - timedelta(days=7)).isoformat() end = today.isoformat() usage_url = f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}' sub_url = 'https://api.openai.com/v1/dashboard/billing/subscription' u = requests.get(usage_url, headers=headers, timeout=12) s = requests.get(sub_url, headers=headers, timeout=12) if u.status_code != 200 or s.status_code != 200: return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'} usage = u.json() sub = s.json() total_usage = usage.get('total_usage') hard_limit = sub.get('hard_limit_usd') reset_at = sub.get('billing_cycle_anchor') if reset_at: reset_at = datetime.fromtimestamp(reset_at, tz=timezone.utc).isoformat() usage_pct = None if total_usage is not None and hard_limit: usage_pct = round(total_usage / hard_limit * 100, 2) return 'ok', { 'window_label': '7d', 'used': total_usage, 'limit': hard_limit, 'usage_pct': usage_pct, 'reset_at': reset_at, 'raw': {'usage': usage, 'subscription': sub}, } def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]: # Anthropic usage endpoints are enterprise-specific; keep placeholder. # We return accepted status; detail will be filled once API confirmed. return 'unsupported', {'error': 'anthropic usage API not configured'} def refresh_provider_usage_once(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() now = _now() for a in accounts: status = 'pending' payload: Dict[str, Any] = {} if a.provider == 'openai': status, payload = _openai_usage(a.credential) elif a.provider == 'anthropic': status, payload = _anthropic_usage(a.credential) else: ok, msg = test_provider_connection(a.provider, a.credential) status = 'ok' if ok else 'error' payload = {'error': None if ok else msg} snap = ProviderUsageSnapshot( account_id=a.id, window_label=payload.get('window_label'), used=payload.get('used'), limit=payload.get('limit'), usage_pct=payload.get('usage_pct'), reset_at=payload.get('reset_at'), status=status, error=payload.get('error'), raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False), fetched_at=now, ) db.add(snap) db.commit() def get_provider_usage_view(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() rows = [] for a in accounts: snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first() rows.append({ 'account_id': a.id, 'provider': a.provider, 'label': a.label, 'window': snap.window_label if snap else None, 'usage_pct': snap.usage_pct if snap else None, 'used': snap.used if snap else None, 'limit': snap.limit if snap else None, 'reset_at': snap.reset_at if snap else None, 'status': snap.status if snap else 'pending', 'error': snap.error if snap else None, 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, }) return rows def get_server_states_view(db: Session, offline_after_minutes: int = 7): now = _now() servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all() out = [] for s in servers: st = db.query(ServerState).filter(ServerState.server_id == s.id).first() last_seen = st.last_seen_at if st else None online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60) out.append({ 'server_id': s.id, 'identifier': s.identifier, 'display_name': s.display_name or s.identifier, 'online': online, 'openclaw_version': st.openclaw_version if st else None, 'cpu_pct': st.cpu_pct if st else None, 'mem_pct': st.mem_pct if st else None, 'disk_pct': st.disk_pct if st else None, 'swap_pct': st.swap_pct if st else None, 'agents': json.loads(st.agents_json) if st and st.agents_json else [], 'last_seen_at': last_seen.isoformat() if last_seen else None, }) return out