feat: expose calendar agent heartbeat api
This commit is contained in:
@@ -10,17 +10,20 @@ BE-CAL-API-006: Plan edit / plan cancel endpoints.
|
||||
BE-CAL-API-007: Date-list endpoint.
|
||||
"""
|
||||
|
||||
from datetime import date as date_type
|
||||
from datetime import date as date_type, datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Query
|
||||
from fastapi import APIRouter, Depends, Header, HTTPException, Query
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.api.deps import get_current_user
|
||||
from app.core.config import get_db
|
||||
from app.models.calendar import SchedulePlan, SlotStatus, TimeSlot
|
||||
from app.models.models import User
|
||||
from app.models.agent import Agent, AgentStatus, ExhaustReason
|
||||
from app.schemas.calendar import (
|
||||
AgentHeartbeatResponse,
|
||||
AgentStatusUpdateRequest,
|
||||
CalendarDayResponse,
|
||||
CalendarSlotItem,
|
||||
DateListResponse,
|
||||
@@ -32,7 +35,9 @@ from app.schemas.calendar import (
|
||||
SchedulePlanEdit,
|
||||
SchedulePlanListResponse,
|
||||
SchedulePlanResponse,
|
||||
SlotStatusEnum,
|
||||
SlotConflictItem,
|
||||
SlotAgentUpdate,
|
||||
TimeSlotCancelResponse,
|
||||
TimeSlotCreate,
|
||||
TimeSlotCreateResponse,
|
||||
@@ -40,6 +45,14 @@ from app.schemas.calendar import (
|
||||
TimeSlotEditResponse,
|
||||
TimeSlotResponse,
|
||||
)
|
||||
from app.services.agent_heartbeat import get_pending_slots_for_agent
|
||||
from app.services.agent_status import (
|
||||
record_heartbeat,
|
||||
transition_to_busy,
|
||||
transition_to_idle,
|
||||
transition_to_offline,
|
||||
transition_to_exhausted,
|
||||
)
|
||||
from app.services.minimum_workload import (
|
||||
get_workload_config,
|
||||
get_workload_warnings_for_date,
|
||||
@@ -264,6 +277,121 @@ def _virtual_slot_to_item(vs: dict) -> CalendarSlotItem:
|
||||
)
|
||||
|
||||
|
||||
def _require_agent(db: Session, agent_id: str, claw_identifier: str) -> Agent:
|
||||
agent = (
|
||||
db.query(Agent)
|
||||
.filter(Agent.agent_id == agent_id, Agent.claw_identifier == claw_identifier)
|
||||
.first()
|
||||
)
|
||||
if agent is None:
|
||||
raise HTTPException(status_code=404, detail="Agent not found")
|
||||
return agent
|
||||
|
||||
|
||||
def _apply_agent_slot_update(slot: TimeSlot, payload: SlotAgentUpdate) -> None:
|
||||
slot.status = payload.status.value
|
||||
if payload.started_at is not None:
|
||||
slot.started_at = payload.started_at
|
||||
slot.attended = True
|
||||
if payload.actual_duration is not None:
|
||||
slot.actual_duration = payload.actual_duration
|
||||
if payload.status == SlotStatusEnum.ONGOING:
|
||||
slot.attended = True
|
||||
|
||||
|
||||
@router.get(
|
||||
"/agent/heartbeat",
|
||||
response_model=AgentHeartbeatResponse,
|
||||
summary="Get all due slots for the calling agent",
|
||||
)
|
||||
def agent_heartbeat(
|
||||
x_agent_id: str = Header(..., alias="X-Agent-ID"),
|
||||
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
agent = _require_agent(db, x_agent_id, x_claw_identifier)
|
||||
record_heartbeat(db, agent)
|
||||
slots = get_pending_slots_for_agent(db, agent.user_id, now=datetime.now(timezone.utc))
|
||||
db.commit()
|
||||
return AgentHeartbeatResponse(
|
||||
slots=[_real_slot_to_item(slot) for slot in slots],
|
||||
agent_status=agent.status.value if hasattr(agent.status, 'value') else str(agent.status),
|
||||
message=f"{len(slots)} due slot(s)",
|
||||
)
|
||||
|
||||
|
||||
@router.patch(
|
||||
"/slots/{slot_id}/agent-update",
|
||||
response_model=TimeSlotEditResponse,
|
||||
summary="Agent updates a real slot status",
|
||||
)
|
||||
def agent_update_real_slot(
|
||||
slot_id: int,
|
||||
payload: SlotAgentUpdate,
|
||||
x_agent_id: str = Header(..., alias="X-Agent-ID"),
|
||||
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
agent = _require_agent(db, x_agent_id, x_claw_identifier)
|
||||
slot = db.query(TimeSlot).filter(TimeSlot.id == slot_id, TimeSlot.user_id == agent.user_id).first()
|
||||
if slot is None:
|
||||
raise HTTPException(status_code=404, detail="Slot not found")
|
||||
_apply_agent_slot_update(slot, payload)
|
||||
db.commit()
|
||||
db.refresh(slot)
|
||||
return TimeSlotEditResponse(slot=_slot_to_response(slot), warnings=[])
|
||||
|
||||
|
||||
@router.patch(
|
||||
"/slots/virtual/{virtual_id}/agent-update",
|
||||
response_model=TimeSlotEditResponse,
|
||||
summary="Agent materializes and updates a virtual slot status",
|
||||
)
|
||||
def agent_update_virtual_slot(
|
||||
virtual_id: str,
|
||||
payload: SlotAgentUpdate,
|
||||
x_agent_id: str = Header(..., alias="X-Agent-ID"),
|
||||
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
agent = _require_agent(db, x_agent_id, x_claw_identifier)
|
||||
slot = materialize_from_virtual_id(db, virtual_id)
|
||||
if slot.user_id != agent.user_id:
|
||||
db.rollback()
|
||||
raise HTTPException(status_code=404, detail="Slot not found")
|
||||
_apply_agent_slot_update(slot, payload)
|
||||
db.commit()
|
||||
db.refresh(slot)
|
||||
return TimeSlotEditResponse(slot=_slot_to_response(slot), warnings=[])
|
||||
|
||||
|
||||
@router.post(
|
||||
"/agent/status",
|
||||
summary="Update agent runtime status from plugin",
|
||||
)
|
||||
def update_agent_status(
|
||||
payload: AgentStatusUpdateRequest,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
agent = _require_agent(db, payload.agent_id, payload.claw_identifier)
|
||||
target = (payload.status or '').lower().strip()
|
||||
if target == AgentStatus.IDLE.value:
|
||||
transition_to_idle(db, agent)
|
||||
elif target == AgentStatus.BUSY.value:
|
||||
transition_to_busy(db, agent, slot_type=SlotTypeEnum.WORK)
|
||||
elif target == AgentStatus.ON_CALL.value:
|
||||
transition_to_busy(db, agent, slot_type=SlotTypeEnum.ON_CALL)
|
||||
elif target == AgentStatus.OFFLINE.value:
|
||||
transition_to_offline(db, agent)
|
||||
elif target == AgentStatus.EXHAUSTED.value:
|
||||
reason = ExhaustReason.BILLING if payload.exhaust_reason == 'billing' else ExhaustReason.RATE_LIMIT
|
||||
transition_to_exhausted(db, agent, reason=reason, recovery_at=payload.recovery_at)
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail="Unsupported agent status")
|
||||
db.commit()
|
||||
return {"ok": True, "agent_id": agent.agent_id, "status": agent.status.value if hasattr(agent.status, 'value') else str(agent.status)}
|
||||
|
||||
|
||||
@router.get(
|
||||
"/day",
|
||||
response_model=CalendarDayResponse,
|
||||
|
||||
@@ -407,3 +407,30 @@ class DateListResponse(BaseModel):
|
||||
default_factory=list,
|
||||
description="Sorted list of future dates with materialized slots",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent heartbeat / agent-driven slot updates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class AgentHeartbeatResponse(BaseModel):
|
||||
"""Slots that are due for a specific agent plus its current runtime status."""
|
||||
slots: list[CalendarSlotItem] = Field(default_factory=list)
|
||||
agent_status: str
|
||||
message: Optional[str] = None
|
||||
|
||||
|
||||
class SlotAgentUpdate(BaseModel):
|
||||
"""Plugin-driven slot status update payload."""
|
||||
status: SlotStatusEnum
|
||||
started_at: Optional[dt_time] = None
|
||||
actual_duration: Optional[int] = Field(None, ge=0, le=65535)
|
||||
|
||||
|
||||
class AgentStatusUpdateRequest(BaseModel):
|
||||
"""Plugin-driven agent status report."""
|
||||
agent_id: str
|
||||
claw_identifier: str
|
||||
status: str
|
||||
recovery_at: Optional[dt_datetime] = None
|
||||
exhaust_reason: Optional[str] = None
|
||||
|
||||
Reference in New Issue
Block a user