P4.1: Extract reusable dependency check helper, deduplicate milestone_actions.py
- New app/services/dependency_check.py with check_milestone_deps() - Replaces 3x duplicated JSON-parse + query + filter logic - Supports both milestone and task dependency checking - Returns structured DepCheckResult with ok/blockers/reason - Refactored preflight and start endpoints to use shared helper
This commit is contained in:
113
app/services/dependency_check.py
Normal file
113
app/services/dependency_check.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""P4.1 — Reusable dependency-check helpers.
|
||||
|
||||
Used by milestone start, milestone preflight, and (future) task pending→open
|
||||
to verify that all declared dependencies are completed before allowing the
|
||||
entity to proceed.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Sequence
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.milestone import Milestone
|
||||
from app.models.task import Task
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Result type
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class DepCheckResult:
|
||||
"""Outcome of a dependency check."""
|
||||
|
||||
ok: bool = True
|
||||
blockers: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def reason(self) -> str | None:
|
||||
return "; ".join(self.blockers) if self.blockers else None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_json_ids(raw: str | None) -> list[int]:
|
||||
"""Safely parse a JSON-encoded list of integer IDs."""
|
||||
if not raw:
|
||||
return []
|
||||
try:
|
||||
ids = json.loads(raw)
|
||||
if isinstance(ids, list):
|
||||
return [int(i) for i in ids]
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _ms_status(ms: Milestone) -> str:
|
||||
return ms.status.value if hasattr(ms.status, "value") else ms.status
|
||||
|
||||
|
||||
def _task_status(t: Task) -> str:
|
||||
return t.status.value if hasattr(t.status, "value") else t.status
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_milestone_deps(
|
||||
db: Session,
|
||||
depend_on_milestones: str | None,
|
||||
depend_on_tasks: str | None,
|
||||
*,
|
||||
required_status: str = "completed",
|
||||
) -> DepCheckResult:
|
||||
"""Check whether all milestone + task dependencies are satisfied.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
db:
|
||||
Active DB session.
|
||||
depend_on_milestones:
|
||||
JSON-encoded list of milestone IDs (from the entity's field).
|
||||
depend_on_tasks:
|
||||
JSON-encoded list of task IDs (from the entity's field).
|
||||
required_status:
|
||||
The status that dependees must have reached (default ``"completed"``).
|
||||
|
||||
Returns
|
||||
-------
|
||||
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
|
||||
with human-readable ``blockers``.
|
||||
"""
|
||||
result = DepCheckResult()
|
||||
|
||||
# --- milestone dependencies ---
|
||||
ms_ids = _parse_json_ids(depend_on_milestones)
|
||||
if ms_ids:
|
||||
dep_milestones: Sequence[Milestone] = (
|
||||
db.query(Milestone).filter(Milestone.id.in_(ms_ids)).all()
|
||||
)
|
||||
incomplete = [m.id for m in dep_milestones if _ms_status(m) != required_status]
|
||||
if incomplete:
|
||||
result.ok = False
|
||||
result.blockers.append(f"Dependent milestones not {required_status}: {incomplete}")
|
||||
|
||||
# --- task dependencies ---
|
||||
task_ids = _parse_json_ids(depend_on_tasks)
|
||||
if task_ids:
|
||||
dep_tasks: Sequence[Task] = (
|
||||
db.query(Task).filter(Task.id.in_(task_ids)).all()
|
||||
)
|
||||
incomplete_tasks = [t.id for t in dep_tasks if _task_status(t) != required_status]
|
||||
if incomplete_tasks:
|
||||
result.ok = False
|
||||
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete_tasks}")
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user