From 578493edc106911410d4e20f5a062b00c237e29d Mon Sep 17 00:00:00 2001 From: orion Date: Sat, 4 Apr 2026 16:46:04 +0000 Subject: [PATCH] feat: expose calendar agent heartbeat api --- app/api/routers/calendar.py | 132 +++++++++++++++++++++++++++++++++++- app/schemas/calendar.py | 27 ++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) diff --git a/app/api/routers/calendar.py b/app/api/routers/calendar.py index b641c41..8062463 100644 --- a/app/api/routers/calendar.py +++ b/app/api/routers/calendar.py @@ -10,17 +10,20 @@ BE-CAL-API-006: Plan edit / plan cancel endpoints. BE-CAL-API-007: Date-list endpoint. """ -from datetime import date as date_type +from datetime import date as date_type, datetime, timezone from typing import Optional -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, Header, HTTPException, Query from sqlalchemy.orm import Session from app.api.deps import get_current_user from app.core.config import get_db from app.models.calendar import SchedulePlan, SlotStatus, TimeSlot from app.models.models import User +from app.models.agent import Agent, AgentStatus, ExhaustReason from app.schemas.calendar import ( + AgentHeartbeatResponse, + AgentStatusUpdateRequest, CalendarDayResponse, CalendarSlotItem, DateListResponse, @@ -32,7 +35,9 @@ from app.schemas.calendar import ( SchedulePlanEdit, SchedulePlanListResponse, SchedulePlanResponse, + SlotStatusEnum, SlotConflictItem, + SlotAgentUpdate, TimeSlotCancelResponse, TimeSlotCreate, TimeSlotCreateResponse, @@ -40,6 +45,14 @@ from app.schemas.calendar import ( TimeSlotEditResponse, TimeSlotResponse, ) +from app.services.agent_heartbeat import get_pending_slots_for_agent +from app.services.agent_status import ( + record_heartbeat, + transition_to_busy, + transition_to_idle, + transition_to_offline, + transition_to_exhausted, +) from app.services.minimum_workload import ( get_workload_config, get_workload_warnings_for_date, @@ -264,6 +277,121 @@ def _virtual_slot_to_item(vs: dict) -> CalendarSlotItem: ) +def _require_agent(db: Session, agent_id: str, claw_identifier: str) -> Agent: + agent = ( + db.query(Agent) + .filter(Agent.agent_id == agent_id, Agent.claw_identifier == claw_identifier) + .first() + ) + if agent is None: + raise HTTPException(status_code=404, detail="Agent not found") + return agent + + +def _apply_agent_slot_update(slot: TimeSlot, payload: SlotAgentUpdate) -> None: + slot.status = payload.status.value + if payload.started_at is not None: + slot.started_at = payload.started_at + slot.attended = True + if payload.actual_duration is not None: + slot.actual_duration = payload.actual_duration + if payload.status == SlotStatusEnum.ONGOING: + slot.attended = True + + +@router.get( + "/agent/heartbeat", + response_model=AgentHeartbeatResponse, + summary="Get all due slots for the calling agent", +) +def agent_heartbeat( + x_agent_id: str = Header(..., alias="X-Agent-ID"), + x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"), + db: Session = Depends(get_db), +): + agent = _require_agent(db, x_agent_id, x_claw_identifier) + record_heartbeat(db, agent) + slots = get_pending_slots_for_agent(db, agent.user_id, now=datetime.now(timezone.utc)) + db.commit() + return AgentHeartbeatResponse( + slots=[_real_slot_to_item(slot) for slot in slots], + agent_status=agent.status.value if hasattr(agent.status, 'value') else str(agent.status), + message=f"{len(slots)} due slot(s)", + ) + + +@router.patch( + "/slots/{slot_id}/agent-update", + response_model=TimeSlotEditResponse, + summary="Agent updates a real slot status", +) +def agent_update_real_slot( + slot_id: int, + payload: SlotAgentUpdate, + x_agent_id: str = Header(..., alias="X-Agent-ID"), + x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"), + db: Session = Depends(get_db), +): + agent = _require_agent(db, x_agent_id, x_claw_identifier) + slot = db.query(TimeSlot).filter(TimeSlot.id == slot_id, TimeSlot.user_id == agent.user_id).first() + if slot is None: + raise HTTPException(status_code=404, detail="Slot not found") + _apply_agent_slot_update(slot, payload) + db.commit() + db.refresh(slot) + return TimeSlotEditResponse(slot=_slot_to_response(slot), warnings=[]) + + +@router.patch( + "/slots/virtual/{virtual_id}/agent-update", + response_model=TimeSlotEditResponse, + summary="Agent materializes and updates a virtual slot status", +) +def agent_update_virtual_slot( + virtual_id: str, + payload: SlotAgentUpdate, + x_agent_id: str = Header(..., alias="X-Agent-ID"), + x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"), + db: Session = Depends(get_db), +): + agent = _require_agent(db, x_agent_id, x_claw_identifier) + slot = materialize_from_virtual_id(db, virtual_id) + if slot.user_id != agent.user_id: + db.rollback() + raise HTTPException(status_code=404, detail="Slot not found") + _apply_agent_slot_update(slot, payload) + db.commit() + db.refresh(slot) + return TimeSlotEditResponse(slot=_slot_to_response(slot), warnings=[]) + + +@router.post( + "/agent/status", + summary="Update agent runtime status from plugin", +) +def update_agent_status( + payload: AgentStatusUpdateRequest, + db: Session = Depends(get_db), +): + agent = _require_agent(db, payload.agent_id, payload.claw_identifier) + target = (payload.status or '').lower().strip() + if target == AgentStatus.IDLE.value: + transition_to_idle(db, agent) + elif target == AgentStatus.BUSY.value: + transition_to_busy(db, agent, slot_type=SlotTypeEnum.WORK) + elif target == AgentStatus.ON_CALL.value: + transition_to_busy(db, agent, slot_type=SlotTypeEnum.ON_CALL) + elif target == AgentStatus.OFFLINE.value: + transition_to_offline(db, agent) + elif target == AgentStatus.EXHAUSTED.value: + reason = ExhaustReason.BILLING if payload.exhaust_reason == 'billing' else ExhaustReason.RATE_LIMIT + transition_to_exhausted(db, agent, reason=reason, recovery_at=payload.recovery_at) + else: + raise HTTPException(status_code=400, detail="Unsupported agent status") + db.commit() + return {"ok": True, "agent_id": agent.agent_id, "status": agent.status.value if hasattr(agent.status, 'value') else str(agent.status)} + + @router.get( "/day", response_model=CalendarDayResponse, diff --git a/app/schemas/calendar.py b/app/schemas/calendar.py index 7d5c08e..f5891a3 100644 --- a/app/schemas/calendar.py +++ b/app/schemas/calendar.py @@ -407,3 +407,30 @@ class DateListResponse(BaseModel): default_factory=list, description="Sorted list of future dates with materialized slots", ) + + +# --------------------------------------------------------------------------- +# Agent heartbeat / agent-driven slot updates +# --------------------------------------------------------------------------- + +class AgentHeartbeatResponse(BaseModel): + """Slots that are due for a specific agent plus its current runtime status.""" + slots: list[CalendarSlotItem] = Field(default_factory=list) + agent_status: str + message: Optional[str] = None + + +class SlotAgentUpdate(BaseModel): + """Plugin-driven slot status update payload.""" + status: SlotStatusEnum + started_at: Optional[dt_time] = None + actual_duration: Optional[int] = Field(None, ge=0, le=65535) + + +class AgentStatusUpdateRequest(BaseModel): + """Plugin-driven agent status report.""" + agent_id: str + claw_identifier: str + status: str + recovery_at: Optional[dt_datetime] = None + exhaust_reason: Optional[str] = None