import json from datetime import datetime, timedelta, timezone from typing import Any, Dict import requests from sqlalchemy.orm import Session from app.models.models import Issue from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState _CACHE: Dict[str, Dict[str, Any]] = {} def _now(): return datetime.now(timezone.utc) def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): key = 'issue_stats_24h' now = _now() hit = _CACHE.get(key) if hit and (now - hit['at']).total_seconds() < ttl_seconds: return hit['data'] since = now - timedelta(hours=24) total = db.query(Issue).count() new_24h = db.query(Issue).filter(Issue.created_at >= since).count() processed_24h = db.query(Issue).filter( Issue.updated_at != None, Issue.updated_at >= since, Issue.status.in_(['resolved', 'closed']) ).count() data = { 'total_issues': total, 'new_issues_24h': new_24h, 'processed_issues_24h': processed_24h, 'computed_at': now.isoformat(), 'cache_ttl_seconds': ttl_seconds, } _CACHE[key] = {'at': now, 'data': data} return data def _provider_headers(provider: str, credential: str): if provider == 'openai': return {'Authorization': f'Bearer {credential}'} if provider == 'anthropic': return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} return None def test_provider_connection(provider: str, credential: str): provider = provider.lower() try: if provider == 'openai': r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) return r.status_code == 200, f'status={r.status_code}' if provider == 'anthropic': r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) return r.status_code == 200, f'status={r.status_code}' if provider in {'minimax', 'kimi', 'qwen'}: # Endpoints/usage API vary by deployment; keep as accepted-but-unverified for now. return True, 'accepted (connectivity check pending provider-specific adapter)' return False, 'unsupported provider' except Exception as e: return False, str(e) def refresh_provider_usage_once(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() now = _now() for a in accounts: ok, msg = test_provider_connection(a.provider, a.credential) snap = ProviderUsageSnapshot( account_id=a.id, window_label='provider-default', used=None, limit=None, usage_pct=None, reset_at=None, status='ok' if ok else 'error', error=None if ok else msg, raw_payload=json.dumps({'message': msg}, ensure_ascii=False), fetched_at=now, ) db.add(snap) db.commit() def get_provider_usage_view(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() rows = [] for a in accounts: snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first() rows.append({ 'account_id': a.id, 'provider': a.provider, 'label': a.label, 'window': snap.window_label if snap else None, 'usage_pct': snap.usage_pct if snap else None, 'used': snap.used if snap else None, 'limit': snap.limit if snap else None, 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, 'status': snap.status if snap else 'pending', 'error': snap.error if snap else None, 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, }) return rows def get_server_states_view(db: Session, offline_after_minutes: int = 7): now = _now() servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all() out = [] for s in servers: st = db.query(ServerState).filter(ServerState.server_id == s.id).first() last_seen = st.last_seen_at if st else None online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60) out.append({ 'server_id': s.id, 'identifier': s.identifier, 'display_name': s.display_name or s.identifier, 'online': online, 'openclaw_version': st.openclaw_version if st else None, 'cpu_pct': st.cpu_pct if st else None, 'mem_pct': st.mem_pct if st else None, 'disk_pct': st.disk_pct if st else None, 'swap_pct': st.swap_pct if st else None, 'agents': json.loads(st.agents_json) if st and st.agents_json else [], 'last_seen_at': last_seen.isoformat() if last_seen else None, }) return out