Compare commits
1 Commits
22a0097a5d
...
6c0959f5bb
| Author | SHA1 | Date | |
|---|---|---|---|
| 6c0959f5bb |
121
app/services/agent_heartbeat.py
Normal file
121
app/services/agent_heartbeat.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Agent heartbeat — query pending slots for execution.
|
||||
|
||||
BE-AGT-001: Service layer that the plugin heartbeat endpoint calls to
|
||||
discover which TimeSlots are ready to be executed by an agent.
|
||||
|
||||
Design reference: NEXT_WAVE_DEV_DIRECTION.md §6.1 (Heartbeat flow)
|
||||
|
||||
Filtering rules:
|
||||
1. Only slots for **today** are considered.
|
||||
2. Only slots with status ``NotStarted`` or ``Deferred``.
|
||||
3. Only slots whose ``scheduled_at`` time has already passed (i.e. the
|
||||
slot's scheduled start is at or before the current time).
|
||||
4. Results are sorted by **priority descending** (higher = more urgent).
|
||||
|
||||
The caller (heartbeat API endpoint) receives a list of actionable slots
|
||||
and decides how to dispatch them to the agent based on agent status.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date, datetime, time, timezone
|
||||
from typing import Sequence
|
||||
|
||||
from sqlalchemy import and_, case
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.calendar import SlotStatus, TimeSlot
|
||||
from app.services.plan_slot import (
|
||||
get_virtual_slots_for_date,
|
||||
materialize_all_for_date,
|
||||
)
|
||||
|
||||
|
||||
# Statuses that are eligible for heartbeat pickup
|
||||
_ACTIONABLE_STATUSES = {SlotStatus.NOT_STARTED, SlotStatus.DEFERRED}
|
||||
|
||||
|
||||
def get_pending_slots_for_agent(
|
||||
db: Session,
|
||||
user_id: int,
|
||||
*,
|
||||
now: datetime | None = None,
|
||||
) -> list[TimeSlot]:
|
||||
"""Return today's actionable slots that are due for execution.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
db : Session
|
||||
SQLAlchemy database session.
|
||||
user_id : int
|
||||
The HarborForge user id linked to the agent.
|
||||
now : datetime, optional
|
||||
Override "current time" for testing. Defaults to ``datetime.now(timezone.utc)``.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list[TimeSlot]
|
||||
Materialized TimeSlot rows sorted by priority descending (highest first).
|
||||
Only includes slots where ``scheduled_at <= current_time`` and status
|
||||
is ``NotStarted`` or ``Deferred``.
|
||||
"""
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
today = now.date() if isinstance(now, datetime) else now
|
||||
current_time: time = now.time() if isinstance(now, datetime) else now
|
||||
|
||||
# --- Step 1: Ensure today's plan-based slots are materialized ----------
|
||||
# The heartbeat is often the first touch of the day, so we materialize
|
||||
# all plan-generated virtual slots for today before querying. This is
|
||||
# idempotent — already-materialized plans are skipped.
|
||||
materialize_all_for_date(db, user_id, today)
|
||||
db.flush()
|
||||
|
||||
# --- Step 2: Query real (materialized) slots ---------------------------
|
||||
actionable_status_values = [s.value for s in _ACTIONABLE_STATUSES]
|
||||
|
||||
slots: list[TimeSlot] = (
|
||||
db.query(TimeSlot)
|
||||
.filter(
|
||||
TimeSlot.user_id == user_id,
|
||||
TimeSlot.date == today,
|
||||
TimeSlot.status.in_(actionable_status_values),
|
||||
TimeSlot.scheduled_at <= current_time,
|
||||
)
|
||||
.order_by(TimeSlot.priority.desc())
|
||||
.all()
|
||||
)
|
||||
|
||||
return slots
|
||||
|
||||
|
||||
def get_pending_slot_count(
|
||||
db: Session,
|
||||
user_id: int,
|
||||
*,
|
||||
now: datetime | None = None,
|
||||
) -> int:
|
||||
"""Return the count of today's actionable slots that are due.
|
||||
|
||||
Lighter alternative to :func:`get_pending_slots_for_agent` when only
|
||||
the count is needed (e.g. quick heartbeat status check).
|
||||
"""
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
today = now.date() if isinstance(now, datetime) else now
|
||||
current_time: time = now.time() if isinstance(now, datetime) else now
|
||||
|
||||
actionable_status_values = [s.value for s in _ACTIONABLE_STATUSES]
|
||||
|
||||
return (
|
||||
db.query(TimeSlot.id)
|
||||
.filter(
|
||||
TimeSlot.user_id == user_id,
|
||||
TimeSlot.date == today,
|
||||
TimeSlot.status.in_(actionable_status_values),
|
||||
TimeSlot.scheduled_at <= current_time,
|
||||
)
|
||||
.count()
|
||||
)
|
||||
Reference in New Issue
Block a user