"""Agent status transitions — BE-AGT-002. Implements the state machine for Agent runtime status: Idle ──→ Busy (woken by a Work slot) Idle ──→ OnCall (woken by an OnCall slot) Busy ──→ Idle (task finished / no more pending slots) OnCall──→ Idle (task finished / no more pending slots) * ──→ Offline (heartbeat timeout — no heartbeat for > 2 min) * ──→ Exhausted (API quota / rate-limit error) Exhausted → Idle (recovery_at reached) Design reference: NEXT_WAVE_DEV_DIRECTION.md §6.4 (Status transitions) """ from __future__ import annotations from datetime import datetime, timedelta, timezone from email.utils import parsedate_to_datetime import re from typing import Mapping, Optional from sqlalchemy.orm import Session from app.models.agent import Agent, AgentStatus, ExhaustReason from app.models.calendar import SlotType # Heartbeat timeout threshold in seconds (2 minutes per spec §6.4) HEARTBEAT_TIMEOUT_SECONDS = 120 # Default recovery duration when we can't parse a retry-after header DEFAULT_RECOVERY_HOURS = 5 # Fallback wording patterns commonly emitted by model providers / gateways. _RESET_IN_PATTERN = re.compile( r"(?:reset(?:s)?|retry)(?:\s+again)?\s+(?:in|after)\s+(?P\d+)\s*(?Pseconds?|secs?|s|minutes?|mins?|m|hours?|hrs?|h)", re.IGNORECASE, ) _RESET_AT_ISO_PATTERN = re.compile( r"resets?\s+at\s+(?P\d{4}-\d{2}-\d{2}[tT ][^\s,;]+(?:Z|[+-]\d{2}:?\d{2})?)", re.IGNORECASE, ) _RESET_AT_GENERIC_PATTERN = re.compile( r"resets?\s+at\s+(?P[^\n]+?)(?:[.,;]|$)", re.IGNORECASE, ) # --------------------------------------------------------------------------- # Transition helpers # --------------------------------------------------------------------------- class AgentStatusError(Exception): """Raised when a requested status transition is invalid.""" def _assert_current(agent: Agent, *expected: AgentStatus) -> None: """Raise if the agent is not in one of the expected statuses.""" if agent.status not in expected: allowed = ", ".join(s.value for s in expected) raise AgentStatusError( f"Agent '{agent.agent_id}' is {agent.status.value}; " f"expected one of [{allowed}]" ) def _to_utc(dt: datetime) -> datetime: """Normalize aware / naive datetimes to UTC-aware timestamps.""" if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _duration_from_match(value: str, unit: str) -> timedelta: """Convert a parsed numeric duration to ``timedelta``.""" amount = int(value) unit_normalized = unit.lower() if unit_normalized.startswith(("second", "sec")) or unit_normalized == "s": return timedelta(seconds=amount) if unit_normalized.startswith(("minute", "min")) or unit_normalized == "m": return timedelta(minutes=amount) if unit_normalized.startswith(("hour", "hr")) or unit_normalized == "h": return timedelta(hours=amount) raise ValueError(f"Unsupported duration unit: {unit}") def parse_exhausted_recovery_at( *, now: datetime | None = None, headers: Mapping[str, str] | None = None, message: str | None = None, ) -> datetime: """Infer the next recovery time for an exhausted agent. Parsing order follows the design intent in NEXT_WAVE_DEV_DIRECTION.md §6.5: 1. ``Retry-After`` response header - integer seconds - HTTP-date 2. Error text like ``reset in 12 mins`` / ``retry after 30 seconds`` 3. Error text like ``resets at 2026-04-01T10:00:00Z`` 4. Fallback to ``now + DEFAULT_RECOVERY_HOURS`` """ if now is None: now = datetime.now(timezone.utc) now = _to_utc(now) normalized_headers = {k.lower(): v for k, v in (headers or {}).items()} retry_after = normalized_headers.get("retry-after") if retry_after: retry_after = retry_after.strip() if retry_after.isdigit(): return now + timedelta(seconds=int(retry_after)) try: return _to_utc(parsedate_to_datetime(retry_after)) except (TypeError, ValueError, IndexError, OverflowError): pass if message: duration_match = _RESET_IN_PATTERN.search(message) if duration_match: return now + _duration_from_match( duration_match.group("value"), duration_match.group("unit"), ) iso_match = _RESET_AT_ISO_PATTERN.search(message) if iso_match: ts = iso_match.group("ts") normalized_ts = ts.replace(" ", "T") if normalized_ts.endswith("Z"): normalized_ts = normalized_ts[:-1] + "+00:00" try: return _to_utc(datetime.fromisoformat(normalized_ts)) except ValueError: pass generic_match = _RESET_AT_GENERIC_PATTERN.search(message) if generic_match: ts = generic_match.group("ts").strip() try: return _to_utc(parsedate_to_datetime(ts)) except (TypeError, ValueError, IndexError, OverflowError): pass return now + timedelta(hours=DEFAULT_RECOVERY_HOURS) # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def transition_to_busy( db: Session, agent: Agent, *, slot_type: SlotType, now: datetime | None = None, ) -> Agent: """Idle → Busy or OnCall depending on *slot_type*. Parameters ---------- slot_type : SlotType The type of the slot that triggered the wakeup. ``SlotType.ON_CALL`` → ``AgentStatus.ON_CALL``, everything else → ``AgentStatus.BUSY``. """ _assert_current(agent, AgentStatus.IDLE) if slot_type == SlotType.ON_CALL: agent.status = AgentStatus.ON_CALL else: agent.status = AgentStatus.BUSY if now is None: now = datetime.now(timezone.utc) agent.last_heartbeat = now db.flush() return agent def transition_to_idle( db: Session, agent: Agent, *, now: datetime | None = None, ) -> Agent: """Busy / OnCall / Exhausted (recovered) → Idle. For Exhausted agents this should only be called when ``recovery_at`` has been reached; the caller is responsible for checking that. """ _assert_current( agent, AgentStatus.BUSY, AgentStatus.ON_CALL, AgentStatus.EXHAUSTED, AgentStatus.OFFLINE, ) agent.status = AgentStatus.IDLE # Clear exhausted metadata if transitioning out of Exhausted agent.exhausted_at = None agent.recovery_at = None agent.exhaust_reason = None if now is None: now = datetime.now(timezone.utc) agent.last_heartbeat = now db.flush() return agent def transition_to_offline( db: Session, agent: Agent, ) -> Agent: """Any status → Offline (heartbeat timeout). Typically called by a background check that detects ``last_heartbeat`` is older than ``HEARTBEAT_TIMEOUT_SECONDS``. """ # Already offline — no-op if agent.status == AgentStatus.OFFLINE: return agent agent.status = AgentStatus.OFFLINE db.flush() return agent def transition_to_exhausted( db: Session, agent: Agent, *, reason: ExhaustReason, recovery_at: datetime | None = None, headers: Mapping[str, str] | None = None, message: str | None = None, now: datetime | None = None, ) -> Agent: """Any active status → Exhausted (API quota error). Parameters ---------- reason : ExhaustReason ``RATE_LIMIT`` or ``BILLING``. recovery_at : datetime, optional Explicit recovery timestamp. If omitted, attempts to parse from ``headers`` / ``message``; falls back to ``now + DEFAULT_RECOVERY_HOURS``. headers : Mapping[str, str], optional Response headers that may contain ``Retry-After``. message : str, optional Error text that may contain ``reset in`` / ``retry after`` / ``resets at`` hints. """ if now is None: now = datetime.now(timezone.utc) now = _to_utc(now) agent.status = AgentStatus.EXHAUSTED agent.exhausted_at = now agent.exhaust_reason = reason if recovery_at is not None: agent.recovery_at = _to_utc(recovery_at) else: agent.recovery_at = parse_exhausted_recovery_at( now=now, headers=headers, message=message, ) db.flush() return agent # --------------------------------------------------------------------------- # Heartbeat-driven checks # --------------------------------------------------------------------------- def check_heartbeat_timeout( db: Session, agent: Agent, *, now: datetime | None = None, ) -> bool: """Mark agent Offline if heartbeat has timed out. Returns ``True`` if the agent was transitioned to Offline. """ if agent.status == AgentStatus.OFFLINE: return False if now is None: now = datetime.now(timezone.utc) if agent.last_heartbeat is None: # Never sent a heartbeat — treat as offline transition_to_offline(db, agent) return True elapsed = (now - agent.last_heartbeat).total_seconds() if elapsed > HEARTBEAT_TIMEOUT_SECONDS: transition_to_offline(db, agent) return True return False def check_exhausted_recovery( db: Session, agent: Agent, *, now: datetime | None = None, ) -> bool: """Recover an Exhausted agent if ``recovery_at`` has been reached. Returns ``True`` if the agent was transitioned back to Idle. """ if agent.status != AgentStatus.EXHAUSTED: return False if now is None: now = datetime.now(timezone.utc) if agent.recovery_at is not None and now >= agent.recovery_at: transition_to_idle(db, agent, now=now) return True return False def record_heartbeat( db: Session, agent: Agent, *, now: datetime | None = None, ) -> Agent: """Update ``last_heartbeat`` timestamp. If the agent was Offline and a heartbeat arrives, transition back to Idle (the agent has come back online). """ if now is None: now = datetime.now(timezone.utc) agent.last_heartbeat = now if agent.status == AgentStatus.OFFLINE: agent.status = AgentStatus.IDLE # Clear any stale exhausted metadata agent.exhausted_at = None agent.recovery_at = None agent.exhaust_reason = None db.flush() return agent