Files
HarborForge.Backend/app/api/routers/milestones.py

253 lines
13 KiB
Python

"""Milestones API router (project-scoped)."""
import json
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, ensure_can_edit_milestone
from app.models import models
from app.models.milestone import Milestone
from app.models.task import Task, TaskStatus, TaskPriority
from app.models.support import Support
from app.models.meeting import Meeting
from app.schemas import schemas
router = APIRouter(prefix="/projects/{project_id}/milestones", tags=["Milestones"])
def _serialize_milestone(milestone):
"""Serialize milestone with JSON fields."""
return {
"id": milestone.id,
"title": milestone.title,
"description": milestone.description,
"status": milestone.status.value if hasattr(milestone.status, 'value') else milestone.status,
"due_date": milestone.due_date,
"planned_release_date": milestone.planned_release_date,
"depend_on_milestones": json.loads(milestone.depend_on_milestones) if milestone.depend_on_milestones else [],
"depend_on_tasks": json.loads(milestone.depend_on_tasks) if milestone.depend_on_tasks else [],
"project_id": milestone.project_id,
"created_by_id": milestone.created_by_id,
"started_at": milestone.started_at,
"created_at": milestone.created_at,
"updated_at": milestone.updated_at,
}
@router.get("", response_model=List[schemas.MilestoneResponse])
def list_milestones(project_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="viewer")
milestones = db.query(Milestone).filter(Milestone.project_id == project_id).all()
return [_serialize_milestone(m) for m in milestones]
@router.post("", response_model=schemas.MilestoneResponse, status_code=status.HTTP_201_CREATED)
def create_milestone(project_id: int, milestone: schemas.MilestoneCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="mgr")
project = db.query(models.Project).filter(models.Project.id == project_id).first()
project_code = project.project_code if project else f"P{project_id}"
max_ms = db.query(Milestone).filter(Milestone.project_id == project_id).order_by(Milestone.id.desc()).first()
next_num = (max_ms.id + 1) if max_ms else 1
milestone_code = f"{project_code}:{next_num:05x}"
data = milestone.model_dump()
data.pop('project_id', None)
if data.get("depend_on_milestones"):
data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"])
if data.get("depend_on_tasks"):
data["depend_on_tasks"] = json.dumps(data["depend_on_tasks"])
db_milestone = Milestone(project_id=project_id, milestone_code=milestone_code, created_by_id=current_user.id, **data)
db.add(db_milestone)
db.commit()
db.refresh(db_milestone)
return _serialize_milestone(db_milestone)
@router.get("/{milestone_id}", response_model=schemas.MilestoneResponse)
def get_milestone(project_id: int, milestone_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="viewer")
milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
return _serialize_milestone(milestone)
@router.patch("/{milestone_id}", response_model=schemas.MilestoneResponse)
def update_milestone(project_id: int, milestone_id: int, milestone: schemas.MilestoneUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
db_milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not db_milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
ensure_can_edit_milestone(db, current_user.id, db_milestone)
# --- P3.6 Milestone edit restrictions based on status ---
ms_status = db_milestone.status.value if hasattr(db_milestone.status, 'value') else db_milestone.status
# Terminal states: no edits allowed
if ms_status in ("completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Cannot edit a milestone that is '{ms_status}'. No modifications are allowed in terminal state."
)
data = milestone.model_dump(exclude_unset=True)
# Never allow status changes via PATCH — use action endpoints instead
if "status" in data:
raise HTTPException(
status_code=400,
detail="Milestone status cannot be changed via PATCH. Use the action endpoints (freeze/start/close) instead."
)
# Freeze / undergoing: restrict scope-changing fields
SCOPE_FIELDS = {"title", "description", "due_date", "planned_release_date", "depend_on_milestones", "depend_on_tasks"}
if ms_status in ("freeze", "undergoing"):
blocked = SCOPE_FIELDS & set(data.keys())
if blocked:
raise HTTPException(
status_code=400,
detail=f"Cannot modify scope fields {sorted(blocked)} when milestone is '{ms_status}'. Scope changes are only allowed in 'open' status."
)
if "depend_on_milestones" in data:
data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"]) if data["depend_on_milestones"] else None
if "depend_on_tasks" in data:
data["depend_on_tasks"] = json.dumps(data["depend_on_tasks"]) if data["depend_on_tasks"] else None
for key, value in data.items():
setattr(db_milestone, key, value)
db.commit()
db.refresh(db_milestone)
return _serialize_milestone(db_milestone)
@router.delete("/{milestone_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_milestone(project_id: int, milestone_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="admin")
db_milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not db_milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
ms_status = db_milestone.status.value if hasattr(db_milestone.status, 'value') else db_milestone.status
if ms_status in ("undergoing", "completed"):
raise HTTPException(status_code=400, detail=f"Cannot delete a milestone that is '{ms_status}'")
db.delete(db_milestone)
db.commit()
return None
@router.post("/{milestone_id}/tasks", status_code=status.HTTP_201_CREATED, tags=["Milestones"])
def create_milestone_task(project_id: int, milestone_id: int, task_data: schemas.TaskCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="dev")
milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status in ("undergoing", "completed", "closed"):
raise HTTPException(status_code=400, detail=f"Cannot add items to a milestone that is '{ms_status}'")
# P9.6: feature story tasks must come from propose accept, not direct creation
task_type = task_data.model_dump(exclude_unset=True).get("task_type", "")
task_subtype = task_data.model_dump(exclude_unset=True).get("task_subtype", "")
if task_type == "story" and task_subtype == "feature":
raise HTTPException(status_code=400, detail="Feature story tasks can only be created via propose accept, not direct creation")
# P3.6 / §5: freeze prevents adding new feature story tasks (redundant after P9.6 but kept as defense-in-depth)
if ms_status == "freeze" and task_type == "story" and task_subtype == "feature":
raise HTTPException(status_code=400, detail="Cannot add feature story tasks after milestone is frozen")
# Generate task_code
milestone_code = milestone.milestone_code or f"m{milestone.id}"
max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first()
next_num = (max_task.id + 1) if max_task else 1
task_code = f"{milestone_code}:T{next_num:05x}"
est_time = None
data = task_data.model_dump(exclude_unset=True)
if data.get("estimated_working_time"):
try:
est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time()
except:
pass
task = Task(
title=data.get("title"),
description=data.get("description"),
task_type=data.get("task_type", "task"),
task_subtype=data.get("task_subtype"),
status=TaskStatus.OPEN,
priority=TaskPriority.MEDIUM,
project_id=project_id,
milestone_id=milestone_id,
reporter_id=current_user.id,
task_code=task_code,
estimated_effort=data.get("estimated_effort"),
estimated_working_time=est_time,
created_by_id=current_user.id,
)
db.add(task)
db.commit()
db.refresh(task)
return task
@router.get("/{milestone_id}/items")
def get_milestone_items(project_id: int, milestone_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="viewer")
milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
tasks = db.query(Task).filter(Task.milestone_id == milestone_id).all()
supports = db.query(Support).filter(Support.milestone_id == milestone_id).all()
meetings = db.query(Meeting).filter(Meeting.milestone_id == milestone_id).all()
return {
"tasks": [{
"id": t.id, "title": t.title, "description": t.description,
"status": t.status.value if hasattr(t.status, 'value') else t.status,
"priority": t.priority.value if hasattr(t.priority, 'value') else t.priority,
"task_code": t.task_code, "created_at": t.created_at,
} for t in tasks],
"supports": [{
"id": s.id, "title": s.title, "description": s.description,
"status": s.status.value, "priority": s.priority.value, "created_at": s.created_at,
} for s in supports],
"meetings": [{
"id": m.id, "title": m.title, "description": m.description,
"status": m.status.value, "priority": m.priority.value, "created_at": m.created_at,
} for m in meetings],
}
@router.get("/{milestone_id}/progress")
def get_milestone_progress(project_id: int, milestone_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
check_project_role(db, current_user.id, project_id, min_role="viewer")
milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
all_tasks = db.query(Task).filter(Task.milestone_id == milestone_id).all()
total = len(all_tasks)
completed = sum(1 for t in all_tasks if t.status == TaskStatus.CLOSED)
progress_pct = (completed / total * 100) if total > 0 else 0
time_progress = None
if milestone.planned_release_date and milestone.created_at:
now = datetime.now()
if milestone.planned_release_date > milestone.created_at:
total_duration = (milestone.planned_release_date - milestone.created_at).total_seconds()
elapsed = (now - milestone.created_at).total_seconds()
time_progress = min(100, max(0, (elapsed / total_duration * 100)))
return {
"milestone_id": milestone_id,
"title": milestone.title,
"total": total,
"total_tasks": total,
"completed": completed,
"progress_pct": round(progress_pct, 1),
"time_progress_pct": round(time_progress, 1) if time_progress else None,
"planned_release_date": milestone.planned_release_date,
}