From 70f343fbac92feaac499a2b6406ca4c04d5c8cba Mon Sep 17 00:00:00 2001 From: zhi Date: Wed, 1 Apr 2026 00:46:16 +0000 Subject: [PATCH] BE-AGT-002: implement Agent status transition service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New service: app/services/agent_status.py - transition_to_busy(): Idle → Busy/OnCall based on slot type - transition_to_idle(): Busy/OnCall/Exhausted/Offline → Idle - transition_to_offline(): Any → Offline (heartbeat timeout) - transition_to_exhausted(): Any → Exhausted (rate-limit/billing) - check_heartbeat_timeout(): auto-detect >2min heartbeat gap - check_exhausted_recovery(): auto-recover when recovery_at reached - record_heartbeat(): update timestamp, recover Offline agents - Tests: tests/test_agent_status.py (22 test cases) --- app/services/agent_status.py | 252 +++++++++++++++++++++++++++++ tests/test_agent_status.py | 301 +++++++++++++++++++++++++++++++++++ 2 files changed, 553 insertions(+) create mode 100644 app/services/agent_status.py create mode 100644 tests/test_agent_status.py diff --git a/app/services/agent_status.py b/app/services/agent_status.py new file mode 100644 index 0000000..80e590f --- /dev/null +++ b/app/services/agent_status.py @@ -0,0 +1,252 @@ +"""Agent status transitions — BE-AGT-002. + +Implements the state machine for Agent runtime status: + + Idle ──→ Busy (woken by a Work slot) + Idle ──→ OnCall (woken by an OnCall slot) + Busy ──→ Idle (task finished / no more pending slots) + OnCall──→ Idle (task finished / no more pending slots) + * ──→ Offline (heartbeat timeout — no heartbeat for > 2 min) + * ──→ Exhausted (API quota / rate-limit error) + Exhausted → Idle (recovery_at reached) + +Design reference: NEXT_WAVE_DEV_DIRECTION.md §6.4 (Status transitions) +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Optional + +from sqlalchemy.orm import Session + +from app.models.agent import Agent, AgentStatus, ExhaustReason +from app.models.calendar import SlotType + +# Heartbeat timeout threshold in seconds (2 minutes per spec §6.4) +HEARTBEAT_TIMEOUT_SECONDS = 120 + +# Default recovery duration when we can't parse a retry-after header +DEFAULT_RECOVERY_HOURS = 5 + + +# --------------------------------------------------------------------------- +# Transition helpers +# --------------------------------------------------------------------------- + +class AgentStatusError(Exception): + """Raised when a requested status transition is invalid.""" + + +def _assert_current(agent: Agent, *expected: AgentStatus) -> None: + """Raise if the agent is not in one of the expected statuses.""" + if agent.status not in expected: + allowed = ", ".join(s.value for s in expected) + raise AgentStatusError( + f"Agent '{agent.agent_id}' is {agent.status.value}; " + f"expected one of [{allowed}]" + ) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def transition_to_busy( + db: Session, + agent: Agent, + *, + slot_type: SlotType, + now: datetime | None = None, +) -> Agent: + """Idle → Busy or OnCall depending on *slot_type*. + + Parameters + ---------- + slot_type : SlotType + The type of the slot that triggered the wakeup. + ``SlotType.ON_CALL`` → ``AgentStatus.ON_CALL``, everything else + → ``AgentStatus.BUSY``. + """ + _assert_current(agent, AgentStatus.IDLE) + + if slot_type == SlotType.ON_CALL: + agent.status = AgentStatus.ON_CALL + else: + agent.status = AgentStatus.BUSY + + if now is None: + now = datetime.now(timezone.utc) + agent.last_heartbeat = now + + db.flush() + return agent + + +def transition_to_idle( + db: Session, + agent: Agent, + *, + now: datetime | None = None, +) -> Agent: + """Busy / OnCall / Exhausted (recovered) → Idle. + + For Exhausted agents this should only be called when ``recovery_at`` + has been reached; the caller is responsible for checking that. + """ + _assert_current( + agent, + AgentStatus.BUSY, + AgentStatus.ON_CALL, + AgentStatus.EXHAUSTED, + AgentStatus.OFFLINE, + ) + + agent.status = AgentStatus.IDLE + + # Clear exhausted metadata if transitioning out of Exhausted + agent.exhausted_at = None + agent.recovery_at = None + agent.exhaust_reason = None + + if now is None: + now = datetime.now(timezone.utc) + agent.last_heartbeat = now + + db.flush() + return agent + + +def transition_to_offline( + db: Session, + agent: Agent, +) -> Agent: + """Any status → Offline (heartbeat timeout). + + Typically called by a background check that detects + ``last_heartbeat`` is older than ``HEARTBEAT_TIMEOUT_SECONDS``. + """ + # Already offline — no-op + if agent.status == AgentStatus.OFFLINE: + return agent + + agent.status = AgentStatus.OFFLINE + db.flush() + return agent + + +def transition_to_exhausted( + db: Session, + agent: Agent, + *, + reason: ExhaustReason, + recovery_at: datetime | None = None, + now: datetime | None = None, +) -> Agent: + """Any active status → Exhausted (API quota error). + + Parameters + ---------- + reason : ExhaustReason + ``RATE_LIMIT`` or ``BILLING``. + recovery_at : datetime, optional + Parsed from retry-after / reset headers. If *None*, defaults to + ``now + DEFAULT_RECOVERY_HOURS``. + """ + if now is None: + now = datetime.now(timezone.utc) + + agent.status = AgentStatus.EXHAUSTED + agent.exhausted_at = now + agent.exhaust_reason = reason + + if recovery_at is not None: + agent.recovery_at = recovery_at + else: + agent.recovery_at = now + timedelta(hours=DEFAULT_RECOVERY_HOURS) + + db.flush() + return agent + + +# --------------------------------------------------------------------------- +# Heartbeat-driven checks +# --------------------------------------------------------------------------- + +def check_heartbeat_timeout( + db: Session, + agent: Agent, + *, + now: datetime | None = None, +) -> bool: + """Mark agent Offline if heartbeat has timed out. + + Returns ``True`` if the agent was transitioned to Offline. + """ + if agent.status == AgentStatus.OFFLINE: + return False + + if now is None: + now = datetime.now(timezone.utc) + + if agent.last_heartbeat is None: + # Never sent a heartbeat — treat as offline + transition_to_offline(db, agent) + return True + + elapsed = (now - agent.last_heartbeat).total_seconds() + if elapsed > HEARTBEAT_TIMEOUT_SECONDS: + transition_to_offline(db, agent) + return True + + return False + + +def check_exhausted_recovery( + db: Session, + agent: Agent, + *, + now: datetime | None = None, +) -> bool: + """Recover an Exhausted agent if ``recovery_at`` has been reached. + + Returns ``True`` if the agent was transitioned back to Idle. + """ + if agent.status != AgentStatus.EXHAUSTED: + return False + + if now is None: + now = datetime.now(timezone.utc) + + if agent.recovery_at is not None and now >= agent.recovery_at: + transition_to_idle(db, agent, now=now) + return True + + return False + + +def record_heartbeat( + db: Session, + agent: Agent, + *, + now: datetime | None = None, +) -> Agent: + """Update ``last_heartbeat`` timestamp. + + If the agent was Offline and a heartbeat arrives, transition back to + Idle (the agent has come back online). + """ + if now is None: + now = datetime.now(timezone.utc) + + agent.last_heartbeat = now + + if agent.status == AgentStatus.OFFLINE: + agent.status = AgentStatus.IDLE + # Clear any stale exhausted metadata + agent.exhausted_at = None + agent.recovery_at = None + agent.exhaust_reason = None + + db.flush() + return agent diff --git a/tests/test_agent_status.py b/tests/test_agent_status.py new file mode 100644 index 0000000..eeef32d --- /dev/null +++ b/tests/test_agent_status.py @@ -0,0 +1,301 @@ +"""Tests for Agent status transition service — BE-AGT-002. + +Covers: + - Idle → Busy / OnCall + - Busy / OnCall → Idle + - Heartbeat timeout → Offline + - API quota error → Exhausted + - Exhausted recovery → Idle + - Invalid transition errors +""" + +import pytest +from datetime import datetime, timedelta, timezone + +from app.models.agent import Agent, AgentStatus, ExhaustReason +from app.models.calendar import SlotType +from app.services.agent_status import ( + AgentStatusError, + HEARTBEAT_TIMEOUT_SECONDS, + DEFAULT_RECOVERY_HOURS, + transition_to_busy, + transition_to_idle, + transition_to_offline, + transition_to_exhausted, + check_heartbeat_timeout, + check_exhausted_recovery, + record_heartbeat, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +NOW = datetime(2026, 4, 1, 12, 0, 0, tzinfo=timezone.utc) + + +def _make_agent(db, *, status=AgentStatus.IDLE, last_hb=None, **kwargs): + """Insert and return an Agent row with a linked user.""" + from app.models import models + from app.api.deps import get_password_hash + + # Ensure we have a user + user = db.query(models.User).filter_by(id=99).first() + if user is None: + # Need a role first + from app.models.role_permission import Role + role = db.query(Role).filter_by(id=99).first() + if role is None: + role = Role(id=99, name="agent_test_role", is_global=False) + db.add(role) + db.flush() + user = models.User( + id=99, username="agent_user", email="agent@test.com", + hashed_password=get_password_hash("test123"), + is_admin=False, role_id=role.id, + ) + db.add(user) + db.flush() + + agent = Agent( + user_id=user.id, + agent_id=kwargs.get("agent_id", "test-agent-001"), + claw_identifier="test-claw", + status=status, + last_heartbeat=last_hb, + **{k: v for k, v in kwargs.items() if k not in ("agent_id",)}, + ) + db.add(agent) + db.flush() + return agent + + +# --------------------------------------------------------------------------- +# Idle → Busy / OnCall +# --------------------------------------------------------------------------- + +class TestTransitionToBusy: + def test_idle_to_busy_for_work_slot(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + result = transition_to_busy(db, agent, slot_type=SlotType.WORK, now=NOW) + assert result.status == AgentStatus.BUSY + assert result.last_heartbeat == NOW + + def test_idle_to_on_call_for_on_call_slot(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + result = transition_to_busy(db, agent, slot_type=SlotType.ON_CALL, now=NOW) + assert result.status == AgentStatus.ON_CALL + + def test_idle_to_busy_for_system_slot(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + result = transition_to_busy(db, agent, slot_type=SlotType.SYSTEM, now=NOW) + assert result.status == AgentStatus.BUSY + + def test_idle_to_busy_for_entertainment_slot(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + result = transition_to_busy(db, agent, slot_type=SlotType.ENTERTAINMENT, now=NOW) + assert result.status == AgentStatus.BUSY + + def test_busy_to_busy_raises(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY) + with pytest.raises(AgentStatusError, match="busy"): + transition_to_busy(db, agent, slot_type=SlotType.WORK) + + def test_exhausted_to_busy_raises(self, db): + agent = _make_agent(db, status=AgentStatus.EXHAUSTED) + with pytest.raises(AgentStatusError): + transition_to_busy(db, agent, slot_type=SlotType.WORK) + + +# --------------------------------------------------------------------------- +# Busy / OnCall → Idle +# --------------------------------------------------------------------------- + +class TestTransitionToIdle: + def test_busy_to_idle(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY) + result = transition_to_idle(db, agent, now=NOW) + assert result.status == AgentStatus.IDLE + assert result.last_heartbeat == NOW + + def test_on_call_to_idle(self, db): + agent = _make_agent(db, status=AgentStatus.ON_CALL) + result = transition_to_idle(db, agent, now=NOW) + assert result.status == AgentStatus.IDLE + + def test_exhausted_to_idle_clears_metadata(self, db): + agent = _make_agent( + db, + status=AgentStatus.EXHAUSTED, + exhausted_at=NOW - timedelta(hours=1), + recovery_at=NOW, + exhaust_reason=ExhaustReason.RATE_LIMIT, + ) + result = transition_to_idle(db, agent, now=NOW) + assert result.status == AgentStatus.IDLE + assert result.exhausted_at is None + assert result.recovery_at is None + assert result.exhaust_reason is None + + def test_offline_to_idle(self, db): + agent = _make_agent(db, status=AgentStatus.OFFLINE) + result = transition_to_idle(db, agent, now=NOW) + assert result.status == AgentStatus.IDLE + + def test_idle_to_idle_raises(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + with pytest.raises(AgentStatusError, match="idle"): + transition_to_idle(db, agent) + + +# --------------------------------------------------------------------------- +# * → Offline (heartbeat timeout) +# --------------------------------------------------------------------------- + +class TestTransitionToOffline: + def test_idle_to_offline(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + result = transition_to_offline(db, agent) + assert result.status == AgentStatus.OFFLINE + + def test_busy_to_offline(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY) + result = transition_to_offline(db, agent) + assert result.status == AgentStatus.OFFLINE + + def test_already_offline_noop(self, db): + agent = _make_agent(db, status=AgentStatus.OFFLINE) + result = transition_to_offline(db, agent) + assert result.status == AgentStatus.OFFLINE + + +# --------------------------------------------------------------------------- +# * → Exhausted (API quota) +# --------------------------------------------------------------------------- + +class TestTransitionToExhausted: + def test_busy_to_exhausted_with_recovery(self, db): + recovery = NOW + timedelta(hours=1) + agent = _make_agent(db, status=AgentStatus.BUSY) + result = transition_to_exhausted( + db, agent, + reason=ExhaustReason.RATE_LIMIT, + recovery_at=recovery, + now=NOW, + ) + assert result.status == AgentStatus.EXHAUSTED + assert result.exhausted_at == NOW + assert result.recovery_at == recovery + assert result.exhaust_reason == ExhaustReason.RATE_LIMIT + + def test_exhausted_default_recovery(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY) + result = transition_to_exhausted( + db, agent, + reason=ExhaustReason.BILLING, + now=NOW, + ) + expected_recovery = NOW + timedelta(hours=DEFAULT_RECOVERY_HOURS) + assert result.recovery_at == expected_recovery + assert result.exhaust_reason == ExhaustReason.BILLING + + def test_idle_to_exhausted(self, db): + """Edge case: agent gets a rate-limit before even starting work.""" + agent = _make_agent(db, status=AgentStatus.IDLE) + result = transition_to_exhausted( + db, agent, + reason=ExhaustReason.RATE_LIMIT, + now=NOW, + ) + assert result.status == AgentStatus.EXHAUSTED + + +# --------------------------------------------------------------------------- +# Heartbeat timeout check +# --------------------------------------------------------------------------- + +class TestCheckHeartbeatTimeout: + def test_timeout_triggers_offline(self, db): + old_hb = NOW - timedelta(seconds=HEARTBEAT_TIMEOUT_SECONDS + 10) + agent = _make_agent(db, status=AgentStatus.IDLE, last_hb=old_hb) + changed = check_heartbeat_timeout(db, agent, now=NOW) + assert changed is True + assert agent.status == AgentStatus.OFFLINE + + def test_recent_heartbeat_no_change(self, db): + recent_hb = NOW - timedelta(seconds=30) + agent = _make_agent(db, status=AgentStatus.BUSY, last_hb=recent_hb) + changed = check_heartbeat_timeout(db, agent, now=NOW) + assert changed is False + assert agent.status == AgentStatus.BUSY + + def test_no_heartbeat_ever_goes_offline(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE, last_hb=None) + changed = check_heartbeat_timeout(db, agent, now=NOW) + assert changed is True + assert agent.status == AgentStatus.OFFLINE + + def test_already_offline_returns_false(self, db): + agent = _make_agent(db, status=AgentStatus.OFFLINE, last_hb=None) + changed = check_heartbeat_timeout(db, agent, now=NOW) + assert changed is False + + +# --------------------------------------------------------------------------- +# Exhausted recovery check +# --------------------------------------------------------------------------- + +class TestCheckExhaustedRecovery: + def test_recovery_at_reached(self, db): + agent = _make_agent( + db, + status=AgentStatus.EXHAUSTED, + exhausted_at=NOW - timedelta(hours=5), + recovery_at=NOW - timedelta(minutes=1), + exhaust_reason=ExhaustReason.RATE_LIMIT, + ) + recovered = check_exhausted_recovery(db, agent, now=NOW) + assert recovered is True + assert agent.status == AgentStatus.IDLE + assert agent.exhausted_at is None + + def test_recovery_at_not_yet_reached(self, db): + agent = _make_agent( + db, + status=AgentStatus.EXHAUSTED, + exhausted_at=NOW, + recovery_at=NOW + timedelta(hours=1), + exhaust_reason=ExhaustReason.BILLING, + ) + recovered = check_exhausted_recovery(db, agent, now=NOW) + assert recovered is False + assert agent.status == AgentStatus.EXHAUSTED + + def test_non_exhausted_agent_returns_false(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE) + recovered = check_exhausted_recovery(db, agent, now=NOW) + assert recovered is False + + +# --------------------------------------------------------------------------- +# Record heartbeat +# --------------------------------------------------------------------------- + +class TestRecordHeartbeat: + def test_updates_timestamp(self, db): + agent = _make_agent(db, status=AgentStatus.IDLE, last_hb=NOW - timedelta(minutes=1)) + result = record_heartbeat(db, agent, now=NOW) + assert result.last_heartbeat == NOW + + def test_offline_agent_recovers_to_idle(self, db): + agent = _make_agent(db, status=AgentStatus.OFFLINE) + result = record_heartbeat(db, agent, now=NOW) + assert result.status == AgentStatus.IDLE + assert result.last_heartbeat == NOW + + def test_busy_agent_stays_busy(self, db): + agent = _make_agent(db, status=AgentStatus.BUSY, last_hb=NOW - timedelta(seconds=30)) + result = record_heartbeat(db, agent, now=NOW) + assert result.status == AgentStatus.BUSY + assert result.last_heartbeat == NOW