From 6c0959f5bb5919fad1ed28dee6a0a8aae3ec5c41 Mon Sep 17 00:00:00 2001 From: zhi Date: Tue, 31 Mar 2026 23:01:47 +0000 Subject: [PATCH] BE-AGT-001: implement heartbeat pending-slot query service - New service: app/services/agent_heartbeat.py - get_pending_slots_for_agent(): queries today's NotStarted/Deferred slots where scheduled_at <= now, sorted by priority descending - get_pending_slot_count(): lightweight count-only variant - Auto-materializes plan virtual slots for today before querying - Supports injectable 'now' parameter for testing --- app/services/agent_heartbeat.py | 121 ++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 app/services/agent_heartbeat.py diff --git a/app/services/agent_heartbeat.py b/app/services/agent_heartbeat.py new file mode 100644 index 0000000..f2641bf --- /dev/null +++ b/app/services/agent_heartbeat.py @@ -0,0 +1,121 @@ +"""Agent heartbeat — query pending slots for execution. + +BE-AGT-001: Service layer that the plugin heartbeat endpoint calls to +discover which TimeSlots are ready to be executed by an agent. + +Design reference: NEXT_WAVE_DEV_DIRECTION.md §6.1 (Heartbeat flow) + +Filtering rules: + 1. Only slots for **today** are considered. + 2. Only slots with status ``NotStarted`` or ``Deferred``. + 3. Only slots whose ``scheduled_at`` time has already passed (i.e. the + slot's scheduled start is at or before the current time). + 4. Results are sorted by **priority descending** (higher = more urgent). + +The caller (heartbeat API endpoint) receives a list of actionable slots +and decides how to dispatch them to the agent based on agent status. +""" + +from __future__ import annotations + +from datetime import date, datetime, time, timezone +from typing import Sequence + +from sqlalchemy import and_, case +from sqlalchemy.orm import Session + +from app.models.calendar import SlotStatus, TimeSlot +from app.services.plan_slot import ( + get_virtual_slots_for_date, + materialize_all_for_date, +) + + +# Statuses that are eligible for heartbeat pickup +_ACTIONABLE_STATUSES = {SlotStatus.NOT_STARTED, SlotStatus.DEFERRED} + + +def get_pending_slots_for_agent( + db: Session, + user_id: int, + *, + now: datetime | None = None, +) -> list[TimeSlot]: + """Return today's actionable slots that are due for execution. + + Parameters + ---------- + db : Session + SQLAlchemy database session. + user_id : int + The HarborForge user id linked to the agent. + now : datetime, optional + Override "current time" for testing. Defaults to ``datetime.now(timezone.utc)``. + + Returns + ------- + list[TimeSlot] + Materialized TimeSlot rows sorted by priority descending (highest first). + Only includes slots where ``scheduled_at <= current_time`` and status + is ``NotStarted`` or ``Deferred``. + """ + if now is None: + now = datetime.now(timezone.utc) + + today = now.date() if isinstance(now, datetime) else now + current_time: time = now.time() if isinstance(now, datetime) else now + + # --- Step 1: Ensure today's plan-based slots are materialized ---------- + # The heartbeat is often the first touch of the day, so we materialize + # all plan-generated virtual slots for today before querying. This is + # idempotent — already-materialized plans are skipped. + materialize_all_for_date(db, user_id, today) + db.flush() + + # --- Step 2: Query real (materialized) slots --------------------------- + actionable_status_values = [s.value for s in _ACTIONABLE_STATUSES] + + slots: list[TimeSlot] = ( + db.query(TimeSlot) + .filter( + TimeSlot.user_id == user_id, + TimeSlot.date == today, + TimeSlot.status.in_(actionable_status_values), + TimeSlot.scheduled_at <= current_time, + ) + .order_by(TimeSlot.priority.desc()) + .all() + ) + + return slots + + +def get_pending_slot_count( + db: Session, + user_id: int, + *, + now: datetime | None = None, +) -> int: + """Return the count of today's actionable slots that are due. + + Lighter alternative to :func:`get_pending_slots_for_agent` when only + the count is needed (e.g. quick heartbeat status check). + """ + if now is None: + now = datetime.now(timezone.utc) + + today = now.date() if isinstance(now, datetime) else now + current_time: time = now.time() if isinstance(now, datetime) else now + + actionable_status_values = [s.value for s in _ACTIONABLE_STATUSES] + + return ( + db.query(TimeSlot.id) + .filter( + TimeSlot.user_id == user_id, + TimeSlot.date == today, + TimeSlot.status.in_(actionable_status_values), + TimeSlot.scheduled_at <= current_time, + ) + .count() + )