feat/public-monitor-and-agent-telemetry #4

Merged
hzhang merged 11 commits from feat/public-monitor-and-agent-telemetry into main 2026-03-11 22:15:25 +00:00
8 changed files with 886 additions and 1 deletions

287
app/api/routers/monitor.py Normal file
View File

@@ -0,0 +1,287 @@
from datetime import datetime, timedelta, timezone
import json
import uuid
from typing import List, Dict
from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db, SessionLocal
from app.api.deps import get_current_user_or_apikey
from app.models import models
from app.models.monitor import (
ProviderAccount,
MonitoredServer,
ServerState,
ServerChallenge,
ServerHandshakeNonce,
)
from app.services.monitoring import (
get_issue_stats_cached,
get_provider_usage_view,
get_server_states_view,
test_provider_connection,
)
from app.services.crypto_box import get_public_key_info, decrypt_payload_b64, ts_within
router = APIRouter(prefix='/monitor', tags=['Monitor'])
SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'}
ACTIVE_WS: Dict[int, WebSocket] = {}
class ProviderAccountCreate(BaseModel):
provider: str
label: str
credential: str
class ProviderTestRequest(BaseModel):
provider: str
credential: str
class MonitoredServerCreate(BaseModel):
identifier: str
display_name: str | None = None
class ChallengeResponse(BaseModel):
identifier: str
challenge_uuid: str
expires_at: str
def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail='Admin required')
return current_user
@router.get('/public/server-public-key')
def monitor_public_key():
return get_public_key_info()
@router.get('/public/overview')
def public_overview(db: Session = Depends(get_db)):
return {
'issues': get_issue_stats_cached(db, ttl_seconds=1800),
'providers': get_provider_usage_view(db),
'servers': get_server_states_view(db, offline_after_minutes=7),
'generated_at': datetime.now(timezone.utc).isoformat(),
}
@router.get('/admin/providers/accounts')
def list_provider_accounts(db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
accounts = db.query(ProviderAccount).order_by(ProviderAccount.created_at.desc()).all()
return [
{
'id': a.id,
'provider': a.provider,
'label': a.label,
'is_enabled': a.is_enabled,
'created_at': a.created_at,
'credential_masked': '***' + (a.credential[-4:] if a.credential else ''),
}
for a in accounts
]
@router.post('/admin/providers/accounts', status_code=status.HTTP_201_CREATED)
def create_provider_account(payload: ProviderAccountCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)):
provider = payload.provider.lower().strip()
if provider not in SUPPORTED_PROVIDERS:
raise HTTPException(status_code=400, detail=f'Unsupported provider: {provider}')
obj = ProviderAccount(
provider=provider,
label=payload.label.strip(),
credential=payload.credential.strip(),
is_enabled=True,
created_by=user.id,
)
db.add(obj)
db.commit()
db.refresh(obj)
return {'id': obj.id, 'provider': obj.provider, 'label': obj.label, 'is_enabled': obj.is_enabled}
@router.post('/admin/providers/test')
def test_provider(payload: ProviderTestRequest, _: models.User = Depends(require_admin)):
ok, message = test_provider_connection(payload.provider.lower().strip(), payload.credential.strip())
return {'ok': ok, 'message': message}
@router.delete('/admin/providers/accounts/{account_id}', status_code=status.HTTP_204_NO_CONTENT)
def delete_provider_account(account_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
obj = db.query(ProviderAccount).filter(ProviderAccount.id == account_id).first()
if not obj:
raise HTTPException(status_code=404, detail='Provider account not found')
db.delete(obj)
db.commit()
return None
@router.get('/admin/servers')
def list_servers(db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
return get_server_states_view(db, offline_after_minutes=7)
@router.post('/admin/servers', status_code=status.HTTP_201_CREATED)
def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)):
identifier = payload.identifier.strip()
if not identifier:
raise HTTPException(status_code=400, detail='identifier required')
exists = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier).first()
if exists:
raise HTTPException(status_code=400, detail='identifier already exists')
obj = MonitoredServer(identifier=identifier, display_name=payload.display_name, is_enabled=True, created_by=user.id)
db.add(obj)
db.commit()
db.refresh(obj)
return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled}
@router.post('/admin/servers/{server_id}/challenge', response_model=ChallengeResponse)
def issue_server_challenge(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
server = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail='Server not found')
challenge_uuid = str(uuid.uuid4())
expires_at = datetime.now(timezone.utc) + timedelta(minutes=10)
ch = ServerChallenge(server_id=server_id, challenge_uuid=challenge_uuid, expires_at=expires_at)
db.add(ch)
db.commit()
return ChallengeResponse(identifier=server.identifier, challenge_uuid=challenge_uuid, expires_at=expires_at.isoformat())
@router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT)
def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first()
if not obj:
raise HTTPException(status_code=404, detail='Server not found')
state = db.query(ServerState).filter(ServerState.server_id == server_id).first()
if state:
db.delete(state)
db.query(ServerChallenge).filter(ServerChallenge.server_id == server_id).delete()
db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server_id).delete()
db.delete(obj)
db.commit()
return None
class ServerHeartbeat(BaseModel):
identifier: str
openclaw_version: str | None = None
agents: List[dict] = []
cpu_pct: float | None = None
mem_pct: float | None = None
disk_pct: float | None = None
swap_pct: float | None = None
@router.post('/server/heartbeat')
def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)):
server = db.query(MonitoredServer).filter(MonitoredServer.identifier == payload.identifier, MonitoredServer.is_enabled == True).first()
if not server:
raise HTTPException(status_code=404, detail='unknown server identifier')
st = db.query(ServerState).filter(ServerState.server_id == server.id).first()
if not st:
st = ServerState(server_id=server.id)
db.add(st)
st.openclaw_version = payload.openclaw_version
st.agents_json = json.dumps(payload.agents, ensure_ascii=False)
st.cpu_pct = payload.cpu_pct
st.mem_pct = payload.mem_pct
st.disk_pct = payload.disk_pct
st.swap_pct = payload.swap_pct
st.last_seen_at = datetime.now(timezone.utc)
db.commit()
return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at}
@router.websocket('/server/ws')
async def server_ws(websocket: WebSocket):
await websocket.accept()
db = SessionLocal()
server_id = None
try:
hello = await websocket.receive_json()
encrypted_payload = (hello.get('encrypted_payload') or '').strip()
if encrypted_payload:
data = decrypt_payload_b64(encrypted_payload)
identifier = (data.get('identifier') or '').strip()
challenge_uuid = (data.get('challenge_uuid') or '').strip()
nonce = (data.get('nonce') or '').strip()
ts = data.get('ts')
if not ts_within(ts, max_minutes=10):
await websocket.close(code=4401)
return
else:
# backward compatible mode
identifier = (hello.get('identifier') or '').strip()
challenge_uuid = (hello.get('challenge_uuid') or '').strip()
nonce = (hello.get('nonce') or '').strip()
if not identifier or not challenge_uuid or not nonce:
await websocket.close(code=4400)
return
server = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier, MonitoredServer.is_enabled == True).first()
if not server:
await websocket.close(code=4404)
return
ch = db.query(ServerChallenge).filter(ServerChallenge.challenge_uuid == challenge_uuid, ServerChallenge.server_id == server.id).first()
if not ch or ch.used_at is not None or ch.expires_at < datetime.now(timezone.utc):
await websocket.close(code=4401)
return
nonce_used = db.query(ServerHandshakeNonce).filter(ServerHandshakeNonce.server_id == server.id, ServerHandshakeNonce.nonce == nonce).first()
if nonce_used:
await websocket.close(code=4409)
return
db.add(ServerHandshakeNonce(server_id=server.id, nonce=nonce))
ch.used_at = datetime.now(timezone.utc)
db.commit()
server_id = server.id
ACTIVE_WS[server.id] = websocket
await websocket.send_json({'ok': True, 'server_id': server.id, 'message': 'connected'})
while True:
msg = await websocket.receive_json()
event = msg.get('event')
payload = msg.get('payload') or {}
st = db.query(ServerState).filter(ServerState.server_id == server.id).first()
if not st:
st = ServerState(server_id=server.id)
db.add(st)
if event == 'server.hello':
st.openclaw_version = payload.get('openclaw_version')
st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False)
elif event in {'server.metrics', 'agent.status_changed'}:
st.cpu_pct = payload.get('cpu_pct', st.cpu_pct)
st.mem_pct = payload.get('mem_pct', st.mem_pct)
st.disk_pct = payload.get('disk_pct', st.disk_pct)
st.swap_pct = payload.get('swap_pct', st.swap_pct)
if 'agents' in payload:
st.agents_json = json.dumps(payload.get('agents') or [], ensure_ascii=False)
st.last_seen_at = datetime.now(timezone.utc)
db.commit()
except WebSocketDisconnect:
pass
except Exception:
try:
await websocket.close(code=1011)
except Exception:
pass
finally:
if server_id and ACTIVE_WS.get(server_id) is websocket:
ACTIVE_WS.pop(server_id, None)
db.close()

View File

@@ -34,6 +34,7 @@ from app.api.routers.users import router as users_router
from app.api.routers.comments import router as comments_router
from app.api.routers.webhooks import router as webhooks_router
from app.api.routers.misc import router as misc_router
from app.api.routers.monitor import router as monitor_router
app.include_router(auth_router)
app.include_router(issues_router)
@@ -42,12 +43,13 @@ app.include_router(users_router)
app.include_router(comments_router)
app.include_router(webhooks_router)
app.include_router(misc_router)
app.include_router(monitor_router)
# Run database migration on startup
@app.on_event("startup")
def startup():
from app.core.config import Base, engine, SessionLocal
from app.models import webhook, apikey, activity, milestone, notification, worklog
from app.models import webhook, apikey, activity, milestone, notification, worklog, monitor
Base.metadata.create_all(bind=engine)
# Initialize from AbstractWizard (admin user, default project, etc.)
@@ -57,3 +59,21 @@ def startup():
run_init(db)
finally:
db.close()
# Start lightweight monitor polling thread (every 10 minutes)
import threading, time
from app.services.monitoring import refresh_provider_usage_once
def _monitor_poll_loop():
while True:
db2 = SessionLocal()
try:
refresh_provider_usage_once(db2)
except Exception:
pass
finally:
db2.close()
time.sleep(600)
t = threading.Thread(target=_monitor_poll_loop, daemon=True)
t.start()

78
app/models/monitor.py Normal file
View File

@@ -0,0 +1,78 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Float, ForeignKey
from sqlalchemy.sql import func
from app.core.config import Base
class ProviderAccount(Base):
__tablename__ = 'provider_accounts'
id = Column(Integer, primary_key=True, index=True)
provider = Column(String(32), nullable=False, index=True) # anthropic/openai/minimax/kimi/qwen
label = Column(String(128), nullable=False)
credential = Column(Text, nullable=False) # TODO: encrypt at rest
is_enabled = Column(Boolean, default=True)
created_by = Column(Integer, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class ProviderUsageSnapshot(Base):
__tablename__ = 'provider_usage_snapshots'
id = Column(Integer, primary_key=True, index=True)
account_id = Column(Integer, ForeignKey('provider_accounts.id'), nullable=False, index=True)
window_label = Column(String(32), nullable=True) # e.g. 1h / 7d
used = Column(Float, nullable=True)
limit = Column(Float, nullable=True)
usage_pct = Column(Float, nullable=True)
reset_at = Column(DateTime(timezone=True), nullable=True)
status = Column(String(32), nullable=False, default='unknown') # ok/error/pending/unsupported
error = Column(Text, nullable=True)
raw_payload = Column(Text, nullable=True)
fetched_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
class MonitoredServer(Base):
__tablename__ = 'monitored_servers'
id = Column(Integer, primary_key=True, index=True)
identifier = Column(String(128), nullable=False, unique=True)
display_name = Column(String(128), nullable=True)
is_enabled = Column(Boolean, default=True)
created_by = Column(Integer, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class ServerState(Base):
__tablename__ = 'server_states'
id = Column(Integer, primary_key=True, index=True)
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, unique=True)
openclaw_version = Column(String(64), nullable=True)
agents_json = Column(Text, nullable=True) # json list
cpu_pct = Column(Float, nullable=True)
mem_pct = Column(Float, nullable=True)
disk_pct = Column(Float, nullable=True)
swap_pct = Column(Float, nullable=True)
last_seen_at = Column(DateTime(timezone=True), nullable=True)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class ServerChallenge(Base):
__tablename__ = 'server_challenges'
id = Column(Integer, primary_key=True, index=True)
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True)
challenge_uuid = Column(String(64), nullable=False, unique=True, index=True)
expires_at = Column(DateTime(timezone=True), nullable=False)
used_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class ServerHandshakeNonce(Base):
__tablename__ = 'server_handshake_nonces'
id = Column(Integer, primary_key=True, index=True)
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, index=True)
nonce = Column(String(128), nullable=False, index=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())

View File

@@ -0,0 +1,63 @@
import base64
import hashlib
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
KEY_DIR = Path(os.getenv('MONITOR_KEY_DIR', '/config/monitor_keys'))
PRIV_PATH = KEY_DIR / 'monitor_private.pem'
PUB_PATH = KEY_DIR / 'monitor_public.pem'
def ensure_keypair() -> None:
KEY_DIR.mkdir(parents=True, exist_ok=True)
if PRIV_PATH.exists() and PUB_PATH.exists():
return
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
PRIV_PATH.write_bytes(private_pem)
PUB_PATH.write_bytes(public_pem)
def get_public_key_info() -> Dict[str, str]:
ensure_keypair()
pem = PUB_PATH.read_text()
kid = hashlib.sha256(pem.encode()).hexdigest()[:16]
return {'public_key_pem': pem, 'key_id': kid}
def decrypt_payload_b64(ciphertext_b64: str) -> Dict[str, Any]:
ensure_keypair()
private_key = serialization.load_pem_private_key(PRIV_PATH.read_bytes(), password=None)
plaintext = private_key.decrypt(
base64.b64decode(ciphertext_b64),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
obj = json.loads(plaintext.decode())
return obj
def ts_within(ts_iso: str, max_minutes: int = 10) -> bool:
try:
ts = datetime.fromisoformat(ts_iso.replace('Z', '+00:00'))
except Exception:
return False
now = datetime.now(timezone.utc)
return abs((now - ts).total_seconds()) <= max_minutes * 60

305
app/services/monitoring.py Normal file
View File

@@ -0,0 +1,305 @@
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Tuple
import requests
from sqlalchemy.orm import Session
from app.models.models import Issue
from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState
_CACHE: Dict[str, Dict[str, Any]] = {}
def _now():
return datetime.now(timezone.utc)
def _parse_credential(raw: str) -> Dict[str, Any]:
raw = (raw or '').strip()
if raw.startswith('{'):
try:
return json.loads(raw)
except Exception:
return {'api_key': raw}
return {'api_key': raw}
def _parse_reset_at(value) -> datetime | None:
if not value:
return None
if isinstance(value, datetime):
return value
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value, tz=timezone.utc)
if isinstance(value, str):
try:
return datetime.fromisoformat(value.replace('Z', '+00:00'))
except Exception:
return None
return None
def _normalize_usage_payload(payload: Dict[str, Any]) -> Dict[str, Any]:
used = payload.get('used') or payload.get('usage') or payload.get('consumed') or payload.get('total_usage')
limit = payload.get('limit') or payload.get('quota') or payload.get('hard_limit') or payload.get('total')
remaining = payload.get('remain') or payload.get('remaining') or payload.get('left')
usage_pct = payload.get('usage_pct') or payload.get('percent') or payload.get('usage_percent')
window_label = payload.get('window') or payload.get('window_label')
reset_at = payload.get('reset_at') or payload.get('reset_time') or payload.get('reset')
if used is None and remaining is not None and limit is not None:
try:
used = float(limit) - float(remaining)
except Exception:
pass
if usage_pct is None and used is not None and limit:
try:
usage_pct = round(float(used) / float(limit) * 100, 2)
except Exception:
pass
return {
'window_label': window_label,
'used': used,
'limit': limit,
'usage_pct': usage_pct,
'reset_at': _parse_reset_at(reset_at),
'raw': payload,
}
def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800):
key = 'issue_stats_24h'
now = _now()
hit = _CACHE.get(key)
if hit and (now - hit['at']).total_seconds() < ttl_seconds:
return hit['data']
since = now - timedelta(hours=24)
total = db.query(Issue).count()
new_24h = db.query(Issue).filter(Issue.created_at >= since).count()
processed_24h = db.query(Issue).filter(
Issue.updated_at != None,
Issue.updated_at >= since,
Issue.status.in_(['resolved', 'closed'])
).count()
data = {
'total_issues': total,
'new_issues_24h': new_24h,
'processed_issues_24h': processed_24h,
'computed_at': now.isoformat(),
'cache_ttl_seconds': ttl_seconds,
}
_CACHE[key] = {'at': now, 'data': data}
return data
def _provider_headers(provider: str, credential: str, extra: Dict[str, Any] | None = None):
extra = extra or {}
if extra.get('auth_header'):
val = extra.get('auth_value')
if not val:
scheme = extra.get('auth_scheme')
val = f"{scheme} {credential}" if scheme else credential
return {extra['auth_header']: val}
if provider == 'openai':
return {'Authorization': f'Bearer {credential}'}
if provider == 'anthropic':
return {'x-api-key': credential, 'anthropic-version': '2023-06-01'}
return {}
def test_provider_connection(provider: str, credential: str):
provider = provider.lower()
info = _parse_credential(credential)
key = info.get('api_key') or credential
try:
if provider == 'openai':
r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12)
return r.status_code == 200, f'status={r.status_code}'
if provider == 'anthropic':
r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, key, info), timeout=12)
return r.status_code == 200, f'status={r.status_code}'
usage_url = info.get('usage_url') or info.get('test_url')
if provider == 'kimi' and not usage_url:
usage_url = 'https://www.kimi.com/api/user/usage'
if usage_url:
r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12)
return r.status_code < 500, f'status={r.status_code}'
if provider in {'minimax', 'kimi', 'qwen'}:
return True, 'accepted (connectivity check pending provider-specific adapter)'
return False, 'unsupported provider'
except Exception as e:
return False, str(e)
def _openai_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
headers = _provider_headers('openai', key, info)
today = _now().date()
start = (today - timedelta(days=7)).isoformat()
end = today.isoformat()
usage_url = info.get('usage_url') or f'https://api.openai.com/v1/dashboard/billing/usage?start_date={start}&end_date={end}'
sub_url = info.get('subscription_url') or 'https://api.openai.com/v1/dashboard/billing/subscription'
u = requests.get(usage_url, headers=headers, timeout=12)
s = requests.get(sub_url, headers=headers, timeout=12)
if u.status_code != 200 or s.status_code != 200:
return 'error', {'error': f'usage:{u.status_code}, subscription:{s.status_code}'}
usage = u.json()
sub = s.json()
total_usage = usage.get('total_usage')
hard_limit = sub.get('hard_limit_usd')
reset_at_ts = sub.get('billing_cycle_anchor')
reset_at = datetime.fromtimestamp(reset_at_ts, tz=timezone.utc) if reset_at_ts else None
usage_pct = None
if total_usage is not None and hard_limit:
usage_pct = round(total_usage / hard_limit * 100, 2)
return 'ok', {
'window_label': '7d',
'used': total_usage,
'limit': hard_limit,
'usage_pct': usage_pct,
'reset_at': reset_at,
'raw': {'usage': usage, 'subscription': sub},
}
def _anthropic_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
usage_url = info.get('usage_url')
if not usage_url:
return 'unsupported', {'error': 'anthropic usage API not configured'}
r = requests.get(usage_url, headers=_provider_headers('anthropic', key, info), timeout=12)
if r.status_code != 200:
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
payload = r.json()
if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict):
payload = payload['data']
return 'ok', _normalize_usage_payload(payload)
def _kimi_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
usage_url = info.get('usage_url') or 'https://www.kimi.com/api/user/usage'
r = requests.get(usage_url, headers=_provider_headers('kimi', key, info), timeout=12)
if r.status_code != 200:
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
payload = r.json()
if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict):
payload = payload['data']
return 'ok', _normalize_usage_payload(payload)
def _minimax_usage(credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
usage_url = info.get('usage_url')
if not usage_url:
return 'unsupported', {'error': 'minimax usage API not configured'}
r = requests.get(usage_url, headers=_provider_headers('minimax', key, info), timeout=12)
if r.status_code != 200:
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
payload = r.json()
if isinstance(payload, dict) and 'data' in payload and isinstance(payload['data'], dict):
payload = payload['data']
return 'ok', _normalize_usage_payload(payload)
def _generic_usage(provider: str, credential: str) -> Tuple[str, Dict[str, Any]]:
info = _parse_credential(credential)
key = info.get('api_key') or credential
usage_url = info.get('usage_url')
if not usage_url:
return 'unsupported', {'error': f'{provider} usage API not configured'}
r = requests.get(usage_url, headers=_provider_headers(provider, key, info), timeout=12)
if r.status_code != 200:
return 'error', {'error': f'usage:{r.status_code}', 'raw': r.text}
payload = r.json()
return 'ok', _normalize_usage_payload(payload)
def refresh_provider_usage_once(db: Session):
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
now = _now()
for a in accounts:
status = 'pending'
payload: Dict[str, Any] = {}
if a.provider == 'openai':
status, payload = _openai_usage(a.credential)
elif a.provider == 'anthropic':
status, payload = _anthropic_usage(a.credential)
elif a.provider == 'kimi':
status, payload = _kimi_usage(a.credential)
elif a.provider == 'minimax':
status, payload = _minimax_usage(a.credential)
elif a.provider == 'qwen':
status, payload = _generic_usage(a.provider, a.credential)
else:
ok, msg = test_provider_connection(a.provider, a.credential)
status = 'ok' if ok else 'error'
payload = {'error': None if ok else msg}
snap = ProviderUsageSnapshot(
account_id=a.id,
window_label=payload.get('window_label'),
used=payload.get('used'),
limit=payload.get('limit'),
usage_pct=payload.get('usage_pct'),
reset_at=payload.get('reset_at'),
status=status,
error=payload.get('error'),
raw_payload=json.dumps(payload.get('raw') or payload, ensure_ascii=False),
fetched_at=now,
)
db.add(snap)
db.commit()
def get_provider_usage_view(db: Session):
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
rows = []
for a in accounts:
snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first()
rows.append({
'account_id': a.id,
'provider': a.provider,
'label': a.label,
'window': snap.window_label if snap else None,
'usage_pct': snap.usage_pct if snap else None,
'used': snap.used if snap else None,
'limit': snap.limit if snap else None,
'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None,
'status': snap.status if snap else 'pending',
'error': snap.error if snap else None,
'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None,
})
return rows
def get_server_states_view(db: Session, offline_after_minutes: int = 7):
now = _now()
servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all()
out = []
for s in servers:
st = db.query(ServerState).filter(ServerState.server_id == s.id).first()
last_seen = st.last_seen_at if st else None
online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60)
out.append({
'server_id': s.id,
'identifier': s.identifier,
'display_name': s.display_name or s.identifier,
'online': online,
'openclaw_version': st.openclaw_version if st else None,
'cpu_pct': st.cpu_pct if st else None,
'mem_pct': st.mem_pct if st else None,
'disk_pct': st.disk_pct if st else None,
'swap_pct': st.swap_pct if st else None,
'agents': json.loads(st.agents_json) if st and st.agents_json else [],
'last_seen_at': last_seen.isoformat() if last_seen else None,
})
return out

View File

@@ -0,0 +1,63 @@
# Provider 账号凭证格式Monitor
默认情况下,`credential` 可以直接填写 API Key 字符串。
如果需要配置自定义 usage 端点,请使用 JSON 字符串。
## 基础格式
```json
{
"api_key": "sk-...",
"usage_url": "https://.../usage",
"auth_header": "Authorization",
"auth_scheme": "Bearer"
}
```
### 字段说明
- `api_key`: API key必填
- `usage_url`: 统计用量的 GET 端点可选minimax/kimi/qwen 推荐填写)
- `auth_header`: 自定义鉴权头名(可选)
- `auth_scheme`: 鉴权 scheme`Bearer`),会拼成 `Bearer <api_key>`
- `auth_value`: 直接指定头值(优先级高于 scheme
## OpenAI
默认使用 OpenAI 官方 billing endpoints7天窗口
- `https://api.openai.com/v1/dashboard/billing/usage`
- `https://api.openai.com/v1/dashboard/billing/subscription`
如需自定义可使用 JSON
```json
{
"api_key": "sk-...",
"usage_url": "https://api.openai.com/v1/dashboard/billing/usage?start_date=YYYY-MM-DD&end_date=YYYY-MM-DD",
"subscription_url": "https://api.openai.com/v1/dashboard/billing/subscription"
}
```
## Anthropic
官方 usage API 需要自行提供 `usage_url`(不同组织可能不同):
```json
{
"api_key": "ak-...",
"usage_url": "https://api.anthropic.com/.../usage"
}
```
## Minimax / Kimi / Qwen
目前需要你提供 `usage_url`(具体端点取决于部署/账号):
```json
{
"api_key": "...",
"usage_url": "https://.../usage",
"auth_header": "Authorization",
"auth_scheme": "Bearer"
}
```
## Kimi
推荐 usage_url: https://www.kimi.com/api/user/usage
Authorization: Bearer <API_KEY>
## Minimax
推荐 usage_url: https://platform.minimax.io/v1/api/openplatform/coding_plan/remains
Authorization: Bearer <API_KEY>

View File

@@ -0,0 +1,68 @@
# OpenClaw Monitor Agent Plugin 开发计划(草案)
## 目标
让被监测服务器通过 WebSocket 主动接入 HarborForge Backend并持续上报
- OpenClaw 版本
- agent 列表
- 每 5 分钟主机指标CPU/MEM/DISK/SWAP
- agent 状态变更事件
## 握手流程
1. Admin 在 HarborForge 后台添加 server identifier
2. Admin 生成 challenge10 分钟有效)
3. 插件请求 `GET /monitor/public/server-public-key` 获取公钥
4. 插件构造 payload
- `identifier`
- `challenge_uuid`
- `nonce`(随机)
- `ts`ISO8601
5. 使用 RSA-OAEP(SHA256) 公钥加密base64 后作为 `encrypted_payload` 发给 `WS /monitor/server/ws`
6. 握手成功后进入事件上报通道
## 插件事件协议
### server.hello
```json
{
"event": "server.hello",
"payload": {
"openclaw_version": "x.y.z",
"agents": [{"id": "a1", "name": "agent-1", "status": "idle"}]
}
}
```
### server.metrics每 5 分钟)
```json
{
"event": "server.metrics",
"payload": {
"cpu_pct": 21.3,
"mem_pct": 42.1,
"disk_pct": 55.9,
"swap_pct": 0.0,
"agents": [{"id": "a1", "name": "agent-1", "status": "busy"}]
}
}
```
### agent.status_changed可选
```json
{
"event": "agent.status_changed",
"payload": {
"agents": [{"id": "a1", "name": "agent-1", "status": "focus"}]
}
}
```
## 实施里程碑
- M1: Node/Python CLI 插件最小握手联通
- M2: 指标采集 + 周期上报
- M3: agent 状态采集与变更事件
- M4: 守护化systemd+ 断线重连 + 本地日志
## 风险与注意事项
- 时钟漂移会导致 `ts` 校验失败(建议 NTP
- challenge 仅一次可用,重复使用会被拒绝
- nonce 重放会被拒绝
- 需要保证插件本地安全保存 identifier/challenge短期

View File

@@ -11,3 +11,4 @@ python-multipart==0.0.6
alembic==1.13.1
python-dotenv==1.0.0
httpx==0.27.0
requests==2.31.0