Compare commits
10 Commits
89e3bcdd0f
...
e938507a24
| Author | SHA1 | Date | |
|---|---|---|---|
| e938507a24 | |||
| c21e4ee335 | |||
| 011a2262ce | |||
| 7bad57eb0e | |||
| 00a1786ec3 | |||
| 586e06f66a | |||
| ec91a15f65 | |||
| 8e38d4cf4d | |||
| 0c75045f6f | |||
| c6b14ac25f |
@@ -3,7 +3,6 @@
|
||||
Provides freeze / start / close actions on milestones.
|
||||
completed is triggered automatically when the sole release maintenance task finishes.
|
||||
"""
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
@@ -18,6 +17,7 @@ from app.models import models
|
||||
from app.models.milestone import Milestone, MilestoneStatus
|
||||
from app.models.task import Task, TaskStatus
|
||||
from app.services.activity import log_activity
|
||||
from app.services.dependency_check import check_milestone_deps
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/projects/{project_id}/milestones/{milestone_id}/actions",
|
||||
@@ -101,38 +101,13 @@ def preflight_milestone_actions(
|
||||
|
||||
# --- start pre-check (only meaningful when status == freeze) ---
|
||||
if ms_status == "freeze":
|
||||
blockers: list[str] = []
|
||||
|
||||
# milestone dependencies
|
||||
dep_ms_ids = []
|
||||
if ms.depend_on_milestones:
|
||||
try:
|
||||
dep_ms_ids = json.loads(ms.depend_on_milestones)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
dep_ms_ids = []
|
||||
if dep_ms_ids:
|
||||
dep_milestones = db.query(Milestone).filter(Milestone.id.in_(dep_ms_ids)).all()
|
||||
incomplete = [m.id for m in dep_milestones if _ms_status_value(m) != "completed"]
|
||||
if incomplete:
|
||||
blockers.append(f"Dependent milestones not completed: {incomplete}")
|
||||
|
||||
# task dependencies
|
||||
dep_task_ids = []
|
||||
if ms.depend_on_tasks:
|
||||
try:
|
||||
dep_task_ids = json.loads(ms.depend_on_tasks)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
dep_task_ids = []
|
||||
if dep_task_ids:
|
||||
dep_tasks = db.query(Task).filter(Task.id.in_(dep_task_ids)).all()
|
||||
incomplete_tasks = [t.id for t in dep_tasks if (t.status.value if hasattr(t.status, "value") else t.status) != "completed"]
|
||||
if incomplete_tasks:
|
||||
blockers.append(f"Dependent tasks not completed: {incomplete_tasks}")
|
||||
|
||||
if blockers:
|
||||
result["start"] = {"allowed": False, "reason": "; ".join(blockers)}
|
||||
else:
|
||||
dep_result = check_milestone_deps(
|
||||
db, ms.depend_on_milestones, ms.depend_on_tasks,
|
||||
)
|
||||
if dep_result.ok:
|
||||
result["start"] = {"allowed": True, "reason": None}
|
||||
else:
|
||||
result["start"] = {"allowed": False, "reason": dep_result.reason}
|
||||
|
||||
return result
|
||||
|
||||
@@ -232,51 +207,15 @@ def start_milestone(
|
||||
detail=f"Cannot start: milestone is '{_ms_status_value(ms)}', expected 'freeze'",
|
||||
)
|
||||
|
||||
# Dependency check — milestone dependencies
|
||||
dep_ms_ids = []
|
||||
if ms.depend_on_milestones:
|
||||
try:
|
||||
dep_ms_ids = json.loads(ms.depend_on_milestones)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
dep_ms_ids = []
|
||||
|
||||
if dep_ms_ids:
|
||||
dep_milestones = (
|
||||
db.query(Milestone)
|
||||
.filter(Milestone.id.in_(dep_ms_ids))
|
||||
.all()
|
||||
# Dependency check (P4.1 — shared helper)
|
||||
dep_result = check_milestone_deps(
|
||||
db, ms.depend_on_milestones, ms.depend_on_tasks,
|
||||
)
|
||||
if not dep_result.ok:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Cannot start: {dep_result.reason}",
|
||||
)
|
||||
incomplete = [
|
||||
m.id
|
||||
for m in dep_milestones
|
||||
if _ms_status_value(m) != "completed"
|
||||
]
|
||||
if incomplete:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Cannot start: dependent milestones not completed: {incomplete}",
|
||||
)
|
||||
|
||||
# Dependency check — task dependencies
|
||||
dep_task_ids = []
|
||||
if ms.depend_on_tasks:
|
||||
try:
|
||||
dep_task_ids = json.loads(ms.depend_on_tasks)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
dep_task_ids = []
|
||||
|
||||
if dep_task_ids:
|
||||
dep_tasks = db.query(Task).filter(Task.id.in_(dep_task_ids)).all()
|
||||
incomplete_tasks = [
|
||||
t.id
|
||||
for t in dep_tasks
|
||||
if (t.status.value if hasattr(t.status, "value") else t.status) != "completed"
|
||||
]
|
||||
if incomplete_tasks:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Cannot start: dependent tasks not completed: {incomplete_tasks}",
|
||||
)
|
||||
|
||||
ms.status = MilestoneStatus.UNDERGOING
|
||||
ms.started_at = datetime.now(timezone.utc)
|
||||
|
||||
@@ -173,9 +173,9 @@ def create_milestone_task(project_id: int, milestone_id: int, task_data: schemas
|
||||
task = Task(
|
||||
title=data.get("title"),
|
||||
description=data.get("description"),
|
||||
task_type=data.get("task_type", "task"),
|
||||
task_type=data.get("task_type", "issue"),
|
||||
task_subtype=data.get("task_subtype"),
|
||||
status=TaskStatus.OPEN,
|
||||
status=TaskStatus.PENDING,
|
||||
priority=TaskPriority.MEDIUM,
|
||||
project_id=project_id,
|
||||
milestone_id=milestone_id,
|
||||
|
||||
@@ -16,6 +16,7 @@ from app.models.notification import Notification as NotificationModel
|
||||
from app.api.deps import get_current_user_or_apikey
|
||||
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task
|
||||
from app.services.activity import log_activity
|
||||
from app.services.dependency_check import check_task_deps
|
||||
|
||||
router = APIRouter(tags=["Tasks"])
|
||||
|
||||
@@ -208,6 +209,21 @@ def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Dep
|
||||
body_fields = {k for k in update_data.keys() if k not in _always_allowed}
|
||||
|
||||
if body_fields:
|
||||
# P3.6 supplement: feature story tasks locked after milestone freeze
|
||||
task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "")
|
||||
task_subtype = task.task_subtype or ""
|
||||
if task_type == "story" and task_subtype == "feature" and task.milestone_id:
|
||||
from app.models.milestone import Milestone
|
||||
ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
|
||||
if ms:
|
||||
ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status
|
||||
if ms_status in ("freeze", "undergoing", "completed", "closed"):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. "
|
||||
f"Blocked fields: {sorted(body_fields)}",
|
||||
)
|
||||
|
||||
# undergoing/completed/closed: body edits forbidden
|
||||
if current_status in ("undergoing", "completed", "closed"):
|
||||
raise HTTPException(
|
||||
@@ -293,7 +309,7 @@ def transition_task(
|
||||
# P5.1: enforce state-machine
|
||||
_check_transition(old_status, new_status)
|
||||
|
||||
# P5.2: pending -> open requires milestone to be undergoing (dependencies checked later)
|
||||
# P5.2: pending -> open requires milestone to be undergoing + task deps satisfied
|
||||
if old_status == "pending" and new_status == "open":
|
||||
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
|
||||
if milestone:
|
||||
@@ -303,6 +319,13 @@ def transition_task(
|
||||
status_code=400,
|
||||
detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'",
|
||||
)
|
||||
# P4.3: check task-level depend_on
|
||||
dep_result = check_task_deps(db, task.depend_on)
|
||||
if not dep_result.ok:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Cannot open task: {dep_result.reason}",
|
||||
)
|
||||
|
||||
# P5.3: open -> undergoing requires assignee AND operator must be the assignee
|
||||
if old_status == "open" and new_status == "undergoing":
|
||||
@@ -428,17 +451,24 @@ def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
|
||||
|
||||
# ---- Batch ----
|
||||
|
||||
class BatchTransition(BaseModel):
|
||||
task_ids: List[int]
|
||||
new_status: str
|
||||
|
||||
class BatchAssign(BaseModel):
|
||||
task_ids: List[int]
|
||||
assignee_id: int
|
||||
|
||||
|
||||
class BatchTransitionBody(BaseModel):
|
||||
task_ids: List[int]
|
||||
new_status: str
|
||||
comment: Optional[str] = None
|
||||
|
||||
|
||||
@router.post("/tasks/batch/transition")
|
||||
def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)):
|
||||
def batch_transition(
|
||||
data: BatchTransitionBody,
|
||||
bg: BackgroundTasks,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: models.User = Depends(get_current_user_or_apikey),
|
||||
):
|
||||
valid_statuses = [s.value for s in TaskStatus]
|
||||
if data.new_status not in valid_statuses:
|
||||
raise HTTPException(status_code=400, detail="Invalid status")
|
||||
@@ -446,22 +476,106 @@ def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = D
|
||||
skipped = []
|
||||
for task_id in data.task_ids:
|
||||
task = db.query(Task).filter(Task.id == task_id).first()
|
||||
if task:
|
||||
old_status = task.status.value if hasattr(task.status, 'value') else task.status
|
||||
allowed = VALID_TRANSITIONS.get(old_status, set())
|
||||
if data.new_status not in allowed:
|
||||
if not task:
|
||||
skipped.append({"id": task_id, "title": None, "old": None,
|
||||
"reason": "Task not found"})
|
||||
continue
|
||||
old_status = task.status.value if hasattr(task.status, 'value') else task.status
|
||||
# P5.1: state-machine check
|
||||
allowed = VALID_TRANSITIONS.get(old_status, set())
|
||||
if data.new_status not in allowed:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
|
||||
continue
|
||||
|
||||
# P5.2: pending → open requires milestone undergoing + task deps
|
||||
if old_status == "pending" and data.new_status == "open":
|
||||
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
|
||||
if milestone:
|
||||
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
|
||||
if ms_status != "undergoing":
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": f"Milestone is '{ms_status}', must be 'undergoing'"})
|
||||
continue
|
||||
dep_result = check_task_deps(db, task.depend_on)
|
||||
if not dep_result.ok:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
|
||||
"reason": dep_result.reason})
|
||||
continue
|
||||
if data.new_status == "undergoing" and not task.started_on:
|
||||
task.started_on = datetime.utcnow()
|
||||
if data.new_status in ("closed", "completed") and not task.finished_on:
|
||||
task.finished_on = datetime.utcnow()
|
||||
if data.new_status == "open" and old_status in ("completed", "closed"):
|
||||
task.finished_on = None
|
||||
task.status = data.new_status
|
||||
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
|
||||
|
||||
# P5.3: open → undergoing requires assignee == current_user
|
||||
if old_status == "open" and data.new_status == "undergoing":
|
||||
if not task.assignee_id:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": "Assignee must be set before starting"})
|
||||
continue
|
||||
if current_user.id != task.assignee_id:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": "Only the assigned user can start this task"})
|
||||
continue
|
||||
|
||||
# P5.4: undergoing → completed requires comment + assignee check
|
||||
if old_status == "undergoing" and data.new_status == "completed":
|
||||
comment_text = data.comment
|
||||
if not comment_text or not comment_text.strip():
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": "A completion comment is required"})
|
||||
continue
|
||||
if task.assignee_id and current_user.id != task.assignee_id:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": "Only the assigned user can complete this task"})
|
||||
continue
|
||||
|
||||
# P5.5: close requires permission
|
||||
if data.new_status == "closed":
|
||||
try:
|
||||
check_permission(db, current_user.id, task.project_id, "task.close")
|
||||
except HTTPException:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": "Missing 'task.close' permission"})
|
||||
continue
|
||||
|
||||
# P5.6: reopen requires permission
|
||||
if data.new_status == "open" and old_status in ("completed", "closed"):
|
||||
perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
|
||||
try:
|
||||
check_permission(db, current_user.id, task.project_id, perm)
|
||||
except HTTPException:
|
||||
skipped.append({"id": task.id, "title": task.title, "old": old_status,
|
||||
"reason": f"Missing '{perm}' permission"})
|
||||
continue
|
||||
task.finished_on = None
|
||||
|
||||
if data.new_status == "undergoing" and not task.started_on:
|
||||
task.started_on = datetime.utcnow()
|
||||
if data.new_status in ("closed", "completed") and not task.finished_on:
|
||||
task.finished_on = datetime.utcnow()
|
||||
task.status = data.new_status
|
||||
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
|
||||
|
||||
# Activity log per task
|
||||
log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id,
|
||||
{"old_status": old_status, "new_status": data.new_status})
|
||||
|
||||
# P5.4: auto-create completion comment
|
||||
if old_status == "undergoing" and data.new_status == "completed" and data.comment:
|
||||
db_comment = models.Comment(
|
||||
content=data.comment.strip(),
|
||||
task_id=task.id,
|
||||
author_id=current_user.id,
|
||||
)
|
||||
db.add(db_comment)
|
||||
|
||||
db.commit()
|
||||
|
||||
# P3.5: auto-complete milestone for any completed task
|
||||
for u in updated:
|
||||
if u["new"] == "completed":
|
||||
t = db.query(Task).filter(Task.id == u["id"]).first()
|
||||
if t:
|
||||
from app.api.routers.milestone_actions import try_auto_complete_milestone
|
||||
try_auto_complete_milestone(db, t, user_id=current_user.id)
|
||||
|
||||
for u in updated:
|
||||
event = "task.closed" if data.new_status == "closed" else "task.updated"
|
||||
bg.add_task(fire_webhooks_sync, event, u, None, db)
|
||||
|
||||
@@ -151,61 +151,93 @@ def init_default_permissions(db: Session) -> list[Permission]:
|
||||
return db.query(Permission).all()
|
||||
|
||||
|
||||
def init_admin_role(db: Session, admin_user: models.User) -> None:
|
||||
"""Create admin role with all permissions and guest role with minimal permissions."""
|
||||
# Check if admin role already exists
|
||||
admin_role = db.query(Role).filter(Role.name == "admin").first()
|
||||
if not admin_role:
|
||||
admin_role = Role(
|
||||
name="admin",
|
||||
description="Administrator - full access to all features",
|
||||
is_global=True
|
||||
)
|
||||
db.add(admin_role)
|
||||
# ---------------------------------------------------------------------------
|
||||
# Default role → permission mapping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# mgr: project management + all milestone/task/propose actions
|
||||
_MGR_PERMISSIONS = {
|
||||
"project.read", "project.write", "project.manage_members",
|
||||
"task.create", "task.read", "task.write", "task.delete",
|
||||
"milestone.create", "milestone.read", "milestone.write", "milestone.delete",
|
||||
"milestone.freeze", "milestone.start", "milestone.close",
|
||||
"task.close", "task.reopen_closed", "task.reopen_completed",
|
||||
"propose.accept", "propose.reject", "propose.reopen",
|
||||
"monitor.read",
|
||||
}
|
||||
|
||||
# dev: day-to-day development work — no freeze/start/close milestone, no accept/reject propose
|
||||
_DEV_PERMISSIONS = {
|
||||
"project.read",
|
||||
"task.create", "task.read", "task.write",
|
||||
"milestone.read",
|
||||
"task.close", "task.reopen_closed", "task.reopen_completed",
|
||||
"monitor.read",
|
||||
}
|
||||
|
||||
# Role definitions: (name, description, permission_set)
|
||||
_DEFAULT_ROLES = [
|
||||
("admin", "Administrator - full access to all features", None), # None ⇒ all perms
|
||||
("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS),
|
||||
("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS),
|
||||
("guest", "Guest - read-only access", None), # special: *.read only
|
||||
]
|
||||
|
||||
|
||||
def _ensure_role(db: Session, name: str, description: str, is_global: bool = True) -> Role:
|
||||
"""Get or create a role by name."""
|
||||
role = db.query(Role).filter(Role.name == name).first()
|
||||
if not role:
|
||||
role = Role(name=name, description=description, is_global=is_global)
|
||||
db.add(role)
|
||||
db.commit()
|
||||
db.refresh(admin_role)
|
||||
logger.info("Created admin role (id=%d)", admin_role.id)
|
||||
|
||||
# Check if guest role already exists
|
||||
guest_role = db.query(Role).filter(Role.name == "guest").first()
|
||||
if not guest_role:
|
||||
guest_role = Role(
|
||||
name="guest",
|
||||
description="Guest - read-only access",
|
||||
is_global=True
|
||||
)
|
||||
db.add(guest_role)
|
||||
db.commit()
|
||||
db.refresh(guest_role)
|
||||
logger.info("Created guest role (id=%d)", guest_role.id)
|
||||
|
||||
# Get all permissions
|
||||
db.refresh(role)
|
||||
logger.info("Created role '%s' (id=%d)", name, role.id)
|
||||
return role
|
||||
|
||||
|
||||
def _sync_role_permissions(db: Session, role: Role, target_perm_names: set[str] | None) -> None:
|
||||
"""Ensure *role* has exactly the permissions in *target_perm_names*.
|
||||
|
||||
* ``None`` means **all** permissions (admin).
|
||||
* The special sentinel ``"__read_only__"`` is handled by the caller passing
|
||||
just the ``*.read`` names.
|
||||
Only adds missing permissions; never removes manually-granted ones (additive).
|
||||
"""
|
||||
all_perms = db.query(Permission).all()
|
||||
|
||||
# Assign all permissions to admin role
|
||||
existing_admin_perm_ids = {rp.permission_id for rp in admin_role.permissions}
|
||||
for perm in all_perms:
|
||||
if perm.id not in existing_admin_perm_ids:
|
||||
rp = RolePermission(role_id=admin_role.id, permission_id=perm.id)
|
||||
db.add(rp)
|
||||
|
||||
if all_perms:
|
||||
perm_by_name = {p.name: p for p in all_perms}
|
||||
|
||||
if target_perm_names is None:
|
||||
wanted_ids = {p.id for p in all_perms}
|
||||
else:
|
||||
wanted_ids = {perm_by_name[n].id for n in target_perm_names if n in perm_by_name}
|
||||
|
||||
existing_ids = {rp.permission_id for rp in role.permissions}
|
||||
added = 0
|
||||
for pid in wanted_ids - existing_ids:
|
||||
db.add(RolePermission(role_id=role.id, permission_id=pid))
|
||||
added += 1
|
||||
|
||||
if added:
|
||||
db.commit()
|
||||
logger.info("Assigned %d permissions to admin role", len(all_perms))
|
||||
|
||||
# Assign only read permissions to guest role
|
||||
read_perms = db.query(Permission).filter(Permission.name.like("%.read")).all()
|
||||
existing_guest_perm_ids = {rp.permission_id for rp in guest_role.permissions}
|
||||
for perm in read_perms:
|
||||
if perm.id not in existing_guest_perm_ids:
|
||||
rp = RolePermission(role_id=guest_role.id, permission_id=perm.id)
|
||||
db.add(rp)
|
||||
|
||||
if read_perms:
|
||||
db.commit()
|
||||
logger.info("Assigned %d read permissions to guest role", len(read_perms))
|
||||
|
||||
logger.info("Admin and guest roles setup complete")
|
||||
logger.info("Assigned %d new permissions to role '%s'", added, role.name)
|
||||
|
||||
|
||||
def init_admin_role(db: Session, admin_user: models.User) -> None:
|
||||
"""Create default roles (admin / mgr / dev / guest) with preset permissions."""
|
||||
|
||||
all_perms = db.query(Permission).all()
|
||||
read_perm_names = {p.name for p in all_perms if p.name.endswith(".read")}
|
||||
|
||||
for name, description, perm_set in _DEFAULT_ROLES:
|
||||
role = _ensure_role(db, name, description)
|
||||
|
||||
if name == "guest":
|
||||
_sync_role_permissions(db, role, read_perm_names)
|
||||
else:
|
||||
_sync_role_permissions(db, role, perm_set)
|
||||
|
||||
logger.info("Default roles setup complete (admin, mgr, dev, guest)")
|
||||
|
||||
|
||||
def run_init(db: Session) -> None:
|
||||
|
||||
@@ -7,7 +7,7 @@ import enum
|
||||
|
||||
|
||||
class TaskType(str, enum.Enum):
|
||||
"""Task type enum — 'issue' is a subtype of task, not the other way around."""
|
||||
"""Task type enum."""
|
||||
ISSUE = "issue"
|
||||
MAINTENANCE = "maintenance"
|
||||
RESEARCH = "research"
|
||||
@@ -15,7 +15,6 @@ class TaskType(str, enum.Enum):
|
||||
STORY = "story"
|
||||
TEST = "test"
|
||||
RESOLUTION = "resolution"
|
||||
TASK = "task"
|
||||
|
||||
|
||||
class TaskStatus(str, enum.Enum):
|
||||
|
||||
148
app/services/dependency_check.py
Normal file
148
app/services/dependency_check.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""P4.1 — Reusable dependency-check helpers.
|
||||
|
||||
Used by milestone start, milestone preflight, and (future) task pending→open
|
||||
to verify that all declared dependencies are completed before allowing the
|
||||
entity to proceed.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Sequence
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.milestone import Milestone
|
||||
from app.models.task import Task
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Result type
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class DepCheckResult:
|
||||
"""Outcome of a dependency check."""
|
||||
|
||||
ok: bool = True
|
||||
blockers: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def reason(self) -> str | None:
|
||||
return "; ".join(self.blockers) if self.blockers else None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_json_ids(raw: str | None) -> list[int]:
|
||||
"""Safely parse a JSON-encoded list of integer IDs."""
|
||||
if not raw:
|
||||
return []
|
||||
try:
|
||||
ids = json.loads(raw)
|
||||
if isinstance(ids, list):
|
||||
return [int(i) for i in ids]
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _ms_status(ms: Milestone) -> str:
|
||||
return ms.status.value if hasattr(ms.status, "value") else ms.status
|
||||
|
||||
|
||||
def _task_status(t: Task) -> str:
|
||||
return t.status.value if hasattr(t.status, "value") else t.status
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_milestone_deps(
|
||||
db: Session,
|
||||
depend_on_milestones: str | None,
|
||||
depend_on_tasks: str | None,
|
||||
*,
|
||||
required_status: str = "completed",
|
||||
) -> DepCheckResult:
|
||||
"""Check whether all milestone + task dependencies are satisfied.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
db:
|
||||
Active DB session.
|
||||
depend_on_milestones:
|
||||
JSON-encoded list of milestone IDs (from the entity's field).
|
||||
depend_on_tasks:
|
||||
JSON-encoded list of task IDs (from the entity's field).
|
||||
required_status:
|
||||
The status that dependees must have reached (default ``"completed"``).
|
||||
|
||||
Returns
|
||||
-------
|
||||
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
|
||||
with human-readable ``blockers``.
|
||||
"""
|
||||
result = DepCheckResult()
|
||||
|
||||
# --- milestone dependencies ---
|
||||
ms_ids = _parse_json_ids(depend_on_milestones)
|
||||
if ms_ids:
|
||||
dep_milestones: Sequence[Milestone] = (
|
||||
db.query(Milestone).filter(Milestone.id.in_(ms_ids)).all()
|
||||
)
|
||||
incomplete = [m.id for m in dep_milestones if _ms_status(m) != required_status]
|
||||
if incomplete:
|
||||
result.ok = False
|
||||
result.blockers.append(f"Dependent milestones not {required_status}: {incomplete}")
|
||||
|
||||
# --- task dependencies ---
|
||||
task_ids = _parse_json_ids(depend_on_tasks)
|
||||
if task_ids:
|
||||
dep_tasks: Sequence[Task] = (
|
||||
db.query(Task).filter(Task.id.in_(task_ids)).all()
|
||||
)
|
||||
incomplete_tasks = [t.id for t in dep_tasks if _task_status(t) != required_status]
|
||||
if incomplete_tasks:
|
||||
result.ok = False
|
||||
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete_tasks}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def check_task_deps(
|
||||
db: Session,
|
||||
depend_on: str | None,
|
||||
*,
|
||||
required_status: str = "completed",
|
||||
) -> DepCheckResult:
|
||||
"""Check whether a task's depend_on tasks are all satisfied.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
db:
|
||||
Active DB session.
|
||||
depend_on:
|
||||
JSON-encoded list of task IDs (from the task's ``depend_on`` field).
|
||||
required_status:
|
||||
The status dependees must have reached (default ``"completed"``).
|
||||
|
||||
Returns
|
||||
-------
|
||||
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
|
||||
with human-readable ``blockers``.
|
||||
"""
|
||||
result = DepCheckResult()
|
||||
task_ids = _parse_json_ids(depend_on)
|
||||
if task_ids:
|
||||
dep_tasks: Sequence[Task] = (
|
||||
db.query(Task).filter(Task.id.in_(task_ids)).all()
|
||||
)
|
||||
incomplete = [t.id for t in dep_tasks if _task_status(t) != required_status]
|
||||
if incomplete:
|
||||
result.ok = False
|
||||
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete}")
|
||||
return result
|
||||
92
cli.py
92
cli.py
@@ -23,7 +23,6 @@ STATUS_ICON = {
|
||||
}
|
||||
TYPE_ICON = {
|
||||
"resolution": "⚖️",
|
||||
"task": "📋",
|
||||
"story": "📖",
|
||||
"test": "🧪",
|
||||
"issue": "📌",
|
||||
@@ -151,10 +150,56 @@ def cmd_search(args):
|
||||
|
||||
|
||||
def cmd_transition(args):
|
||||
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}")
|
||||
body = {}
|
||||
if args.comment:
|
||||
body["comment"] = args.comment
|
||||
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}", body or None)
|
||||
print(f"Task #{result['id']} transitioned to: {result['status']}")
|
||||
|
||||
|
||||
# ── Propose commands ──────────────────────────────────────────────
|
||||
|
||||
def cmd_proposes(args):
|
||||
if not args.project:
|
||||
print("Error: --project is required for proposes", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
result = _request("GET", f"/projects/{args.project}/proposes")
|
||||
items = result if isinstance(result, list) else result.get("items", [])
|
||||
if not items:
|
||||
print(" No proposes found.")
|
||||
return
|
||||
for p in items:
|
||||
status_icon = STATUS_ICON.get(p["status"], "❓")
|
||||
feat = f" → task {p['feat_task_id']}" if p.get("feat_task_id") else ""
|
||||
print(f" {status_icon} 💡 {p['propose_code']} {p['title']}{feat}")
|
||||
|
||||
|
||||
def cmd_propose_create(args):
|
||||
data = {"title": args.title}
|
||||
if args.description:
|
||||
data["description"] = args.description
|
||||
result = _request("POST", f"/projects/{args.project}/proposes", data)
|
||||
print(f"Created propose {result['propose_code']}: {result['title']}")
|
||||
|
||||
|
||||
def cmd_propose_accept(args):
|
||||
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/accept?milestone_id={args.milestone}")
|
||||
print(f"Propose #{args.propose_id} accepted → task {result.get('feat_task_id', '?')}")
|
||||
|
||||
|
||||
def cmd_propose_reject(args):
|
||||
data = {}
|
||||
if args.reason:
|
||||
data["reason"] = args.reason
|
||||
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reject", data or None)
|
||||
print(f"Propose #{args.propose_id} rejected")
|
||||
|
||||
|
||||
def cmd_propose_reopen(args):
|
||||
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reopen")
|
||||
print(f"Propose #{args.propose_id} reopened")
|
||||
|
||||
|
||||
def cmd_stats(args):
|
||||
params = f"?project_id={args.project}" if args.project else ""
|
||||
stats = _request("GET", f"/dashboard/stats{params}")
|
||||
@@ -170,8 +215,13 @@ def cmd_stats(args):
|
||||
|
||||
|
||||
def cmd_milestones(args):
|
||||
params = f"?project_id={args.project}" if args.project else ""
|
||||
milestones = _request("GET", f"/milestones{params}")
|
||||
params = []
|
||||
if args.project:
|
||||
params.append(f"project_id={args.project}")
|
||||
if args.status:
|
||||
params.append(f"status={args.status}")
|
||||
qs = f"?{'&'.join(params)}" if params else ""
|
||||
milestones = _request("GET", f"/milestones{qs}")
|
||||
if not milestones:
|
||||
print(" No milestones found.")
|
||||
return
|
||||
@@ -242,7 +292,7 @@ def main():
|
||||
|
||||
p_tasks = sub.add_parser("tasks", aliases=["issues"], help="List tasks")
|
||||
p_tasks.add_argument("--project", "-p", type=int)
|
||||
p_tasks.add_argument("--type", "-t", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"])
|
||||
p_tasks.add_argument("--type", "-t", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
|
||||
p_tasks.add_argument("--status", "-s", choices=["open", "pending", "undergoing", "completed", "closed"])
|
||||
|
||||
p_create = sub.add_parser("create-task", aliases=["create-issue"], help="Create a task")
|
||||
@@ -250,7 +300,7 @@ def main():
|
||||
p_create.add_argument("--project", "-p", type=int, required=True)
|
||||
p_create.add_argument("--milestone", "-m", type=int, required=True)
|
||||
p_create.add_argument("--reporter", "-r", type=int, required=True)
|
||||
p_create.add_argument("--type", "-t", default="task", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"])
|
||||
p_create.add_argument("--type", "-t", default="issue", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
|
||||
p_create.add_argument("--subtype")
|
||||
p_create.add_argument("--priority", choices=["low", "medium", "high", "critical"])
|
||||
p_create.add_argument("--description", "-d")
|
||||
@@ -271,12 +321,14 @@ def main():
|
||||
p_trans = sub.add_parser("transition", help="Transition task status")
|
||||
p_trans.add_argument("task_id", type=int)
|
||||
p_trans.add_argument("status", choices=["open", "pending", "undergoing", "completed", "closed"])
|
||||
p_trans.add_argument("--comment", "-c", help="Comment (required for undergoing→completed)")
|
||||
|
||||
p_stats = sub.add_parser("stats", help="Dashboard stats")
|
||||
p_stats.add_argument("--project", "-p", type=int)
|
||||
|
||||
p_ms = sub.add_parser("milestones", help="List milestones")
|
||||
p_ms.add_argument("--project", "-p", type=int)
|
||||
p_ms.add_argument("--status", "-s", choices=["open", "freeze", "undergoing", "completed", "closed"])
|
||||
|
||||
p_msp = sub.add_parser("milestone-progress", help="Show milestone progress")
|
||||
p_msp.add_argument("milestone_id", type=int)
|
||||
@@ -296,6 +348,29 @@ def main():
|
||||
p_worklogs = sub.add_parser("worklogs", help="List work logs for a task")
|
||||
p_worklogs.add_argument("task_id", type=int)
|
||||
|
||||
# ── Propose subcommands ──
|
||||
p_proposes = sub.add_parser("proposes", help="List proposes for a project")
|
||||
p_proposes.add_argument("--project", "-p", type=int, required=True)
|
||||
|
||||
p_pc = sub.add_parser("propose-create", help="Create a propose")
|
||||
p_pc.add_argument("title")
|
||||
p_pc.add_argument("--project", "-p", type=int, required=True)
|
||||
p_pc.add_argument("--description", "-d")
|
||||
|
||||
p_pa = sub.add_parser("propose-accept", help="Accept a propose into a milestone")
|
||||
p_pa.add_argument("propose_id", type=int)
|
||||
p_pa.add_argument("--project", "-p", type=int, required=True)
|
||||
p_pa.add_argument("--milestone", "-m", type=int, required=True)
|
||||
|
||||
p_pr = sub.add_parser("propose-reject", help="Reject a propose")
|
||||
p_pr.add_argument("propose_id", type=int)
|
||||
p_pr.add_argument("--project", "-p", type=int, required=True)
|
||||
p_pr.add_argument("--reason", "-r")
|
||||
|
||||
p_pro = sub.add_parser("propose-reopen", help="Reopen a rejected propose")
|
||||
p_pro.add_argument("propose_id", type=int)
|
||||
p_pro.add_argument("--project", "-p", type=int, required=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
@@ -320,6 +395,11 @@ def main():
|
||||
"overdue": cmd_overdue,
|
||||
"log-time": cmd_log_time,
|
||||
"worklogs": cmd_worklogs,
|
||||
"proposes": cmd_proposes,
|
||||
"propose-create": cmd_propose_create,
|
||||
"propose-accept": cmd_propose_accept,
|
||||
"propose-reject": cmd_propose_reject,
|
||||
"propose-reopen": cmd_propose_reopen,
|
||||
}
|
||||
cmds[args.command](args)
|
||||
|
||||
|
||||
1
tests/__init__.py
Normal file
1
tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# tests package
|
||||
302
tests/conftest.py
Normal file
302
tests/conftest.py
Normal file
@@ -0,0 +1,302 @@
|
||||
"""Shared test fixtures — SQLite in-memory DB + FastAPI TestClient.
|
||||
|
||||
This avoids needing MySQL for unit/integration tests.
|
||||
All models are created fresh for every test function (function-scoped session).
|
||||
"""
|
||||
import sys, os
|
||||
|
||||
# Ensure the backend app package is importable
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine, event
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
# --- Override engine BEFORE any app import touches the real DB ---
|
||||
from app.core.config import Base
|
||||
|
||||
# Force-import ALL model modules so Base.metadata knows every table
|
||||
import app.models.models # noqa: F401 — User, Project, Comment, etc.
|
||||
import app.models.milestone # noqa: F401
|
||||
import app.models.task # noqa: F401
|
||||
import app.models.role_permission # noqa: F401
|
||||
import app.models.activity # noqa: F401
|
||||
import app.models.propose # noqa: F401
|
||||
try:
|
||||
import app.models.apikey # noqa: F401
|
||||
except ImportError:
|
||||
pass
|
||||
try:
|
||||
import app.models.webhook # noqa: F401
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
TEST_DATABASE_URL = "sqlite://" # in-memory
|
||||
|
||||
engine = create_engine(
|
||||
TEST_DATABASE_URL,
|
||||
connect_args={"check_same_thread": False},
|
||||
# Use StaticPool so all sessions share the same in-memory connection
|
||||
poolclass=__import__("sqlalchemy.pool", fromlist=["StaticPool"]).StaticPool,
|
||||
)
|
||||
|
||||
# SQLite needs foreign keys enabled per-connection
|
||||
@event.listens_for(engine, "connect")
|
||||
def _set_sqlite_pragma(dbapi_conn, _):
|
||||
cursor = dbapi_conn.cursor()
|
||||
cursor.execute("PRAGMA foreign_keys=ON")
|
||||
cursor.close()
|
||||
|
||||
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_database():
|
||||
"""Create all tables before each test, drop after."""
|
||||
Base.metadata.create_all(bind=engine)
|
||||
yield
|
||||
Base.metadata.drop_all(bind=engine)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def db():
|
||||
"""Yield a DB session for direct model manipulation."""
|
||||
session = TestingSessionLocal()
|
||||
try:
|
||||
yield session
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def client(db):
|
||||
"""FastAPI TestClient wired to the test DB + a default authenticated user."""
|
||||
from fastapi.testclient import TestClient
|
||||
from app.main import app
|
||||
from app.core.config import get_db
|
||||
|
||||
# Override DB dependency
|
||||
def _override_get_db():
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
pass # caller's `db` fixture handles close
|
||||
|
||||
app.dependency_overrides[get_db] = _override_get_db
|
||||
yield TestClient(app)
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper factories
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture()
|
||||
def make_user(db):
|
||||
"""Factory to create a User row."""
|
||||
from app.models.models import User
|
||||
|
||||
_counter = [0]
|
||||
# Pre-compute a bcrypt hash for "password" to avoid passlib/bcrypt version issues
|
||||
_pwd_hash = "$2b$12$LJ3m4ys/Xz.l1PaOHHKN/uE7dQFnSm1AUBfEkL0C2dN9.3Oau4XG"
|
||||
|
||||
def _make(username=None, is_admin=False):
|
||||
_counter[0] += 1
|
||||
n = _counter[0]
|
||||
u = User(
|
||||
username=username or f"testuser{n}",
|
||||
email=f"test{n}@example.com",
|
||||
hashed_password=_pwd_hash,
|
||||
is_active=True,
|
||||
is_admin=is_admin,
|
||||
)
|
||||
db.add(u)
|
||||
db.commit()
|
||||
db.refresh(u)
|
||||
return u
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def make_project(db):
|
||||
"""Factory to create a Project row."""
|
||||
from app.models.models import Project
|
||||
|
||||
_counter = [0]
|
||||
|
||||
def _make(owner_id, name=None, project_code=None):
|
||||
_counter[0] += 1
|
||||
n = _counter[0]
|
||||
p = Project(
|
||||
name=name or f"TestProject{n}",
|
||||
project_code=project_code or f"TP{n}",
|
||||
owner_name="owner",
|
||||
owner_id=owner_id,
|
||||
)
|
||||
db.add(p)
|
||||
db.commit()
|
||||
db.refresh(p)
|
||||
return p
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def make_milestone(db):
|
||||
"""Factory to create a Milestone row."""
|
||||
from app.models.milestone import Milestone, MilestoneStatus
|
||||
|
||||
_counter = [0]
|
||||
|
||||
def _make(project_id, created_by_id, status=MilestoneStatus.OPEN, **kw):
|
||||
_counter[0] += 1
|
||||
n = _counter[0]
|
||||
ms = Milestone(
|
||||
title=kw.pop("title", f"Milestone {n}"),
|
||||
project_id=project_id,
|
||||
created_by_id=created_by_id,
|
||||
status=status,
|
||||
milestone_code=kw.pop("milestone_code", f"M{n:04d}"),
|
||||
**kw,
|
||||
)
|
||||
db.add(ms)
|
||||
db.commit()
|
||||
db.refresh(ms)
|
||||
return ms
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def make_task(db):
|
||||
"""Factory to create a Task row."""
|
||||
from app.models.task import Task, TaskStatus
|
||||
|
||||
_counter = [0]
|
||||
|
||||
def _make(project_id, milestone_id, reporter_id, status=TaskStatus.PENDING, **kw):
|
||||
_counter[0] += 1
|
||||
n = _counter[0]
|
||||
t = Task(
|
||||
title=kw.pop("title", f"Task {n}"),
|
||||
project_id=project_id,
|
||||
milestone_id=milestone_id,
|
||||
reporter_id=reporter_id,
|
||||
created_by_id=kw.pop("created_by_id", reporter_id),
|
||||
status=status,
|
||||
task_code=kw.pop("task_code", f"T{n:04d}"),
|
||||
task_type=kw.pop("task_type", "issue"),
|
||||
task_subtype=kw.pop("task_subtype", None),
|
||||
**kw,
|
||||
)
|
||||
db.add(t)
|
||||
db.commit()
|
||||
db.refresh(t)
|
||||
return t
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def seed_roles_and_permissions(db):
|
||||
"""Create the minimal role + permission setup needed by action endpoints.
|
||||
|
||||
Returns (admin_role, mgr_role, dev_role).
|
||||
"""
|
||||
from app.models.role_permission import Role, Permission, RolePermission
|
||||
|
||||
# --- roles ---
|
||||
admin_role = Role(name="admin", is_global=True)
|
||||
mgr_role = Role(name="mgr", is_global=False)
|
||||
dev_role = Role(name="dev", is_global=False)
|
||||
db.add_all([admin_role, mgr_role, dev_role])
|
||||
db.commit()
|
||||
|
||||
# --- permissions ---
|
||||
perm_names = [
|
||||
("milestone.freeze", "milestone"),
|
||||
("milestone.start", "milestone"),
|
||||
("milestone.close", "milestone"),
|
||||
("task.close", "task"),
|
||||
("task.reopen_closed", "task"),
|
||||
("task.reopen_completed", "task"),
|
||||
("propose.accept", "propose"),
|
||||
("propose.reject", "propose"),
|
||||
("propose.reopen", "propose"),
|
||||
# add broad perms for role checks
|
||||
("project.read", "project"),
|
||||
("project.write", "project"),
|
||||
("milestone.read", "milestone"),
|
||||
("milestone.write", "milestone"),
|
||||
("milestone.create", "milestone"),
|
||||
("task.read", "task"),
|
||||
("task.write", "task"),
|
||||
("task.create", "task"),
|
||||
]
|
||||
perm_objs = {}
|
||||
for name, cat in perm_names:
|
||||
p = Permission(name=name, category=cat, description=name)
|
||||
db.add(p)
|
||||
db.flush()
|
||||
perm_objs[name] = p
|
||||
|
||||
# admin gets all
|
||||
for p in perm_objs.values():
|
||||
db.add(RolePermission(role_id=admin_role.id, permission_id=p.id))
|
||||
|
||||
# mgr gets milestone + propose + task management perms
|
||||
mgr_perms = [
|
||||
"milestone.freeze", "milestone.start", "milestone.close",
|
||||
"task.close", "task.reopen_closed", "task.reopen_completed",
|
||||
"propose.accept", "propose.reject", "propose.reopen",
|
||||
"project.read", "project.write",
|
||||
"milestone.read", "milestone.write", "milestone.create",
|
||||
"task.read", "task.write", "task.create",
|
||||
]
|
||||
for name in mgr_perms:
|
||||
db.add(RolePermission(role_id=mgr_role.id, permission_id=perm_objs[name].id))
|
||||
|
||||
# dev gets basic perms
|
||||
dev_perms = [
|
||||
"project.read", "task.read", "task.write", "task.create",
|
||||
"milestone.read", "task.close", "task.reopen_closed", "task.reopen_completed",
|
||||
]
|
||||
for name in dev_perms:
|
||||
db.add(RolePermission(role_id=dev_role.id, permission_id=perm_objs[name].id))
|
||||
|
||||
db.commit()
|
||||
db.refresh(admin_role)
|
||||
db.refresh(mgr_role)
|
||||
db.refresh(dev_role)
|
||||
return admin_role, mgr_role, dev_role
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def make_member(db):
|
||||
"""Factory to add a user as project member with a given role."""
|
||||
from app.models.models import ProjectMember
|
||||
|
||||
def _make(project_id, user_id, role_id):
|
||||
pm = ProjectMember(project_id=project_id, user_id=user_id, role_id=role_id)
|
||||
db.add(pm)
|
||||
db.commit()
|
||||
return pm
|
||||
|
||||
return _make
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def auth_header():
|
||||
"""Generate a JWT auth header for a given user."""
|
||||
from app.api.deps import create_access_token
|
||||
|
||||
def _make(user):
|
||||
token = create_access_token({"sub": str(user.id)})
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
return _make
|
||||
358
tests/test_milestone_actions.py
Normal file
358
tests/test_milestone_actions.py
Normal file
@@ -0,0 +1,358 @@
|
||||
"""P13.1 — Milestone state-machine action tests.
|
||||
|
||||
Covers:
|
||||
- freeze: success, missing release task, multiple release tasks, wrong status
|
||||
- start: success + started_at, deps not met, wrong status
|
||||
- close: from open/freeze/undergoing, wrong status (completed/closed)
|
||||
- auto-complete: release task completion triggers milestone completed
|
||||
"""
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from app.models.milestone import MilestoneStatus
|
||||
from app.models.task import TaskStatus
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Freeze
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestFreeze:
|
||||
"""POST /projects/{pid}/milestones/{mid}/actions/freeze"""
|
||||
|
||||
def test_freeze_success(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user(is_admin=False)
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id)
|
||||
|
||||
# Create exactly 1 maintenance/release task
|
||||
make_task(
|
||||
project.id, ms.id, user.id,
|
||||
task_type="maintenance", task_subtype="release",
|
||||
)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200, resp.text
|
||||
assert resp.json()["status"] == "freeze"
|
||||
|
||||
db.refresh(ms)
|
||||
assert ms.status == MilestoneStatus.FREEZE
|
||||
|
||||
def test_freeze_no_release_task(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "no maintenance/release task" in resp.json()["detail"].lower()
|
||||
|
||||
def test_freeze_multiple_release_tasks(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id)
|
||||
|
||||
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
|
||||
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "expected exactly 1" in resp.json()["detail"].lower()
|
||||
|
||||
def test_freeze_wrong_status(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.CLOSED)
|
||||
|
||||
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "expected 'open'" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Start
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestStart:
|
||||
"""POST /projects/{pid}/milestones/{mid}/actions/start"""
|
||||
|
||||
def _freeze_milestone(self, db, ms):
|
||||
ms.status = MilestoneStatus.FREEZE
|
||||
db.commit()
|
||||
db.refresh(ms)
|
||||
|
||||
def test_start_success(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id)
|
||||
self._freeze_milestone(db, ms)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200, resp.text
|
||||
data = resp.json()
|
||||
assert data["status"] == "undergoing"
|
||||
assert "started_at" in data
|
||||
|
||||
db.refresh(ms)
|
||||
assert ms.status == MilestoneStatus.UNDERGOING
|
||||
assert ms.started_at is not None
|
||||
|
||||
def test_start_deps_not_met(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
|
||||
# Create a dependency milestone that is NOT completed
|
||||
dep_ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
ms = make_milestone(
|
||||
project.id, user.id,
|
||||
depend_on_milestones=json.dumps([dep_ms.id]),
|
||||
)
|
||||
self._freeze_milestone(db, ms)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "cannot start" in resp.json()["detail"].lower()
|
||||
|
||||
def test_start_wrong_status(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "expected 'freeze'" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Close
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestClose:
|
||||
"""POST /projects/{pid}/milestones/{mid}/actions/close"""
|
||||
|
||||
@pytest.mark.parametrize("initial_status", [
|
||||
MilestoneStatus.OPEN,
|
||||
MilestoneStatus.FREEZE,
|
||||
MilestoneStatus.UNDERGOING,
|
||||
])
|
||||
def test_close_from_allowed_statuses(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
seed_roles_and_permissions, make_member, auth_header, initial_status,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=initial_status)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
|
||||
headers=auth_header(user),
|
||||
json={"reason": "no longer needed"},
|
||||
)
|
||||
assert resp.status_code == 200, resp.text
|
||||
assert resp.json()["status"] == "closed"
|
||||
|
||||
db.refresh(ms)
|
||||
assert ms.status == MilestoneStatus.CLOSED
|
||||
|
||||
@pytest.mark.parametrize("terminal_status", [
|
||||
MilestoneStatus.COMPLETED,
|
||||
MilestoneStatus.CLOSED,
|
||||
])
|
||||
def test_close_from_terminal_rejected(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
seed_roles_and_permissions, make_member, auth_header, terminal_status,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=terminal_status)
|
||||
|
||||
resp = client.post(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Auto-complete
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestAutoComplete:
|
||||
"""When the sole release task is completed, milestone auto-completes."""
|
||||
|
||||
def test_auto_complete_on_release_task_finish(
|
||||
self, db, make_user, make_project, make_milestone, make_task,
|
||||
):
|
||||
"""Direct unit test of try_auto_complete_milestone."""
|
||||
from app.api.routers.milestone_actions import try_auto_complete_milestone
|
||||
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
|
||||
release_task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
task_type="maintenance", task_subtype="release",
|
||||
status=TaskStatus.COMPLETED,
|
||||
)
|
||||
|
||||
try_auto_complete_milestone(db, release_task, user_id=user.id)
|
||||
|
||||
db.refresh(ms)
|
||||
assert ms.status == MilestoneStatus.COMPLETED
|
||||
|
||||
def test_no_auto_complete_for_non_release_task(
|
||||
self, db, make_user, make_project, make_milestone, make_task,
|
||||
):
|
||||
from app.api.routers.milestone_actions import try_auto_complete_milestone
|
||||
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
|
||||
# Also add the required release task (still pending)
|
||||
make_task(
|
||||
project.id, ms.id, user.id,
|
||||
task_type="maintenance", task_subtype="release",
|
||||
status=TaskStatus.PENDING,
|
||||
)
|
||||
|
||||
normal_task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
task_type="issue", task_subtype="defect",
|
||||
status=TaskStatus.COMPLETED,
|
||||
)
|
||||
|
||||
try_auto_complete_milestone(db, normal_task, user_id=user.id)
|
||||
|
||||
db.refresh(ms)
|
||||
assert ms.status == MilestoneStatus.UNDERGOING # unchanged
|
||||
|
||||
def test_no_auto_complete_when_not_undergoing(
|
||||
self, db, make_user, make_project, make_milestone, make_task,
|
||||
):
|
||||
from app.api.routers.milestone_actions import try_auto_complete_milestone
|
||||
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.FREEZE)
|
||||
|
||||
release_task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
task_type="maintenance", task_subtype="release",
|
||||
status=TaskStatus.COMPLETED,
|
||||
)
|
||||
|
||||
try_auto_complete_milestone(db, release_task, user_id=user.id)
|
||||
|
||||
db.refresh(ms)
|
||||
assert ms.status == MilestoneStatus.FREEZE # unchanged
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Preflight
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestPreflight:
|
||||
"""GET /projects/{pid}/milestones/{mid}/actions/preflight"""
|
||||
|
||||
def test_preflight_freeze_allowed(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id)
|
||||
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
|
||||
|
||||
resp = client.get(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["freeze"]["allowed"] is True
|
||||
|
||||
def test_preflight_freeze_not_allowed(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id)
|
||||
# No release task
|
||||
|
||||
resp = client.get(
|
||||
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["freeze"]["allowed"] is False
|
||||
559
tests/test_propose.py
Normal file
559
tests/test_propose.py
Normal file
@@ -0,0 +1,559 @@
|
||||
"""P13.3 — Propose backend tests.
|
||||
|
||||
Covers:
|
||||
- CRUD: create, list, get, update
|
||||
- propose_code per-project incrementing
|
||||
- accept → auto-generate feature story task + feat_task_id
|
||||
- accept with non-open milestone → fail
|
||||
- reject → status change
|
||||
- rejected → reopen back to open
|
||||
- feat_task_id cannot be set manually
|
||||
- edit restrictions (only open proposes editable)
|
||||
- permission checks for accept/reject/reopen
|
||||
"""
|
||||
import pytest
|
||||
from app.models.milestone import MilestoneStatus
|
||||
from app.models.task import TaskStatus
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _propose_url(project_id: int, propose_id: int | None = None) -> str:
|
||||
base = f"/projects/{project_id}/proposes"
|
||||
return f"{base}/{propose_id}" if propose_id else base
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# CRUD
|
||||
# ===========================================================================
|
||||
|
||||
class TestProposeCRUD:
|
||||
"""Basic create / list / get / update."""
|
||||
|
||||
def test_create_propose(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id, project_code="PROJ")
|
||||
make_member(project.id, user.id, dev_role.id)
|
||||
|
||||
resp = client.post(
|
||||
_propose_url(project.id),
|
||||
json={"title": "New Feature Idea", "description": "Some details"},
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 201
|
||||
data = resp.json()
|
||||
assert data["title"] == "New Feature Idea"
|
||||
assert data["status"] == "open"
|
||||
assert data["propose_code"].startswith("PROJ:P")
|
||||
assert data["feat_task_id"] is None
|
||||
|
||||
def test_list_proposes(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, dev_role.id)
|
||||
|
||||
# Create two proposes
|
||||
client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
|
||||
client.post(_propose_url(project.id), json={"title": "P2"}, headers=auth_header(user))
|
||||
|
||||
resp = client.get(_propose_url(project.id), headers=auth_header(user))
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json()) == 2
|
||||
|
||||
def test_get_propose(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, dev_role.id)
|
||||
|
||||
create_resp = client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(user))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["title"] == "P1"
|
||||
|
||||
def test_update_propose_open(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, dev_role.id)
|
||||
|
||||
create_resp = client.post(_propose_url(project.id), json={"title": "Old"}, headers=auth_header(user))
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.patch(
|
||||
_propose_url(project.id, propose_id),
|
||||
json={"title": "New Title", "description": "Updated"},
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["title"] == "New Title"
|
||||
assert resp.json()["description"] == "Updated"
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Propose Code
|
||||
# ===========================================================================
|
||||
|
||||
class TestProposeCode:
|
||||
"""P1.4 — propose_code increments per project independently."""
|
||||
|
||||
def test_code_increments_per_project(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
proj_a = make_project(owner_id=user.id, project_code="ALPHA")
|
||||
proj_b = make_project(owner_id=user.id, project_code="BETA")
|
||||
make_member(proj_a.id, user.id, dev_role.id)
|
||||
make_member(proj_b.id, user.id, dev_role.id)
|
||||
|
||||
# Create 2 in ALPHA
|
||||
r1 = client.post(_propose_url(proj_a.id), json={"title": "A1"}, headers=auth_header(user))
|
||||
r2 = client.post(_propose_url(proj_a.id), json={"title": "A2"}, headers=auth_header(user))
|
||||
|
||||
# Create 1 in BETA
|
||||
r3 = client.post(_propose_url(proj_b.id), json={"title": "B1"}, headers=auth_header(user))
|
||||
|
||||
code1 = r1.json()["propose_code"]
|
||||
code2 = r2.json()["propose_code"]
|
||||
code3 = r3.json()["propose_code"]
|
||||
|
||||
assert code1.startswith("ALPHA:P")
|
||||
assert code2.startswith("ALPHA:P")
|
||||
assert code3.startswith("BETA:P")
|
||||
# They should be distinct
|
||||
assert code1 != code2
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Accept
|
||||
# ===========================================================================
|
||||
|
||||
class TestAccept:
|
||||
"""P6.2 — accept propose → create feature story task."""
|
||||
|
||||
def test_accept_success(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id),
|
||||
json={"title": "Cool Feature", "description": "Do something cool"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["status"] == "accepted"
|
||||
assert data["feat_task_id"] is not None
|
||||
|
||||
# Verify the generated task exists
|
||||
from app.models.task import Task
|
||||
task = db.query(Task).filter(Task.id == int(data["feat_task_id"])).first()
|
||||
assert task is not None
|
||||
assert task.title == "Cool Feature"
|
||||
assert task.description == "Do something cool"
|
||||
assert task.task_type == "story"
|
||||
assert task.task_subtype == "feature"
|
||||
task_status = task.status.value if hasattr(task.status, "value") else task.status
|
||||
assert task_status == "pending"
|
||||
assert task.milestone_id == ms.id
|
||||
|
||||
def test_accept_non_open_milestone_fails(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.FREEZE)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id),
|
||||
json={"title": "Feature X"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "open" in resp.json()["detail"].lower()
|
||||
|
||||
def test_accept_already_accepted_fails(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# First accept
|
||||
client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
|
||||
# Second accept should fail
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_accept_auto_fills_feat_task_id(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
data = resp.json()
|
||||
assert data["feat_task_id"] is not None
|
||||
|
||||
# Re-fetch to confirm persistence
|
||||
get_resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(mgr))
|
||||
assert get_resp.json()["feat_task_id"] == data["feat_task_id"]
|
||||
|
||||
def test_accept_no_permission_fails(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""dev role should not have propose.accept permission."""
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
owner = make_user()
|
||||
dev_user = make_user()
|
||||
project = make_project(owner_id=owner.id)
|
||||
make_member(project.id, owner.id, mgr_role.id)
|
||||
make_member(project.id, dev_user.id, dev_role.id)
|
||||
|
||||
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
# Dev creates the propose
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Dev tries to accept — should fail
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(dev_user),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Reject
|
||||
# ===========================================================================
|
||||
|
||||
class TestReject:
|
||||
"""P6.3 — reject propose."""
|
||||
|
||||
def test_reject_success(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/reject",
|
||||
json={"reason": "Not needed"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "rejected"
|
||||
|
||||
def test_reject_non_open_fails(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Accept first
|
||||
client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
|
||||
# Now reject should fail
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/reject",
|
||||
json={"reason": "Changed mind"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_reject_no_permission_fails(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
owner = make_user()
|
||||
dev_user = make_user()
|
||||
project = make_project(owner_id=owner.id)
|
||||
make_member(project.id, owner.id, mgr_role.id)
|
||||
make_member(project.id, dev_user.id, dev_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/reject",
|
||||
json={"reason": "nah"},
|
||||
headers=auth_header(dev_user),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Reopen
|
||||
# ===========================================================================
|
||||
|
||||
class TestReopen:
|
||||
"""P6.4 — reopen rejected propose."""
|
||||
|
||||
def test_reopen_success(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Reject first
|
||||
client.post(
|
||||
_propose_url(project.id, propose_id) + "/reject",
|
||||
json={"reason": "wait"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
|
||||
# Reopen
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/reopen",
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "open"
|
||||
|
||||
def test_reopen_non_rejected_fails(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Try reopen on open propose — should fail
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/reopen",
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_reopen_no_permission_fails(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
owner = make_user()
|
||||
dev_user = make_user()
|
||||
project = make_project(owner_id=owner.id)
|
||||
make_member(project.id, owner.id, mgr_role.id)
|
||||
make_member(project.id, dev_user.id, dev_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Owner rejects
|
||||
client.post(
|
||||
_propose_url(project.id, propose_id) + "/reject",
|
||||
json={"reason": "nah"},
|
||||
headers=auth_header(owner),
|
||||
)
|
||||
|
||||
# Dev tries to reopen — should fail
|
||||
resp = client.post(
|
||||
_propose_url(project.id, propose_id) + "/reopen",
|
||||
headers=auth_header(dev_user),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# feat_task_id protection
|
||||
# ===========================================================================
|
||||
|
||||
class TestFeatTaskIdProtection:
|
||||
"""P6.5 — feat_task_id is server-side only, cannot be set by client."""
|
||||
|
||||
def test_update_cannot_set_feat_task_id(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, dev_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(user),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Try to set feat_task_id via PATCH
|
||||
resp = client.patch(
|
||||
_propose_url(project.id, propose_id),
|
||||
json={"feat_task_id": "999"},
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
# feat_task_id should still be None (server ignores it)
|
||||
assert resp.json()["feat_task_id"] is None
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Edit restrictions
|
||||
# ===========================================================================
|
||||
|
||||
class TestEditRestrictions:
|
||||
"""Propose editing is only allowed in open status."""
|
||||
|
||||
def test_edit_accepted_propose_fails(
|
||||
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Accept
|
||||
client.post(
|
||||
_propose_url(project.id, propose_id) + "/accept",
|
||||
json={"milestone_id": ms.id},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
|
||||
# Try to edit
|
||||
resp = client.patch(
|
||||
_propose_url(project.id, propose_id),
|
||||
json={"title": "Changed"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "open" in resp.json()["detail"].lower()
|
||||
|
||||
def test_edit_rejected_propose_fails(
|
||||
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
admin_role, mgr_role, dev_role = seed_roles_and_permissions
|
||||
mgr = make_user()
|
||||
project = make_project(owner_id=mgr.id)
|
||||
make_member(project.id, mgr.id, mgr_role.id)
|
||||
|
||||
create_resp = client.post(
|
||||
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
|
||||
)
|
||||
propose_id = create_resp.json()["id"]
|
||||
|
||||
# Reject
|
||||
client.post(
|
||||
_propose_url(project.id, propose_id) + "/reject",
|
||||
json={"reason": "no"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
|
||||
# Try to edit
|
||||
resp = client.patch(
|
||||
_propose_url(project.id, propose_id),
|
||||
json={"title": "Changed"},
|
||||
headers=auth_header(mgr),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
564
tests/test_task_transitions.py
Normal file
564
tests/test_task_transitions.py
Normal file
@@ -0,0 +1,564 @@
|
||||
"""P13.2 — Task state-machine transition tests.
|
||||
|
||||
Covers:
|
||||
- pending → open: success, milestone not undergoing, deps not met
|
||||
- open → undergoing: success, no assignee, non-assignee blocked
|
||||
- undergoing → completed: success with comment, no comment fails, non-assignee blocked
|
||||
- close from pending/open/undergoing: permission required
|
||||
- reopen from completed/closed → open: distinct permissions
|
||||
- invalid transitions: rejected by state machine
|
||||
- edit restrictions: P5.7 body edit guards by status/assignee
|
||||
"""
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from app.models.milestone import MilestoneStatus
|
||||
from app.models.task import TaskStatus
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Helpers
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
def _transition(client, task_id, new_status, headers, comment=None):
|
||||
"""POST /tasks/{id}/transition?new_status=..."""
|
||||
body = {}
|
||||
if comment is not None:
|
||||
body["comment"] = comment
|
||||
return client.post(
|
||||
f"/tasks/{task_id}/transition?new_status={new_status}",
|
||||
json=body,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# pending → open
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestPendingToOpen:
|
||||
|
||||
def test_success(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""pending→open succeeds when milestone is undergoing and no deps."""
|
||||
admin_role, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 200, resp.text
|
||||
assert resp.json()["status"] == "open"
|
||||
|
||||
def test_milestone_not_undergoing(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""pending→open rejected when milestone is still open."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 400
|
||||
assert "undergoing" in resp.json()["detail"].lower()
|
||||
|
||||
def test_deps_not_satisfied(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""pending→open rejected when depend_on tasks are not completed."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
|
||||
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.PENDING,
|
||||
depend_on=json.dumps([dep_task.id]),
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 400
|
||||
assert "depend" in resp.json()["detail"].lower() or "block" in resp.json()["detail"].lower()
|
||||
|
||||
def test_deps_satisfied(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""pending→open succeeds when all depend_on tasks are completed."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
|
||||
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.PENDING,
|
||||
depend_on=json.dumps([dep_task.id]),
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "open"
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# open → undergoing
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestOpenToUndergoing:
|
||||
|
||||
def test_success_assignee_starts(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Assignee can start their own task."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.OPEN,
|
||||
assignee_id=user.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "undergoing", auth_header(user))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "undergoing"
|
||||
db.refresh(task)
|
||||
assert task.started_on is not None
|
||||
|
||||
def test_no_assignee_fails(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Cannot start a task without an assignee."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
|
||||
|
||||
resp = _transition(client, task.id, "undergoing", auth_header(user))
|
||||
assert resp.status_code == 400
|
||||
assert "assignee" in resp.json()["detail"].lower()
|
||||
|
||||
def test_non_assignee_blocked(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""A different user cannot start someone else's task."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
owner = make_user()
|
||||
other = make_user()
|
||||
project = make_project(owner_id=owner.id)
|
||||
make_member(project.id, owner.id, mgr_role.id)
|
||||
make_member(project.id, other.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, owner.id,
|
||||
status=TaskStatus.OPEN,
|
||||
assignee_id=owner.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "undergoing", auth_header(other))
|
||||
assert resp.status_code == 403
|
||||
assert "assigned" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# undergoing → completed
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestUndergoingToCompleted:
|
||||
|
||||
def test_success_with_comment(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Assignee can complete a task with a completion comment."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.UNDERGOING,
|
||||
assignee_id=user.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "completed", auth_header(user), comment="Done!")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "completed"
|
||||
db.refresh(task)
|
||||
assert task.finished_on is not None
|
||||
|
||||
def test_no_comment_fails(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Cannot complete without a comment."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.UNDERGOING,
|
||||
assignee_id=user.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "completed", auth_header(user))
|
||||
assert resp.status_code == 400
|
||||
assert "comment" in resp.json()["detail"].lower()
|
||||
|
||||
def test_empty_comment_fails(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Empty/whitespace comment is rejected."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.UNDERGOING,
|
||||
assignee_id=user.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "completed", auth_header(user), comment=" ")
|
||||
assert resp.status_code == 400
|
||||
assert "comment" in resp.json()["detail"].lower()
|
||||
|
||||
def test_non_assignee_blocked(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Non-assignee cannot complete the task."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
owner = make_user()
|
||||
other = make_user()
|
||||
project = make_project(owner_id=owner.id)
|
||||
make_member(project.id, owner.id, mgr_role.id)
|
||||
make_member(project.id, other.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, owner.id,
|
||||
status=TaskStatus.UNDERGOING,
|
||||
assignee_id=owner.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, "completed", auth_header(other), comment="I finished it")
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Close task (from various states)
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestCloseTask:
|
||||
|
||||
@pytest.mark.parametrize("initial_status", [
|
||||
TaskStatus.PENDING,
|
||||
TaskStatus.OPEN,
|
||||
TaskStatus.UNDERGOING,
|
||||
])
|
||||
def test_close_from_valid_states(
|
||||
self, initial_status,
|
||||
client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Close is allowed from pending/open/undergoing with permission."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=initial_status)
|
||||
|
||||
resp = _transition(client, task.id, "closed", auth_header(user))
|
||||
assert resp.status_code == 200, resp.text
|
||||
assert resp.json()["status"] == "closed"
|
||||
|
||||
@pytest.mark.parametrize("initial_status", [
|
||||
TaskStatus.COMPLETED,
|
||||
TaskStatus.CLOSED,
|
||||
])
|
||||
def test_close_from_terminal_states_fails(
|
||||
self, initial_status,
|
||||
client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Cannot close from completed or already closed."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=initial_status)
|
||||
|
||||
resp = _transition(client, task.id, "closed", auth_header(user))
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_close_without_permission_fails(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""User without task.close permission cannot close."""
|
||||
from app.models.role_permission import Role
|
||||
_, _, dev_role = seed_roles_and_permissions
|
||||
|
||||
# Create a role with NO task.close permission
|
||||
no_close_role = Role(name="viewer", is_global=False)
|
||||
db.add(no_close_role)
|
||||
db.commit()
|
||||
|
||||
# Give viewer only basic perms (project.read, task.read)
|
||||
from app.models.role_permission import Permission, RolePermission
|
||||
for pname in ("project.read", "task.read"):
|
||||
p = db.query(Permission).filter(Permission.name == pname).first()
|
||||
if p:
|
||||
db.add(RolePermission(role_id=no_close_role.id, permission_id=p.id))
|
||||
db.commit()
|
||||
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, no_close_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
|
||||
|
||||
resp = _transition(client, task.id, "closed", auth_header(user))
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Reopen (completed → open, closed → open)
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestReopen:
|
||||
|
||||
def test_reopen_completed(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Reopen from completed → open with task.reopen_completed permission."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "open"
|
||||
# finished_on should be cleared
|
||||
db.refresh(task)
|
||||
assert task.finished_on is None
|
||||
|
||||
def test_reopen_closed(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Reopen from closed → open with task.reopen_closed permission."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.CLOSED)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "open"
|
||||
|
||||
def test_reopen_without_permission_fails(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""User without reopen permission cannot reopen."""
|
||||
from app.models.role_permission import Role, Permission, RolePermission
|
||||
|
||||
# Create a role with task.close but NO reopen permissions
|
||||
limited_role = Role(name="limited", is_global=False)
|
||||
db.add(limited_role)
|
||||
db.commit()
|
||||
for pname in ("project.read", "task.read", "task.write", "task.close"):
|
||||
p = db.query(Permission).filter(Permission.name == pname).first()
|
||||
if p:
|
||||
db.add(RolePermission(role_id=limited_role.id, permission_id=p.id))
|
||||
db.commit()
|
||||
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, limited_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
|
||||
|
||||
resp = _transition(client, task.id, "open", auth_header(user))
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Invalid transitions
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestInvalidTransitions:
|
||||
|
||||
@pytest.mark.parametrize("from_status,to_status", [
|
||||
(TaskStatus.PENDING, "undergoing"),
|
||||
(TaskStatus.PENDING, "completed"),
|
||||
(TaskStatus.OPEN, "completed"),
|
||||
(TaskStatus.OPEN, "pending"),
|
||||
(TaskStatus.UNDERGOING, "open"),
|
||||
(TaskStatus.UNDERGOING, "pending"),
|
||||
(TaskStatus.COMPLETED, "undergoing"),
|
||||
(TaskStatus.COMPLETED, "closed"),
|
||||
(TaskStatus.CLOSED, "undergoing"),
|
||||
(TaskStatus.CLOSED, "completed"),
|
||||
])
|
||||
def test_disallowed_transition(
|
||||
self, from_status, to_status,
|
||||
client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""State machine rejects transitions not in VALID_TRANSITIONS."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=from_status,
|
||||
assignee_id=user.id,
|
||||
)
|
||||
|
||||
resp = _transition(client, task.id, to_status, auth_header(user))
|
||||
assert resp.status_code == 400
|
||||
assert "cannot transition" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------
|
||||
# Edit restrictions (PATCH)
|
||||
# -----------------------------------------------------------------------
|
||||
|
||||
class TestEditRestrictions:
|
||||
|
||||
def test_undergoing_body_edit_blocked(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Cannot PATCH body fields on an undergoing task."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.UNDERGOING, assignee_id=user.id)
|
||||
|
||||
resp = client.patch(
|
||||
f"/tasks/{task.id}",
|
||||
json={"title": "New Title"},
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
assert "undergoing" in resp.json()["detail"].lower()
|
||||
|
||||
def test_completed_body_edit_blocked(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Cannot PATCH body fields on a completed task."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
|
||||
|
||||
resp = client.patch(
|
||||
f"/tasks/{task.id}",
|
||||
json={"title": "Changed"},
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_open_assignee_only_edit(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Open task with assignee: only assignee can edit body."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
owner = make_user()
|
||||
other = make_user()
|
||||
project = make_project(owner_id=owner.id)
|
||||
make_member(project.id, owner.id, mgr_role.id)
|
||||
make_member(project.id, other.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, owner.id,
|
||||
status=TaskStatus.OPEN,
|
||||
assignee_id=owner.id,
|
||||
)
|
||||
|
||||
# Other user cannot edit
|
||||
resp = client.patch(
|
||||
f"/tasks/{task.id}",
|
||||
json={"title": "Hijack"},
|
||||
headers=auth_header(other),
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
# Assignee can edit
|
||||
resp = client.patch(
|
||||
f"/tasks/{task.id}",
|
||||
json={"title": "My Change"},
|
||||
headers=auth_header(owner),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["title"] == "My Change"
|
||||
|
||||
def test_open_no_assignee_anyone_edits(
|
||||
self, client, db, make_user, make_project, make_milestone,
|
||||
make_task, seed_roles_and_permissions, make_member, auth_header,
|
||||
):
|
||||
"""Open task without assignee: any project member can edit."""
|
||||
_, mgr_role, _ = seed_roles_and_permissions
|
||||
user = make_user()
|
||||
project = make_project(owner_id=user.id)
|
||||
make_member(project.id, user.id, mgr_role.id)
|
||||
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
|
||||
task = make_task(
|
||||
project.id, ms.id, user.id,
|
||||
status=TaskStatus.OPEN,
|
||||
assignee_id=None,
|
||||
)
|
||||
|
||||
resp = client.patch(
|
||||
f"/tasks/{task.id}",
|
||||
json={"title": "Anyone's Change"},
|
||||
headers=auth_header(user),
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["title"] == "Anyone's Change"
|
||||
Reference in New Issue
Block a user