Files
HarborForge.Backend/app/services/monitoring.py

240 lines
9.3 KiB
Python

import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Tuple
import requests
from sqlalchemy.orm import Session
from app.models.models import Issue
from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState
_CACHE: Dict[str, Dict[str, Any]] = {}
def _now():
return datetime.now(timezone.utc)
def _parse_credential(raw: str) -> Dict[str, Any]:
raw = (raw or '').strip()
if raw.startswith('{'):
try:
return json.loads(raw)
except Exception:
return {'api_key': raw}
return {'api_key': raw}
def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800):
key = 'issue_stats_24h'
now = _now()
hit = _CACHE.get(key)
if hit and (now - hit['at']).total_seconds() < ttl_seconds:
return hit['data']
since = now - timedelta(hours=24)
total = db.query(Issue).count()
new_24h = db.query(Issue).filter(Issue.created_at >= since).count()
processed_24h = db.query(Issue).filter(
Issue.updated_at != None,
Issue.updated_at >= since,
Issue.status.in_(['resolved', 'closed'])
).count()
data = {
'total_issues': total,
'new_issues_24h': new_24h,
'processed_issues_24h': processed_24h,
'computed_at': now.isoformat(),
'cache_ttl_seconds': ttl_seconds,
}
_CACHE[key] = {'at': now, 'data': data}
return data
def _provider_headers(provider: str, credential: str, extra: Dict[str, Any] | None = None):
extra = extra or {}
if extra.get('auth_header'):
val = extra.get('auth_value')
if not val:
scheme = extra.get('auth_scheme')
val = f"{scheme} {credential}" if scheme else credential
return {extra['auth_header']: val}
if provider == 'openai':
return {'Authorization': f'Bearer {credential}'}
if provider == 'anthropic':
return {'x-api-key': credential, 'anthropic-version': '2023-06-01'}
return {}
def test_provider_connection(provider: str, credential: str):
provider = provider.lower()
info = _parse_credential(credential)
key = info.get('api_key') or credential
try:
if provider == 'openai':
r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12)
return r.status_code == 200, f'status={r.status_code}'
if provider == 'anthropic':
r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12)
return r.status_code == 200, f'status={r.status_code}'
usage_url = info.get('usage_url') or info.get('test_url')
if usage_url:
r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12)
return r.status_code < 500, f'status={r.status_code}'
if provider in {'minimax', 'kimi', 'qwen'}:
return True, 'accepted (connectivity check pending provider-specific adapter)'
return False, 'unsupported provider'
except Exception as e:
return False, str(e)
def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
headers = _provider_headers('openai', key, info)
today = _now().date()
start = (today - timedelta(days=7)).isoformat()
end = today.isoformat()
usage_url = info.get('usage_url') or f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}'
sub_url = info.get('subscription_url') or 'https://api.openai.com/v1/dashboard/billing/subscription'
u = requests.get(usage_url, headers=headers, timeout=12)
s = requests.get(sub_url, headers=headers, timeout=12)
if u.status_code != 200 or s.status_code != 200:
return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'}
usage = u.json()
sub = s.json()
total_usage = usage.get('total_usage')
hard_limit = sub.get('hard_limit_usd')
reset_at_ts = sub.get('billing_cycle_anchor')
reset_at = datetime.fromtimestamp(reset_at_ts, tz=timezone.utc) if reset_at_ts else None
usage_pct = None
if total_usage is not None and hard_limit:
usage_pct = round(total_usage / hard_limit * 100, 2)
return 'ok', {
'window_label': '7d',
'used': total_usage,
'limit': hard_limit,
'usage_pct': usage_pct,
'reset_at': reset_at,
'raw': {'usage': usage, 'subscription': sub},
}
def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
usage_url = info.get('usage_url')
if not usage_url:
return 'unsupported', {'error': 'anthropic usage API not configured'}
r = requests.get(usage_url, headers=_provider_headers('anthropic', key, info), timeout=12)
if r.status_code != 200:
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
payload = r.json()
# Try to normalize
return 'ok', {
'window_label': payload.get('window') or payload.get('window_label'),
'used': payload.get('used'),
'limit': payload.get('limit'),
'usage_pct': payload.get('usage_pct'),
'reset_at': None,
'raw': payload,
}
def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
usage_url = info.get('usage_url')
if not usage_url:
return 'unsupported', {'error': f'{provider} usage API not configured'}
r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12)
if r.status_code != 200:
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
payload = r.json()
return 'ok', {
'window_label': payload.get('window') or payload.get('window_label'),
'used': payload.get('used'),
'limit': payload.get('limit'),
'usage_pct': payload.get('usage_pct'),
'reset_at': None,
'raw': payload,
}
def refresh_provider_usage_once(db: Session):
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
now = _now()
for a in accounts:
status = 'pending'
payload: Dict[str, Any] = {}
if a.provider == 'openai':
status, payload = _openai_usage(a.credential)
elif a.provider == 'anthropic':
status, payload = _anthropic_usage(a.credential)
elif a.provider in {'minimax', 'kimi', 'qwen'}:
status, payload = _generic_usage(a.provider, a.credential)
else:
ok, msg = test_provider_connection(a.provider, a.credential)
status = 'ok' if ok else 'error'
payload = {'error': None if ok else msg}
snap = ProviderUsageSnapshot(
account_id=a.id,
window_label=payload.get('window_label'),
used=payload.get('used'),
limit=payload.get('limit'),
usage_pct=payload.get('usage_pct'),
reset_at=payload.get('reset_at'),
status=status,
error=payload.get('error'),
raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False),
fetched_at=now,
)
db.add(snap)
db.commit()
def get_provider_usage_view(db: Session):
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
rows = []
for a in accounts:
snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first()
rows.append({
'account_id': a.id,
'provider': a.provider,
'label': a.label,
'window': snap.window_label if snap else None,
'usage_pct': snap.usage_pct if snap else None,
'used': snap.used if snap else None,
'limit': snap.limit if snap else None,
'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None,
'status': snap.status if snap else 'pending',
'error': snap.error if snap else None,
'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None,
})
return rows
def get_server_states_view(db: Session, offline_after_minutes: int = 7):
now = _now()
servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all()
out = []
for s in servers:
st = db.query(ServerState).filter(ServerState.server_id == s.id).first()
last_seen = st.last_seen_at if st else None
online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60)
out.append({
'server_id': s.id,
'identifier': s.identifier,
'display_name': s.display_name or s.identifier,
'online': online,
'openclaw_version': st.openclaw_version if st else None,
'cpu_pct': st.cpu_pct if st else None,
'mem_pct': st.mem_pct if st else None,
'disk_pct': st.disk_pct if st else None,
'swap_pct': st.swap_pct if st else None,
'agents': json.loads(st.agents_json) if st and st.agents_json else [],
'last_seen_at': last_seen.isoformat() if last_seen else None,
})
return out