diff --git a/app/api/routers/monitor.py b/app/api/routers/monitor.py new file mode 100644 index 0000000..1435e6f --- /dev/null +++ b/app/api/routers/monitor.py @@ -0,0 +1,287 @@ +from datetime import datetime, timedelta, timezone +import json +import uuid +from typing import List, Dict + +from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from app.core.config import get_db, SessionLocal +from app.api.deps import get_current_user_or_apikey +from app.models import models +from app.models.monitor import ( + ProviderAccount, + MonitoredServer, + ServerState, + ServerChallenge, + ServerHandshakeNonce, +) +from app.services.monitoring import ( + get_issue_stats_cached, + get_provider_usage_view, + get_server_states_view, + test_provider_connection, +) +from app.services.crypto_box import get_public_key_info, decrypt_payload_b64, ts_within + +router = APIRouter(prefix='/monitor', tags=['Monitor']) +SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'} +ACTIVE_WS: Dict[int, WebSocket] = {} + + +class ProviderAccountCreate(BaseModel): + provider: str + label: str + credential: str + + +class ProviderTestRequest(BaseModel): + provider: str + credential: str + + +class MonitoredServerCreate(BaseModel): + identifier: str + display_name: str | None = None + + +class ChallengeResponse(BaseModel): + identifier: str + challenge_uuid: str + expires_at: str + + +def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)): + if not current_user.is_admin: + raise HTTPException(status_code=403, detail='Admin required') + return current_user + + +@router.get('/public/server-public-key') +def monitor_public_key(): + return get_public_key_info() + + +@router.get('/public/overview') +def public_overview(db: Session = Depends(get_db)): + return { + 'issues': get_issue_stats_cached(db, ttl_seconds=1800), + 'providers': get_provider_usage_view(db), + 'servers': get_server_states_view(db, offline_after_minutes=7), + 'generated_at': datetime.now(timezone.utc).isoformat(), + } + + +@router.get('/admin/providers/accounts') +def list_provider_accounts(db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + accounts = db.query(ProviderAccount).order_by(ProviderAccount.created_at.desc()).all() + return [ + { + 'id': a.id, + 'provider': a.provider, + 'label': a.label, + 'is_enabled': a.is_enabled, + 'created_at': a.created_at, + 'credential_masked': '***' + (a.credential[-4:] if a.credential else ''), + } + for a in accounts + ] + + +@router.post('/admin/providers/accounts', status_code=status.HTTP_201_CREATED) +def create_provider_account(payload: ProviderAccountCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)): + provider = payload.provider.lower().strip() + if provider not in SUPPORTED_PROVIDERS: + raise HTTPException(status_code=400, detail=f'Unsupported provider: {provider}') + obj = ProviderAccount( + provider=provider, + label=payload.label.strip(), + credential=payload.credential.strip(), + is_enabled=True, + created_by=user.id, + ) + db.add(obj) + db.commit() + db.refresh(obj) + return {'id': obj.id, 'provider': obj.provider, 'label': obj.label, 'is_enabled': obj.is_enabled} + + +@router.post('/admin/providers/test') +def test_provider(payload: ProviderTestRequest, _: models.User = Depends(require_admin)): + ok, message = test_provider_connection(payload.provider.lower().strip(), payload.credential.strip()) + return {'ok': ok, 'message': message} + + +@router.delete('/admin/providers/accounts/{account_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_provider_account(account_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + obj = db.query(ProviderAccount).filter(ProviderAccount.id == account_id).first() + if not obj: + raise HTTPException(status_code=404, detail='Provider account not found') + db.delete(obj) + db.commit() + return None + + +@router.get('/admin/servers') +def list_servers(db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + return get_server_states_view(db, offline_after_minutes=7) + + +@router.post('/admin/servers', status_code=status.HTTP_201_CREATED) +def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)): + identifier = payload.identifier.strip() + if not identifier: + raise HTTPException(status_code=400, detail='identifier required') + exists = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier).first() + if exists: + raise HTTPException(status_code=400, detail='identifier already exists') + obj = MonitoredServer(identifier=identifier, display_name=payload.display_name, is_enabled=True, created_by=user.id) + db.add(obj) + db.commit() + db.refresh(obj) + return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled} + + +@router.post('/admin/servers/{server_id}/challenge', response_model=ChallengeResponse) +def issue_server_challenge(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + server = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first() + if not server: + raise HTTPException(status_code=404, detail='Server not found') + challenge_uuid = str(uuid.uuid4()) + expires_at = datetime.now(timezone.utc) + timedelta(minutes=10) + ch = ServerChallenge(server_id=server_id, challenge_uuid=challenge_uuid, expires_at=expires_at) + db.add(ch) + db.commit() + return ChallengeResponse(identifier=server.identifier, challenge_uuid=challenge_uuid, expires_at=expires_at.isoformat()) + + +@router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first() + if not obj: + raise HTTPException(status_code=404, detail='Server not found') + state = db.query(ServerState).filter(ServerState.server_id == server_id).first() + if state: + db.delete(state) + db.query(ServerChallenge).filter(ServerChallenge.server_id == server_id).delete() + db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server_id).delete() + db.delete(obj) + db.commit() + return None + + +class ServerHeartbeat(BaseModel): + identifier: str + openclaw_version: str | None = None + agents: List[dict] = [] + cpu_pct: float | None = None + mem_pct: float | None = None + disk_pct: float | None = None + swap_pct: float | None = None + + +@router.post('/server/heartbeat') +def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)): + server = db.query(MonitoredServer).filter(MonitoredServer.identifier == payload.identifier, MonitoredServer.is_enabled == True).first() + if not server: + raise HTTPException(status_code=404, detail='unknown server identifier') + st = db.query(ServerState).filter(ServerState.server_id == server.id).first() + if not st: + st = ServerState(server_id=server.id) + db.add(st) + st.openclaw_version = payload.openclaw_version + st.agents_json = json.dumps(payload.agents, ensure_ascii=False) + st.cpu_pct = payload.cpu_pct + st.mem_pct = payload.mem_pct + st.disk_pct = payload.disk_pct + st.swap_pct = payload.swap_pct + st.last_seen_at = datetime.now(timezone.utc) + db.commit() + return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at} + + +@router.websocket('/server/ws') +async def server_ws(websocket: WebSocket): + await websocket.accept() + db = SessionLocal() + server_id = None + try: + hello = await websocket.receive_json() + + encrypted_payload = (hello.get('encrypted_payload') or '').strip() + if encrypted_payload: + data = decrypt_payload_b64(encrypted_payload) + identifier = (data.get('identifier') or '').strip() + challenge_uuid = (data.get('challenge_uuid') or '').strip() + nonce = (data.get('nonce') or '').strip() + ts = data.get('ts') + if not ts_within(ts, max_minutes=10): + await websocket.close(code=4401) + return + else: + # backward compatible mode + identifier = (hello.get('identifier') or '').strip() + challenge_uuid = (hello.get('challenge_uuid') or '').strip() + nonce = (hello.get('nonce') or '').strip() + + if not identifier or not challenge_uuid or not nonce: + await websocket.close(code=4400) + return + + server = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier, MonitoredServer.is_enabled == True).first() + if not server: + await websocket.close(code=4404) + return + + ch = db.query(ServerChallenge).filter(ServerChallenge.challenge_uuid == challenge_uuid, ServerChallenge.server_id == server.id).first() + if not ch or ch.used_at is not None or ch.expires_at < datetime.now(timezone.utc): + await websocket.close(code=4401) + return + + nonce_used = db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server.id, ServerHandshakeNonce.nonce == nonce).first() + if nonce_used: + await websocket.close(code=4409) + return + + db.add(ServerHandshakeNonce(server_id=server.id, nonce=nonce)) + ch.used_at = datetime.now(timezone.utc) + db.commit() + + server_id = server.id + ACTIVE_WS[server.id] = websocket + await websocket.send_json({'ok': True, 'server_id': server.id, 'message': 'connected'}) + + while True: + msg = await websocket.receive_json() + event = msg.get('event') + payload = msg.get('payload') or {} + st = db.query(ServerState).filter(ServerState.server_id == server.id).first() + if not st: + st = ServerState(server_id=server.id) + db.add(st) + + if event == 'server.hello': + st.openclaw_version = payload.get('openclaw_version') + st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False) + elif event in {'server.metrics', 'agent.status_changed'}: + st.cpu_pct = payload.get('cpu_pct', st.cpu_pct) + st.mem_pct = payload.get('mem_pct', st.mem_pct) + st.disk_pct = payload.get('disk_pct', st.disk_pct) + st.swap_pct = payload.get('swap_pct', st.swap_pct) + if 'agents' in payload: + st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False) + + st.last_seen_at = datetime.now(timezone.utc) + db.commit() + except WebSocketDisconnect: + pass + except Exception: + try: + await websocket.close(code=1011) + except Exception: + pass + finally: + if server_id and ACTIVE_WS.get(server_id) is websocket: + ACTIVE_WS.pop(server_id, None) + db.close() diff --git a/app/main.py b/app/main.py index f43e6d4..2ea5790 100644 --- a/app/main.py +++ b/app/main.py @@ -34,6 +34,7 @@ from app.api.routers.users import router as users_router from app.api.routers.comments import router as comments_router from app.api.routers.webhooks import router as webhooks_router from app.api.routers.misc import router as misc_router +from app.api.routers.monitor import router as monitor_router app.include_router(auth_router) app.include_router(issues_router) @@ -42,12 +43,13 @@ app.include_router(users_router) app.include_router(comments_router) app.include_router(webhooks_router) app.include_router(misc_router) +app.include_router(monitor_router) # Run database migration on startup @app.on_event("startup") def startup(): from app.core.config import Base, engine, SessionLocal - from app.models import webhook, apikey, activity, milestone, notification, worklog + from app.models import webhook, apikey, activity, milestone, notification, worklog, monitor Base.metadata.create_all(bind=engine) # Initialize from AbstractWizard (admin user, default project, etc.) @@ -57,3 +59,21 @@ def startup(): run_init(db) finally: db.close() + + # Start lightweight monitor polling thread (every 10 minutes) + import threading, time + from app.services.monitoring import refresh_provider_usage_once + + def _monitor_poll_loop(): + while True: + db2 = SessionLocal() + try: + refresh_provider_usage_once(db2) + except Exception: + pass + finally: + db2.close() + time.sleep(600) + + t = threading.Thread(target=_monitor_poll_loop, daemon=True) + t.start() diff --git a/app/models/monitor.py b/app/models/monitor.py new file mode 100644 index 0000000..533dbcb --- /dev/null +++ b/app/models/monitor.py @@ -0,0 +1,78 @@ +from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Float, ForeignKey +from sqlalchemy.sql import func +from app.core.config import Base + + +class ProviderAccount(Base): + __tablename__ = 'provider_accounts' + + id = Column(Integer, primary_key=True, index=True) + provider = Column(String(32), nullable=False, index=True) # anthropic/openai/minimax/kimi/qwen + label = Column(String(128), nullable=False) + credential = Column(Text, nullable=False) # TODO: encrypt at rest + is_enabled = Column(Boolean, default=True) + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + +class ProviderUsageSnapshot(Base): + __tablename__ = 'provider_usage_snapshots' + + id = Column(Integer, primary_key=True, index=True) + account_id = Column(Integer, ForeignKey('provider_accounts.id'), nullable=False, index=True) + window_label = Column(String(32), nullable=True) # e.g. 1h / 7d + used = Column(Float, nullable=True) + limit = Column(Float, nullable=True) + usage_pct = Column(Float, nullable=True) + reset_at = Column(DateTime(timezone=True), nullable=True) + status = Column(String(32), nullable=False, default='unknown') # ok/error/pending/unsupported + error = Column(Text, nullable=True) + raw_payload = Column(Text, nullable=True) + fetched_at = Column(DateTime(timezone=True), server_default=func.now(), index=True) + + +class MonitoredServer(Base): + __tablename__ = 'monitored_servers' + + id = Column(Integer, primary_key=True, index=True) + identifier = Column(String(128), nullable=False, unique=True) + display_name = Column(String(128), nullable=True) + is_enabled = Column(Boolean, default=True) + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + +class ServerState(Base): + __tablename__ = 'server_states' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, unique=True) + openclaw_version = Column(String(64), nullable=True) + agents_json = Column(Text, nullable=True) # json list + cpu_pct = Column(Float, nullable=True) + mem_pct = Column(Float, nullable=True) + disk_pct = Column(Float, nullable=True) + swap_pct = Column(Float, nullable=True) + last_seen_at = Column(DateTime(timezone=True), nullable=True) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + +class ServerChallenge(Base): + __tablename__ = 'server_challenges' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True) + challenge_uuid = Column(String(64), nullable=False, unique=True, index=True) + expires_at = Column(DateTime(timezone=True), nullable=False) + used_at = Column(DateTime(timezone=True), nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + +class ServerHandshakeNonce(Base): + __tablename__ = 'server_handshake_nonces' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True) + nonce = Column(String(128), nullable=False, index=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) diff --git a/app/services/crypto_box.py b/app/services/crypto_box.py new file mode 100644 index 0000000..38423ba --- /dev/null +++ b/app/services/crypto_box.py @@ -0,0 +1,63 @@ +import base64 +import hashlib +import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, Any + +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import rsa, padding + +KEY_DIR = Path(os.getenv('MONITOR_KEY_DIR', '/config/monitor_keys')) +PRIV_PATH = KEY_DIR / 'monitor_private.pem' +PUB_PATH = KEY_DIR / 'monitor_public.pem' + + +def ensure_keypair() -> None: + KEY_DIR.mkdir(parents=True, exist_ok=True) + if PRIV_PATH.exists() and PUB_PATH.exists(): + return + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + PRIV_PATH.write_bytes(private_pem) + PUB_PATH.write_bytes(public_pem) + + +def get_public_key_info() -> Dict[str, str]: + ensure_keypair() + pem = PUB_PATH.read_text() + kid = hashlib.sha256(pem.encode()).hexdigest()[:16] + return {'public_key_pem': pem, 'key_id': kid} + + +def decrypt_payload_b64(ciphertext_b64: str) -> Dict[str, Any]: + ensure_keypair() + private_key = serialization.load_pem_private_key(PRIV_PATH.read_bytes(), password=None) + plaintext = private_key.decrypt( + base64.b64decode(ciphertext_b64), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + obj = json.loads(plaintext.decode()) + return obj + + +def ts_within(ts_iso: str, max_minutes: int = 10) -> bool: + try: + ts = datetime.fromisoformat(ts_iso.replace('Z', '+00:00')) + except Exception: + return False + now = datetime.now(timezone.utc) + return abs((now - ts).total_seconds()) <= max_minutes * 60 diff --git a/app/services/monitoring.py b/app/services/monitoring.py new file mode 100644 index 0000000..df053fa --- /dev/null +++ b/app/services/monitoring.py @@ -0,0 +1,305 @@ +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Tuple + +import requests +from sqlalchemy.orm import Session + +from app.models.models import Issue +from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState + +_CACHE: Dict[str, Dict[str, Any]] = {} + + +def _now(): + return datetime.now(timezone.utc) + + +def _parse_credential(raw: str) -> Dict[str, Any]: + raw = (raw or '').strip() + if raw.startswith('{'): + try: + return json.loads(raw) + except Exception: + return {'api_key': raw} + return {'api_key': raw} + + +def _parse_reset_at(value) -> datetime | None: + if not value: + return None + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value, tz=timezone.utc) + if isinstance(value, str): + try: + return datetime.fromisoformat(value.replace('Z', '+00:00')) + except Exception: + return None + return None + + +def _normalize_usage_payload(payload: Dict[str, Any]) -> Dict[str, Any]: + used = payload.get('used') or payload.get('usage') or payload.get('consumed') or payload.get('total_usage') + limit = payload.get('limit') or payload.get('quota') or payload.get('hard_limit') or payload.get('total') + remaining = payload.get('remain') or payload.get('remaining') or payload.get('left') + usage_pct = payload.get('usage_pct') or payload.get('percent') or payload.get('usage_percent') + window_label = payload.get('window') or payload.get('window_label') + reset_at = payload.get('reset_at') or payload.get('reset_time') or payload.get('reset') + + if used is None and remaining is not None and limit is not None: + try: + used = float(limit) - float(remaining) + except Exception: + pass + + if usage_pct is None and used is not None and limit: + try: + usage_pct = round(float(used) / float(limit) * 100, 2) + except Exception: + pass + + return { + 'window_label': window_label, + 'used': used, + 'limit': limit, + 'usage_pct': usage_pct, + 'reset_at': _parse_reset_at(reset_at), + 'raw': payload, + } + + +def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): + key = 'issue_stats_24h' + now = _now() + hit = _CACHE.get(key) + if hit and (now - hit['at']).total_seconds() < ttl_seconds: + return hit['data'] + + since = now - timedelta(hours=24) + total = db.query(Issue).count() + new_24h = db.query(Issue).filter(Issue.created_at >= since).count() + processed_24h = db.query(Issue).filter( + Issue.updated_at != None, + Issue.updated_at >= since, + Issue.status.in_(['resolved', 'closed']) + ).count() + data = { + 'total_issues': total, + 'new_issues_24h': new_24h, + 'processed_issues_24h': processed_24h, + 'computed_at': now.isoformat(), + 'cache_ttl_seconds': ttl_seconds, + } + _CACHE[key] = {'at': now, 'data': data} + return data + + +def _provider_headers(provider: str, credential: str, extra: Dict[str, Any] | None = None): + extra = extra or {} + if extra.get('auth_header'): + val = extra.get('auth_value') + if not val: + scheme = extra.get('auth_scheme') + val = f"{scheme} {credential}" if scheme else credential + return {extra['auth_header']: val} + if provider == 'openai': + return {'Authorization': f'Bearer {credential}'} + if provider == 'anthropic': + return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} + return {} + + +def test_provider_connection(provider: str, credential: str): + provider = provider.lower() + info = _parse_credential(credential) + key = info.get('api_key') or credential + try: + if provider == 'openai': + r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) + return r.status_code == 200, f'status={r.status_code}' + if provider == 'anthropic': + r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) + return r.status_code == 200, f'status={r.status_code}' + usage_url = info.get('usage_url') or info.get('test_url') + if provider == 'kimi' and not usage_url: + usage_url = 'https://www.kimi.com/api/user/usage' + if usage_url: + r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) + return r.status_code < 500, f'status={r.status_code}' + if provider in {'minimax', 'kimi', 'qwen'}: + return True, 'accepted (connectivity check pending provider-specific adapter)' + return False, 'unsupported provider' + except Exception as e: + return False, str(e) + + +def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + headers = _provider_headers('openai', key, info) + today = _now().date() + start = (today - timedelta(days=7)).isoformat() + end = today.isoformat() + usage_url = info.get('usage_url') or f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}' + sub_url = info.get('subscription_url') or 'https://api.openai.com/v1/dashboard/billing/subscription' + u = requests.get(usage_url, headers=headers, timeout=12) + s = requests.get(sub_url, headers=headers, timeout=12) + if u.status_code != 200 or s.status_code != 200: + return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'} + usage = u.json() + sub = s.json() + total_usage = usage.get('total_usage') + hard_limit = sub.get('hard_limit_usd') + reset_at_ts = sub.get('billing_cycle_anchor') + reset_at = datetime.fromtimestamp(reset_at_ts, tz=timezone.utc) if reset_at_ts else None + usage_pct = None + if total_usage is not None and hard_limit: + usage_pct = round(total_usage / hard_limit * 100, 2) + return 'ok', { + 'window_label': '7d', + 'used': total_usage, + 'limit': hard_limit, + 'usage_pct': usage_pct, + 'reset_at': reset_at, + 'raw': {'usage': usage, 'subscription': sub}, + } + + +def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') + if not usage_url: + return 'unsupported', {'error': 'anthropic usage API not configured'} + r = requests.get(usage_url, headers=_provider_headers('anthropic', key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): + payload = payload['data'] + return 'ok', _normalize_usage_payload(payload) + + +def _kimi_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') or 'https://www.kimi.com/api/user/usage' + r = requests.get(usage_url, headers=_provider_headers('kimi', key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): + payload = payload['data'] + return 'ok', _normalize_usage_payload(payload) + + +def _minimax_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') + if not usage_url: + return 'unsupported', {'error': 'minimax usage API not configured'} + r = requests.get(usage_url, headers=_provider_headers('minimax', key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): + payload = payload['data'] + return 'ok', _normalize_usage_payload(payload) + + +def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') + if not usage_url: + return 'unsupported', {'error': f'{provider} usage API not configured'} + r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + return 'ok', _normalize_usage_payload(payload) + + +def refresh_provider_usage_once(db: Session): + accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() + now = _now() + for a in accounts: + status = 'pending' + payload: Dict[str, Any] = {} + if a.provider == 'openai': + status, payload = _openai_usage(a.credential) + elif a.provider == 'anthropic': + status, payload = _anthropic_usage(a.credential) + elif a.provider == 'kimi': + status, payload = _kimi_usage(a.credential) + elif a.provider == 'minimax': + status, payload = _minimax_usage(a.credential) + elif a.provider == 'qwen': + status, payload = _generic_usage(a.provider, a.credential) + else: + ok, msg = test_provider_connection(a.provider, a.credential) + status = 'ok' if ok else 'error' + payload = {'error': None if ok else msg} + + snap = ProviderUsageSnapshot( + account_id=a.id, + window_label=payload.get('window_label'), + used=payload.get('used'), + limit=payload.get('limit'), + usage_pct=payload.get('usage_pct'), + reset_at=payload.get('reset_at'), + status=status, + error=payload.get('error'), + raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False), + fetched_at=now, + ) + db.add(snap) + db.commit() + + +def get_provider_usage_view(db: Session): + accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() + rows = [] + for a in accounts: + snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first() + rows.append({ + 'account_id': a.id, + 'provider': a.provider, + 'label': a.label, + 'window': snap.window_label if snap else None, + 'usage_pct': snap.usage_pct if snap else None, + 'used': snap.used if snap else None, + 'limit': snap.limit if snap else None, + 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, + 'status': snap.status if snap else 'pending', + 'error': snap.error if snap else None, + 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, + }) + return rows + + +def get_server_states_view(db: Session, offline_after_minutes: int = 7): + now = _now() + servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all() + out = [] + for s in servers: + st = db.query(ServerState).filter(ServerState.server_id == s.id).first() + last_seen = st.last_seen_at if st else None + online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60) + out.append({ + 'server_id': s.id, + 'identifier': s.identifier, + 'display_name': s.display_name or s.identifier, + 'online': online, + 'openclaw_version': st.openclaw_version if st else None, + 'cpu_pct': st.cpu_pct if st else None, + 'mem_pct': st.mem_pct if st else None, + 'disk_pct': st.disk_pct if st else None, + 'swap_pct': st.swap_pct if st else None, + 'agents': json.loads(st.agents_json) if st and st.agents_json else [], + 'last_seen_at': last_seen.isoformat() if last_seen else None, + }) + return out diff --git a/docs/monitor-provider-credentials.md b/docs/monitor-provider-credentials.md new file mode 100644 index 0000000..a9031bc --- /dev/null +++ b/docs/monitor-provider-credentials.md @@ -0,0 +1,63 @@ +# Provider 账号凭证格式(Monitor) + +默认情况下,`credential` 可以直接填写 API Key 字符串。 +如果需要配置自定义 usage 端点,请使用 JSON 字符串。 + +## 基础格式 +```json +{ + "api_key": "sk-...", + "usage_url": "https://.../usage", + "auth_header": "Authorization", + "auth_scheme": "Bearer" +} +``` + +### 字段说明 +- `api_key`: API key(必填) +- `usage_url`: 统计用量的 GET 端点(可选,minimax/kimi/qwen 推荐填写) +- `auth_header`: 自定义鉴权头名(可选) +- `auth_scheme`: 鉴权 scheme(如 `Bearer`),会拼成 `Bearer ` +- `auth_value`: 直接指定头值(优先级高于 scheme) + +## OpenAI +默认使用 OpenAI 官方 billing endpoints(7天窗口): +- `https://api.openai.com/v1/dashboard/billing/usage` +- `https://api.openai.com/v1/dashboard/billing/subscription` + +如需自定义可使用 JSON: +```json +{ + "api_key": "sk-...", + "usage_url": "https://api.openai.com/v1/dashboard/billing/usage?start_date=YYYY-MM-DD&end_date=YYYY-MM-DD", + "subscription_url": "https://api.openai.com/v1/dashboard/billing/subscription" +} +``` + +## Anthropic +官方 usage API 需要自行提供 `usage_url`(不同组织可能不同): +```json +{ + "api_key": "ak-...", + "usage_url": "https://api.anthropic.com/.../usage" +} +``` + +## Minimax / Kimi / Qwen +目前需要你提供 `usage_url`(具体端点取决于部署/账号): +```json +{ + "api_key": "...", + "usage_url": "https://.../usage", + "auth_header": "Authorization", + "auth_scheme": "Bearer" +} +``` + +## Kimi +推荐 usage_url: https://www.kimi.com/api/user/usage +Authorization: Bearer + +## Minimax +推荐 usage_url: https://platform.minimax.io/v1/api/openplatform/coding_plan/remains +Authorization: Bearer diff --git a/docs/openclaw-monitor-plugin-plan.md b/docs/openclaw-monitor-plugin-plan.md new file mode 100644 index 0000000..80c0f18 --- /dev/null +++ b/docs/openclaw-monitor-plugin-plan.md @@ -0,0 +1,68 @@ +# OpenClaw Monitor Agent Plugin 开发计划(草案) + +## 目标 +让被监测服务器通过 WebSocket 主动接入 HarborForge Backend,并持续上报: +- OpenClaw 版本 +- agent 列表 +- 每 5 分钟主机指标(CPU/MEM/DISK/SWAP) +- agent 状态变更事件 + +## 握手流程 +1. Admin 在 HarborForge 后台添加 server identifier +2. Admin 生成 challenge(10 分钟有效) +3. 插件请求 `GET /monitor/public/server-public-key` 获取公钥 +4. 插件构造 payload: + - `identifier` + - `challenge_uuid` + - `nonce`(随机) + - `ts`(ISO8601) +5. 使用 RSA-OAEP(SHA256) 公钥加密,base64 后作为 `encrypted_payload` 发给 `WS /monitor/server/ws` +6. 握手成功后进入事件上报通道 + +## 插件事件协议 +### server.hello +```json +{ + "event": "server.hello", + "payload": { + "openclaw_version": "x.y.z", + "agents": [{"id": "a1", "name": "agent-1", "status": "idle"}] + } +} +``` + +### server.metrics(每 5 分钟) +```json +{ + "event": "server.metrics", + "payload": { + "cpu_pct": 21.3, + "mem_pct": 42.1, + "disk_pct": 55.9, + "swap_pct": 0.0, + "agents": [{"id": "a1", "name": "agent-1", "status": "busy"}] + } +} +``` + +### agent.status_changed(可选) +```json +{ + "event": "agent.status_changed", + "payload": { + "agents": [{"id": "a1", "name": "agent-1", "status": "focus"}] + } +} +``` + +## 实施里程碑 +- M1: Node/Python CLI 插件最小握手联通 +- M2: 指标采集 + 周期上报 +- M3: agent 状态采集与变更事件 +- M4: 守护化(systemd)+ 断线重连 + 本地日志 + +## 风险与注意事项 +- 时钟漂移会导致 `ts` 校验失败(建议 NTP) +- challenge 仅一次可用,重复使用会被拒绝 +- nonce 重放会被拒绝 +- 需要保证插件本地安全保存 identifier/challenge(短期) diff --git a/requirements.txt b/requirements.txt index 9db19f8..85208cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ python-multipart==0.0.6 alembic==1.13.1 python-dotenv==1.0.0 httpx==0.27.0 +requests==2.31.0