diff --git a/app/services/agent_status.py b/app/services/agent_status.py index 80e590f..b579750 100644 --- a/app/services/agent_status.py +++ b/app/services/agent_status.py @@ -16,7 +16,9 @@ Design reference: NEXT_WAVE_DEV_DIRECTION.md §6.4 (Status transitions) from __future__ import annotations from datetime import datetime, timedelta, timezone -from typing import Optional +from email.utils import parsedate_to_datetime +import re +from typing import Mapping, Optional from sqlalchemy.orm import Session @@ -29,6 +31,20 @@ HEARTBEAT_TIMEOUT_SECONDS = 120 # Default recovery duration when we can't parse a retry-after header DEFAULT_RECOVERY_HOURS = 5 +# Fallback wording patterns commonly emitted by model providers / gateways. +_RESET_IN_PATTERN = re.compile( + r"(?:reset(?:s)?|retry)(?:\s+again)?\s+(?:in|after)\s+(?P\d+)\s*(?Pseconds?|secs?|s|minutes?|mins?|m|hours?|hrs?|h)", + re.IGNORECASE, +) +_RESET_AT_ISO_PATTERN = re.compile( + r"resets?\s+at\s+(?P\d{4}-\d{2}-\d{2}[tT ][^\s,;]+(?:Z|[+-]\d{2}:?\d{2})?)", + re.IGNORECASE, +) +_RESET_AT_GENERIC_PATTERN = re.compile( + r"resets?\s+at\s+(?P[^\n]+?)(?:[.,;]|$)", + re.IGNORECASE, +) + # --------------------------------------------------------------------------- # Transition helpers @@ -48,6 +64,90 @@ def _assert_current(agent: Agent, *expected: AgentStatus) -> None: ) +def _to_utc(dt: datetime) -> datetime: + """Normalize aware / naive datetimes to UTC-aware timestamps.""" + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _duration_from_match(value: str, unit: str) -> timedelta: + """Convert a parsed numeric duration to ``timedelta``.""" + amount = int(value) + unit_normalized = unit.lower() + + if unit_normalized.startswith(("second", "sec")) or unit_normalized == "s": + return timedelta(seconds=amount) + if unit_normalized.startswith(("minute", "min")) or unit_normalized == "m": + return timedelta(minutes=amount) + if unit_normalized.startswith(("hour", "hr")) or unit_normalized == "h": + return timedelta(hours=amount) + + raise ValueError(f"Unsupported duration unit: {unit}") + + +def parse_exhausted_recovery_at( + *, + now: datetime | None = None, + headers: Mapping[str, str] | None = None, + message: str | None = None, +) -> datetime: + """Infer the next recovery time for an exhausted agent. + + Parsing order follows the design intent in NEXT_WAVE_DEV_DIRECTION.md §6.5: + + 1. ``Retry-After`` response header + - integer seconds + - HTTP-date + 2. Error text like ``reset in 12 mins`` / ``retry after 30 seconds`` + 3. Error text like ``resets at 2026-04-01T10:00:00Z`` + 4. Fallback to ``now + DEFAULT_RECOVERY_HOURS`` + """ + if now is None: + now = datetime.now(timezone.utc) + now = _to_utc(now) + + normalized_headers = {k.lower(): v for k, v in (headers or {}).items()} + retry_after = normalized_headers.get("retry-after") + if retry_after: + retry_after = retry_after.strip() + if retry_after.isdigit(): + return now + timedelta(seconds=int(retry_after)) + try: + return _to_utc(parsedate_to_datetime(retry_after)) + except (TypeError, ValueError, IndexError, OverflowError): + pass + + if message: + duration_match = _RESET_IN_PATTERN.search(message) + if duration_match: + return now + _duration_from_match( + duration_match.group("value"), + duration_match.group("unit"), + ) + + iso_match = _RESET_AT_ISO_PATTERN.search(message) + if iso_match: + ts = iso_match.group("ts") + normalized_ts = ts.replace(" ", "T") + if normalized_ts.endswith("Z"): + normalized_ts = normalized_ts[:-1] + "+00:00" + try: + return _to_utc(datetime.fromisoformat(normalized_ts)) + except ValueError: + pass + + generic_match = _RESET_AT_GENERIC_PATTERN.search(message) + if generic_match: + ts = generic_match.group("ts").strip() + try: + return _to_utc(parsedate_to_datetime(ts)) + except (TypeError, ValueError, IndexError, OverflowError): + pass + + return now + timedelta(hours=DEFAULT_RECOVERY_HOURS) + + # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- @@ -141,6 +241,8 @@ def transition_to_exhausted( *, reason: ExhaustReason, recovery_at: datetime | None = None, + headers: Mapping[str, str] | None = None, + message: str | None = None, now: datetime | None = None, ) -> Agent: """Any active status → Exhausted (API quota error). @@ -150,20 +252,30 @@ def transition_to_exhausted( reason : ExhaustReason ``RATE_LIMIT`` or ``BILLING``. recovery_at : datetime, optional - Parsed from retry-after / reset headers. If *None*, defaults to - ``now + DEFAULT_RECOVERY_HOURS``. + Explicit recovery timestamp. If omitted, attempts to parse from + ``headers`` / ``message``; falls back to ``now + DEFAULT_RECOVERY_HOURS``. + headers : Mapping[str, str], optional + Response headers that may contain ``Retry-After``. + message : str, optional + Error text that may contain ``reset in`` / ``retry after`` / + ``resets at`` hints. """ if now is None: now = datetime.now(timezone.utc) + now = _to_utc(now) agent.status = AgentStatus.EXHAUSTED agent.exhausted_at = now agent.exhaust_reason = reason if recovery_at is not None: - agent.recovery_at = recovery_at + agent.recovery_at = _to_utc(recovery_at) else: - agent.recovery_at = now + timedelta(hours=DEFAULT_RECOVERY_HOURS) + agent.recovery_at = parse_exhausted_recovery_at( + now=now, + headers=headers, + message=message, + ) db.flush() return agent diff --git a/tests/test_agent_status.py b/tests/test_agent_status.py index eeef32d..995cdd3 100644 --- a/tests/test_agent_status.py +++ b/tests/test_agent_status.py @@ -18,6 +18,7 @@ from app.services.agent_status import ( AgentStatusError, HEARTBEAT_TIMEOUT_SECONDS, DEFAULT_RECOVERY_HOURS, + parse_exhausted_recovery_at, transition_to_busy, transition_to_idle, transition_to_offline, @@ -170,6 +171,55 @@ class TestTransitionToOffline: assert result.status == AgentStatus.OFFLINE +# --------------------------------------------------------------------------- +# Recovery time parsing +# --------------------------------------------------------------------------- + +class TestParseExhaustedRecoveryAt: + def test_parses_retry_after_seconds_header(self): + recovery = parse_exhausted_recovery_at( + now=NOW, + headers={"Retry-After": "120"}, + ) + assert recovery == NOW + timedelta(seconds=120) + + def test_parses_retry_after_http_date_header(self): + recovery = parse_exhausted_recovery_at( + now=NOW, + headers={"Retry-After": "Wed, 01 Apr 2026 12:05:00 GMT"}, + ) + assert recovery == datetime(2026, 4, 1, 12, 5, 0, tzinfo=timezone.utc) + + def test_parses_reset_in_minutes_from_message(self): + recovery = parse_exhausted_recovery_at( + now=NOW, + message="rate limit exceeded, reset in 7 mins", + ) + assert recovery == NOW + timedelta(minutes=7) + + def test_parses_retry_after_seconds_from_message(self): + recovery = parse_exhausted_recovery_at( + now=NOW, + message="429 too many requests; retry after 45 seconds", + ) + assert recovery == NOW + timedelta(seconds=45) + + def test_parses_resets_at_iso_timestamp_from_message(self): + recovery = parse_exhausted_recovery_at( + now=NOW, + message="quota exhausted, resets at 2026-04-01T14:30:00Z", + ) + assert recovery == datetime(2026, 4, 1, 14, 30, 0, tzinfo=timezone.utc) + + def test_falls_back_to_default_when_unparseable(self): + recovery = parse_exhausted_recovery_at( + now=NOW, + headers={"Retry-After": "not-a-date"}, + message="please try later maybe soon", + ) + assert recovery == NOW + timedelta(hours=DEFAULT_RECOVERY_HOURS) + + # --------------------------------------------------------------------------- # * → Exhausted (API quota) # --------------------------------------------------------------------------- @@ -210,6 +260,28 @@ class TestTransitionToExhausted: ) assert result.status == AgentStatus.EXHAUSTED + def test_parses_recovery_from_headers_when_timestamp_not_explicitly_provided(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY) + result = transition_to_exhausted( + db, + agent, + reason=ExhaustReason.RATE_LIMIT, + headers={"Retry-After": "90"}, + now=NOW, + ) + assert result.recovery_at == NOW + timedelta(seconds=90) + + def test_parses_recovery_from_message_when_timestamp_not_explicitly_provided(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY) + result = transition_to_exhausted( + db, + agent, + reason=ExhaustReason.BILLING, + message="billing quota exhausted, resets at 2026-04-01T15:00:00Z", + now=NOW, + ) + assert result.recovery_at == datetime(2026, 4, 1, 15, 0, 0, tzinfo=timezone.utc) + # --------------------------------------------------------------------------- # Heartbeat timeout check