feat/public-monitor-and-agent-telemetry #4

Merged
hzhang merged 11 commits from feat/public-monitor-and-agent-telemetry into main 2026-03-11 22:15:25 +00:00
4 changed files with 380 additions and 1 deletions
Showing only changes of commit d299428d35 - Show all commits

169
app/api/routers/monitor.py Normal file
View File

@@ -0,0 +1,169 @@
from datetime import datetime, timedelta, timezone
import json
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.models import models
from app.models.monitor import ProviderAccount, MonitoredServer, ServerState
from app.services.monitoring import (
get_issue_stats_cached,
get_provider_usage_view,
get_server_states_view,
test_provider_connection,
)
router = APIRouter(prefix='/monitor', tags=['Monitor'])
SUPPORTED_PROVIDERS = {'anthropic', 'openai', 'minimax', 'kimi', 'qwen'}
class ProviderAccountCreate(BaseModel):
provider: str
label: str
credential: str
class ProviderTestRequest(BaseModel):
provider: str
credential: str
class MonitoredServerCreate(BaseModel):
identifier: str
display_name: str | None = None
def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail='Admin required')
return current_user
@router.get('/public/overview')
def public_overview(db: Session = Depends(get_db)):
return {
'issues': get_issue_stats_cached(db, ttl_seconds=1800),
'providers': get_provider_usage_view(db),
'servers': get_server_states_view(db, offline_after_minutes=7),
'generated_at': datetime.now(timezone.utc).isoformat(),
}
@router.get('/admin/providers/accounts')
def list_provider_accounts(db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
accounts = db.query(ProviderAccount).order_by(ProviderAccount.created_at.desc()).all()
return [
{
'id': a.id,
'provider': a.provider,
'label': a.label,
'is_enabled': a.is_enabled,
'created_at': a.created_at,
'credential_masked': '***' + (a.credential[-4:] if a.credential else ''),
}
for a in accounts
]
@router.post('/admin/providers/accounts', status_code=status.HTTP_201_CREATED)
def create_provider_account(payload: ProviderAccountCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)):
provider = payload.provider.lower().strip()
if provider not in SUPPORTED_PROVIDERS:
raise HTTPException(status_code=400, detail=f'Unsupported provider: {provider}')
obj = ProviderAccount(
provider=provider,
label=payload.label.strip(),
credential=payload.credential.strip(),
is_enabled=True,
created_by=user.id,
)
db.add(obj)
db.commit()
db.refresh(obj)
return {'id': obj.id, 'provider': obj.provider, 'label': obj.label, 'is_enabled': obj.is_enabled}
@router.post('/admin/providers/test')
def test_provider(payload: ProviderTestRequest, _: models.User = Depends(require_admin)):
ok, message = test_provider_connection(payload.provider.lower().strip(), payload.credential.strip())
return {'ok': ok, 'message': message}
@router.delete('/admin/providers/accounts/{account_id}', status_code=status.HTTP_204_NO_CONTENT)
def delete_provider_account(account_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
obj = db.query(ProviderAccount).filter(ProviderAccount.id == account_id).first()
if not obj:
raise HTTPException(status_code=404, detail='Provider account not found')
db.delete(obj)
db.commit()
return None
@router.get('/admin/servers')
def list_servers(db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
return get_server_states_view(db, offline_after_minutes=7)
@router.post('/admin/servers', status_code=status.HTTP_201_CREATED)
def add_server(payload: MonitoredServerCreate, db: Session = Depends(get_db), user: models.User = Depends(require_admin)):
identifier = payload.identifier.strip()
if not identifier:
raise HTTPException(status_code=400, detail='identifier required')
exists = db.query(MonitoredServer).filter(MonitoredServer.identifier == identifier).first()
if exists:
raise HTTPException(status_code=400, detail='identifier already exists')
obj = MonitoredServer(identifier=identifier, display_name=payload.display_name, is_enabled=True, created_by=user.id)
db.add(obj)
db.commit()
db.refresh(obj)
return {'id': obj.id, 'identifier': obj.identifier, 'display_name': obj.display_name, 'is_enabled': obj.is_enabled}
@router.delete('/admin/servers/{server_id}', status_code=status.HTTP_204_NO_CONTENT)
def delete_server(server_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin)):
obj = db.query(MonitoredServer).filter(MonitoredServer.id == server_id).first()
if not obj:
raise HTTPException(status_code=404, detail='Server not found')
state = db.query(ServerState).filter(ServerState.server_id == server_id).first()
if state:
db.delete(state)
db.delete(obj)
db.commit()
return None
# Temporary ingestion endpoint before WS plugin lands
class ServerHeartbeat(BaseModel):
identifier: str
openclaw_version: str | None = None
agents: List[dict] = []
cpu_pct: float | None = None
mem_pct: float | None = None
disk_pct: float | None = None
swap_pct: float | None = None
@router.post('/server/heartbeat')
def server_heartbeat(payload: ServerHeartbeat, db: Session = Depends(get_db)):
server = db.query(MonitoredServer).filter(MonitoredServer.identifier == payload.identifier, MonitoredServer.is_enabled == True).first()
if not server:
raise HTTPException(status_code=404, detail='unknown server identifier')
st = db.query(ServerState).filter(ServerState.server_id == server.id).first()
if not st:
st = ServerState(server_id=server.id)
db.add(st)
st.openclaw_version = payload.openclaw_version
st.agents_json = json.dumps(payload.agents, ensure_ascii=False)
st.cpu_pct = payload.cpu_pct
st.mem_pct = payload.mem_pct
st.disk_pct = payload.disk_pct
st.swap_pct = payload.swap_pct
st.last_seen_at = datetime.now(timezone.utc)
db.commit()
return {'ok': True, 'server_id': server.id, 'last_seen_at': st.last_seen_at}

View File

@@ -34,6 +34,7 @@ from app.api.routers.users import router as users_router
from app.api.routers.comments import router as comments_router
from app.api.routers.webhooks import router as webhooks_router
from app.api.routers.misc import router as misc_router
from app.api.routers.monitor import router as monitor_router
app.include_router(auth_router)
app.include_router(issues_router)
@@ -42,12 +43,13 @@ app.include_router(users_router)
app.include_router(comments_router)
app.include_router(webhooks_router)
app.include_router(misc_router)
app.include_router(monitor_router)
# Run database migration on startup
@app.on_event("startup")
def startup():
from app.core.config import Base, engine, SessionLocal
from app.models import webhook, apikey, activity, milestone, notification, worklog
from app.models import webhook, apikey, activity, milestone, notification, worklog, monitor
Base.metadata.create_all(bind=engine)
# Initialize from AbstractWizard (admin user, default project, etc.)
@@ -57,3 +59,21 @@ def startup():
run_init(db)
finally:
db.close()
# Start lightweight monitor polling thread (every 10 minutes)
import threading, time
from app.services.monitoring import refresh_provider_usage_once
def _monitor_poll_loop():
while True:
db2 = SessionLocal()
try:
refresh_provider_usage_once(db2)
except Exception:
pass
finally:
db2.close()
time.sleep(600)
t = threading.Thread(target=_monitor_poll_loop, daemon=True)
t.start()

58
app/models/monitor.py Normal file
View File

@@ -0,0 +1,58 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Float, ForeignKey
from sqlalchemy.sql import func
from app.core.config import Base
class ProviderAccount(Base):
__tablename__ = 'provider_accounts'
id = Column(Integer, primary_key=True, index=True)
provider = Column(String(32), nullable=False, index=True) # anthropic/openai/minimax/kimi/qwen
label = Column(String(128), nullable=False)
credential = Column(Text, nullable=False) # TODO: encrypt at rest
is_enabled = Column(Boolean, default=True)
created_by = Column(Integer, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class ProviderUsageSnapshot(Base):
__tablename__ = 'provider_usage_snapshots'
id = Column(Integer, primary_key=True, index=True)
account_id = Column(Integer, ForeignKey('provider_accounts.id'), nullable=False, index=True)
window_label = Column(String(32), nullable=True) # e.g. 1h / 7d
used = Column(Float, nullable=True)
limit = Column(Float, nullable=True)
usage_pct = Column(Float, nullable=True)
reset_at = Column(DateTime(timezone=True), nullable=True)
status = Column(String(32), nullable=False, default='unknown') # ok/error/pending/unsupported
error = Column(Text, nullable=True)
raw_payload = Column(Text, nullable=True)
fetched_at = Column(DateTime(timezone=True), server_default=func.now(), index=True)
class MonitoredServer(Base):
__tablename__ = 'monitored_servers'
id = Column(Integer, primary_key=True, index=True)
identifier = Column(String(128), nullable=False, unique=True)
display_name = Column(String(128), nullable=True)
is_enabled = Column(Boolean, default=True)
created_by = Column(Integer, nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class ServerState(Base):
__tablename__ = 'server_states'
id = Column(Integer, primary_key=True, index=True)
server_id = Column(Integer, ForeignKey('monitored_servers.id'), nullable=False, unique=True)
openclaw_version = Column(String(64), nullable=True)
agents_json = Column(Text, nullable=True) # json list
cpu_pct = Column(Float, nullable=True)
mem_pct = Column(Float, nullable=True)
disk_pct = Column(Float, nullable=True)
swap_pct = Column(Float, nullable=True)
last_seen_at = Column(DateTime(timezone=True), nullable=True)
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

132
app/services/monitoring.py Normal file
View File

@@ -0,0 +1,132 @@
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict
import requests
from sqlalchemy.orm import Session
from app.models.models import Issue
from app.models.monitor import ProviderAccount, ProviderUsageSnapshot, MonitoredServer, ServerState
_CACHE: Dict[str, Dict[str, Any]] = {}
def _now():
return datetime.now(timezone.utc)
def get_issue_stats_cached(db: Session, ttl_seconds: int = 1800):
key = 'issue_stats_24h'
now = _now()
hit = _CACHE.get(key)
if hit and (now - hit['at']).total_seconds() < ttl_seconds:
return hit['data']
since = now - timedelta(hours=24)
total = db.query(Issue).count()
new_24h = db.query(Issue).filter(Issue.created_at >= since).count()
processed_24h = db.query(Issue).filter(
Issue.updated_at != None,
Issue.updated_at >= since,
Issue.status.in_(['resolved', 'closed'])
).count()
data = {
'total_issues': total,
'new_issues_24h': new_24h,
'processed_issues_24h': processed_24h,
'computed_at': now.isoformat(),
'cache_ttl_seconds': ttl_seconds,
}
_CACHE[key] = {'at': now, 'data': data}
return data
def _provider_headers(provider: str, credential: str):
if provider == 'openai':
return {'Authorization': f'Bearer {credential}'}
if provider == 'anthropic':
return {'x-api-key': credential, 'anthropic-version': '2023-06-01'}
return None
def test_provider_connection(provider: str, credential: str):
provider = provider.lower()
try:
if provider == 'openai':
r = requests.get('https://api.openai.com/v1/models', headers=_provider_headers(provider, credential), timeout=12)
return r.status_code == 200, f'status={r.status_code}'
if provider == 'anthropic':
r = requests.get('https://api.anthropic.com/v1/models', headers=_provider_headers(provider, credential), timeout=12)
return r.status_code == 200, f'status={r.status_code}'
if provider in {'minimax', 'kimi', 'qwen'}:
# Endpoints/usage API vary by deployment; keep as accepted-but-unverified for now.
return True, 'accepted (connectivity check pending provider-specific adapter)'
return False, 'unsupported provider'
except Exception as e:
return False, str(e)
def refresh_provider_usage_once(db: Session):
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
now = _now()
for a in accounts:
ok, msg = test_provider_connection(a.provider, a.credential)
snap = ProviderUsageSnapshot(
account_id=a.id,
window_label='provider-default',
used=None,
limit=None,
usage_pct=None,
reset_at=None,
status='ok' if ok else 'error',
error=None if ok else msg,
raw_payload=json.dumps({'message': msg}, ensure_ascii=False),
fetched_at=now,
)
db.add(snap)
db.commit()
def get_provider_usage_view(db: Session):
accounts = db.query(ProviderAccount).filter(ProviderAccount.is_enabled == True).all()
rows = []
for a in accounts:
snap = db.query(ProviderUsageSnapshot).filter(ProviderUsageSnapshot.account_id == a.id).order_by(ProviderUsageSnapshot.fetched_at.desc()).first()
rows.append({
'account_id': a.id,
'provider': a.provider,
'label': a.label,
'window': snap.window_label if snap else None,
'usage_pct': snap.usage_pct if snap else None,
'used': snap.used if snap else None,
'limit': snap.limit if snap else None,
'reset_at': snap.reset_at.isoformat() if snap and snap.reset_at else None,
'status': snap.status if snap else 'pending',
'error': snap.error if snap else None,
'fetched_at': snap.fetched_at.isoformat() if snap and snap.fetched_at else None,
})
return rows
def get_server_states_view(db: Session, offline_after_minutes: int = 7):
now = _now()
servers = db.query(MonitoredServer).filter(MonitoredServer.is_enabled == True).all()
out = []
for s in servers:
st = db.query(ServerState).filter(ServerState.server_id == s.id).first()
last_seen = st.last_seen_at if st else None
online = bool(last_seen and (now - last_seen).total_seconds() <= offline_after_minutes * 60)
out.append({
'server_id': s.id,
'identifier': s.identifier,
'display_name': s.display_name or s.identifier,
'online': online,
'openclaw_version': st.openclaw_version if st else None,
'cpu_pct': st.cpu_pct if st else None,
'mem_pct': st.mem_pct if st else None,
'disk_pct': st.disk_pct if st else None,
'swap_pct': st.swap_pct if st else None,
'agents': json.loads(st.agents_json) if st and st.agents_json else [],
'last_seen_at': last_seen.isoformat() if last_seen else None,
})
return out