"""Agent status transitions — BE-AGT-002. Implements the state machine for Agent runtime status: Idle ──→ Busy (woken by a Work slot) Idle ──→ OnCall (woken by an OnCall slot) Busy ──→ Idle (task finished / no more pending slots) OnCall──→ Idle (task finished / no more pending slots) * ──→ Offline (heartbeat timeout — no heartbeat for > 2 min) * ──→ Exhausted (API quota / rate-limit error) Exhausted → Idle (recovery_at reached) Design reference: NEXT_WAVE_DEV_DIRECTION.md §6.4 (Status transitions) """ from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy.orm import Session from app.models.agent import Agent, AgentStatus, ExhaustReason from app.models.calendar import SlotType # Heartbeat timeout threshold in seconds (2 minutes per spec §6.4) HEARTBEAT_TIMEOUT_SECONDS = 120 # Default recovery duration when we can't parse a retry-after header DEFAULT_RECOVERY_HOURS = 5 # --------------------------------------------------------------------------- # Transition helpers # --------------------------------------------------------------------------- class AgentStatusError(Exception): """Raised when a requested status transition is invalid.""" def _assert_current(agent: Agent, *expected: AgentStatus) -> None: """Raise if the agent is not in one of the expected statuses.""" if agent.status not in expected: allowed = ", ".join(s.value for s in expected) raise AgentStatusError( f"Agent '{agent.agent_id}' is {agent.status.value}; " f"expected one of [{allowed}]" ) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def transition_to_busy( db: Session, agent: Agent, *, slot_type: SlotType, now: datetime | None = None, ) -> Agent: """Idle → Busy or OnCall depending on *slot_type*. Parameters ---------- slot_type : SlotType The type of the slot that triggered the wakeup. ``SlotType.ON_CALL`` → ``AgentStatus.ON_CALL``, everything else → ``AgentStatus.BUSY``. """ _assert_current(agent, AgentStatus.IDLE) if slot_type == SlotType.ON_CALL: agent.status = AgentStatus.ON_CALL else: agent.status = AgentStatus.BUSY if now is None: now = datetime.now(timezone.utc) agent.last_heartbeat = now db.flush() return agent def transition_to_idle( db: Session, agent: Agent, *, now: datetime | None = None, ) -> Agent: """Busy / OnCall / Exhausted (recovered) → Idle. For Exhausted agents this should only be called when ``recovery_at`` has been reached; the caller is responsible for checking that. """ _assert_current( agent, AgentStatus.BUSY, AgentStatus.ON_CALL, AgentStatus.EXHAUSTED, AgentStatus.OFFLINE, ) agent.status = AgentStatus.IDLE # Clear exhausted metadata if transitioning out of Exhausted agent.exhausted_at = None agent.recovery_at = None agent.exhaust_reason = None if now is None: now = datetime.now(timezone.utc) agent.last_heartbeat = now db.flush() return agent def transition_to_offline( db: Session, agent: Agent, ) -> Agent: """Any status → Offline (heartbeat timeout). Typically called by a background check that detects ``last_heartbeat`` is older than ``HEARTBEAT_TIMEOUT_SECONDS``. """ # Already offline — no-op if agent.status == AgentStatus.OFFLINE: return agent agent.status = AgentStatus.OFFLINE db.flush() return agent def transition_to_exhausted( db: Session, agent: Agent, *, reason: ExhaustReason, recovery_at: datetime | None = None, now: datetime | None = None, ) -> Agent: """Any active status → Exhausted (API quota error). Parameters ---------- reason : ExhaustReason ``RATE_LIMIT`` or ``BILLING``. recovery_at : datetime, optional Parsed from retry-after / reset headers. If *None*, defaults to ``now + DEFAULT_RECOVERY_HOURS``. """ if now is None: now = datetime.now(timezone.utc) agent.status = AgentStatus.EXHAUSTED agent.exhausted_at = now agent.exhaust_reason = reason if recovery_at is not None: agent.recovery_at = recovery_at else: agent.recovery_at = now + timedelta(hours=DEFAULT_RECOVERY_HOURS) db.flush() return agent # --------------------------------------------------------------------------- # Heartbeat-driven checks # --------------------------------------------------------------------------- def check_heartbeat_timeout( db: Session, agent: Agent, *, now: datetime | None = None, ) -> bool: """Mark agent Offline if heartbeat has timed out. Returns ``True`` if the agent was transitioned to Offline. """ if agent.status == AgentStatus.OFFLINE: return False if now is None: now = datetime.now(timezone.utc) if agent.last_heartbeat is None: # Never sent a heartbeat — treat as offline transition_to_offline(db, agent) return True elapsed = (now - agent.last_heartbeat).total_seconds() if elapsed > HEARTBEAT_TIMEOUT_SECONDS: transition_to_offline(db, agent) return True return False def check_exhausted_recovery( db: Session, agent: Agent, *, now: datetime | None = None, ) -> bool: """Recover an Exhausted agent if ``recovery_at`` has been reached. Returns ``True`` if the agent was transitioned back to Idle. """ if agent.status != AgentStatus.EXHAUSTED: return False if now is None: now = datetime.now(timezone.utc) if agent.recovery_at is not None and now >= agent.recovery_at: transition_to_idle(db, agent, now=now) return True return False def record_heartbeat( db: Session, agent: Agent, *, now: datetime | None = None, ) -> Agent: """Update ``last_heartbeat`` timestamp. If the agent was Offline and a heartbeat arrives, transition back to Idle (the agent has come back online). """ if now is None: now = datetime.now(timezone.utc) agent.last_heartbeat = now if agent.status == AgentStatus.OFFLINE: agent.status = AgentStatus.IDLE # Clear any stale exhausted metadata agent.exhausted_at = None agent.recovery_at = None agent.exhaust_reason = None db.flush() return agent