374 lines
14 KiB
Python
374 lines
14 KiB
Python
"""Tests for Agent status transition service — BE-AGT-002.
|
|
|
|
Covers:
|
|
- Idle → Busy / OnCall
|
|
- Busy / OnCall → Idle
|
|
- Heartbeat timeout → Offline
|
|
- API quota error → Exhausted
|
|
- Exhausted recovery → Idle
|
|
- Invalid transition errors
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from app.models.agent import Agent, AgentStatus, ExhaustReason
|
|
from app.models.calendar import SlotType
|
|
from app.services.agent_status import (
|
|
AgentStatusError,
|
|
HEARTBEAT_TIMEOUT_SECONDS,
|
|
DEFAULT_RECOVERY_HOURS,
|
|
parse_exhausted_recovery_at,
|
|
transition_to_busy,
|
|
transition_to_idle,
|
|
transition_to_offline,
|
|
transition_to_exhausted,
|
|
check_heartbeat_timeout,
|
|
check_exhausted_recovery,
|
|
record_heartbeat,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
NOW = datetime(2026, 4, 1, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
def _make_agent(db, *, status=AgentStatus.IDLE, last_hb=None, **kwargs):
|
|
"""Insert and return an Agent row with a linked user."""
|
|
from app.models import models
|
|
from app.api.deps import get_password_hash
|
|
|
|
# Ensure we have a user
|
|
user = db.query(models.User).filter_by(id=99).first()
|
|
if user is None:
|
|
# Need a role first
|
|
from app.models.role_permission import Role
|
|
role = db.query(Role).filter_by(id=99).first()
|
|
if role is None:
|
|
role = Role(id=99, name="agent_test_role", is_global=False)
|
|
db.add(role)
|
|
db.flush()
|
|
user = models.User(
|
|
id=99, username="agent_user", email="agent@test.com",
|
|
hashed_password=get_password_hash("test123"),
|
|
is_admin=False, role_id=role.id,
|
|
)
|
|
db.add(user)
|
|
db.flush()
|
|
|
|
agent = Agent(
|
|
user_id=user.id,
|
|
agent_id=kwargs.get("agent_id", "test-agent-001"),
|
|
claw_identifier="test-claw",
|
|
status=status,
|
|
last_heartbeat=last_hb,
|
|
**{k: v for k, v in kwargs.items() if k not in ("agent_id",)},
|
|
)
|
|
db.add(agent)
|
|
db.flush()
|
|
return agent
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Idle → Busy / OnCall
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTransitionToBusy:
|
|
def test_idle_to_busy_for_work_slot(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
result = transition_to_busy(db, agent, slot_type=SlotType.WORK, now=NOW)
|
|
assert result.status == AgentStatus.BUSY
|
|
assert result.last_heartbeat == NOW
|
|
|
|
def test_idle_to_on_call_for_on_call_slot(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
result = transition_to_busy(db, agent, slot_type=SlotType.ON_CALL, now=NOW)
|
|
assert result.status == AgentStatus.ON_CALL
|
|
|
|
def test_idle_to_busy_for_system_slot(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
result = transition_to_busy(db, agent, slot_type=SlotType.SYSTEM, now=NOW)
|
|
assert result.status == AgentStatus.BUSY
|
|
|
|
def test_idle_to_busy_for_entertainment_slot(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
result = transition_to_busy(db, agent, slot_type=SlotType.ENTERTAINMENT, now=NOW)
|
|
assert result.status == AgentStatus.BUSY
|
|
|
|
def test_busy_to_busy_raises(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
with pytest.raises(AgentStatusError, match="busy"):
|
|
transition_to_busy(db, agent, slot_type=SlotType.WORK)
|
|
|
|
def test_exhausted_to_busy_raises(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.EXHAUSTED)
|
|
with pytest.raises(AgentStatusError):
|
|
transition_to_busy(db, agent, slot_type=SlotType.WORK)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Busy / OnCall → Idle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTransitionToIdle:
|
|
def test_busy_to_idle(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
result = transition_to_idle(db, agent, now=NOW)
|
|
assert result.status == AgentStatus.IDLE
|
|
assert result.last_heartbeat == NOW
|
|
|
|
def test_on_call_to_idle(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.ON_CALL)
|
|
result = transition_to_idle(db, agent, now=NOW)
|
|
assert result.status == AgentStatus.IDLE
|
|
|
|
def test_exhausted_to_idle_clears_metadata(self, db):
|
|
agent = _make_agent(
|
|
db,
|
|
status=AgentStatus.EXHAUSTED,
|
|
exhausted_at=NOW - timedelta(hours=1),
|
|
recovery_at=NOW,
|
|
exhaust_reason=ExhaustReason.RATE_LIMIT,
|
|
)
|
|
result = transition_to_idle(db, agent, now=NOW)
|
|
assert result.status == AgentStatus.IDLE
|
|
assert result.exhausted_at is None
|
|
assert result.recovery_at is None
|
|
assert result.exhaust_reason is None
|
|
|
|
def test_offline_to_idle(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.OFFLINE)
|
|
result = transition_to_idle(db, agent, now=NOW)
|
|
assert result.status == AgentStatus.IDLE
|
|
|
|
def test_idle_to_idle_raises(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
with pytest.raises(AgentStatusError, match="idle"):
|
|
transition_to_idle(db, agent)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# * → Offline (heartbeat timeout)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTransitionToOffline:
|
|
def test_idle_to_offline(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
result = transition_to_offline(db, agent)
|
|
assert result.status == AgentStatus.OFFLINE
|
|
|
|
def test_busy_to_offline(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
result = transition_to_offline(db, agent)
|
|
assert result.status == AgentStatus.OFFLINE
|
|
|
|
def test_already_offline_noop(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.OFFLINE)
|
|
result = transition_to_offline(db, agent)
|
|
assert result.status == AgentStatus.OFFLINE
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Recovery time parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestParseExhaustedRecoveryAt:
|
|
def test_parses_retry_after_seconds_header(self):
|
|
recovery = parse_exhausted_recovery_at(
|
|
now=NOW,
|
|
headers={"Retry-After": "120"},
|
|
)
|
|
assert recovery == NOW + timedelta(seconds=120)
|
|
|
|
def test_parses_retry_after_http_date_header(self):
|
|
recovery = parse_exhausted_recovery_at(
|
|
now=NOW,
|
|
headers={"Retry-After": "Wed, 01 Apr 2026 12:05:00 GMT"},
|
|
)
|
|
assert recovery == datetime(2026, 4, 1, 12, 5, 0, tzinfo=timezone.utc)
|
|
|
|
def test_parses_reset_in_minutes_from_message(self):
|
|
recovery = parse_exhausted_recovery_at(
|
|
now=NOW,
|
|
message="rate limit exceeded, reset in 7 mins",
|
|
)
|
|
assert recovery == NOW + timedelta(minutes=7)
|
|
|
|
def test_parses_retry_after_seconds_from_message(self):
|
|
recovery = parse_exhausted_recovery_at(
|
|
now=NOW,
|
|
message="429 too many requests; retry after 45 seconds",
|
|
)
|
|
assert recovery == NOW + timedelta(seconds=45)
|
|
|
|
def test_parses_resets_at_iso_timestamp_from_message(self):
|
|
recovery = parse_exhausted_recovery_at(
|
|
now=NOW,
|
|
message="quota exhausted, resets at 2026-04-01T14:30:00Z",
|
|
)
|
|
assert recovery == datetime(2026, 4, 1, 14, 30, 0, tzinfo=timezone.utc)
|
|
|
|
def test_falls_back_to_default_when_unparseable(self):
|
|
recovery = parse_exhausted_recovery_at(
|
|
now=NOW,
|
|
headers={"Retry-After": "not-a-date"},
|
|
message="please try later maybe soon",
|
|
)
|
|
assert recovery == NOW + timedelta(hours=DEFAULT_RECOVERY_HOURS)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# * → Exhausted (API quota)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTransitionToExhausted:
|
|
def test_busy_to_exhausted_with_recovery(self, db):
|
|
recovery = NOW + timedelta(hours=1)
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
result = transition_to_exhausted(
|
|
db, agent,
|
|
reason=ExhaustReason.RATE_LIMIT,
|
|
recovery_at=recovery,
|
|
now=NOW,
|
|
)
|
|
assert result.status == AgentStatus.EXHAUSTED
|
|
assert result.exhausted_at == NOW
|
|
assert result.recovery_at == recovery
|
|
assert result.exhaust_reason == ExhaustReason.RATE_LIMIT
|
|
|
|
def test_exhausted_default_recovery(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
result = transition_to_exhausted(
|
|
db, agent,
|
|
reason=ExhaustReason.BILLING,
|
|
now=NOW,
|
|
)
|
|
expected_recovery = NOW + timedelta(hours=DEFAULT_RECOVERY_HOURS)
|
|
assert result.recovery_at == expected_recovery
|
|
assert result.exhaust_reason == ExhaustReason.BILLING
|
|
|
|
def test_idle_to_exhausted(self, db):
|
|
"""Edge case: agent gets a rate-limit before even starting work."""
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
result = transition_to_exhausted(
|
|
db, agent,
|
|
reason=ExhaustReason.RATE_LIMIT,
|
|
now=NOW,
|
|
)
|
|
assert result.status == AgentStatus.EXHAUSTED
|
|
|
|
def test_parses_recovery_from_headers_when_timestamp_not_explicitly_provided(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
result = transition_to_exhausted(
|
|
db,
|
|
agent,
|
|
reason=ExhaustReason.RATE_LIMIT,
|
|
headers={"Retry-After": "90"},
|
|
now=NOW,
|
|
)
|
|
assert result.recovery_at == NOW + timedelta(seconds=90)
|
|
|
|
def test_parses_recovery_from_message_when_timestamp_not_explicitly_provided(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY)
|
|
result = transition_to_exhausted(
|
|
db,
|
|
agent,
|
|
reason=ExhaustReason.BILLING,
|
|
message="billing quota exhausted, resets at 2026-04-01T15:00:00Z",
|
|
now=NOW,
|
|
)
|
|
assert result.recovery_at == datetime(2026, 4, 1, 15, 0, 0, tzinfo=timezone.utc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Heartbeat timeout check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckHeartbeatTimeout:
|
|
def test_timeout_triggers_offline(self, db):
|
|
old_hb = NOW - timedelta(seconds=HEARTBEAT_TIMEOUT_SECONDS + 10)
|
|
agent = _make_agent(db, status=AgentStatus.IDLE, last_hb=old_hb)
|
|
changed = check_heartbeat_timeout(db, agent, now=NOW)
|
|
assert changed is True
|
|
assert agent.status == AgentStatus.OFFLINE
|
|
|
|
def test_recent_heartbeat_no_change(self, db):
|
|
recent_hb = NOW - timedelta(seconds=30)
|
|
agent = _make_agent(db, status=AgentStatus.BUSY, last_hb=recent_hb)
|
|
changed = check_heartbeat_timeout(db, agent, now=NOW)
|
|
assert changed is False
|
|
assert agent.status == AgentStatus.BUSY
|
|
|
|
def test_no_heartbeat_ever_goes_offline(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE, last_hb=None)
|
|
changed = check_heartbeat_timeout(db, agent, now=NOW)
|
|
assert changed is True
|
|
assert agent.status == AgentStatus.OFFLINE
|
|
|
|
def test_already_offline_returns_false(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.OFFLINE, last_hb=None)
|
|
changed = check_heartbeat_timeout(db, agent, now=NOW)
|
|
assert changed is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Exhausted recovery check
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCheckExhaustedRecovery:
|
|
def test_recovery_at_reached(self, db):
|
|
agent = _make_agent(
|
|
db,
|
|
status=AgentStatus.EXHAUSTED,
|
|
exhausted_at=NOW - timedelta(hours=5),
|
|
recovery_at=NOW - timedelta(minutes=1),
|
|
exhaust_reason=ExhaustReason.RATE_LIMIT,
|
|
)
|
|
recovered = check_exhausted_recovery(db, agent, now=NOW)
|
|
assert recovered is True
|
|
assert agent.status == AgentStatus.IDLE
|
|
assert agent.exhausted_at is None
|
|
|
|
def test_recovery_at_not_yet_reached(self, db):
|
|
agent = _make_agent(
|
|
db,
|
|
status=AgentStatus.EXHAUSTED,
|
|
exhausted_at=NOW,
|
|
recovery_at=NOW + timedelta(hours=1),
|
|
exhaust_reason=ExhaustReason.BILLING,
|
|
)
|
|
recovered = check_exhausted_recovery(db, agent, now=NOW)
|
|
assert recovered is False
|
|
assert agent.status == AgentStatus.EXHAUSTED
|
|
|
|
def test_non_exhausted_agent_returns_false(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE)
|
|
recovered = check_exhausted_recovery(db, agent, now=NOW)
|
|
assert recovered is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Record heartbeat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestRecordHeartbeat:
|
|
def test_updates_timestamp(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.IDLE, last_hb=NOW - timedelta(minutes=1))
|
|
result = record_heartbeat(db, agent, now=NOW)
|
|
assert result.last_heartbeat == NOW
|
|
|
|
def test_offline_agent_recovers_to_idle(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.OFFLINE)
|
|
result = record_heartbeat(db, agent, now=NOW)
|
|
assert result.status == AgentStatus.IDLE
|
|
assert result.last_heartbeat == NOW
|
|
|
|
def test_busy_agent_stays_busy(self, db):
|
|
agent = _make_agent(db, status=AgentStatus.BUSY, last_hb=NOW - timedelta(seconds=30))
|
|
result = record_heartbeat(db, agent, now=NOW)
|
|
assert result.status == AgentStatus.BUSY
|
|
assert result.last_heartbeat == NOW
|