From d299428d35a960320c108a443aa478aab28d4708 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 11:59:53 +0000 Subject: [PATCH 01/11] feat: add public monitor API + admin provider/server management scaffold --- app/api/routers/monitor.py | 169 +++++++++++++++++++++++++++++++++++++ app/main.py | 22 ++++- app/models/monitor.py | 58 +++++++++++++ app/services/monitoring.py | 132 +++++++++++++++++++++++++++++ 4 files changed, 380 insertions(+), 1 deletion(-) create mode 100644 app/api/routers/monitor.py create mode 100644 app/models/monitor.py create mode 100644 app/services/monitoring.py diff --git a/app/api/routers/monitor.py b/app/api/routers/monitor.py new file mode 100644 index 0000000..d4dfbef --- /dev/null +++ b/app/api/routers/monitor.py @@ -0,0 +1,169 @@ +from datetime import datetime, timedelta, timezone +import json +from typing import List + +from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlalchemy.orm import Session + +from app.core.config import get_db +from app.api.deps import get_current_user_or_apikey +from app.models import models +from app.models.monitor import ProviderAccount, MonitoredServer, ServerState +from app.services.monitoring import ( + get_issue_stats_cached, + get_provider_usage_view, + get_server_states_view, + test_provider_connection, +) + +router = APIRouter(prefix='/monitor', tags=['Monitor']) + + +SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'} + + +class ProviderAccountCreate(BaseModel): + provider: str + label: str + credential: str + + +class ProviderTestRequest(BaseModel): + provider: str + credential: str + + +class MonitoredServerCreate(BaseModel): + identifier: str + display_name: str | None = None + + +def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)): + if not current_user.is_admin: + raise HTTPException(status_code=403, detail='Admin required') + return current_user + + +@router.get('/public/overview') +def public_overview(db: Session = Depends(get_db)): + return { + 'issues': get_issue_stats_cached(db, ttl_seconds=1800), + 'providers': get_provider_usage_view(db), + 'servers': get_server_states_view(db, offline_after_minutes=7), + 'generated_at': datetime.now(timezone.utc).isoformat(), + } + + +@router.get('/admin/providers/accounts') +def list_provider_accounts(db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + accounts = db.query(ProviderAccount).order_by(ProviderAccount.created_at.desc()).all() + return [ + { + 'id': a.id, + 'provider': a.provider, + 'label': a.label, + 'is_enabled': a.is_enabled, + 'created_at': a.created_at, + 'credential_masked': '***' + (a.credential[-4:] if a.credential else ''), + } + for a in accounts + ] + + +@router.post('/admin/providers/accounts', status_code=status.HTTP_201_CREATED) +def create_provider_account(payload: ProviderAccountCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)): + provider = payload.provider.lower().strip() + if provider not in SUPPORTED_PROVIDERS: + raise HTTPException(status_code=400, detail=f'Unsupported provider: {provider}') + obj = ProviderAccount( + provider=provider, + label=payload.label.strip(), + credential=payload.credential.strip(), + is_enabled=True, + created_by=user.id, + ) + db.add(obj) + db.commit() + db.refresh(obj) + return {'id': obj.id, 'provider': obj.provider, 'label': obj.label, 'is_enabled': obj.is_enabled} + + +@router.post('/admin/providers/test') +def test_provider(payload: ProviderTestRequest, _: models.User = Depends(require_admin)): + ok, message = test_provider_connection(payload.provider.lower().strip(), payload.credential.strip()) + return {'ok': ok, 'message': message} + + +@router.delete('/admin/providers/accounts/{account_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_provider_account(account_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + obj = db.query(ProviderAccount).filter(ProviderAccount.id == account_id).first() + if not obj: + raise HTTPException(status_code=404, detail='Provider account not found') + db.delete(obj) + db.commit() + return None + + +@router.get('/admin/servers') +def list_servers(db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + return get_server_states_view(db, offline_after_minutes=7) + + +@router.post('/admin/servers', status_code=status.HTTP_201_CREATED) +def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)): + identifier = payload.identifier.strip() + if not identifier: + raise HTTPException(status_code=400, detail='identifier required') + exists = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier).first() + if exists: + raise HTTPException(status_code=400, detail='identifier already exists') + obj = MonitoredServer(identifier=identifier, display_name=payload.display_name, is_enabled=True, created_by=user.id) + db.add(obj) + db.commit() + db.refresh(obj) + return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled} + + +@router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first() + if not obj: + raise HTTPException(status_code=404, detail='Server not found') + state = db.query(ServerState).filter(ServerState.server_id == server_id).first() + if state: + db.delete(state) + db.delete(obj) + db.commit() + return None + + +# Temporary ingestion endpoint before WS plugin lands +class ServerHeartbeat(BaseModel): + identifier: str + openclaw_version: str | None = None + agents: List[dict] = [] + cpu_pct: float | None = None + mem_pct: float | None = None + disk_pct: float | None = None + swap_pct: float | None = None + + +@router.post('/server/heartbeat') +def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)): + server = db.query(MonitoredServer).filter(MonitoredServer.identifier == payload.identifier, MonitoredServer.is_enabled == True).first() + if not server: + raise HTTPException(status_code=404, detail='unknown server identifier') + st = db.query(ServerState).filter(ServerState.server_id == server.id).first() + if not st: + st = ServerState(server_id=server.id) + db.add(st) + st.openclaw_version = payload.openclaw_version + st.agents_json = json.dumps(payload.agents, ensure_ascii=False) + st.cpu_pct = payload.cpu_pct + st.mem_pct = payload.mem_pct + st.disk_pct = payload.disk_pct + st.swap_pct = payload.swap_pct + st.last_seen_at = datetime.now(timezone.utc) + db.commit() + return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at} diff --git a/app/main.py b/app/main.py index f43e6d4..2ea5790 100644 --- a/app/main.py +++ b/app/main.py @@ -34,6 +34,7 @@ from app.api.routers.users import router as users_router from app.api.routers.comments import router as comments_router from app.api.routers.webhooks import router as webhooks_router from app.api.routers.misc import router as misc_router +from app.api.routers.monitor import router as monitor_router app.include_router(auth_router) app.include_router(issues_router) @@ -42,12 +43,13 @@ app.include_router(users_router) app.include_router(comments_router) app.include_router(webhooks_router) app.include_router(misc_router) +app.include_router(monitor_router) # Run database migration on startup @app.on_event("startup") def startup(): from app.core.config import Base, engine, SessionLocal - from app.models import webhook, apikey, activity, milestone, notification, worklog + from app.models import webhook, apikey, activity, milestone, notification, worklog, monitor Base.metadata.create_all(bind=engine) # Initialize from AbstractWizard (admin user, default project, etc.) @@ -57,3 +59,21 @@ def startup(): run_init(db) finally: db.close() + + # Start lightweight monitor polling thread (every 10 minutes) + import threading, time + from app.services.monitoring import refresh_provider_usage_once + + def _monitor_poll_loop(): + while True: + db2 = SessionLocal() + try: + refresh_provider_usage_once(db2) + except Exception: + pass + finally: + db2.close() + time.sleep(600) + + t = threading.Thread(target=_monitor_poll_loop, daemon=True) + t.start() diff --git a/app/models/monitor.py b/app/models/monitor.py new file mode 100644 index 0000000..030e790 --- /dev/null +++ b/app/models/monitor.py @@ -0,0 +1,58 @@ +from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Float, ForeignKey +from sqlalchemy.sql import func +from app.core.config import Base + + +class ProviderAccount(Base): + __tablename__ = 'provider_accounts' + + id = Column(Integer, primary_key=True, index=True) + provider = Column(String(32), nullable=False, index=True) # anthropic/openai/minimax/kimi/qwen + label = Column(String(128), nullable=False) + credential = Column(Text, nullable=False) # TODO: encrypt at rest + is_enabled = Column(Boolean, default=True) + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + +class ProviderUsageSnapshot(Base): + __tablename__ = 'provider_usage_snapshots' + + id = Column(Integer, primary_key=True, index=True) + account_id = Column(Integer, ForeignKey('provider_accounts.id'), nullable=False, index=True) + window_label = Column(String(32), nullable=True) # e.g. 1h / 7d + used = Column(Float, nullable=True) + limit = Column(Float, nullable=True) + usage_pct = Column(Float, nullable=True) + reset_at = Column(DateTime(timezone=True), nullable=True) + status = Column(String(32), nullable=False, default='unknown') # ok/error/pending/unsupported + error = Column(Text, nullable=True) + raw_payload = Column(Text, nullable=True) + fetched_at = Column(DateTime(timezone=True), server_default=func.now(), index=True) + + +class MonitoredServer(Base): + __tablename__ = 'monitored_servers' + + id = Column(Integer, primary_key=True, index=True) + identifier = Column(String(128), nullable=False, unique=True) + display_name = Column(String(128), nullable=True) + is_enabled = Column(Boolean, default=True) + created_by = Column(Integer, nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + +class ServerState(Base): + __tablename__ = 'server_states' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, unique=True) + openclaw_version = Column(String(64), nullable=True) + agents_json = Column(Text, nullable=True) # json list + cpu_pct = Column(Float, nullable=True) + mem_pct = Column(Float, nullable=True) + disk_pct = Column(Float, nullable=True) + swap_pct = Column(Float, nullable=True) + last_seen_at = Column(DateTime(timezone=True), nullable=True) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) diff --git a/app/services/monitoring.py b/app/services/monitoring.py new file mode 100644 index 0000000..9ea73e5 --- /dev/null +++ b/app/services/monitoring.py @@ -0,0 +1,132 @@ +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + +import requests +from sqlalchemy.orm import Session + +from app.models.models import Issue +from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState + +_CACHE: Dict[str, Dict[str, Any]] = {} + + +def _now(): + return datetime.now(timezone.utc) + + +def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): + key = 'issue_stats_24h' + now = _now() + hit = _CACHE.get(key) + if hit and (now - hit['at']).total_seconds() < ttl_seconds: + return hit['data'] + + since = now - timedelta(hours=24) + total = db.query(Issue).count() + new_24h = db.query(Issue).filter(Issue.created_at >= since).count() + processed_24h = db.query(Issue).filter( + Issue.updated_at != None, + Issue.updated_at >= since, + Issue.status.in_(['resolved', 'closed']) + ).count() + data = { + 'total_issues': total, + 'new_issues_24h': new_24h, + 'processed_issues_24h': processed_24h, + 'computed_at': now.isoformat(), + 'cache_ttl_seconds': ttl_seconds, + } + _CACHE[key] = {'at': now, 'data': data} + return data + + +def _provider_headers(provider: str, credential: str): + if provider == 'openai': + return {'Authorization': f'Bearer {credential}'} + if provider == 'anthropic': + return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} + return None + + +def test_provider_connection(provider: str, credential: str): + provider = provider.lower() + try: + if provider == 'openai': + r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) + return r.status_code == 200, f'status={r.status_code}' + if provider == 'anthropic': + r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) + return r.status_code == 200, f'status={r.status_code}' + if provider in {'minimax', 'kimi', 'qwen'}: + # Endpoints/usage API vary by deployment; keep as accepted-but-unverified for now. + return True, 'accepted (connectivity check pending provider-specific adapter)' + return False, 'unsupported provider' + except Exception as e: + return False, str(e) + + +def refresh_provider_usage_once(db: Session): + accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() + now = _now() + for a in accounts: + ok, msg = test_provider_connection(a.provider, a.credential) + snap = ProviderUsageSnapshot( + account_id=a.id, + window_label='provider-default', + used=None, + limit=None, + usage_pct=None, + reset_at=None, + status='ok' if ok else 'error', + error=None if ok else msg, + raw_payload=json.dumps({'message': msg}, ensure_ascii=False), + fetched_at=now, + ) + db.add(snap) + db.commit() + + +def get_provider_usage_view(db: Session): + accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() + rows = [] + for a in accounts: + snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first() + rows.append({ + 'account_id': a.id, + 'provider': a.provider, + 'label': a.label, + 'window': snap.window_label if snap else None, + 'usage_pct': snap.usage_pct if snap else None, + 'used': snap.used if snap else None, + 'limit': snap.limit if snap else None, + 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, + 'status': snap.status if snap else 'pending', + 'error': snap.error if snap else None, + 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, + }) + return rows + + +def get_server_states_view(db: Session, offline_after_minutes: int = 7): + now = _now() + servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all() + out = [] + for s in servers: + st = db.query(ServerState).filter(ServerState.server_id == s.id).first() + last_seen = st.last_seen_at if st else None + online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60) + out.append({ + 'server_id': s.id, + 'identifier': s.identifier, + 'display_name': s.display_name or s.identifier, + 'online': online, + 'openclaw_version': st.openclaw_version if st else None, + 'cpu_pct': st.cpu_pct if st else None, + 'mem_pct': st.mem_pct if st else None, + 'disk_pct': st.disk_pct if st else None, + 'swap_pct': st.swap_pct if st else None, + 'agents': json.loads(st.agents_json) if st and st.agents_json else [], + 'last_seen_at': last_seen.isoformat() if last_seen else None, + }) + return out From 464bccafd812a5bac31313d98509599cbb1a6ada Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 12:41:32 +0000 Subject: [PATCH 02/11] feat: add 10m server challenge flow and websocket telemetry channel --- app/api/routers/monitor.py | 113 ++++++++++++++++++++++++++++++++++--- app/models/monitor.py | 20 +++++++ 2 files changed, 126 insertions(+), 7 deletions(-) diff --git a/app/api/routers/monitor.py b/app/api/routers/monitor.py index d4dfbef..6c37a14 100644 --- a/app/api/routers/monitor.py +++ b/app/api/routers/monitor.py @@ -1,15 +1,22 @@ from datetime import datetime, timedelta, timezone import json -from typing import List +import uuid +from typing import List, Dict -from fastapi import APIRouter, Depends, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect from pydantic import BaseModel from sqlalchemy.orm import Session -from app.core.config import get_db +from app.core.config import get_db, SessionLocal from app.api.deps import get_current_user_or_apikey from app.models import models -from app.models.monitor import ProviderAccount, MonitoredServer, ServerState +from app.models.monitor import ( + ProviderAccount, + MonitoredServer, + ServerState, + ServerChallenge, + ServerHandshakeNonce, +) from app.services.monitoring import ( get_issue_stats_cached, get_provider_usage_view, @@ -18,9 +25,8 @@ from app.services.monitoring import ( ) router = APIRouter(prefix='/monitor', tags=['Monitor']) - - SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'} +ACTIVE_WS: Dict[int, WebSocket] = {} class ProviderAccountCreate(BaseModel): @@ -39,6 +45,12 @@ class MonitoredServerCreate(BaseModel): display_name: str | None = None +class ChallengeResponse(BaseModel): + identifier: str + challenge_uuid: str + expires_at: str + + def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)): if not current_user.is_admin: raise HTTPException(status_code=403, detail='Admin required') @@ -125,6 +137,19 @@ def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), us return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled} +@router.post('/admin/servers/{server_id}/challenge', response_model=ChallengeResponse) +def issue_server_challenge(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): + server = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first() + if not server: + raise HTTPException(status_code=404, detail='Server not found') + challenge_uuid = str(uuid.uuid4()) + expires_at = datetime.now(timezone.utc) + timedelta(minutes=10) + ch = ServerChallenge(server_id=server_id, challenge_uuid=challenge_uuid, expires_at=expires_at) + db.add(ch) + db.commit() + return ChallengeResponse(identifier=server.identifier, challenge_uuid=challenge_uuid, expires_at=expires_at.isoformat()) + + @router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT) def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)): obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first() @@ -133,12 +158,13 @@ def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User state = db.query(ServerState).filter(ServerState.server_id == server_id).first() if state: db.delete(state) + db.query(ServerChallenge).filter(ServerChallenge.server_id == server_id).delete() + db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server_id).delete() db.delete(obj) db.commit() return None -# Temporary ingestion endpoint before WS plugin lands class ServerHeartbeat(BaseModel): identifier: str openclaw_version: str | None = None @@ -167,3 +193,76 @@ def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)): st.last_seen_at = datetime.now(timezone.utc) db.commit() return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at} + + +@router.websocket('/server/ws') +async def server_ws(websocket: WebSocket): + await websocket.accept() + db = SessionLocal() + server_id = None + try: + hello = await websocket.receive_json() + identifier = (hello.get('identifier') or '').strip() + challenge_uuid = (hello.get('challenge_uuid') or '').strip() + nonce = (hello.get('nonce') or '').strip() + + if not identifier or not challenge_uuid or not nonce: + await websocket.close(code=4400) + return + + server = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier, MonitoredServer.is_enabled == True).first() + if not server: + await websocket.close(code=4404) + return + + ch = db.query(ServerChallenge).filter(ServerChallenge.challenge_uuid == challenge_uuid, ServerChallenge.server_id == server.id).first() + if not ch or ch.used_at is not None or ch.expires_at < datetime.now(timezone.utc): + await websocket.close(code=4401) + return + + nonce_used = db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server.id, ServerHandshakeNonce.nonce == nonce).first() + if nonce_used: + await websocket.close(code=4409) + return + + db.add(ServerHandshakeNonce(server_id=server.id, nonce=nonce)) + ch.used_at = datetime.now(timezone.utc) + db.commit() + + server_id = server.id + ACTIVE_WS[server.id] = websocket + await websocket.send_json({'ok': True, 'server_id': server.id, 'message': 'connected'}) + + while True: + msg = await websocket.receive_json() + event = msg.get('event') + payload = msg.get('payload') or {} + st = db.query(ServerState).filter(ServerState.server_id == server.id).first() + if not st: + st = ServerState(server_id=server.id) + db.add(st) + + if event == 'server.hello': + st.openclaw_version = payload.get('openclaw_version') + st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False) + elif event in {'server.metrics', 'agent.status_changed'}: + st.cpu_pct = payload.get('cpu_pct', st.cpu_pct) + st.mem_pct = payload.get('mem_pct', st.mem_pct) + st.disk_pct = payload.get('disk_pct', st.disk_pct) + st.swap_pct = payload.get('swap_pct', st.swap_pct) + if 'agents' in payload: + st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False) + + st.last_seen_at = datetime.now(timezone.utc) + db.commit() + except WebSocketDisconnect: + pass + except Exception: + try: + await websocket.close(code=1011) + except Exception: + pass + finally: + if server_id and ACTIVE_WS.get(server_id) is websocket: + ACTIVE_WS.pop(server_id, None) + db.close() diff --git a/app/models/monitor.py b/app/models/monitor.py index 030e790..533dbcb 100644 --- a/app/models/monitor.py +++ b/app/models/monitor.py @@ -56,3 +56,23 @@ class ServerState(Base): swap_pct = Column(Float, nullable=True) last_seen_at = Column(DateTime(timezone=True), nullable=True) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + +class ServerChallenge(Base): + __tablename__ = 'server_challenges' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True) + challenge_uuid = Column(String(64), nullable=False, unique=True, index=True) + expires_at = Column(DateTime(timezone=True), nullable=False) + used_at = Column(DateTime(timezone=True), nullable=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + +class ServerHandshakeNonce(Base): + __tablename__ = 'server_handshake_nonces' + + id = Column(Integer, primary_key=True, index=True) + server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True) + nonce = Column(String(128), nullable=False, index=True) + created_at = Column(DateTime(timezone=True), server_default=func.now()) From 9fb13f4906e7e857482d0b1c3fe32d25e7baf935 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 12:51:54 +0000 Subject: [PATCH 03/11] feat: add RSA public-key handshake support for monitor server websocket --- app/api/routers/monitor.py | 25 +++++++++++++-- app/services/crypto_box.py | 63 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 app/services/crypto_box.py diff --git a/app/api/routers/monitor.py b/app/api/routers/monitor.py index 6c37a14..1435e6f 100644 --- a/app/api/routers/monitor.py +++ b/app/api/routers/monitor.py @@ -23,6 +23,7 @@ from app.services.monitoring import ( get_server_states_view, test_provider_connection, ) +from app.services.crypto_box import get_public_key_info, decrypt_payload_b64, ts_within router = APIRouter(prefix='/monitor', tags=['Monitor']) SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'} @@ -57,6 +58,11 @@ def require_admin(current_user: models.User = Depends(get_current_user_or_apikey return current_user +@router.get('/public/server-public-key') +def monitor_public_key(): + return get_public_key_info() + + @router.get('/public/overview') def public_overview(db: Session = Depends(get_db)): return { @@ -202,9 +208,22 @@ async def server_ws(websocket: WebSocket): server_id = None try: hello = await websocket.receive_json() - identifier = (hello.get('identifier') or '').strip() - challenge_uuid = (hello.get('challenge_uuid') or '').strip() - nonce = (hello.get('nonce') or '').strip() + + encrypted_payload = (hello.get('encrypted_payload') or '').strip() + if encrypted_payload: + data = decrypt_payload_b64(encrypted_payload) + identifier = (data.get('identifier') or '').strip() + challenge_uuid = (data.get('challenge_uuid') or '').strip() + nonce = (data.get('nonce') or '').strip() + ts = data.get('ts') + if not ts_within(ts, max_minutes=10): + await websocket.close(code=4401) + return + else: + # backward compatible mode + identifier = (hello.get('identifier') or '').strip() + challenge_uuid = (hello.get('challenge_uuid') or '').strip() + nonce = (hello.get('nonce') or '').strip() if not identifier or not challenge_uuid or not nonce: await websocket.close(code=4400) diff --git a/app/services/crypto_box.py b/app/services/crypto_box.py new file mode 100644 index 0000000..38423ba --- /dev/null +++ b/app/services/crypto_box.py @@ -0,0 +1,63 @@ +import base64 +import hashlib +import json +import os +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, Any + +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import rsa, padding + +KEY_DIR = Path(os.getenv('MONITOR_KEY_DIR', '/config/monitor_keys')) +PRIV_PATH = KEY_DIR / 'monitor_private.pem' +PUB_PATH = KEY_DIR / 'monitor_public.pem' + + +def ensure_keypair() -> None: + KEY_DIR.mkdir(parents=True, exist_ok=True) + if PRIV_PATH.exists() and PUB_PATH.exists(): + return + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + PRIV_PATH.write_bytes(private_pem) + PUB_PATH.write_bytes(public_pem) + + +def get_public_key_info() -> Dict[str, str]: + ensure_keypair() + pem = PUB_PATH.read_text() + kid = hashlib.sha256(pem.encode()).hexdigest()[:16] + return {'public_key_pem': pem, 'key_id': kid} + + +def decrypt_payload_b64(ciphertext_b64: str) -> Dict[str, Any]: + ensure_keypair() + private_key = serialization.load_pem_private_key(PRIV_PATH.read_bytes(), password=None) + plaintext = private_key.decrypt( + base64.b64decode(ciphertext_b64), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + obj = json.loads(plaintext.decode()) + return obj + + +def ts_within(ts_iso: str, max_minutes: int = 10) -> bool: + try: + ts = datetime.fromisoformat(ts_iso.replace('Z', '+00:00')) + except Exception: + return False + now = datetime.now(timezone.utc) + return abs((now - ts).total_seconds()) <= max_minutes * 60 From 74e054c51ec80fa032b349754841c6254d80eb4e Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 12:52:21 +0000 Subject: [PATCH 04/11] docs: add openclaw monitor plugin implementation plan draft --- docs/openclaw-monitor-plugin-plan.md | 42 ++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 docs/openclaw-monitor-plugin-plan.md diff --git a/docs/openclaw-monitor-plugin-plan.md b/docs/openclaw-monitor-plugin-plan.md new file mode 100644 index 0000000..3bb9428 --- /dev/null +++ b/docs/openclaw-monitor-plugin-plan.md @@ -0,0 +1,42 @@ +# OpenClaw Monitor Agent Plugin 开发计划(草案) + +## 目标 +让被监测服务器通过 WebSocket 主动接入 HarborForge Backend,并持续上报: +- OpenClaw 版本 +- agent 列表 +- 每 5 分钟主机指标(CPU/MEM/DISK/SWAP) +- agent 状态变更事件 + +## 握手流程 +1. Admin 在 HarborForge 后台添加 server identifier +2. Admin 生成 challenge(10 分钟有效) +3. 插件请求 获取公钥 +4. 插件构造 payload: + - + - + - (随机) + - (ISO8601) +5. 使用 RSA-OAEP(SHA256) 公钥加密,base64 后作为 发给 +6. 握手成功后进入事件上报通道 + +## 插件事件协议 +### server.hello + + +### server.metrics(每 5 分钟) + + +### agent.status_changed(可选) + + +## 实施里程碑 +- M1: Node/Python CLI 插件最小握手联通 +- M2: 指标采集 + 周期上报 +- M3: agent 状态采集与变更事件 +- M4: 守护化(systemd)+ 断线重连 + 本地日志 + +## 风险与注意事项 +- 时钟漂移会导致 校验失败(建议 NTP) +- challenge 仅一次可用,重复使用会被拒绝 +- nonce 重放会被拒绝 +- 需要保证插件本地安全保存 identifier/challenge(短期) From c0ec70c64f78687e72d903559a467c884dbf6a3e Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 12:53:11 +0000 Subject: [PATCH 05/11] docs: fix plugin plan markdown content and protocol examples --- docs/openclaw-monitor-plugin-plan.md | 46 ++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/docs/openclaw-monitor-plugin-plan.md b/docs/openclaw-monitor-plugin-plan.md index 3bb9428..80c0f18 100644 --- a/docs/openclaw-monitor-plugin-plan.md +++ b/docs/openclaw-monitor-plugin-plan.md @@ -10,24 +10,50 @@ ## 握手流程 1. Admin 在 HarborForge 后台添加 server identifier 2. Admin 生成 challenge(10 分钟有效) -3. 插件请求 获取公钥 +3. 插件请求 `GET /monitor/public/server-public-key` 获取公钥 4. 插件构造 payload: - - - - - - (随机) - - (ISO8601) -5. 使用 RSA-OAEP(SHA256) 公钥加密,base64 后作为 发给 + - `identifier` + - `challenge_uuid` + - `nonce`(随机) + - `ts`(ISO8601) +5. 使用 RSA-OAEP(SHA256) 公钥加密,base64 后作为 `encrypted_payload` 发给 `WS /monitor/server/ws` 6. 握手成功后进入事件上报通道 ## 插件事件协议 ### server.hello - +```json +{ + "event": "server.hello", + "payload": { + "openclaw_version": "x.y.z", + "agents": [{"id": "a1", "name": "agent-1", "status": "idle"}] + } +} +``` ### server.metrics(每 5 分钟) - +```json +{ + "event": "server.metrics", + "payload": { + "cpu_pct": 21.3, + "mem_pct": 42.1, + "disk_pct": 55.9, + "swap_pct": 0.0, + "agents": [{"id": "a1", "name": "agent-1", "status": "busy"}] + } +} +``` ### agent.status_changed(可选) - +```json +{ + "event": "agent.status_changed", + "payload": { + "agents": [{"id": "a1", "name": "agent-1", "status": "focus"}] + } +} +``` ## 实施里程碑 - M1: Node/Python CLI 插件最小握手联通 @@ -36,7 +62,7 @@ - M4: 守护化(systemd)+ 断线重连 + 本地日志 ## 风险与注意事项 -- 时钟漂移会导致 校验失败(建议 NTP) +- 时钟漂移会导致 `ts` 校验失败(建议 NTP) - challenge 仅一次可用,重复使用会被拒绝 - nonce 重放会被拒绝 - 需要保证插件本地安全保存 identifier/challenge(短期) From 5b8f84d87d0782678cb94d2930c01f0c9a7f7310 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 13:08:58 +0000 Subject: [PATCH 06/11] feat: add provider usage adapters for openai and placeholders for others --- app/services/monitoring.py | 70 ++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/app/services/monitoring.py b/app/services/monitoring.py index 9ea73e5..9aeb4b3 100644 --- a/app/services/monitoring.py +++ b/app/services/monitoring.py @@ -1,6 +1,6 @@ import json from datetime import datetime, timedelta, timezone -from typing import Any, Dict +from typing import Any, Dict, Tuple import requests from sqlalchemy.orm import Session @@ -66,21 +66,69 @@ def test_provider_connection(provider: str, credential: str): return False, str(e) +def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + # Uses legacy billing endpoints; may be disabled in some orgs. + headers = _provider_headers('openai', credential) + today = _now().date() + start = (today - timedelta(days=7)).isoformat() + end = today.isoformat() + usage_url = f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}' + sub_url = 'https://api.openai.com/v1/dashboard/billing/subscription' + u = requests.get(usage_url, headers=headers, timeout=12) + s = requests.get(sub_url, headers=headers, timeout=12) + if u.status_code != 200 or s.status_code != 200: + return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'} + usage = u.json() + sub = s.json() + total_usage = usage.get('total_usage') + hard_limit = sub.get('hard_limit_usd') + reset_at = sub.get('billing_cycle_anchor') + if reset_at: + reset_at = datetime.fromtimestamp(reset_at, tz=timezone.utc).isoformat() + usage_pct = None + if total_usage is not None and hard_limit: + usage_pct = round(total_usage / hard_limit * 100, 2) + return 'ok', { + 'window_label': '7d', + 'used': total_usage, + 'limit': hard_limit, + 'usage_pct': usage_pct, + 'reset_at': reset_at, + 'raw': {'usage': usage, 'subscription': sub}, + } + + +def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + # Anthropic usage endpoints are enterprise-specific; keep placeholder. + # We return accepted status; detail will be filled once API confirmed. + return 'unsupported', {'error': 'anthropic usage API not configured'} + + def refresh_provider_usage_once(db: Session): accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all() now = _now() for a in accounts: - ok, msg = test_provider_connection(a.provider, a.credential) + status = 'pending' + payload: Dict[str, Any] = {} + if a.provider == 'openai': + status, payload = _openai_usage(a.credential) + elif a.provider == 'anthropic': + status, payload = _anthropic_usage(a.credential) + else: + ok, msg = test_provider_connection(a.provider, a.credential) + status = 'ok' if ok else 'error' + payload = {'error': None if ok else msg} + snap = ProviderUsageSnapshot( account_id=a.id, - window_label='provider-default', - used=None, - limit=None, - usage_pct=None, - reset_at=None, - status='ok' if ok else 'error', - error=None if ok else msg, - raw_payload=json.dumps({'message': msg}, ensure_ascii=False), + window_label=payload.get('window_label'), + used=payload.get('used'), + limit=payload.get('limit'), + usage_pct=payload.get('usage_pct'), + reset_at=payload.get('reset_at'), + status=status, + error=payload.get('error'), + raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False), fetched_at=now, ) db.add(snap) @@ -100,7 +148,7 @@ def get_provider_usage_view(db: Session): 'usage_pct': snap.usage_pct if snap else None, 'used': snap.used if snap else None, 'limit': snap.limit if snap else None, - 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, + 'reset_at': snap.reset_at if snap else None, 'status': snap.status if snap else 'pending', 'error': snap.error if snap else None, 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, From ff4baf61132b7e833c7322b515a3fe8c5b4f0da5 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 13:13:07 +0000 Subject: [PATCH 07/11] feat: support provider usage via configurable JSON credentials --- app/services/monitoring.py | 91 +++++++++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 16 deletions(-) diff --git a/app/services/monitoring.py b/app/services/monitoring.py index 9aeb4b3..07055a4 100644 --- a/app/services/monitoring.py +++ b/app/services/monitoring.py @@ -15,6 +15,16 @@ def _now(): return datetime.now(timezone.utc) +def _parse_credential(raw: str) -> Dict[str, Any]: + raw = (raw or '').strip() + if raw.startswith('{'): + try: + return json.loads(raw) + except Exception: + return {'api_key': raw} + return {'api_key': raw} + + def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): key = 'issue_stats_24h' now = _now() @@ -41,25 +51,37 @@ def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): return data -def _provider_headers(provider: str, credential: str): +def _provider_headers(provider: str, credential: str, extra: Dict[str, Any] | None = None): + extra = extra or {} + if extra.get('auth_header'): + val = extra.get('auth_value') + if not val: + scheme = extra.get('auth_scheme') + val = f"{scheme} {credential}" if scheme else credential + return {extra['auth_header']: val} if provider == 'openai': return {'Authorization': f'Bearer {credential}'} if provider == 'anthropic': return {'x-api-key': credential, 'anthropic-version': '2023-06-01'} - return None + return {} def test_provider_connection(provider: str, credential: str): provider = provider.lower() + info = _parse_credential(credential) + key = info.get('api_key') or credential try: if provider == 'openai': - r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) + r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) return r.status_code == 200, f'status={r.status_code}' if provider == 'anthropic': - r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, credential), timeout=12) + r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) return r.status_code == 200, f'status={r.status_code}' + usage_url = info.get('usage_url') or info.get('test_url') + if usage_url: + r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) + return r.status_code < 500, f'status={r.status_code}' if provider in {'minimax', 'kimi', 'qwen'}: - # Endpoints/usage API vary by deployment; keep as accepted-but-unverified for now. return True, 'accepted (connectivity check pending provider-specific adapter)' return False, 'unsupported provider' except Exception as e: @@ -67,13 +89,14 @@ def test_provider_connection(provider: str, credential: str): def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: - # Uses legacy billing endpoints; may be disabled in some orgs. - headers = _provider_headers('openai', credential) + info = _parse_credential(credential) + key = info.get('api_key') or credential + headers = _provider_headers('openai', key, info) today = _now().date() start = (today - timedelta(days=7)).isoformat() end = today.isoformat() - usage_url = f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}' - sub_url = 'https://api.openai.com/v1/dashboard/billing/subscription' + usage_url = info.get('usage_url') or f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}' + sub_url = info.get('subscription_url') or 'https://api.openai.com/v1/dashboard/billing/subscription' u = requests.get(usage_url, headers=headers, timeout=12) s = requests.get(sub_url, headers=headers, timeout=12) if u.status_code != 200 or s.status_code != 200: @@ -82,9 +105,8 @@ def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: sub = s.json() total_usage = usage.get('total_usage') hard_limit = sub.get('hard_limit_usd') - reset_at = sub.get('billing_cycle_anchor') - if reset_at: - reset_at = datetime.fromtimestamp(reset_at, tz=timezone.utc).isoformat() + reset_at_ts = sub.get('billing_cycle_anchor') + reset_at = datetime.fromtimestamp(reset_at_ts, tz=timezone.utc) if reset_at_ts else None usage_pct = None if total_usage is not None and hard_limit: usage_pct = round(total_usage / hard_limit * 100, 2) @@ -99,9 +121,44 @@ def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]: def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]: - # Anthropic usage endpoints are enterprise-specific; keep placeholder. - # We return accepted status; detail will be filled once API confirmed. - return 'unsupported', {'error': 'anthropic usage API not configured'} + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') + if not usage_url: + return 'unsupported', {'error': 'anthropic usage API not configured'} + r = requests.get(usage_url, headers=_provider_headers('anthropic', key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + # Try to normalize + return 'ok', { + 'window_label': payload.get('window') or payload.get('window_label'), + 'used': payload.get('used'), + 'limit': payload.get('limit'), + 'usage_pct': payload.get('usage_pct'), + 'reset_at': None, + 'raw': payload, + } + + +def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') + if not usage_url: + return 'unsupported', {'error': f'{provider} usage API not configured'} + r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + return 'ok', { + 'window_label': payload.get('window') or payload.get('window_label'), + 'used': payload.get('used'), + 'limit': payload.get('limit'), + 'usage_pct': payload.get('usage_pct'), + 'reset_at': None, + 'raw': payload, + } def refresh_provider_usage_once(db: Session): @@ -114,6 +171,8 @@ def refresh_provider_usage_once(db: Session): status, payload = _openai_usage(a.credential) elif a.provider == 'anthropic': status, payload = _anthropic_usage(a.credential) + elif a.provider in {'minimax', 'kimi', 'qwen'}: + status, payload = _generic_usage(a.provider, a.credential) else: ok, msg = test_provider_connection(a.provider, a.credential) status = 'ok' if ok else 'error' @@ -148,7 +207,7 @@ def get_provider_usage_view(db: Session): 'usage_pct': snap.usage_pct if snap else None, 'used': snap.used if snap else None, 'limit': snap.limit if snap else None, - 'reset_at': snap.reset_at if snap else None, + 'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None, 'status': snap.status if snap else 'pending', 'error': snap.error if snap else None, 'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None, From d5402f3a70fe86db320a1d4f5568082ff96c9b91 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 13:13:23 +0000 Subject: [PATCH 08/11] docs: add provider credential format guidance --- docs/monitor-provider-credentials.md | 55 ++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 docs/monitor-provider-credentials.md diff --git a/docs/monitor-provider-credentials.md b/docs/monitor-provider-credentials.md new file mode 100644 index 0000000..0eb4c3e --- /dev/null +++ b/docs/monitor-provider-credentials.md @@ -0,0 +1,55 @@ +# Provider 账号凭证格式(Monitor) + +默认情况下,`credential` 可以直接填写 API Key 字符串。 +如果需要配置自定义 usage 端点,请使用 JSON 字符串。 + +## 基础格式 +```json +{ + "api_key": "sk-...", + "usage_url": "https://.../usage", + "auth_header": "Authorization", + "auth_scheme": "Bearer" +} +``` + +### 字段说明 +- `api_key`: API key(必填) +- `usage_url`: 统计用量的 GET 端点(可选,minimax/kimi/qwen 推荐填写) +- `auth_header`: 自定义鉴权头名(可选) +- `auth_scheme`: 鉴权 scheme(如 `Bearer`),会拼成 `Bearer ` +- `auth_value`: 直接指定头值(优先级高于 scheme) + +## OpenAI +默认使用 OpenAI 官方 billing endpoints(7天窗口): +- `https://api.openai.com/v1/dashboard/billing/usage` +- `https://api.openai.com/v1/dashboard/billing/subscription` + +如需自定义可使用 JSON: +```json +{ + "api_key": "sk-...", + "usage_url": "https://api.openai.com/v1/dashboard/billing/usage?start_date=YYYY-MM-DD&end_date=YYYY-MM-DD", + "subscription_url": "https://api.openai.com/v1/dashboard/billing/subscription" +} +``` + +## Anthropic +官方 usage API 需要自行提供 `usage_url`(不同组织可能不同): +```json +{ + "api_key": "ak-...", + "usage_url": "https://api.anthropic.com/.../usage" +} +``` + +## Minimax / Kimi / Qwen +目前需要你提供 `usage_url`(具体端点取决于部署/账号): +```json +{ + "api_key": "...", + "usage_url": "https://.../usage", + "auth_header": "Authorization", + "auth_scheme": "Bearer" +} +``` From c81654739c82f5af422daac464b539171b1c586b Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 17:05:18 +0000 Subject: [PATCH 09/11] feat: add kimi/minimax usage adapters and update provider docs --- app/services/monitoring.py | 102 ++++++++++++++++++++++----- docs/monitor-provider-credentials.md | 8 +++ 2 files changed, 92 insertions(+), 18 deletions(-) diff --git a/app/services/monitoring.py b/app/services/monitoring.py index 07055a4..df053fa 100644 --- a/app/services/monitoring.py +++ b/app/services/monitoring.py @@ -25,6 +25,51 @@ def _parse_credential(raw: str) -> Dict[str, Any]: return {'api_key': raw} +def _parse_reset_at(value) -> datetime | None: + if not value: + return None + if isinstance(value, datetime): + return value + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value, tz=timezone.utc) + if isinstance(value, str): + try: + return datetime.fromisoformat(value.replace('Z', '+00:00')) + except Exception: + return None + return None + + +def _normalize_usage_payload(payload: Dict[str, Any]) -> Dict[str, Any]: + used = payload.get('used') or payload.get('usage') or payload.get('consumed') or payload.get('total_usage') + limit = payload.get('limit') or payload.get('quota') or payload.get('hard_limit') or payload.get('total') + remaining = payload.get('remain') or payload.get('remaining') or payload.get('left') + usage_pct = payload.get('usage_pct') or payload.get('percent') or payload.get('usage_percent') + window_label = payload.get('window') or payload.get('window_label') + reset_at = payload.get('reset_at') or payload.get('reset_time') or payload.get('reset') + + if used is None and remaining is not None and limit is not None: + try: + used = float(limit) - float(remaining) + except Exception: + pass + + if usage_pct is None and used is not None and limit: + try: + usage_pct = round(float(used) / float(limit) * 100, 2) + except Exception: + pass + + return { + 'window_label': window_label, + 'used': used, + 'limit': limit, + 'usage_pct': usage_pct, + 'reset_at': _parse_reset_at(reset_at), + 'raw': payload, + } + + def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800): key = 'issue_stats_24h' now = _now() @@ -78,6 +123,8 @@ def test_provider_connection(provider: str, credential: str): r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12) return r.status_code == 200, f'status={r.status_code}' usage_url = info.get('usage_url') or info.get('test_url') + if provider == 'kimi' and not usage_url: + usage_url = 'https://www.kimi.com/api/user/usage' if usage_url: r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12) return r.status_code < 500, f'status={r.status_code}' @@ -130,15 +177,37 @@ def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]: if r.status_code != 200: return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} payload = r.json() - # Try to normalize - return 'ok', { - 'window_label': payload.get('window') or payload.get('window_label'), - 'used': payload.get('used'), - 'limit': payload.get('limit'), - 'usage_pct': payload.get('usage_pct'), - 'reset_at': None, - 'raw': payload, - } + if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): + payload = payload['data'] + return 'ok', _normalize_usage_payload(payload) + + +def _kimi_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') or 'https://www.kimi.com/api/user/usage' + r = requests.get(usage_url, headers=_provider_headers('kimi', key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): + payload = payload['data'] + return 'ok', _normalize_usage_payload(payload) + + +def _minimax_usage(credential: str) -> Tuple[str, Dict[str, Any]]: + info = _parse_credential(credential) + key = info.get('api_key') or credential + usage_url = info.get('usage_url') + if not usage_url: + return 'unsupported', {'error': 'minimax usage API not configured'} + r = requests.get(usage_url, headers=_provider_headers('minimax', key, info), timeout=12) + if r.status_code != 200: + return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} + payload = r.json() + if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict): + payload = payload['data'] + return 'ok', _normalize_usage_payload(payload) def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]: @@ -151,14 +220,7 @@ def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]] if r.status_code != 200: return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text} payload = r.json() - return 'ok', { - 'window_label': payload.get('window') or payload.get('window_label'), - 'used': payload.get('used'), - 'limit': payload.get('limit'), - 'usage_pct': payload.get('usage_pct'), - 'reset_at': None, - 'raw': payload, - } + return 'ok', _normalize_usage_payload(payload) def refresh_provider_usage_once(db: Session): @@ -171,7 +233,11 @@ def refresh_provider_usage_once(db: Session): status, payload = _openai_usage(a.credential) elif a.provider == 'anthropic': status, payload = _anthropic_usage(a.credential) - elif a.provider in {'minimax', 'kimi', 'qwen'}: + elif a.provider == 'kimi': + status, payload = _kimi_usage(a.credential) + elif a.provider == 'minimax': + status, payload = _minimax_usage(a.credential) + elif a.provider == 'qwen': status, payload = _generic_usage(a.provider, a.credential) else: ok, msg = test_provider_connection(a.provider, a.credential) diff --git a/docs/monitor-provider-credentials.md b/docs/monitor-provider-credentials.md index 0eb4c3e..dad5244 100644 --- a/docs/monitor-provider-credentials.md +++ b/docs/monitor-provider-credentials.md @@ -53,3 +53,11 @@ "auth_scheme": "Bearer" } ``` + +## Kimi +推荐 usage_url: https://www.kimi.com/api/user/usage +Authorization: Bearer + +## Minimax +推荐 usage_url: https://platform.minimax.io/v1/api/openplatform/coding_plan/remains?GroupId=YOUR_GROUP_ID +Authorization: Bearer From 863c79ef3ef1df63f7ed02e41dd50454a14b6711 Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 17:15:53 +0000 Subject: [PATCH 10/11] docs: remove GroupId query param from minimax example --- docs/monitor-provider-credentials.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitor-provider-credentials.md b/docs/monitor-provider-credentials.md index dad5244..a9031bc 100644 --- a/docs/monitor-provider-credentials.md +++ b/docs/monitor-provider-credentials.md @@ -59,5 +59,5 @@ Authorization: Bearer ## Minimax -推荐 usage_url: https://platform.minimax.io/v1/api/openplatform/coding_plan/remains?GroupId=YOUR_GROUP_ID +推荐 usage_url: https://platform.minimax.io/v1/api/openplatform/coding_plan/remains Authorization: Bearer From 5f47a177948eb29c7a59076c842307a3c9b5c3af Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 11 Mar 2026 17:29:17 +0000 Subject: [PATCH 11/11] fix: add requests dependency for provider usage adapters --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 9db19f8..85208cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ python-multipart==0.0.6 alembic==1.13.1 python-dotenv==1.0.0 httpx==0.27.0 +requests==2.31.0