feat/public-monitor-and-agent-telemetry #4
287
app/api/routers/monitor.py
Normal file
287
app/api/routers/monitor.py
Normal file
@@ -0,0 +1,287 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import json
|
||||
import uuid
|
||||
from typing import List, Dict
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.core.config import get_db, SessionLocal
|
||||
from app.api.deps import get_current_user_or_apikey
|
||||
from app.models import models
|
||||
from app.models.monitor import (
|
||||
ProviderAccount,
|
||||
MonitoredServer,
|
||||
ServerState,
|
||||
ServerChallenge,
|
||||
ServerHandshakeNonce,
|
||||
)
|
||||
from app.services.monitoring import (
|
||||
get_issue_stats_cached,
|
||||
get_provider_usage_view,
|
||||
get_server_states_view,
|
||||
test_provider_connection,
|
||||
)
|
||||
from app.services.crypto_box import get_public_key_info, decrypt_payload_b64, ts_within
|
||||
|
||||
router = APIRouter(prefix='/monitor', tags=['Monitor'])
|
||||
SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'}
|
||||
ACTIVE_WS: Dict[int, WebSocket] = {}
|
||||
|
||||
|
||||
class ProviderAccountCreate(BaseModel):
|
||||
provider: str
|
||||
label: str
|
||||
credential: str
|
||||
|
||||
|
||||
class ProviderTestRequest(BaseModel):
|
||||
provider: str
|
||||
credential: str
|
||||
|
||||
|
||||
class MonitoredServerCreate(BaseModel):
|
||||
identifier: str
|
||||
display_name: str | None = None
|
||||
|
||||
|
||||
class ChallengeResponse(BaseModel):
|
||||
identifier: str
|
||||
challenge_uuid: str
|
||||
expires_at: str
|
||||
|
||||
|
||||
def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)):
|
||||
if not current_user.is_admin:
|
||||
raise HTTPException(status_code=403, detail='Admin required')
|
||||
return current_user
|
||||
|
||||
|
||||
@router.get('/public/server-public-key')
|
||||
def monitor_public_key():
|
||||
return get_public_key_info()
|
||||
|
||||
|
||||
@router.get('/public/overview')
|
||||
def public_overview(db: Session = Depends(get_db)):
|
||||
return {
|
||||
'issues': get_issue_stats_cached(db, ttl_seconds=1800),
|
||||
'providers': get_provider_usage_view(db),
|
||||
'servers': get_server_states_view(db, offline_after_minutes=7),
|
||||
'generated_at': datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
@router.get('/admin/providers/accounts')
|
||||
def list_provider_accounts(db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
|
||||
accounts = db.query(ProviderAccount).order_by(ProviderAccount.created_at.desc()).all()
|
||||
return [
|
||||
{
|
||||
'id': a.id,
|
||||
'provider': a.provider,
|
||||
'label': a.label,
|
||||
'is_enabled': a.is_enabled,
|
||||
'created_at': a.created_at,
|
||||
'credential_masked': '***' + (a.credential[-4:] if a.credential else ''),
|
||||
}
|
||||
for a in accounts
|
||||
]
|
||||
|
||||
|
||||
@router.post('/admin/providers/accounts', status_code=status.HTTP_201_CREATED)
|
||||
def create_provider_account(payload: ProviderAccountCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)):
|
||||
provider = payload.provider.lower().strip()
|
||||
if provider not in SUPPORTED_PROVIDERS:
|
||||
raise HTTPException(status_code=400, detail=f'Unsupported provider: {provider}')
|
||||
obj = ProviderAccount(
|
||||
provider=provider,
|
||||
label=payload.label.strip(),
|
||||
credential=payload.credential.strip(),
|
||||
is_enabled=True,
|
||||
created_by=user.id,
|
||||
)
|
||||
db.add(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return {'id': obj.id, 'provider': obj.provider, 'label': obj.label, 'is_enabled': obj.is_enabled}
|
||||
|
||||
|
||||
@router.post('/admin/providers/test')
|
||||
def test_provider(payload: ProviderTestRequest, _: models.User = Depends(require_admin)):
|
||||
ok, message = test_provider_connection(payload.provider.lower().strip(), payload.credential.strip())
|
||||
return {'ok': ok, 'message': message}
|
||||
|
||||
|
||||
@router.delete('/admin/providers/accounts/{account_id}', status_code=status.HTTP_204_NO_CONTENT)
|
||||
def delete_provider_account(account_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
|
||||
obj = db.query(ProviderAccount).filter(ProviderAccount.id == account_id).first()
|
||||
if not obj:
|
||||
raise HTTPException(status_code=404, detail='Provider account not found')
|
||||
db.delete(obj)
|
||||
db.commit()
|
||||
return None
|
||||
|
||||
|
||||
@router.get('/admin/servers')
|
||||
def list_servers(db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
|
||||
return get_server_states_view(db, offline_after_minutes=7)
|
||||
|
||||
|
||||
@router.post('/admin/servers', status_code=status.HTTP_201_CREATED)
|
||||
def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)):
|
||||
identifier = payload.identifier.strip()
|
||||
if not identifier:
|
||||
raise HTTPException(status_code=400, detail='identifier required')
|
||||
exists = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier).first()
|
||||
if exists:
|
||||
raise HTTPException(status_code=400, detail='identifier already exists')
|
||||
obj = MonitoredServer(identifier=identifier, display_name=payload.display_name, is_enabled=True, created_by=user.id)
|
||||
db.add(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled}
|
||||
|
||||
|
||||
@router.post('/admin/servers/{server_id}/challenge', response_model=ChallengeResponse)
|
||||
def issue_server_challenge(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
|
||||
server = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first()
|
||||
if not server:
|
||||
raise HTTPException(status_code=404, detail='Server not found')
|
||||
challenge_uuid = str(uuid.uuid4())
|
||||
expires_at = datetime.now(timezone.utc) + timedelta(minutes=10)
|
||||
ch = ServerChallenge(server_id=server_id, challenge_uuid=challenge_uuid, expires_at=expires_at)
|
||||
db.add(ch)
|
||||
db.commit()
|
||||
return ChallengeResponse(identifier=server.identifier, challenge_uuid=challenge_uuid, expires_at=expires_at.isoformat())
|
||||
|
||||
|
||||
@router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT)
|
||||
def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
|
||||
obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first()
|
||||
if not obj:
|
||||
raise HTTPException(status_code=404, detail='Server not found')
|
||||
state = db.query(ServerState).filter(ServerState.server_id == server_id).first()
|
||||
if state:
|
||||
db.delete(state)
|
||||
db.query(ServerChallenge).filter(ServerChallenge.server_id == server_id).delete()
|
||||
db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server_id).delete()
|
||||
db.delete(obj)
|
||||
db.commit()
|
||||
return None
|
||||
|
||||
|
||||
class ServerHeartbeat(BaseModel):
|
||||
identifier: str
|
||||
openclaw_version: str | None = None
|
||||
agents: List[dict] = []
|
||||
cpu_pct: float | None = None
|
||||
mem_pct: float | None = None
|
||||
disk_pct: float | None = None
|
||||
swap_pct: float | None = None
|
||||
|
||||
|
||||
@router.post('/server/heartbeat')
|
||||
def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)):
|
||||
server = db.query(MonitoredServer).filter(MonitoredServer.identifier == payload.identifier, MonitoredServer.is_enabled == True).first()
|
||||
if not server:
|
||||
raise HTTPException(status_code=404, detail='unknown server identifier')
|
||||
st = db.query(ServerState).filter(ServerState.server_id == server.id).first()
|
||||
if not st:
|
||||
st = ServerState(server_id=server.id)
|
||||
db.add(st)
|
||||
st.openclaw_version = payload.openclaw_version
|
||||
st.agents_json = json.dumps(payload.agents, ensure_ascii=False)
|
||||
st.cpu_pct = payload.cpu_pct
|
||||
st.mem_pct = payload.mem_pct
|
||||
st.disk_pct = payload.disk_pct
|
||||
st.swap_pct = payload.swap_pct
|
||||
st.last_seen_at = datetime.now(timezone.utc)
|
||||
db.commit()
|
||||
return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at}
|
||||
|
||||
|
||||
@router.websocket('/server/ws')
|
||||
async def server_ws(websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
db = SessionLocal()
|
||||
server_id = None
|
||||
try:
|
||||
hello = await websocket.receive_json()
|
||||
|
||||
encrypted_payload = (hello.get('encrypted_payload') or '').strip()
|
||||
if encrypted_payload:
|
||||
data = decrypt_payload_b64(encrypted_payload)
|
||||
identifier = (data.get('identifier') or '').strip()
|
||||
challenge_uuid = (data.get('challenge_uuid') or '').strip()
|
||||
nonce = (data.get('nonce') or '').strip()
|
||||
ts = data.get('ts')
|
||||
if not ts_within(ts, max_minutes=10):
|
||||
await websocket.close(code=4401)
|
||||
return
|
||||
else:
|
||||
# backward compatible mode
|
||||
identifier = (hello.get('identifier') or '').strip()
|
||||
challenge_uuid = (hello.get('challenge_uuid') or '').strip()
|
||||
nonce = (hello.get('nonce') or '').strip()
|
||||
|
||||
if not identifier or not challenge_uuid or not nonce:
|
||||
await websocket.close(code=4400)
|
||||
return
|
||||
|
||||
server = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier, MonitoredServer.is_enabled == True).first()
|
||||
if not server:
|
||||
await websocket.close(code=4404)
|
||||
return
|
||||
|
||||
ch = db.query(ServerChallenge).filter(ServerChallenge.challenge_uuid == challenge_uuid, ServerChallenge.server_id == server.id).first()
|
||||
if not ch or ch.used_at is not None or ch.expires_at < datetime.now(timezone.utc):
|
||||
await websocket.close(code=4401)
|
||||
return
|
||||
|
||||
nonce_used = db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server.id, ServerHandshakeNonce.nonce == nonce).first()
|
||||
if nonce_used:
|
||||
await websocket.close(code=4409)
|
||||
return
|
||||
|
||||
db.add(ServerHandshakeNonce(server_id=server.id, nonce=nonce))
|
||||
ch.used_at = datetime.now(timezone.utc)
|
||||
db.commit()
|
||||
|
||||
server_id = server.id
|
||||
ACTIVE_WS[server.id] = websocket
|
||||
await websocket.send_json({'ok': True, 'server_id': server.id, 'message': 'connected'})
|
||||
|
||||
while True:
|
||||
msg = await websocket.receive_json()
|
||||
event = msg.get('event')
|
||||
payload = msg.get('payload') or {}
|
||||
st = db.query(ServerState).filter(ServerState.server_id == server.id).first()
|
||||
if not st:
|
||||
st = ServerState(server_id=server.id)
|
||||
db.add(st)
|
||||
|
||||
if event == 'server.hello':
|
||||
st.openclaw_version = payload.get('openclaw_version')
|
||||
st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False)
|
||||
elif event in {'server.metrics', 'agent.status_changed'}:
|
||||
st.cpu_pct = payload.get('cpu_pct', st.cpu_pct)
|
||||
st.mem_pct = payload.get('mem_pct', st.mem_pct)
|
||||
st.disk_pct = payload.get('disk_pct', st.disk_pct)
|
||||
st.swap_pct = payload.get('swap_pct', st.swap_pct)
|
||||
if 'agents' in payload:
|
||||
st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False)
|
||||
|
||||
st.last_seen_at = datetime.now(timezone.utc)
|
||||
db.commit()
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except Exception:
|
||||
try:
|
||||
await websocket.close(code=1011)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
if server_id and ACTIVE_WS.get(server_id) is websocket:
|
||||
ACTIVE_WS.pop(server_id, None)
|
||||
db.close()
|
||||
22
app/main.py
22
app/main.py
@@ -34,6 +34,7 @@ from app.api.routers.users import router as users_router
|
||||
from app.api.routers.comments import router as comments_router
|
||||
from app.api.routers.webhooks import router as webhooks_router
|
||||
from app.api.routers.misc import router as misc_router
|
||||
from app.api.routers.monitor import router as monitor_router
|
||||
|
||||
app.include_router(auth_router)
|
||||
app.include_router(issues_router)
|
||||
@@ -42,12 +43,13 @@ app.include_router(users_router)
|
||||
app.include_router(comments_router)
|
||||
app.include_router(webhooks_router)
|
||||
app.include_router(misc_router)
|
||||
app.include_router(monitor_router)
|
||||
|
||||
# Run database migration on startup
|
||||
@app.on_event("startup")
|
||||
def startup():
|
||||
from app.core.config import Base, engine, SessionLocal
|
||||
from app.models import webhook, apikey, activity, milestone, notification, worklog
|
||||
from app.models import webhook, apikey, activity, milestone, notification, worklog, monitor
|
||||
Base.metadata.create_all(bind=engine)
|
||||
|
||||
# Initialize from AbstractWizard (admin user, default project, etc.)
|
||||
@@ -57,3 +59,21 @@ def startup():
|
||||
run_init(db)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
# Start lightweight monitor polling thread (every 10 minutes)
|
||||
import threading, time
|
||||
from app.services.monitoring import refresh_provider_usage_once
|
||||
|
||||
def _monitor_poll_loop():
|
||||
while True:
|
||||
db2 = SessionLocal()
|
||||
try:
|
||||
refresh_provider_usage_once(db2)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
db2.close()
|
||||
time.sleep(600)
|
||||
|
||||
t = threading.Thread(target=_monitor_poll_loop, daemon=True)
|
||||
t.start()
|
||||
|
||||
78
app/models/monitor.py
Normal file
78
app/models/monitor.py
Normal file
@@ -0,0 +1,78 @@
|
||||
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Float, ForeignKey
|
||||
from sqlalchemy.sql import func
|
||||
from app.core.config import Base
|
||||
|
||||
|
||||
class ProviderAccount(Base):
|
||||
__tablename__ = 'provider_accounts'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
provider = Column(String(32), nullable=False, index=True) # anthropic/openai/minimax/kimi/qwen
|
||||
label = Column(String(128), nullable=False)
|
||||
credential = Column(Text, nullable=False) # TODO: encrypt at rest
|
||||
is_enabled = Column(Boolean, default=True)
|
||||
created_by = Column(Integer, nullable=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
||||
|
||||
|
||||
class ProviderUsageSnapshot(Base):
|
||||
__tablename__ = 'provider_usage_snapshots'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
account_id = Column(Integer, ForeignKey('provider_accounts.id'), nullable=False, index=True)
|
||||
window_label = Column(String(32), nullable=True) # e.g. 1h / 7d
|
||||
used = Column(Float, nullable=True)
|
||||
limit = Column(Float, nullable=True)
|
||||
usage_pct = Column(Float, nullable=True)
|
||||
reset_at = Column(DateTime(timezone=True), nullable=True)
|
||||
status = Column(String(32), nullable=False, default='unknown') # ok/error/pending/unsupported
|
||||
error = Column(Text, nullable=True)
|
||||
raw_payload = Column(Text, nullable=True)
|
||||
fetched_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
|
||||
|
||||
|
||||
class MonitoredServer(Base):
|
||||
__tablename__ = 'monitored_servers'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
identifier = Column(String(128), nullable=False, unique=True)
|
||||
display_name = Column(String(128), nullable=True)
|
||||
is_enabled = Column(Boolean, default=True)
|
||||
created_by = Column(Integer, nullable=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
|
||||
|
||||
class ServerState(Base):
|
||||
__tablename__ = 'server_states'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, unique=True)
|
||||
openclaw_version = Column(String(64), nullable=True)
|
||||
agents_json = Column(Text, nullable=True) # json list
|
||||
cpu_pct = Column(Float, nullable=True)
|
||||
mem_pct = Column(Float, nullable=True)
|
||||
disk_pct = Column(Float, nullable=True)
|
||||
swap_pct = Column(Float, nullable=True)
|
||||
last_seen_at = Column(DateTime(timezone=True), nullable=True)
|
||||
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
|
||||
|
||||
|
||||
class ServerChallenge(Base):
|
||||
__tablename__ = 'server_challenges'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True)
|
||||
challenge_uuid = Column(String(64), nullable=False, unique=True, index=True)
|
||||
expires_at = Column(DateTime(timezone=True), nullable=False)
|
||||
used_at = Column(DateTime(timezone=True), nullable=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
|
||||
|
||||
class ServerHandshakeNonce(Base):
|
||||
__tablename__ = 'server_handshake_nonces'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True)
|
||||
nonce = Column(String(128), nullable=False, index=True)
|
||||
created_at = Column(DateTime(timezone=True), server_default=func.now())
|
||||
63
app/services/crypto_box.py
Normal file
63
app/services/crypto_box.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
from cryptography.hazmat.primitives import serialization, hashes
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||||
|
||||
KEY_DIR = Path(os.getenv('MONITOR_KEY_DIR', '/config/monitor_keys'))
|
||||
PRIV_PATH = KEY_DIR / 'monitor_private.pem'
|
||||
PUB_PATH = KEY_DIR / 'monitor_public.pem'
|
||||
|
||||
|
||||
def ensure_keypair() -> None:
|
||||
KEY_DIR.mkdir(parents=True, exist_ok=True)
|
||||
if PRIV_PATH.exists() and PUB_PATH.exists():
|
||||
return
|
||||
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
||||
private_pem = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
)
|
||||
public_pem = private_key.public_key().public_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
PRIV_PATH.write_bytes(private_pem)
|
||||
PUB_PATH.write_bytes(public_pem)
|
||||
|
||||
|
||||
def get_public_key_info() -> Dict[str, str]:
|
||||
ensure_keypair()
|
||||
pem = PUB_PATH.read_text()
|
||||
kid = hashlib.sha256(pem.encode()).hexdigest()[:16]
|
||||
return {'public_key_pem': pem, 'key_id': kid}
|
||||
|
||||
|
||||
def decrypt_payload_b64(ciphertext_b64: str) -> Dict[str, Any]:
|
||||
ensure_keypair()
|
||||
private_key = serialization.load_pem_private_key(PRIV_PATH.read_bytes(), password=None)
|
||||
plaintext = private_key.decrypt(
|
||||
base64.b64decode(ciphertext_b64),
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None,
|
||||
),
|
||||
)
|
||||
obj = json.loads(plaintext.decode())
|
||||
return obj
|
||||
|
||||
|
||||
def ts_within(ts_iso: str, max_minutes: int = 10) -> bool:
|
||||
try:
|
||||
ts = datetime.fromisoformat(ts_iso.replace('Z', '+00:00'))
|
||||
except Exception:
|
||||
return False
|
||||
now = datetime.now(timezone.utc)
|
||||
return abs((now - ts).total_seconds()) <= max_minutes * 60
|
||||
305
app/services/monitoring.py
Normal file
305
app/services/monitoring.py
Normal file
@@ -0,0 +1,305 @@
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
import requests
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.models import Issue
|
||||
from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState
|
||||
|
||||
_CACHE: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
|
||||
def _now():
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
def _parse_credential(raw: str) -> Dict[str, Any]:
|
||||
raw = (raw or '').strip()
|
||||
if raw.startswith('{'):
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except Exception:
|
||||
return {'api_key': raw}
|
||||
return {'api_key': raw}
|
||||
|
||||
|
||||
def _parse_reset_at(value) -> datetime | None:
|
||||
if not value:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
return value
|
||||
if isinstance(value, (int, float)):
|
||||
return datetime.fromtimestamp(value, tz=timezone.utc)
|
||||
if isinstance(value, str):
|
||||
try:
|
||||
return datetime.fromisoformat(value.replace('Z', '+00:00'))
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _normalize_usage_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
|
||||
used = payload.get('used') or payload.get('usage') or payload.get('consumed') or payload.get('total_usage')
|
||||
limit = payload.get('limit') or payload.get('quota') or payload.get('hard_limit') or payload.get('total')
|
||||
remaining = payload.get('remain') or payload.get('remaining') or payload.get('left')
|
||||
usage_pct = payload.get('usage_pct') or payload.get('percent') or payload.get('usage_percent')
|
||||
window_label = payload.get('window') or payload.get('window_label')
|
||||
reset_at = payload.get('reset_at') or payload.get('reset_time') or payload.get('reset')
|
||||
|
||||
if used is None and remaining is not None and limit is not None:
|
||||
try:
|
||||
used = float(limit) - float(remaining)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if usage_pct is None and used is not None and limit:
|
||||
try:
|
||||
usage_pct = round(float(used) / float(limit) * 100, 2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
'window_label': window_label,
|
||||
'used': used,
|
||||
'limit': limit,
|
||||
'usage_pct': usage_pct,
|
||||
'reset_at': _parse_reset_at(reset_at),
|
||||
'raw': payload,
|
||||
}
|
||||
|
||||
|
||||
def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800):
|
||||
key = 'issue_stats_24h'
|
||||
now = _now()
|
||||
hit = _CACHE.get(key)
|
||||
if hit and (now - hit['at']).total_seconds() < ttl_seconds:
|
||||
return hit['data']
|
||||
|
||||
since = now - timedelta(hours=24)
|
||||
total = db.query(Issue).count()
|
||||
new_24h = db.query(Issue).filter(Issue.created_at >= since).count()
|
||||
processed_24h = db.query(Issue).filter(
|
||||
Issue.updated_at != None,
|
||||
Issue.updated_at >= since,
|
||||
Issue.status.in_(['resolved', 'closed'])
|
||||
).count()
|
||||
data = {
|
||||
'total_issues': total,
|
||||
'new_issues_24h': new_24h,
|
||||
'processed_issues_24h': processed_24h,
|
||||
'computed_at': now.isoformat(),
|
||||
'cache_ttl_seconds': ttl_seconds,
|
||||
}
|
||||
_CACHE[key] = {'at': now, 'data': data}
|
||||
return data
|
||||
|
||||
|
||||
def _provider_headers(provider: str, credential: str, extra: Dict[str, Any] | None = None):
|
||||
extra = extra or {}
|
||||
if extra.get('auth_header'):
|
||||
val = extra.get('auth_value')
|
||||
if not val:
|
||||
scheme = extra.get('auth_scheme')
|
||||
val = f"{scheme} {credential}" if scheme else credential
|
||||
return {extra['auth_header']: val}
|
||||
if provider == 'openai':
|
||||
return {'Authorization': f'Bearer {credential}'}
|
||||
if provider == 'anthropic':
|
||||
return {'x-api-key': credential, 'anthropic-version': '2023-06-01'}
|
||||
return {}
|
||||
|
||||
|
||||
def test_provider_connection(provider: str, credential: str):
|
||||
provider = provider.lower()
|
||||
info = _parse_credential(credential)
|
||||
key = info.get('api_key') or credential
|
||||
try:
|
||||
if provider == 'openai':
|
||||
r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12)
|
||||
return r.status_code == 200, f'status={r.status_code}'
|
||||
if provider == 'anthropic':
|
||||
r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12)
|
||||
return r.status_code == 200, f'status={r.status_code}'
|
||||
usage_url = info.get('usage_url') or info.get('test_url')
|
||||
if provider == 'kimi' and not usage_url:
|
||||
usage_url = 'https://www.kimi.com/api/user/usage'
|
||||
if usage_url:
|
||||
r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12)
|
||||
return r.status_code < 500, f'status={r.status_code}'
|
||||
if provider in {'minimax', 'kimi', 'qwen'}:
|
||||
return True, 'accepted (connectivity check pending provider-specific adapter)'
|
||||
return False, 'unsupported provider'
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
|
||||
def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
|
||||
info = _parse_credential(credential)
|
||||
key = info.get('api_key') or credential
|
||||
headers = _provider_headers('openai', key, info)
|
||||
today = _now().date()
|
||||
start = (today - timedelta(days=7)).isoformat()
|
||||
end = today.isoformat()
|
||||
usage_url = info.get('usage_url') or f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}'
|
||||
sub_url = info.get('subscription_url') or 'https://api.openai.com/v1/dashboard/billing/subscription'
|
||||
u = requests.get(usage_url, headers=headers, timeout=12)
|
||||
s = requests.get(sub_url, headers=headers, timeout=12)
|
||||
if u.status_code != 200 or s.status_code != 200:
|
||||
return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'}
|
||||
usage = u.json()
|
||||
sub = s.json()
|
||||
total_usage = usage.get('total_usage')
|
||||
hard_limit = sub.get('hard_limit_usd')
|
||||
reset_at_ts = sub.get('billing_cycle_anchor')
|
||||
reset_at = datetime.fromtimestamp(reset_at_ts, tz=timezone.utc) if reset_at_ts else None
|
||||
usage_pct = None
|
||||
if total_usage is not None and hard_limit:
|
||||
usage_pct = round(total_usage / hard_limit * 100, 2)
|
||||
return 'ok', {
|
||||
'window_label': '7d',
|
||||
'used': total_usage,
|
||||
'limit': hard_limit,
|
||||
'usage_pct': usage_pct,
|
||||
'reset_at': reset_at,
|
||||
'raw': {'usage': usage, 'subscription': sub},
|
||||
}
|
||||
|
||||
|
||||
def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
|
||||
info = _parse_credential(credential)
|
||||
key = info.get('api_key') or credential
|
||||
usage_url = info.get('usage_url')
|
||||
if not usage_url:
|
||||
return 'unsupported', {'error': 'anthropic usage API not configured'}
|
||||
r = requests.get(usage_url, headers=_provider_headers('anthropic', key, info), timeout=12)
|
||||
if r.status_code != 200:
|
||||
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
|
||||
payload = r.json()
|
||||
if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict):
|
||||
payload = payload['data']
|
||||
return 'ok', _normalize_usage_payload(payload)
|
||||
|
||||
|
||||
def _kimi_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
|
||||
info = _parse_credential(credential)
|
||||
key = info.get('api_key') or credential
|
||||
usage_url = info.get('usage_url') or 'https://www.kimi.com/api/user/usage'
|
||||
r = requests.get(usage_url, headers=_provider_headers('kimi', key, info), timeout=12)
|
||||
if r.status_code != 200:
|
||||
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
|
||||
payload = r.json()
|
||||
if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict):
|
||||
payload = payload['data']
|
||||
return 'ok', _normalize_usage_payload(payload)
|
||||
|
||||
|
||||
def _minimax_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
|
||||
info = _parse_credential(credential)
|
||||
key = info.get('api_key') or credential
|
||||
usage_url = info.get('usage_url')
|
||||
if not usage_url:
|
||||
return 'unsupported', {'error': 'minimax usage API not configured'}
|
||||
r = requests.get(usage_url, headers=_provider_headers('minimax', key, info), timeout=12)
|
||||
if r.status_code != 200:
|
||||
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
|
||||
payload = r.json()
|
||||
if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict):
|
||||
payload = payload['data']
|
||||
return 'ok', _normalize_usage_payload(payload)
|
||||
|
||||
|
||||
def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]:
|
||||
info = _parse_credential(credential)
|
||||
key = info.get('api_key') or credential
|
||||
usage_url = info.get('usage_url')
|
||||
if not usage_url:
|
||||
return 'unsupported', {'error': f'{provider} usage API not configured'}
|
||||
r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12)
|
||||
if r.status_code != 200:
|
||||
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
|
||||
payload = r.json()
|
||||
return 'ok', _normalize_usage_payload(payload)
|
||||
|
||||
|
||||
def refresh_provider_usage_once(db: Session):
|
||||
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
|
||||
now = _now()
|
||||
for a in accounts:
|
||||
status = 'pending'
|
||||
payload: Dict[str, Any] = {}
|
||||
if a.provider == 'openai':
|
||||
status, payload = _openai_usage(a.credential)
|
||||
elif a.provider == 'anthropic':
|
||||
status, payload = _anthropic_usage(a.credential)
|
||||
elif a.provider == 'kimi':
|
||||
status, payload = _kimi_usage(a.credential)
|
||||
elif a.provider == 'minimax':
|
||||
status, payload = _minimax_usage(a.credential)
|
||||
elif a.provider == 'qwen':
|
||||
status, payload = _generic_usage(a.provider, a.credential)
|
||||
else:
|
||||
ok, msg = test_provider_connection(a.provider, a.credential)
|
||||
status = 'ok' if ok else 'error'
|
||||
payload = {'error': None if ok else msg}
|
||||
|
||||
snap = ProviderUsageSnapshot(
|
||||
account_id=a.id,
|
||||
window_label=payload.get('window_label'),
|
||||
used=payload.get('used'),
|
||||
limit=payload.get('limit'),
|
||||
usage_pct=payload.get('usage_pct'),
|
||||
reset_at=payload.get('reset_at'),
|
||||
status=status,
|
||||
error=payload.get('error'),
|
||||
raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False),
|
||||
fetched_at=now,
|
||||
)
|
||||
db.add(snap)
|
||||
db.commit()
|
||||
|
||||
|
||||
def get_provider_usage_view(db: Session):
|
||||
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
|
||||
rows = []
|
||||
for a in accounts:
|
||||
snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first()
|
||||
rows.append({
|
||||
'account_id': a.id,
|
||||
'provider': a.provider,
|
||||
'label': a.label,
|
||||
'window': snap.window_label if snap else None,
|
||||
'usage_pct': snap.usage_pct if snap else None,
|
||||
'used': snap.used if snap else None,
|
||||
'limit': snap.limit if snap else None,
|
||||
'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None,
|
||||
'status': snap.status if snap else 'pending',
|
||||
'error': snap.error if snap else None,
|
||||
'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None,
|
||||
})
|
||||
return rows
|
||||
|
||||
|
||||
def get_server_states_view(db: Session, offline_after_minutes: int = 7):
|
||||
now = _now()
|
||||
servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all()
|
||||
out = []
|
||||
for s in servers:
|
||||
st = db.query(ServerState).filter(ServerState.server_id == s.id).first()
|
||||
last_seen = st.last_seen_at if st else None
|
||||
online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60)
|
||||
out.append({
|
||||
'server_id': s.id,
|
||||
'identifier': s.identifier,
|
||||
'display_name': s.display_name or s.identifier,
|
||||
'online': online,
|
||||
'openclaw_version': st.openclaw_version if st else None,
|
||||
'cpu_pct': st.cpu_pct if st else None,
|
||||
'mem_pct': st.mem_pct if st else None,
|
||||
'disk_pct': st.disk_pct if st else None,
|
||||
'swap_pct': st.swap_pct if st else None,
|
||||
'agents': json.loads(st.agents_json) if st and st.agents_json else [],
|
||||
'last_seen_at': last_seen.isoformat() if last_seen else None,
|
||||
})
|
||||
return out
|
||||
63
docs/monitor-provider-credentials.md
Normal file
63
docs/monitor-provider-credentials.md
Normal file
@@ -0,0 +1,63 @@
|
||||
# Provider 账号凭证格式(Monitor)
|
||||
|
||||
默认情况下,`credential` 可以直接填写 API Key 字符串。
|
||||
如果需要配置自定义 usage 端点,请使用 JSON 字符串。
|
||||
|
||||
## 基础格式
|
||||
```json
|
||||
{
|
||||
"api_key": "sk-...",
|
||||
"usage_url": "https://.../usage",
|
||||
"auth_header": "Authorization",
|
||||
"auth_scheme": "Bearer"
|
||||
}
|
||||
```
|
||||
|
||||
### 字段说明
|
||||
- `api_key`: API key(必填)
|
||||
- `usage_url`: 统计用量的 GET 端点(可选,minimax/kimi/qwen 推荐填写)
|
||||
- `auth_header`: 自定义鉴权头名(可选)
|
||||
- `auth_scheme`: 鉴权 scheme(如 `Bearer`),会拼成 `Bearer <api_key>`
|
||||
- `auth_value`: 直接指定头值(优先级高于 scheme)
|
||||
|
||||
## OpenAI
|
||||
默认使用 OpenAI 官方 billing endpoints(7天窗口):
|
||||
- `https://api.openai.com/v1/dashboard/billing/usage`
|
||||
- `https://api.openai.com/v1/dashboard/billing/subscription`
|
||||
|
||||
如需自定义可使用 JSON:
|
||||
```json
|
||||
{
|
||||
"api_key": "sk-...",
|
||||
"usage_url": "https://api.openai.com/v1/dashboard/billing/usage?start_date=YYYY-MM-DD&end_date=YYYY-MM-DD",
|
||||
"subscription_url": "https://api.openai.com/v1/dashboard/billing/subscription"
|
||||
}
|
||||
```
|
||||
|
||||
## Anthropic
|
||||
官方 usage API 需要自行提供 `usage_url`(不同组织可能不同):
|
||||
```json
|
||||
{
|
||||
"api_key": "ak-...",
|
||||
"usage_url": "https://api.anthropic.com/.../usage"
|
||||
}
|
||||
```
|
||||
|
||||
## Minimax / Kimi / Qwen
|
||||
目前需要你提供 `usage_url`(具体端点取决于部署/账号):
|
||||
```json
|
||||
{
|
||||
"api_key": "...",
|
||||
"usage_url": "https://.../usage",
|
||||
"auth_header": "Authorization",
|
||||
"auth_scheme": "Bearer"
|
||||
}
|
||||
```
|
||||
|
||||
## Kimi
|
||||
推荐 usage_url: https://www.kimi.com/api/user/usage
|
||||
Authorization: Bearer <API_KEY>
|
||||
|
||||
## Minimax
|
||||
推荐 usage_url: https://platform.minimax.io/v1/api/openplatform/coding_plan/remains
|
||||
Authorization: Bearer <API_KEY>
|
||||
68
docs/openclaw-monitor-plugin-plan.md
Normal file
68
docs/openclaw-monitor-plugin-plan.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# OpenClaw Monitor Agent Plugin 开发计划(草案)
|
||||
|
||||
## 目标
|
||||
让被监测服务器通过 WebSocket 主动接入 HarborForge Backend,并持续上报:
|
||||
- OpenClaw 版本
|
||||
- agent 列表
|
||||
- 每 5 分钟主机指标(CPU/MEM/DISK/SWAP)
|
||||
- agent 状态变更事件
|
||||
|
||||
## 握手流程
|
||||
1. Admin 在 HarborForge 后台添加 server identifier
|
||||
2. Admin 生成 challenge(10 分钟有效)
|
||||
3. 插件请求 `GET /monitor/public/server-public-key` 获取公钥
|
||||
4. 插件构造 payload:
|
||||
- `identifier`
|
||||
- `challenge_uuid`
|
||||
- `nonce`(随机)
|
||||
- `ts`(ISO8601)
|
||||
5. 使用 RSA-OAEP(SHA256) 公钥加密,base64 后作为 `encrypted_payload` 发给 `WS /monitor/server/ws`
|
||||
6. 握手成功后进入事件上报通道
|
||||
|
||||
## 插件事件协议
|
||||
### server.hello
|
||||
```json
|
||||
{
|
||||
"event": "server.hello",
|
||||
"payload": {
|
||||
"openclaw_version": "x.y.z",
|
||||
"agents": [{"id": "a1", "name": "agent-1", "status": "idle"}]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### server.metrics(每 5 分钟)
|
||||
```json
|
||||
{
|
||||
"event": "server.metrics",
|
||||
"payload": {
|
||||
"cpu_pct": 21.3,
|
||||
"mem_pct": 42.1,
|
||||
"disk_pct": 55.9,
|
||||
"swap_pct": 0.0,
|
||||
"agents": [{"id": "a1", "name": "agent-1", "status": "busy"}]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### agent.status_changed(可选)
|
||||
```json
|
||||
{
|
||||
"event": "agent.status_changed",
|
||||
"payload": {
|
||||
"agents": [{"id": "a1", "name": "agent-1", "status": "focus"}]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 实施里程碑
|
||||
- M1: Node/Python CLI 插件最小握手联通
|
||||
- M2: 指标采集 + 周期上报
|
||||
- M3: agent 状态采集与变更事件
|
||||
- M4: 守护化(systemd)+ 断线重连 + 本地日志
|
||||
|
||||
## 风险与注意事项
|
||||
- 时钟漂移会导致 `ts` 校验失败(建议 NTP)
|
||||
- challenge 仅一次可用,重复使用会被拒绝
|
||||
- nonce 重放会被拒绝
|
||||
- 需要保证插件本地安全保存 identifier/challenge(短期)
|
||||
@@ -11,3 +11,4 @@ python-multipart==0.0.6
|
||||
alembic==1.13.1
|
||||
python-dotenv==1.0.0
|
||||
httpx==0.27.0
|
||||
requests==2.31.0
|
||||
|
||||
Reference in New Issue
Block a user