Compare commits

...

10 Commits

Author SHA1 Message Date
zhi
e938507a24 test(P13.3): propose backend tests — 19 tests covering CRUD, accept/reject/reopen, code generation, feat_task_id protection, edit restrictions, permissions 2026-03-18 05:01:56 +00:00
zhi
c21e4ee335 test(P13.2): task state-machine tests — 34 tests covering transitions, assignee guards, comments, permissions, edit restrictions 2026-03-18 04:02:29 +00:00
zhi
011a2262ce test(P13.1): add milestone state machine tests — 17 tests covering freeze/start/close/auto-complete/preflight
New test infrastructure:
- tests/conftest.py: SQLite in-memory fixtures, TestClient wired to test DB,
  factory fixtures for User/Project/Milestone/Task/Roles/Permissions
- tests/test_milestone_actions.py: 17 tests covering:
  - freeze success/no-release-task/multiple-release-tasks/wrong-status
  - start success+started_at/deps-not-met/wrong-status
  - close from open/freeze/undergoing, rejected from completed/closed
  - auto-complete on release task finish, no auto-complete for non-release/wrong-status
  - preflight allowed/not-allowed
2026-03-18 03:07:30 +00:00
zhi
7bad57eb0e feat(P5): sync batch transition with P5.3-P5.6 guards — auth, assignee, comment, permission, deps, auto-complete 2026-03-18 01:01:59 +00:00
zhi
00a1786ec3 feat(P12.1): CLI — add propose subcommands, remove task_type=task, add milestone status filter, transition comment support 2026-03-18 00:01:52 +00:00
zhi
586e06f66a feat(P3.6): lock feature story task body edits when milestone is freeze/undergoing/completed/closed 2026-03-17 23:01:39 +00:00
zhi
ec91a15f65 fix(P7.1): remove TaskType.TASK from models.py + fix milestone task defaults (issue/pending) 2026-03-17 23:01:02 +00:00
zhi
8e38d4cf4d feat(P2.2): add default mgr/dev role seeds with preset permissions for milestone/task/propose actions 2026-03-17 19:02:44 +00:00
zhi
0c75045f6f feat(P4.3): wire task depend_on check into pending→open transition via reusable helper 2026-03-17 18:02:08 +00:00
zhi
c6b14ac25f P4.1: Extract reusable dependency check helper, deduplicate milestone_actions.py
- New app/services/dependency_check.py with check_milestone_deps()
- Replaces 3x duplicated JSON-parse + query + filter logic
- Supports both milestone and task dependency checking
- Returns structured DepCheckResult with ok/blockers/reason
- Refactored preflight and start endpoints to use shared helper
2026-03-17 17:03:45 +00:00
12 changed files with 2253 additions and 157 deletions

View File

@@ -3,7 +3,6 @@
Provides freeze / start / close actions on milestones. Provides freeze / start / close actions on milestones.
completed is triggered automatically when the sole release maintenance task finishes. completed is triggered automatically when the sole release maintenance task finishes.
""" """
import json
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional
@@ -18,6 +17,7 @@ from app.models import models
from app.models.milestone import Milestone, MilestoneStatus from app.models.milestone import Milestone, MilestoneStatus
from app.models.task import Task, TaskStatus from app.models.task import Task, TaskStatus
from app.services.activity import log_activity from app.services.activity import log_activity
from app.services.dependency_check import check_milestone_deps
router = APIRouter( router = APIRouter(
prefix="/projects/{project_id}/milestones/{milestone_id}/actions", prefix="/projects/{project_id}/milestones/{milestone_id}/actions",
@@ -101,38 +101,13 @@ def preflight_milestone_actions(
# --- start pre-check (only meaningful when status == freeze) --- # --- start pre-check (only meaningful when status == freeze) ---
if ms_status == "freeze": if ms_status == "freeze":
blockers: list[str] = [] dep_result = check_milestone_deps(
db, ms.depend_on_milestones, ms.depend_on_tasks,
# milestone dependencies )
dep_ms_ids = [] if dep_result.ok:
if ms.depend_on_milestones:
try:
dep_ms_ids = json.loads(ms.depend_on_milestones)
except (json.JSONDecodeError, TypeError):
dep_ms_ids = []
if dep_ms_ids:
dep_milestones = db.query(Milestone).filter(Milestone.id.in_(dep_ms_ids)).all()
incomplete = [m.id for m in dep_milestones if _ms_status_value(m) != "completed"]
if incomplete:
blockers.append(f"Dependent milestones not completed: {incomplete}")
# task dependencies
dep_task_ids = []
if ms.depend_on_tasks:
try:
dep_task_ids = json.loads(ms.depend_on_tasks)
except (json.JSONDecodeError, TypeError):
dep_task_ids = []
if dep_task_ids:
dep_tasks = db.query(Task).filter(Task.id.in_(dep_task_ids)).all()
incomplete_tasks = [t.id for t in dep_tasks if (t.status.value if hasattr(t.status, "value") else t.status) != "completed"]
if incomplete_tasks:
blockers.append(f"Dependent tasks not completed: {incomplete_tasks}")
if blockers:
result["start"] = {"allowed": False, "reason": "; ".join(blockers)}
else:
result["start"] = {"allowed": True, "reason": None} result["start"] = {"allowed": True, "reason": None}
else:
result["start"] = {"allowed": False, "reason": dep_result.reason}
return result return result
@@ -232,51 +207,15 @@ def start_milestone(
detail=f"Cannot start: milestone is '{_ms_status_value(ms)}', expected 'freeze'", detail=f"Cannot start: milestone is '{_ms_status_value(ms)}', expected 'freeze'",
) )
# Dependency check — milestone dependencies # Dependency check (P4.1 — shared helper)
dep_ms_ids = [] dep_result = check_milestone_deps(
if ms.depend_on_milestones: db, ms.depend_on_milestones, ms.depend_on_tasks,
try: )
dep_ms_ids = json.loads(ms.depend_on_milestones) if not dep_result.ok:
except (json.JSONDecodeError, TypeError): raise HTTPException(
dep_ms_ids = [] status_code=400,
detail=f"Cannot start: {dep_result.reason}",
if dep_ms_ids:
dep_milestones = (
db.query(Milestone)
.filter(Milestone.id.in_(dep_ms_ids))
.all()
) )
incomplete = [
m.id
for m in dep_milestones
if _ms_status_value(m) != "completed"
]
if incomplete:
raise HTTPException(
status_code=400,
detail=f"Cannot start: dependent milestones not completed: {incomplete}",
)
# Dependency check — task dependencies
dep_task_ids = []
if ms.depend_on_tasks:
try:
dep_task_ids = json.loads(ms.depend_on_tasks)
except (json.JSONDecodeError, TypeError):
dep_task_ids = []
if dep_task_ids:
dep_tasks = db.query(Task).filter(Task.id.in_(dep_task_ids)).all()
incomplete_tasks = [
t.id
for t in dep_tasks
if (t.status.value if hasattr(t.status, "value") else t.status) != "completed"
]
if incomplete_tasks:
raise HTTPException(
status_code=400,
detail=f"Cannot start: dependent tasks not completed: {incomplete_tasks}",
)
ms.status = MilestoneStatus.UNDERGOING ms.status = MilestoneStatus.UNDERGOING
ms.started_at = datetime.now(timezone.utc) ms.started_at = datetime.now(timezone.utc)

View File

@@ -173,9 +173,9 @@ def create_milestone_task(project_id: int, milestone_id: int, task_data: schemas
task = Task( task = Task(
title=data.get("title"), title=data.get("title"),
description=data.get("description"), description=data.get("description"),
task_type=data.get("task_type", "task"), task_type=data.get("task_type", "issue"),
task_subtype=data.get("task_subtype"), task_subtype=data.get("task_subtype"),
status=TaskStatus.OPEN, status=TaskStatus.PENDING,
priority=TaskPriority.MEDIUM, priority=TaskPriority.MEDIUM,
project_id=project_id, project_id=project_id,
milestone_id=milestone_id, milestone_id=milestone_id,

View File

@@ -16,6 +16,7 @@ from app.models.notification import Notification as NotificationModel
from app.api.deps import get_current_user_or_apikey from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task
from app.services.activity import log_activity from app.services.activity import log_activity
from app.services.dependency_check import check_task_deps
router = APIRouter(tags=["Tasks"]) router = APIRouter(tags=["Tasks"])
@@ -208,6 +209,21 @@ def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Dep
body_fields = {k for k in update_data.keys() if k not in _always_allowed} body_fields = {k for k in update_data.keys() if k not in _always_allowed}
if body_fields: if body_fields:
# P3.6 supplement: feature story tasks locked after milestone freeze
task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "")
task_subtype = task.task_subtype or ""
if task_type == "story" and task_subtype == "feature" and task.milestone_id:
from app.models.milestone import Milestone
ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if ms:
ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status
if ms_status in ("freeze", "undergoing", "completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. "
f"Blocked fields: {sorted(body_fields)}",
)
# undergoing/completed/closed: body edits forbidden # undergoing/completed/closed: body edits forbidden
if current_status in ("undergoing", "completed", "closed"): if current_status in ("undergoing", "completed", "closed"):
raise HTTPException( raise HTTPException(
@@ -293,7 +309,7 @@ def transition_task(
# P5.1: enforce state-machine # P5.1: enforce state-machine
_check_transition(old_status, new_status) _check_transition(old_status, new_status)
# P5.2: pending -> open requires milestone to be undergoing (dependencies checked later) # P5.2: pending -> open requires milestone to be undergoing + task deps satisfied
if old_status == "pending" and new_status == "open": if old_status == "pending" and new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone: if milestone:
@@ -303,6 +319,13 @@ def transition_task(
status_code=400, status_code=400,
detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'", detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'",
) )
# P4.3: check task-level depend_on
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
raise HTTPException(
status_code=400,
detail=f"Cannot open task: {dep_result.reason}",
)
# P5.3: open -> undergoing requires assignee AND operator must be the assignee # P5.3: open -> undergoing requires assignee AND operator must be the assignee
if old_status == "open" and new_status == "undergoing": if old_status == "open" and new_status == "undergoing":
@@ -428,17 +451,24 @@ def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
# ---- Batch ---- # ---- Batch ----
class BatchTransition(BaseModel):
task_ids: List[int]
new_status: str
class BatchAssign(BaseModel): class BatchAssign(BaseModel):
task_ids: List[int] task_ids: List[int]
assignee_id: int assignee_id: int
class BatchTransitionBody(BaseModel):
task_ids: List[int]
new_status: str
comment: Optional[str] = None
@router.post("/tasks/batch/transition") @router.post("/tasks/batch/transition")
def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)): def batch_transition(
data: BatchTransitionBody,
bg: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
valid_statuses = [s.value for s in TaskStatus] valid_statuses = [s.value for s in TaskStatus]
if data.new_status not in valid_statuses: if data.new_status not in valid_statuses:
raise HTTPException(status_code=400, detail="Invalid status") raise HTTPException(status_code=400, detail="Invalid status")
@@ -446,22 +476,106 @@ def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = D
skipped = [] skipped = []
for task_id in data.task_ids: for task_id in data.task_ids:
task = db.query(Task).filter(Task.id == task_id).first() task = db.query(Task).filter(Task.id == task_id).first()
if task: if not task:
old_status = task.status.value if hasattr(task.status, 'value') else task.status skipped.append({"id": task_id, "title": None, "old": None,
allowed = VALID_TRANSITIONS.get(old_status, set()) "reason": "Task not found"})
if data.new_status not in allowed: continue
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: state-machine check
allowed = VALID_TRANSITIONS.get(old_status, set())
if data.new_status not in allowed:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
continue
# P5.2: pending → open requires milestone undergoing + task deps
if old_status == "pending" and data.new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status != "undergoing":
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Milestone is '{ms_status}', must be 'undergoing'"})
continue
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
skipped.append({"id": task.id, "title": task.title, "old": old_status, skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"}) "reason": dep_result.reason})
continue continue
if data.new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow() # P5.3: open → undergoing requires assignee == current_user
if data.new_status in ("closed", "completed") and not task.finished_on: if old_status == "open" and data.new_status == "undergoing":
task.finished_on = datetime.utcnow() if not task.assignee_id:
if data.new_status == "open" and old_status in ("completed", "closed"): skipped.append({"id": task.id, "title": task.title, "old": old_status,
task.finished_on = None "reason": "Assignee must be set before starting"})
task.status = data.new_status continue
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status}) if current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can start this task"})
continue
# P5.4: undergoing → completed requires comment + assignee check
if old_status == "undergoing" and data.new_status == "completed":
comment_text = data.comment
if not comment_text or not comment_text.strip():
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "A completion comment is required"})
continue
if task.assignee_id and current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can complete this task"})
continue
# P5.5: close requires permission
if data.new_status == "closed":
try:
check_permission(db, current_user.id, task.project_id, "task.close")
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Missing 'task.close' permission"})
continue
# P5.6: reopen requires permission
if data.new_status == "open" and old_status in ("completed", "closed"):
perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
try:
check_permission(db, current_user.id, task.project_id, perm)
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Missing '{perm}' permission"})
continue
task.finished_on = None
if data.new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if data.new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
task.status = data.new_status
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
# Activity log per task
log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id,
{"old_status": old_status, "new_status": data.new_status})
# P5.4: auto-create completion comment
if old_status == "undergoing" and data.new_status == "completed" and data.comment:
db_comment = models.Comment(
content=data.comment.strip(),
task_id=task.id,
author_id=current_user.id,
)
db.add(db_comment)
db.commit() db.commit()
# P3.5: auto-complete milestone for any completed task
for u in updated:
if u["new"] == "completed":
t = db.query(Task).filter(Task.id == u["id"]).first()
if t:
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, t, user_id=current_user.id)
for u in updated: for u in updated:
event = "task.closed" if data.new_status == "closed" else "task.updated" event = "task.closed" if data.new_status == "closed" else "task.updated"
bg.add_task(fire_webhooks_sync, event, u, None, db) bg.add_task(fire_webhooks_sync, event, u, None, db)

View File

@@ -151,61 +151,93 @@ def init_default_permissions(db: Session) -> list[Permission]:
return db.query(Permission).all() return db.query(Permission).all()
def init_admin_role(db: Session, admin_user: models.User) -> None: # ---------------------------------------------------------------------------
"""Create admin role with all permissions and guest role with minimal permissions.""" # Default role → permission mapping
# Check if admin role already exists # ---------------------------------------------------------------------------
admin_role = db.query(Role).filter(Role.name == "admin").first()
if not admin_role:
admin_role = Role(
name="admin",
description="Administrator - full access to all features",
is_global=True
)
db.add(admin_role)
db.commit()
db.refresh(admin_role)
logger.info("Created admin role (id=%d)", admin_role.id)
# Check if guest role already exists # mgr: project management + all milestone/task/propose actions
guest_role = db.query(Role).filter(Role.name == "guest").first() _MGR_PERMISSIONS = {
if not guest_role: "project.read", "project.write", "project.manage_members",
guest_role = Role( "task.create", "task.read", "task.write", "task.delete",
name="guest", "milestone.create", "milestone.read", "milestone.write", "milestone.delete",
description="Guest - read-only access", "milestone.freeze", "milestone.start", "milestone.close",
is_global=True "task.close", "task.reopen_closed", "task.reopen_completed",
) "propose.accept", "propose.reject", "propose.reopen",
db.add(guest_role) "monitor.read",
db.commit() }
db.refresh(guest_role)
logger.info("Created guest role (id=%d)", guest_role.id)
# Get all permissions # dev: day-to-day development work — no freeze/start/close milestone, no accept/reject propose
_DEV_PERMISSIONS = {
"project.read",
"task.create", "task.read", "task.write",
"milestone.read",
"task.close", "task.reopen_closed", "task.reopen_completed",
"monitor.read",
}
# Role definitions: (name, description, permission_set)
_DEFAULT_ROLES = [
("admin", "Administrator - full access to all features", None), # None ⇒ all perms
("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS),
("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS),
("guest", "Guest - read-only access", None), # special: *.read only
]
def _ensure_role(db: Session, name: str, description: str, is_global: bool = True) -> Role:
"""Get or create a role by name."""
role = db.query(Role).filter(Role.name == name).first()
if not role:
role = Role(name=name, description=description, is_global=is_global)
db.add(role)
db.commit()
db.refresh(role)
logger.info("Created role '%s' (id=%d)", name, role.id)
return role
def _sync_role_permissions(db: Session, role: Role, target_perm_names: set[str] | None) -> None:
"""Ensure *role* has exactly the permissions in *target_perm_names*.
* ``None`` means **all** permissions (admin).
* The special sentinel ``"__read_only__"`` is handled by the caller passing
just the ``*.read`` names.
Only adds missing permissions; never removes manually-granted ones (additive).
"""
all_perms = db.query(Permission).all() all_perms = db.query(Permission).all()
perm_by_name = {p.name: p for p in all_perms}
# Assign all permissions to admin role if target_perm_names is None:
existing_admin_perm_ids = {rp.permission_id for rp in admin_role.permissions} wanted_ids = {p.id for p in all_perms}
for perm in all_perms: else:
if perm.id not in existing_admin_perm_ids: wanted_ids = {perm_by_name[n].id for n in target_perm_names if n in perm_by_name}
rp = RolePermission(role_id=admin_role.id, permission_id=perm.id)
db.add(rp)
if all_perms: existing_ids = {rp.permission_id for rp in role.permissions}
added = 0
for pid in wanted_ids - existing_ids:
db.add(RolePermission(role_id=role.id, permission_id=pid))
added += 1
if added:
db.commit() db.commit()
logger.info("Assigned %d permissions to admin role", len(all_perms)) logger.info("Assigned %d new permissions to role '%s'", added, role.name)
# Assign only read permissions to guest role
read_perms = db.query(Permission).filter(Permission.name.like("%.read")).all()
existing_guest_perm_ids = {rp.permission_id for rp in guest_role.permissions}
for perm in read_perms:
if perm.id not in existing_guest_perm_ids:
rp = RolePermission(role_id=guest_role.id, permission_id=perm.id)
db.add(rp)
if read_perms: def init_admin_role(db: Session, admin_user: models.User) -> None:
db.commit() """Create default roles (admin / mgr / dev / guest) with preset permissions."""
logger.info("Assigned %d read permissions to guest role", len(read_perms))
logger.info("Admin and guest roles setup complete") all_perms = db.query(Permission).all()
read_perm_names = {p.name for p in all_perms if p.name.endswith(".read")}
for name, description, perm_set in _DEFAULT_ROLES:
role = _ensure_role(db, name, description)
if name == "guest":
_sync_role_permissions(db, role, read_perm_names)
else:
_sync_role_permissions(db, role, perm_set)
logger.info("Default roles setup complete (admin, mgr, dev, guest)")
def run_init(db: Session) -> None: def run_init(db: Session) -> None:

View File

@@ -7,7 +7,7 @@ import enum
class TaskType(str, enum.Enum): class TaskType(str, enum.Enum):
"""Task type enum'issue' is a subtype of task, not the other way around.""" """Task type enum."""
ISSUE = "issue" ISSUE = "issue"
MAINTENANCE = "maintenance" MAINTENANCE = "maintenance"
RESEARCH = "research" RESEARCH = "research"
@@ -15,7 +15,6 @@ class TaskType(str, enum.Enum):
STORY = "story" STORY = "story"
TEST = "test" TEST = "test"
RESOLUTION = "resolution" RESOLUTION = "resolution"
TASK = "task"
class TaskStatus(str, enum.Enum): class TaskStatus(str, enum.Enum):

View File

@@ -0,0 +1,148 @@
"""P4.1 — Reusable dependency-check helpers.
Used by milestone start, milestone preflight, and (future) task pending→open
to verify that all declared dependencies are completed before allowing the
entity to proceed.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Sequence
from sqlalchemy.orm import Session
from app.models.milestone import Milestone
from app.models.task import Task
# ---------------------------------------------------------------------------
# Result type
# ---------------------------------------------------------------------------
@dataclass
class DepCheckResult:
"""Outcome of a dependency check."""
ok: bool = True
blockers: list[str] = field(default_factory=list)
@property
def reason(self) -> str | None:
return "; ".join(self.blockers) if self.blockers else None
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _parse_json_ids(raw: str | None) -> list[int]:
"""Safely parse a JSON-encoded list of integer IDs."""
if not raw:
return []
try:
ids = json.loads(raw)
if isinstance(ids, list):
return [int(i) for i in ids]
except (json.JSONDecodeError, TypeError, ValueError):
pass
return []
def _ms_status(ms: Milestone) -> str:
return ms.status.value if hasattr(ms.status, "value") else ms.status
def _task_status(t: Task) -> str:
return t.status.value if hasattr(t.status, "value") else t.status
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def check_milestone_deps(
db: Session,
depend_on_milestones: str | None,
depend_on_tasks: str | None,
*,
required_status: str = "completed",
) -> DepCheckResult:
"""Check whether all milestone + task dependencies are satisfied.
Parameters
----------
db:
Active DB session.
depend_on_milestones:
JSON-encoded list of milestone IDs (from the entity's field).
depend_on_tasks:
JSON-encoded list of task IDs (from the entity's field).
required_status:
The status that dependees must have reached (default ``"completed"``).
Returns
-------
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
with human-readable ``blockers``.
"""
result = DepCheckResult()
# --- milestone dependencies ---
ms_ids = _parse_json_ids(depend_on_milestones)
if ms_ids:
dep_milestones: Sequence[Milestone] = (
db.query(Milestone).filter(Milestone.id.in_(ms_ids)).all()
)
incomplete = [m.id for m in dep_milestones if _ms_status(m) != required_status]
if incomplete:
result.ok = False
result.blockers.append(f"Dependent milestones not {required_status}: {incomplete}")
# --- task dependencies ---
task_ids = _parse_json_ids(depend_on_tasks)
if task_ids:
dep_tasks: Sequence[Task] = (
db.query(Task).filter(Task.id.in_(task_ids)).all()
)
incomplete_tasks = [t.id for t in dep_tasks if _task_status(t) != required_status]
if incomplete_tasks:
result.ok = False
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete_tasks}")
return result
def check_task_deps(
db: Session,
depend_on: str | None,
*,
required_status: str = "completed",
) -> DepCheckResult:
"""Check whether a task's depend_on tasks are all satisfied.
Parameters
----------
db:
Active DB session.
depend_on:
JSON-encoded list of task IDs (from the task's ``depend_on`` field).
required_status:
The status dependees must have reached (default ``"completed"``).
Returns
-------
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
with human-readable ``blockers``.
"""
result = DepCheckResult()
task_ids = _parse_json_ids(depend_on)
if task_ids:
dep_tasks: Sequence[Task] = (
db.query(Task).filter(Task.id.in_(task_ids)).all()
)
incomplete = [t.id for t in dep_tasks if _task_status(t) != required_status]
if incomplete:
result.ok = False
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete}")
return result

92
cli.py
View File

@@ -23,7 +23,6 @@ STATUS_ICON = {
} }
TYPE_ICON = { TYPE_ICON = {
"resolution": "⚖️", "resolution": "⚖️",
"task": "📋",
"story": "📖", "story": "📖",
"test": "🧪", "test": "🧪",
"issue": "📌", "issue": "📌",
@@ -151,10 +150,56 @@ def cmd_search(args):
def cmd_transition(args): def cmd_transition(args):
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}") body = {}
if args.comment:
body["comment"] = args.comment
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}", body or None)
print(f"Task #{result['id']} transitioned to: {result['status']}") print(f"Task #{result['id']} transitioned to: {result['status']}")
# ── Propose commands ──────────────────────────────────────────────
def cmd_proposes(args):
if not args.project:
print("Error: --project is required for proposes", file=sys.stderr)
sys.exit(1)
result = _request("GET", f"/projects/{args.project}/proposes")
items = result if isinstance(result, list) else result.get("items", [])
if not items:
print(" No proposes found.")
return
for p in items:
status_icon = STATUS_ICON.get(p["status"], "")
feat = f" → task {p['feat_task_id']}" if p.get("feat_task_id") else ""
print(f" {status_icon} 💡 {p['propose_code']} {p['title']}{feat}")
def cmd_propose_create(args):
data = {"title": args.title}
if args.description:
data["description"] = args.description
result = _request("POST", f"/projects/{args.project}/proposes", data)
print(f"Created propose {result['propose_code']}: {result['title']}")
def cmd_propose_accept(args):
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/accept?milestone_id={args.milestone}")
print(f"Propose #{args.propose_id} accepted → task {result.get('feat_task_id', '?')}")
def cmd_propose_reject(args):
data = {}
if args.reason:
data["reason"] = args.reason
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reject", data or None)
print(f"Propose #{args.propose_id} rejected")
def cmd_propose_reopen(args):
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reopen")
print(f"Propose #{args.propose_id} reopened")
def cmd_stats(args): def cmd_stats(args):
params = f"?project_id={args.project}" if args.project else "" params = f"?project_id={args.project}" if args.project else ""
stats = _request("GET", f"/dashboard/stats{params}") stats = _request("GET", f"/dashboard/stats{params}")
@@ -170,8 +215,13 @@ def cmd_stats(args):
def cmd_milestones(args): def cmd_milestones(args):
params = f"?project_id={args.project}" if args.project else "" params = []
milestones = _request("GET", f"/milestones{params}") if args.project:
params.append(f"project_id={args.project}")
if args.status:
params.append(f"status={args.status}")
qs = f"?{'&'.join(params)}" if params else ""
milestones = _request("GET", f"/milestones{qs}")
if not milestones: if not milestones:
print(" No milestones found.") print(" No milestones found.")
return return
@@ -242,7 +292,7 @@ def main():
p_tasks = sub.add_parser("tasks", aliases=["issues"], help="List tasks") p_tasks = sub.add_parser("tasks", aliases=["issues"], help="List tasks")
p_tasks.add_argument("--project", "-p", type=int) p_tasks.add_argument("--project", "-p", type=int)
p_tasks.add_argument("--type", "-t", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"]) p_tasks.add_argument("--type", "-t", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_tasks.add_argument("--status", "-s", choices=["open", "pending", "undergoing", "completed", "closed"]) p_tasks.add_argument("--status", "-s", choices=["open", "pending", "undergoing", "completed", "closed"])
p_create = sub.add_parser("create-task", aliases=["create-issue"], help="Create a task") p_create = sub.add_parser("create-task", aliases=["create-issue"], help="Create a task")
@@ -250,7 +300,7 @@ def main():
p_create.add_argument("--project", "-p", type=int, required=True) p_create.add_argument("--project", "-p", type=int, required=True)
p_create.add_argument("--milestone", "-m", type=int, required=True) p_create.add_argument("--milestone", "-m", type=int, required=True)
p_create.add_argument("--reporter", "-r", type=int, required=True) p_create.add_argument("--reporter", "-r", type=int, required=True)
p_create.add_argument("--type", "-t", default="task", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"]) p_create.add_argument("--type", "-t", default="issue", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_create.add_argument("--subtype") p_create.add_argument("--subtype")
p_create.add_argument("--priority", choices=["low", "medium", "high", "critical"]) p_create.add_argument("--priority", choices=["low", "medium", "high", "critical"])
p_create.add_argument("--description", "-d") p_create.add_argument("--description", "-d")
@@ -271,12 +321,14 @@ def main():
p_trans = sub.add_parser("transition", help="Transition task status") p_trans = sub.add_parser("transition", help="Transition task status")
p_trans.add_argument("task_id", type=int) p_trans.add_argument("task_id", type=int)
p_trans.add_argument("status", choices=["open", "pending", "undergoing", "completed", "closed"]) p_trans.add_argument("status", choices=["open", "pending", "undergoing", "completed", "closed"])
p_trans.add_argument("--comment", "-c", help="Comment (required for undergoing→completed)")
p_stats = sub.add_parser("stats", help="Dashboard stats") p_stats = sub.add_parser("stats", help="Dashboard stats")
p_stats.add_argument("--project", "-p", type=int) p_stats.add_argument("--project", "-p", type=int)
p_ms = sub.add_parser("milestones", help="List milestones") p_ms = sub.add_parser("milestones", help="List milestones")
p_ms.add_argument("--project", "-p", type=int) p_ms.add_argument("--project", "-p", type=int)
p_ms.add_argument("--status", "-s", choices=["open", "freeze", "undergoing", "completed", "closed"])
p_msp = sub.add_parser("milestone-progress", help="Show milestone progress") p_msp = sub.add_parser("milestone-progress", help="Show milestone progress")
p_msp.add_argument("milestone_id", type=int) p_msp.add_argument("milestone_id", type=int)
@@ -296,6 +348,29 @@ def main():
p_worklogs = sub.add_parser("worklogs", help="List work logs for a task") p_worklogs = sub.add_parser("worklogs", help="List work logs for a task")
p_worklogs.add_argument("task_id", type=int) p_worklogs.add_argument("task_id", type=int)
# ── Propose subcommands ──
p_proposes = sub.add_parser("proposes", help="List proposes for a project")
p_proposes.add_argument("--project", "-p", type=int, required=True)
p_pc = sub.add_parser("propose-create", help="Create a propose")
p_pc.add_argument("title")
p_pc.add_argument("--project", "-p", type=int, required=True)
p_pc.add_argument("--description", "-d")
p_pa = sub.add_parser("propose-accept", help="Accept a propose into a milestone")
p_pa.add_argument("propose_id", type=int)
p_pa.add_argument("--project", "-p", type=int, required=True)
p_pa.add_argument("--milestone", "-m", type=int, required=True)
p_pr = sub.add_parser("propose-reject", help="Reject a propose")
p_pr.add_argument("propose_id", type=int)
p_pr.add_argument("--project", "-p", type=int, required=True)
p_pr.add_argument("--reason", "-r")
p_pro = sub.add_parser("propose-reopen", help="Reopen a rejected propose")
p_pro.add_argument("propose_id", type=int)
p_pro.add_argument("--project", "-p", type=int, required=True)
args = parser.parse_args() args = parser.parse_args()
if not args.command: if not args.command:
parser.print_help() parser.print_help()
@@ -320,6 +395,11 @@ def main():
"overdue": cmd_overdue, "overdue": cmd_overdue,
"log-time": cmd_log_time, "log-time": cmd_log_time,
"worklogs": cmd_worklogs, "worklogs": cmd_worklogs,
"proposes": cmd_proposes,
"propose-create": cmd_propose_create,
"propose-accept": cmd_propose_accept,
"propose-reject": cmd_propose_reject,
"propose-reopen": cmd_propose_reopen,
} }
cmds[args.command](args) cmds[args.command](args)

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# tests package

302
tests/conftest.py Normal file
View File

@@ -0,0 +1,302 @@
"""Shared test fixtures — SQLite in-memory DB + FastAPI TestClient.
This avoids needing MySQL for unit/integration tests.
All models are created fresh for every test function (function-scoped session).
"""
import sys, os
# Ensure the backend app package is importable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
# --- Override engine BEFORE any app import touches the real DB ---
from app.core.config import Base
# Force-import ALL model modules so Base.metadata knows every table
import app.models.models # noqa: F401 — User, Project, Comment, etc.
import app.models.milestone # noqa: F401
import app.models.task # noqa: F401
import app.models.role_permission # noqa: F401
import app.models.activity # noqa: F401
import app.models.propose # noqa: F401
try:
import app.models.apikey # noqa: F401
except ImportError:
pass
try:
import app.models.webhook # noqa: F401
except ImportError:
pass
TEST_DATABASE_URL = "sqlite://" # in-memory
engine = create_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
# Use StaticPool so all sessions share the same in-memory connection
poolclass=__import__("sqlalchemy.pool", fromlist=["StaticPool"]).StaticPool,
)
# SQLite needs foreign keys enabled per-connection
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn, _):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def setup_database():
"""Create all tables before each test, drop after."""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture()
def db():
"""Yield a DB session for direct model manipulation."""
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
@pytest.fixture()
def client(db):
"""FastAPI TestClient wired to the test DB + a default authenticated user."""
from fastapi.testclient import TestClient
from app.main import app
from app.core.config import get_db
# Override DB dependency
def _override_get_db():
try:
yield db
finally:
pass # caller's `db` fixture handles close
app.dependency_overrides[get_db] = _override_get_db
yield TestClient(app)
app.dependency_overrides.clear()
# ---------------------------------------------------------------------------
# Helper factories
# ---------------------------------------------------------------------------
@pytest.fixture()
def make_user(db):
"""Factory to create a User row."""
from app.models.models import User
_counter = [0]
# Pre-compute a bcrypt hash for "password" to avoid passlib/bcrypt version issues
_pwd_hash = "$2b$12$LJ3m4ys/Xz.l1PaOHHKN/uE7dQFnSm1AUBfEkL0C2dN9.3Oau4XG"
def _make(username=None, is_admin=False):
_counter[0] += 1
n = _counter[0]
u = User(
username=username or f"testuser{n}",
email=f"test{n}@example.com",
hashed_password=_pwd_hash,
is_active=True,
is_admin=is_admin,
)
db.add(u)
db.commit()
db.refresh(u)
return u
return _make
@pytest.fixture()
def make_project(db):
"""Factory to create a Project row."""
from app.models.models import Project
_counter = [0]
def _make(owner_id, name=None, project_code=None):
_counter[0] += 1
n = _counter[0]
p = Project(
name=name or f"TestProject{n}",
project_code=project_code or f"TP{n}",
owner_name="owner",
owner_id=owner_id,
)
db.add(p)
db.commit()
db.refresh(p)
return p
return _make
@pytest.fixture()
def make_milestone(db):
"""Factory to create a Milestone row."""
from app.models.milestone import Milestone, MilestoneStatus
_counter = [0]
def _make(project_id, created_by_id, status=MilestoneStatus.OPEN, **kw):
_counter[0] += 1
n = _counter[0]
ms = Milestone(
title=kw.pop("title", f"Milestone {n}"),
project_id=project_id,
created_by_id=created_by_id,
status=status,
milestone_code=kw.pop("milestone_code", f"M{n:04d}"),
**kw,
)
db.add(ms)
db.commit()
db.refresh(ms)
return ms
return _make
@pytest.fixture()
def make_task(db):
"""Factory to create a Task row."""
from app.models.task import Task, TaskStatus
_counter = [0]
def _make(project_id, milestone_id, reporter_id, status=TaskStatus.PENDING, **kw):
_counter[0] += 1
n = _counter[0]
t = Task(
title=kw.pop("title", f"Task {n}"),
project_id=project_id,
milestone_id=milestone_id,
reporter_id=reporter_id,
created_by_id=kw.pop("created_by_id", reporter_id),
status=status,
task_code=kw.pop("task_code", f"T{n:04d}"),
task_type=kw.pop("task_type", "issue"),
task_subtype=kw.pop("task_subtype", None),
**kw,
)
db.add(t)
db.commit()
db.refresh(t)
return t
return _make
@pytest.fixture()
def seed_roles_and_permissions(db):
"""Create the minimal role + permission setup needed by action endpoints.
Returns (admin_role, mgr_role, dev_role).
"""
from app.models.role_permission import Role, Permission, RolePermission
# --- roles ---
admin_role = Role(name="admin", is_global=True)
mgr_role = Role(name="mgr", is_global=False)
dev_role = Role(name="dev", is_global=False)
db.add_all([admin_role, mgr_role, dev_role])
db.commit()
# --- permissions ---
perm_names = [
("milestone.freeze", "milestone"),
("milestone.start", "milestone"),
("milestone.close", "milestone"),
("task.close", "task"),
("task.reopen_closed", "task"),
("task.reopen_completed", "task"),
("propose.accept", "propose"),
("propose.reject", "propose"),
("propose.reopen", "propose"),
# add broad perms for role checks
("project.read", "project"),
("project.write", "project"),
("milestone.read", "milestone"),
("milestone.write", "milestone"),
("milestone.create", "milestone"),
("task.read", "task"),
("task.write", "task"),
("task.create", "task"),
]
perm_objs = {}
for name, cat in perm_names:
p = Permission(name=name, category=cat, description=name)
db.add(p)
db.flush()
perm_objs[name] = p
# admin gets all
for p in perm_objs.values():
db.add(RolePermission(role_id=admin_role.id, permission_id=p.id))
# mgr gets milestone + propose + task management perms
mgr_perms = [
"milestone.freeze", "milestone.start", "milestone.close",
"task.close", "task.reopen_closed", "task.reopen_completed",
"propose.accept", "propose.reject", "propose.reopen",
"project.read", "project.write",
"milestone.read", "milestone.write", "milestone.create",
"task.read", "task.write", "task.create",
]
for name in mgr_perms:
db.add(RolePermission(role_id=mgr_role.id, permission_id=perm_objs[name].id))
# dev gets basic perms
dev_perms = [
"project.read", "task.read", "task.write", "task.create",
"milestone.read", "task.close", "task.reopen_closed", "task.reopen_completed",
]
for name in dev_perms:
db.add(RolePermission(role_id=dev_role.id, permission_id=perm_objs[name].id))
db.commit()
db.refresh(admin_role)
db.refresh(mgr_role)
db.refresh(dev_role)
return admin_role, mgr_role, dev_role
@pytest.fixture()
def make_member(db):
"""Factory to add a user as project member with a given role."""
from app.models.models import ProjectMember
def _make(project_id, user_id, role_id):
pm = ProjectMember(project_id=project_id, user_id=user_id, role_id=role_id)
db.add(pm)
db.commit()
return pm
return _make
@pytest.fixture()
def auth_header():
"""Generate a JWT auth header for a given user."""
from app.api.deps import create_access_token
def _make(user):
token = create_access_token({"sub": str(user.id)})
return {"Authorization": f"Bearer {token}"}
return _make

View File

@@ -0,0 +1,358 @@
"""P13.1 — Milestone state-machine action tests.
Covers:
- freeze: success, missing release task, multiple release tasks, wrong status
- start: success + started_at, deps not met, wrong status
- close: from open/freeze/undergoing, wrong status (completed/closed)
- auto-complete: release task completion triggers milestone completed
"""
import json
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# -----------------------------------------------------------------------
# Freeze
# -----------------------------------------------------------------------
class TestFreeze:
"""POST /projects/{pid}/milestones/{mid}/actions/freeze"""
def test_freeze_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, _ = seed_roles_and_permissions
user = make_user(is_admin=False)
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
# Create exactly 1 maintenance/release task
make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "freeze"
db.refresh(ms)
assert ms.status == MilestoneStatus.FREEZE
def test_freeze_no_release_task(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "no maintenance/release task" in resp.json()["detail"].lower()
def test_freeze_multiple_release_tasks(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected exactly 1" in resp.json()["detail"].lower()
def test_freeze_wrong_status(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.CLOSED)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected 'open'" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Start
# -----------------------------------------------------------------------
class TestStart:
"""POST /projects/{pid}/milestones/{mid}/actions/start"""
def _freeze_milestone(self, db, ms):
ms.status = MilestoneStatus.FREEZE
db.commit()
db.refresh(ms)
def test_start_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
self._freeze_milestone(db, ms)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["status"] == "undergoing"
assert "started_at" in data
db.refresh(ms)
assert ms.status == MilestoneStatus.UNDERGOING
assert ms.started_at is not None
def test_start_deps_not_met(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
# Create a dependency milestone that is NOT completed
dep_ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
ms = make_milestone(
project.id, user.id,
depend_on_milestones=json.dumps([dep_ms.id]),
)
self._freeze_milestone(db, ms)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "cannot start" in resp.json()["detail"].lower()
def test_start_wrong_status(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected 'freeze'" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Close
# -----------------------------------------------------------------------
class TestClose:
"""POST /projects/{pid}/milestones/{mid}/actions/close"""
@pytest.mark.parametrize("initial_status", [
MilestoneStatus.OPEN,
MilestoneStatus.FREEZE,
MilestoneStatus.UNDERGOING,
])
def test_close_from_allowed_statuses(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header, initial_status,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=initial_status)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
headers=auth_header(user),
json={"reason": "no longer needed"},
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "closed"
db.refresh(ms)
assert ms.status == MilestoneStatus.CLOSED
@pytest.mark.parametrize("terminal_status", [
MilestoneStatus.COMPLETED,
MilestoneStatus.CLOSED,
])
def test_close_from_terminal_rejected(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header, terminal_status,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=terminal_status)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
headers=auth_header(user),
)
assert resp.status_code == 400
# -----------------------------------------------------------------------
# Auto-complete
# -----------------------------------------------------------------------
class TestAutoComplete:
"""When the sole release task is completed, milestone auto-completes."""
def test_auto_complete_on_release_task_finish(
self, db, make_user, make_project, make_milestone, make_task,
):
"""Direct unit test of try_auto_complete_milestone."""
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
release_task = make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, release_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.COMPLETED
def test_no_auto_complete_for_non_release_task(
self, db, make_user, make_project, make_milestone, make_task,
):
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
# Also add the required release task (still pending)
make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.PENDING,
)
normal_task = make_task(
project.id, ms.id, user.id,
task_type="issue", task_subtype="defect",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, normal_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.UNDERGOING # unchanged
def test_no_auto_complete_when_not_undergoing(
self, db, make_user, make_project, make_milestone, make_task,
):
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.FREEZE)
release_task = make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, release_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.FREEZE # unchanged
# -----------------------------------------------------------------------
# Preflight
# -----------------------------------------------------------------------
class TestPreflight:
"""GET /projects/{pid}/milestones/{mid}/actions/preflight"""
def test_preflight_freeze_allowed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.get(
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
headers=auth_header(user),
)
assert resp.status_code == 200
data = resp.json()
assert data["freeze"]["allowed"] is True
def test_preflight_freeze_not_allowed(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
# No release task
resp = client.get(
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
headers=auth_header(user),
)
assert resp.status_code == 200
data = resp.json()
assert data["freeze"]["allowed"] is False

559
tests/test_propose.py Normal file
View File

@@ -0,0 +1,559 @@
"""P13.3 — Propose backend tests.
Covers:
- CRUD: create, list, get, update
- propose_code per-project incrementing
- accept → auto-generate feature story task + feat_task_id
- accept with non-open milestone → fail
- reject → status change
- rejected → reopen back to open
- feat_task_id cannot be set manually
- edit restrictions (only open proposes editable)
- permission checks for accept/reject/reopen
"""
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _propose_url(project_id: int, propose_id: int | None = None) -> str:
base = f"/projects/{project_id}/proposes"
return f"{base}/{propose_id}" if propose_id else base
# ===========================================================================
# CRUD
# ===========================================================================
class TestProposeCRUD:
"""Basic create / list / get / update."""
def test_create_propose(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id, project_code="PROJ")
make_member(project.id, user.id, dev_role.id)
resp = client.post(
_propose_url(project.id),
json={"title": "New Feature Idea", "description": "Some details"},
headers=auth_header(user),
)
assert resp.status_code == 201
data = resp.json()
assert data["title"] == "New Feature Idea"
assert data["status"] == "open"
assert data["propose_code"].startswith("PROJ:P")
assert data["feat_task_id"] is None
def test_list_proposes(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
# Create two proposes
client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
client.post(_propose_url(project.id), json={"title": "P2"}, headers=auth_header(user))
resp = client.get(_propose_url(project.id), headers=auth_header(user))
assert resp.status_code == 200
assert len(resp.json()) == 2
def test_get_propose(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
propose_id = create_resp.json()["id"]
resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(user))
assert resp.status_code == 200
assert resp.json()["title"] == "P1"
def test_update_propose_open(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(_propose_url(project.id), json={"title": "Old"}, headers=auth_header(user))
propose_id = create_resp.json()["id"]
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "New Title", "description": "Updated"},
headers=auth_header(user),
)
assert resp.status_code == 200
assert resp.json()["title"] == "New Title"
assert resp.json()["description"] == "Updated"
# ===========================================================================
# Propose Code
# ===========================================================================
class TestProposeCode:
"""P1.4 — propose_code increments per project independently."""
def test_code_increments_per_project(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
proj_a = make_project(owner_id=user.id, project_code="ALPHA")
proj_b = make_project(owner_id=user.id, project_code="BETA")
make_member(proj_a.id, user.id, dev_role.id)
make_member(proj_b.id, user.id, dev_role.id)
# Create 2 in ALPHA
r1 = client.post(_propose_url(proj_a.id), json={"title": "A1"}, headers=auth_header(user))
r2 = client.post(_propose_url(proj_a.id), json={"title": "A2"}, headers=auth_header(user))
# Create 1 in BETA
r3 = client.post(_propose_url(proj_b.id), json={"title": "B1"}, headers=auth_header(user))
code1 = r1.json()["propose_code"]
code2 = r2.json()["propose_code"]
code3 = r3.json()["propose_code"]
assert code1.startswith("ALPHA:P")
assert code2.startswith("ALPHA:P")
assert code3.startswith("BETA:P")
# They should be distinct
assert code1 != code2
# ===========================================================================
# Accept
# ===========================================================================
class TestAccept:
"""P6.2 — accept propose → create feature story task."""
def test_accept_success(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id),
json={"title": "Cool Feature", "description": "Do something cool"},
headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "accepted"
assert data["feat_task_id"] is not None
# Verify the generated task exists
from app.models.task import Task
task = db.query(Task).filter(Task.id == int(data["feat_task_id"])).first()
assert task is not None
assert task.title == "Cool Feature"
assert task.description == "Do something cool"
assert task.task_type == "story"
assert task.task_subtype == "feature"
task_status = task.status.value if hasattr(task.status, "value") else task.status
assert task_status == "pending"
assert task.milestone_id == ms.id
def test_accept_non_open_milestone_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.FREEZE)
create_resp = client.post(
_propose_url(project.id),
json={"title": "Feature X"},
headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 400
assert "open" in resp.json()["detail"].lower()
def test_accept_already_accepted_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# First accept
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Second accept should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_accept_auto_fills_feat_task_id(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
data = resp.json()
assert data["feat_task_id"] is not None
# Re-fetch to confirm persistence
get_resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(mgr))
assert get_resp.json()["feat_task_id"] == data["feat_task_id"]
def test_accept_no_permission_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
"""dev role should not have propose.accept permission."""
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.OPEN)
# Dev creates the propose
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
# Dev tries to accept — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# Reject
# ===========================================================================
class TestReject:
"""P6.3 — reject propose."""
def test_reject_success(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "Not needed"},
headers=auth_header(mgr),
)
assert resp.status_code == 200
assert resp.json()["status"] == "rejected"
def test_reject_non_open_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Accept first
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Now reject should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "Changed mind"},
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_reject_no_permission_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "nah"},
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# Reopen
# ===========================================================================
class TestReopen:
"""P6.4 — reopen rejected propose."""
def test_reopen_success(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Reject first
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "wait"},
headers=auth_header(mgr),
)
# Reopen
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(mgr),
)
assert resp.status_code == 200
assert resp.json()["status"] == "open"
def test_reopen_non_rejected_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Try reopen on open propose — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_reopen_no_permission_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
# Owner rejects
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "nah"},
headers=auth_header(owner),
)
# Dev tries to reopen — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# feat_task_id protection
# ===========================================================================
class TestFeatTaskIdProtection:
"""P6.5 — feat_task_id is server-side only, cannot be set by client."""
def test_update_cannot_set_feat_task_id(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(user),
)
propose_id = create_resp.json()["id"]
# Try to set feat_task_id via PATCH
resp = client.patch(
_propose_url(project.id, propose_id),
json={"feat_task_id": "999"},
headers=auth_header(user),
)
assert resp.status_code == 200
# feat_task_id should still be None (server ignores it)
assert resp.json()["feat_task_id"] is None
# ===========================================================================
# Edit restrictions
# ===========================================================================
class TestEditRestrictions:
"""Propose editing is only allowed in open status."""
def test_edit_accepted_propose_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Accept
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Try to edit
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "Changed"},
headers=auth_header(mgr),
)
assert resp.status_code == 400
assert "open" in resp.json()["detail"].lower()
def test_edit_rejected_propose_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Reject
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "no"},
headers=auth_header(mgr),
)
# Try to edit
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "Changed"},
headers=auth_header(mgr),
)
assert resp.status_code == 400

View File

@@ -0,0 +1,564 @@
"""P13.2 — Task state-machine transition tests.
Covers:
- pending → open: success, milestone not undergoing, deps not met
- open → undergoing: success, no assignee, non-assignee blocked
- undergoing → completed: success with comment, no comment fails, non-assignee blocked
- close from pending/open/undergoing: permission required
- reopen from completed/closed → open: distinct permissions
- invalid transitions: rejected by state machine
- edit restrictions: P5.7 body edit guards by status/assignee
"""
import json
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# -----------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------
def _transition(client, task_id, new_status, headers, comment=None):
"""POST /tasks/{id}/transition?new_status=..."""
body = {}
if comment is not None:
body["comment"] = comment
return client.post(
f"/tasks/{task_id}/transition?new_status={new_status}",
json=body,
headers=headers,
)
# -----------------------------------------------------------------------
# pending → open
# -----------------------------------------------------------------------
class TestPendingToOpen:
def test_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open succeeds when milestone is undergoing and no deps."""
admin_role, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "open"
def test_milestone_not_undergoing(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open rejected when milestone is still open."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 400
assert "undergoing" in resp.json()["detail"].lower()
def test_deps_not_satisfied(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open rejected when depend_on tasks are not completed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.PENDING,
depend_on=json.dumps([dep_task.id]),
)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 400
assert "depend" in resp.json()["detail"].lower() or "block" in resp.json()["detail"].lower()
def test_deps_satisfied(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open succeeds when all depend_on tasks are completed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.PENDING,
depend_on=json.dumps([dep_task.id]),
)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
# -----------------------------------------------------------------------
# open → undergoing
# -----------------------------------------------------------------------
class TestOpenToUndergoing:
def test_success_assignee_starts(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Assignee can start their own task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.OPEN,
assignee_id=user.id,
)
resp = _transition(client, task.id, "undergoing", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "undergoing"
db.refresh(task)
assert task.started_on is not None
def test_no_assignee_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot start a task without an assignee."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
resp = _transition(client, task.id, "undergoing", auth_header(user))
assert resp.status_code == 400
assert "assignee" in resp.json()["detail"].lower()
def test_non_assignee_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""A different user cannot start someone else's task."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.OPEN,
assignee_id=owner.id,
)
resp = _transition(client, task.id, "undergoing", auth_header(other))
assert resp.status_code == 403
assert "assigned" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# undergoing → completed
# -----------------------------------------------------------------------
class TestUndergoingToCompleted:
def test_success_with_comment(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Assignee can complete a task with a completion comment."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user), comment="Done!")
assert resp.status_code == 200
assert resp.json()["status"] == "completed"
db.refresh(task)
assert task.finished_on is not None
def test_no_comment_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot complete without a comment."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user))
assert resp.status_code == 400
assert "comment" in resp.json()["detail"].lower()
def test_empty_comment_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Empty/whitespace comment is rejected."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user), comment=" ")
assert resp.status_code == 400
assert "comment" in resp.json()["detail"].lower()
def test_non_assignee_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Non-assignee cannot complete the task."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.UNDERGOING,
assignee_id=owner.id,
)
resp = _transition(client, task.id, "completed", auth_header(other), comment="I finished it")
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Close task (from various states)
# -----------------------------------------------------------------------
class TestCloseTask:
@pytest.mark.parametrize("initial_status", [
TaskStatus.PENDING,
TaskStatus.OPEN,
TaskStatus.UNDERGOING,
])
def test_close_from_valid_states(
self, initial_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Close is allowed from pending/open/undergoing with permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=initial_status)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "closed"
@pytest.mark.parametrize("initial_status", [
TaskStatus.COMPLETED,
TaskStatus.CLOSED,
])
def test_close_from_terminal_states_fails(
self, initial_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot close from completed or already closed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=initial_status)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 400
def test_close_without_permission_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""User without task.close permission cannot close."""
from app.models.role_permission import Role
_, _, dev_role = seed_roles_and_permissions
# Create a role with NO task.close permission
no_close_role = Role(name="viewer", is_global=False)
db.add(no_close_role)
db.commit()
# Give viewer only basic perms (project.read, task.read)
from app.models.role_permission import Permission, RolePermission
for pname in ("project.read", "task.read"):
p = db.query(Permission).filter(Permission.name == pname).first()
if p:
db.add(RolePermission(role_id=no_close_role.id, permission_id=p.id))
db.commit()
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, no_close_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Reopen (completed → open, closed → open)
# -----------------------------------------------------------------------
class TestReopen:
def test_reopen_completed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Reopen from completed → open with task.reopen_completed permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
# finished_on should be cleared
db.refresh(task)
assert task.finished_on is None
def test_reopen_closed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Reopen from closed → open with task.reopen_closed permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.CLOSED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
def test_reopen_without_permission_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""User without reopen permission cannot reopen."""
from app.models.role_permission import Role, Permission, RolePermission
# Create a role with task.close but NO reopen permissions
limited_role = Role(name="limited", is_global=False)
db.add(limited_role)
db.commit()
for pname in ("project.read", "task.read", "task.write", "task.close"):
p = db.query(Permission).filter(Permission.name == pname).first()
if p:
db.add(RolePermission(role_id=limited_role.id, permission_id=p.id))
db.commit()
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, limited_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Invalid transitions
# -----------------------------------------------------------------------
class TestInvalidTransitions:
@pytest.mark.parametrize("from_status,to_status", [
(TaskStatus.PENDING, "undergoing"),
(TaskStatus.PENDING, "completed"),
(TaskStatus.OPEN, "completed"),
(TaskStatus.OPEN, "pending"),
(TaskStatus.UNDERGOING, "open"),
(TaskStatus.UNDERGOING, "pending"),
(TaskStatus.COMPLETED, "undergoing"),
(TaskStatus.COMPLETED, "closed"),
(TaskStatus.CLOSED, "undergoing"),
(TaskStatus.CLOSED, "completed"),
])
def test_disallowed_transition(
self, from_status, to_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""State machine rejects transitions not in VALID_TRANSITIONS."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=from_status,
assignee_id=user.id,
)
resp = _transition(client, task.id, to_status, auth_header(user))
assert resp.status_code == 400
assert "cannot transition" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Edit restrictions (PATCH)
# -----------------------------------------------------------------------
class TestEditRestrictions:
def test_undergoing_body_edit_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot PATCH body fields on an undergoing task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.UNDERGOING, assignee_id=user.id)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "New Title"},
headers=auth_header(user),
)
assert resp.status_code == 400
assert "undergoing" in resp.json()["detail"].lower()
def test_completed_body_edit_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot PATCH body fields on a completed task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Changed"},
headers=auth_header(user),
)
assert resp.status_code == 400
def test_open_assignee_only_edit(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Open task with assignee: only assignee can edit body."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.OPEN,
assignee_id=owner.id,
)
# Other user cannot edit
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Hijack"},
headers=auth_header(other),
)
assert resp.status_code == 403
# Assignee can edit
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "My Change"},
headers=auth_header(owner),
)
assert resp.status_code == 200
assert resp.json()["title"] == "My Change"
def test_open_no_assignee_anyone_edits(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Open task without assignee: any project member can edit."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.OPEN,
assignee_id=None,
)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Anyone's Change"},
headers=auth_header(user),
)
assert resp.status_code == 200
assert resp.json()["title"] == "Anyone's Change"