From d299428d35a960320c108a443aa478aab28d4708 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 11:59:53 +0000 Subject: [PATCH] feat: add public monitor API + admin provider/server management scaffold --- app/api/routers/monitor.py | 169 +++++++++++++++++++++++++++++++++++++ app/main.py | 22 ++++- app/models/monitor.py | 58 +++++++++++++ app/services/monitoring.py | 132 +++++++++++++++++++++++++++++ 4 files changed, 380 insertions(+), 1 deletion(-) create mode 100644 app/api/routers/monitor.py create mode 100644 app/models/monitor.py create mode 100644 app/services/monitoring.py diff --git a/app/api/routers/monitor.py b/app/api/routers/monitor.py new file mode 100644 index 0000000..d4dfbef --- /dev/null +++ b/app/api/routers/monitor.py @@ -0,0 +1,169 @@ +from datetime import datetime, timedelta, timezone +import json +from typing import List + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from app.core.config import get_db +from app.api.deps import get_current_user_or_apikey +from app.models import models +from app.models.monitor import ProviderAccount, MonitoredServer, ServerState +from app.services.monitoring import ( + get_issue_stats_cached, + get_provider_usage_view, + get_server_states_view, + test_provider_connection, +) + +router = APIRouter(prefix='/monitor', tags=['Monitor']) + + +SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'} + + +class ProviderAccountCreate(BaseModel): + provider: str + label: str + credential: str + + +class ProviderTestRequest(BaseModel): + provider: str + credential: str + + +class MonitoredServerCreate(BaseModel): + identifier: str + display_name: str | None = None + + +def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)): + if not current_user.is_admin: + raise HTTPException(status_code=403, detail='Admin required') + return current_user + + +@router.get('/public/overview') +def public_overview(db: Session = Depends(get_db)): + return { + 'issues': get_issue_stats_cached(db, ttl_seconds=1800), + 'providers': get_provider_usage_view(db), + 'servers': get_server_states_view(db, offline_after_minutes=7), + 'generated_at': datetime.now(timezone.utc).isoformat(), + } + + +@router.get('/admin/providers/accounts') +def list_provider_accounts(db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + accounts = db.query(ProviderAccount).order_by(ProviderAccount.created_at.desc()).all() + return [ + { + 'id': a.id, + 'provider': a.provider, + 'label': a.label, + 'is_enabled': a.is_enabled, + 'created_at': a.created_at, + 'credential_masked': '***' + (a.credential[-4:] if a.credential else ''), + } + for a in accounts + ] + + +@router.post('/admin/providers/accounts', status_code=status.HTTP_201_CREATED) +def create_provider_account(payload: ProviderAccountCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)): + provider = payload.provider.lower().strip() + if provider not in SUPPORTED_PROVIDERS: + raise HTTPException(status_code=400, detail=f'Unsupported provider: {provider}') + obj = ProviderAccount( + provider=provider, + label=payload.label.strip(), + credential=payload.credential.strip(), + is_enabled=True, + created_by=user.id, + ) + db.add(obj) + db.commit() + db.refresh(obj) + return {'id': obj.id, 'provider': obj.provider, 'label': obj.label, 'is_enabled': obj.is_enabled} + + +@router.post('/admin/providers/test') +def test_provider(payload: ProviderTestRequest, _: models.User = Depends(require_admin)): + ok, message = test_provider_connection(payload.provider.lower().strip(), payload.credential.strip()) + return {'ok': ok, 'message': message} + + +@router.delete('/admin/providers/accounts/{account_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_provider_account(account_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + obj = db.query(ProviderAccount).filter(ProviderAccount.id == account_id).first() + if not obj: + raise HTTPException(status_code=404, detail='Provider account not found') + db.delete(obj) + db.commit() + return None + + +@router.get('/admin/servers') +def list_servers(db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + return get_server_states_view(db, offline_after_minutes=7) + + +@router.post('/admin/servers', status_code=status.HTTP_201_CREATED) +def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)): + identifier = payload.identifier.strip() + if not identifier: + raise HTTPException(status_code=400, detail='identifier required') + exists = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier).first() + if exists: + raise HTTPException(status_code=400, detail='identifier already exists') + obj = MonitoredServer(identifier=identifier, display_name=payload.display_name, is_enabled=True, created_by=user.id) + db.add(obj) + db.commit() + db.refresh(obj) + return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled} + + +@router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first() + if not obj: + raise HTTPException(status_code=404, detail='Server not found') + state = db.query(ServerState).filter(ServerState.server_id == server_id).first() + if state: + db.delete(state) + db.delete(obj) + db.commit() + return None + + +# Temporary ingestion endpoint before WS plugin lands +class ServerHeartbeat(BaseModel): + identifier: str + openclaw_version: str | None = None + agents: List[dict] = [] + cpu_pct: float | None = None + mem_pct: float | None = None + disk_pct: float | None = None + swap_pct: float | None = None + + +@router.post('/server/heartbeat') +def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)): + server = db.query(MonitoredServer).filter(MonitoredServer.identifier == payload.identifier, MonitoredServer.is_enabled == True).first() + if not server: + raise HTTPException(status_code=404, detail='unknown server identifier') + st = db.query(ServerState).filter(ServerState.server_id == server.id).first() + if not st: + st = ServerState(server_id=server.id) + db.add(st) + st.openclaw_version = payload.openclaw_version + st.agents_json = json.dumps(payload.agents, ensure_ascii=False) + st.cpu_pct = payload.cpu_pct + st.mem_pct = payload.mem_pct + st.disk_pct = payload.disk_pct + st.swap_pct = payload.swap_pct + st.last_seen_at = datetime.now(timezone.utc) + db.commit() + return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at} diff --git a/app/main.py b/app/main.py index f43e6d4..2ea5790 100644 --- a/app/main.py +++ b/app/main.py @@ -34,6 +34,7 @@ from app.api.routers.users import router as users_router from app.api.routers.comments import router as comments_router from app.api.routers.webhooks import router as webhooks_router from app.api.routers.misc import router as misc_router +from app.api.routers.monitor import router as monitor_router app.include_router(auth_router) app.include_router(issues_router) @@ -42,12 +43,13 @@ app.include_router(users_router) app.include_router(comments_router) app.include_router(webhooks_router) app.include_router(misc_router) +app.include_router(monitor_router) # Run database migration on startup @app.on_event("startup") def startup(): from app.core.config import Base, engine, SessionLocal - from app.models import webhook, apikey, activity, milestone, notification, worklog + from app.models import webhook, apikey, activity, milestone, notification, worklog, monitor Base.metadata.create_all(bind=engine) # Initialize from AbstractWizard (admin user, default project, etc.) @@ -57,3 +59,21 @@ def startup(): run_init(db) finally: db.close() + + # Start lightweight monitor polling thread (every 10 minutes) + import threading, time + from app.services.monitoring import refresh_provider_usage_once + + def _monitor_poll_loop(): + while True: + db2 = SessionLocal() + try: + refresh_provider_usage_once(db2) + except Exception: + pass + finally: + db2.close() + time.sleep(600) + + t = threading.Thread(target=_monitor_poll_loop, daemon=True) + t.start() diff --git a/app/models/monitor.py b/app/models/monitor.py new file mode 100644 index 0000000..030e790 --- /dev/null +++ b/app/models/monitor.py @@ -0,0 +1,58 @@ +from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Float, ForeignKey +from sqlalchemy.sql import func +from app.core.config import Base + + +class ProviderAccount(Base): + __tablename__ = 'provider_accounts' + + id = Column(Integer, primary_key=True, index=True) + provider = Column(String(32), nullable=False, index=True) # anthropic/openai/minimax/kimi/qwen + label = Column(String(128), nullable=False) + credential = Column(Text, nullable=False) # TODO: encrypt at rest + is_enabled = Column(Boolean, default=True) + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + +class ProviderUsageSnapshot(Base): + __tablename__ = 'provider_usage_snapshots' + + id = Column(Integer, primary_key=True, index=True) + account_id = Column(Integer, ForeignKey('provider_accounts.id'), nullable=False, index=True) + window_label = Column(String(32), nullable=True) # e.g. 1h / 7d + used = Column(Float, nullable=True) + limit = Column(Float, nullable=True) + usage_pct = Column(Float, nullable=True) + reset_at = Column(DateTime(timezone=True), nullable=True) + status = Column(String(32), nullable=False, default='unknown') # ok/error/pending/unsupported + error = Column(Text, nullable=True) + raw_payload = Column(Text, nullable=True) + fetched_at = Column(DateTime(timezone=True), server_default=func.now(), index=True) + + +class MonitoredServer(Base): + __tablename__ = 'monitored_servers' + + id = Column(Integer, primary_key=True, index=True) + identifier = Column(String(128), nullable=False, unique=True) + display_name = Column(String(128), nullable=True) + is_enabled = Column(Boolean, default=True) + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + +class ServerState(Base): + __tablename__ = 'server_states' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, unique=True) + openclaw_version = Column(String(64), nullable=True) + agents_json = Column(Text, nullable=True) # json list + cpu_pct = Column(Float, nullable=True) + mem_pct = Column(Float, nullable=True) + disk_pct = Column(Float, nullable=True) + swap_pct = Column(Float, nullable=True) + last_seen_at = Column(DateTime(timezone=True), nullable=True) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) diff --git a/app/services/monitoring.py b/app/services/monitoring.py new file mode 100644 index 0000000..9ea73e5 --- /dev/null +++ b/app/services/monitoring.py @@ -0,0 +1,132 @@ +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + +import requests +from sqlalchemy.orm import Session + +from app.models.models import Issue +from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState + +_CACHE: Dict[str, Dict[str, Any]] = {} + + +def _now(): + return datetime.now(timezone.utc) + + +def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): + key = 'issue_stats_24h' + now = _now() + hit = _CACHE.get(key) + if hit and (now - hit['at']).total_seconds() < ttl_seconds: + return hit['data'] + + since = now - timedelta(hours=24) + total = db.query(Issue).count() + new_24h = db.query(Issue).filter(Issue.created_at >= since).count() + processed_24h = db.query(Issue).filter( + Issue.updated_at != None, + Issue.updated_at >= since, + Issue.status.in_(['resolved', 'closed']) + ).count() + data = { + 'total_issues': total, + 'new_issues_24h': new_24h, + 'processed_issues_24h': processed_24h, + 'computed_at': now.isoformat(), + 'cache_ttl_seconds': ttl_seconds, + } + _CACHE[key] = {'at': now, 'data': data} + return data + + +def _provider_headers(provider: str, credential: str): + if provider == 'openai': + return {'Authorization': f'Bearer {credential}'} + if provider == 'anthropic': + return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} + return None + + +def test_provider_connection(provider: str, credential: str): + provider = provider.lower() + try: + if provider == 'openai': + r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) + return r.status_code == 200, f'status={r.status_code}' + if provider == 'anthropic': + r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) + return r.status_code == 200, f'status={r.status_code}' + if provider in {'minimax', 'kimi', 'qwen'}: + # Endpoints/usage API vary by deployment; keep as accepted-but-unverified for now. + return True, 'accepted (connectivity check pending provider-specific adapter)' + return False, 'unsupported provider' + except Exception as e: + return False, str(e) + + +def refresh_provider_usage_once(db: Session): + accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() + now = _now() + for a in accounts: + ok, msg = test_provider_connection(a.provider, a.credential) + snap = ProviderUsageSnapshot( + account_id=a.id, + window_label='provider-default', + used=None, + limit=None, + usage_pct=None, + reset_at=None, + status='ok' if ok else 'error', + error=None if ok else msg, + raw_payload=json.dumps({'message': msg}, ensure_ascii=False), + fetched_at=now, + ) + db.add(snap) + db.commit() + + +def get_provider_usage_view(db: Session): + accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() + rows = [] + for a in accounts: + snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first() + rows.append({ + 'account_id': a.id, + 'provider': a.provider, + 'label': a.label, + 'window': snap.window_label if snap else None, + 'usage_pct': snap.usage_pct if snap else None, + 'used': snap.used if snap else None, + 'limit': snap.limit if snap else None, + 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, + 'status': snap.status if snap else 'pending', + 'error': snap.error if snap else None, + 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, + }) + return rows + + +def get_server_states_view(db: Session, offline_after_minutes: int = 7): + now = _now() + servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all() + out = [] + for s in servers: + st = db.query(ServerState).filter(ServerState.server_id == s.id).first() + last_seen = st.last_seen_at if st else None + online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60) + out.append({ + 'server_id': s.id, + 'identifier': s.identifier, + 'display_name': s.display_name or s.identifier, + 'online': online, + 'openclaw_version': st.openclaw_version if st else None, + 'cpu_pct': st.cpu_pct if st else None, + 'mem_pct': st.mem_pct if st else None, + 'disk_pct': st.disk_pct if st else None, + 'swap_pct': st.swap_pct if st else None, + 'agents': json.loads(st.agents_json) if st and st.agents_json else [], + 'last_seen_at': last_seen.isoformat() if last_seen else None, + }) + return out