Compare commits

..

10 Commits

Author SHA1 Message Date
zhi
e938507a24 test(P13.3): propose backend tests — 19 tests covering CRUD, accept/reject/reopen, code generation, feat_task_id protection, edit restrictions, permissions 2026-03-18 05:01:56 +00:00
zhi
c21e4ee335 test(P13.2): task state-machine tests — 34 tests covering transitions, assignee guards, comments, permissions, edit restrictions 2026-03-18 04:02:29 +00:00
zhi
011a2262ce test(P13.1): add milestone state machine tests — 17 tests covering freeze/start/close/auto-complete/preflight
New test infrastructure:
- tests/conftest.py: SQLite in-memory fixtures, TestClient wired to test DB,
  factory fixtures for User/Project/Milestone/Task/Roles/Permissions
- tests/test_milestone_actions.py: 17 tests covering:
  - freeze success/no-release-task/multiple-release-tasks/wrong-status
  - start success+started_at/deps-not-met/wrong-status
  - close from open/freeze/undergoing, rejected from completed/closed
  - auto-complete on release task finish, no auto-complete for non-release/wrong-status
  - preflight allowed/not-allowed
2026-03-18 03:07:30 +00:00
zhi
7bad57eb0e feat(P5): sync batch transition with P5.3-P5.6 guards — auth, assignee, comment, permission, deps, auto-complete 2026-03-18 01:01:59 +00:00
zhi
00a1786ec3 feat(P12.1): CLI — add propose subcommands, remove task_type=task, add milestone status filter, transition comment support 2026-03-18 00:01:52 +00:00
zhi
586e06f66a feat(P3.6): lock feature story task body edits when milestone is freeze/undergoing/completed/closed 2026-03-17 23:01:39 +00:00
zhi
ec91a15f65 fix(P7.1): remove TaskType.TASK from models.py + fix milestone task defaults (issue/pending) 2026-03-17 23:01:02 +00:00
zhi
8e38d4cf4d feat(P2.2): add default mgr/dev role seeds with preset permissions for milestone/task/propose actions 2026-03-17 19:02:44 +00:00
zhi
0c75045f6f feat(P4.3): wire task depend_on check into pending→open transition via reusable helper 2026-03-17 18:02:08 +00:00
zhi
c6b14ac25f P4.1: Extract reusable dependency check helper, deduplicate milestone_actions.py
- New app/services/dependency_check.py with check_milestone_deps()
- Replaces 3x duplicated JSON-parse + query + filter logic
- Supports both milestone and task dependency checking
- Returns structured DepCheckResult with ok/blockers/reason
- Refactored preflight and start endpoints to use shared helper
2026-03-17 17:03:45 +00:00
12 changed files with 2253 additions and 157 deletions

View File

@@ -3,7 +3,6 @@
Provides freeze / start / close actions on milestones.
completed is triggered automatically when the sole release maintenance task finishes.
"""
import json
from datetime import datetime, timezone
from typing import Optional
@@ -18,6 +17,7 @@ from app.models import models
from app.models.milestone import Milestone, MilestoneStatus
from app.models.task import Task, TaskStatus
from app.services.activity import log_activity
from app.services.dependency_check import check_milestone_deps
router = APIRouter(
prefix="/projects/{project_id}/milestones/{milestone_id}/actions",
@@ -101,38 +101,13 @@ def preflight_milestone_actions(
# --- start pre-check (only meaningful when status == freeze) ---
if ms_status == "freeze":
blockers: list[str] = []
# milestone dependencies
dep_ms_ids = []
if ms.depend_on_milestones:
try:
dep_ms_ids = json.loads(ms.depend_on_milestones)
except (json.JSONDecodeError, TypeError):
dep_ms_ids = []
if dep_ms_ids:
dep_milestones = db.query(Milestone).filter(Milestone.id.in_(dep_ms_ids)).all()
incomplete = [m.id for m in dep_milestones if _ms_status_value(m) != "completed"]
if incomplete:
blockers.append(f"Dependent milestones not completed: {incomplete}")
# task dependencies
dep_task_ids = []
if ms.depend_on_tasks:
try:
dep_task_ids = json.loads(ms.depend_on_tasks)
except (json.JSONDecodeError, TypeError):
dep_task_ids = []
if dep_task_ids:
dep_tasks = db.query(Task).filter(Task.id.in_(dep_task_ids)).all()
incomplete_tasks = [t.id for t in dep_tasks if (t.status.value if hasattr(t.status, "value") else t.status) != "completed"]
if incomplete_tasks:
blockers.append(f"Dependent tasks not completed: {incomplete_tasks}")
if blockers:
result["start"] = {"allowed": False, "reason": "; ".join(blockers)}
else:
dep_result = check_milestone_deps(
db, ms.depend_on_milestones, ms.depend_on_tasks,
)
if dep_result.ok:
result["start"] = {"allowed": True, "reason": None}
else:
result["start"] = {"allowed": False, "reason": dep_result.reason}
return result
@@ -232,51 +207,15 @@ def start_milestone(
detail=f"Cannot start: milestone is '{_ms_status_value(ms)}', expected 'freeze'",
)
# Dependency check — milestone dependencies
dep_ms_ids = []
if ms.depend_on_milestones:
try:
dep_ms_ids = json.loads(ms.depend_on_milestones)
except (json.JSONDecodeError, TypeError):
dep_ms_ids = []
if dep_ms_ids:
dep_milestones = (
db.query(Milestone)
.filter(Milestone.id.in_(dep_ms_ids))
.all()
# Dependency check (P4.1 — shared helper)
dep_result = check_milestone_deps(
db, ms.depend_on_milestones, ms.depend_on_tasks,
)
if not dep_result.ok:
raise HTTPException(
status_code=400,
detail=f"Cannot start: {dep_result.reason}",
)
incomplete = [
m.id
for m in dep_milestones
if _ms_status_value(m) != "completed"
]
if incomplete:
raise HTTPException(
status_code=400,
detail=f"Cannot start: dependent milestones not completed: {incomplete}",
)
# Dependency check — task dependencies
dep_task_ids = []
if ms.depend_on_tasks:
try:
dep_task_ids = json.loads(ms.depend_on_tasks)
except (json.JSONDecodeError, TypeError):
dep_task_ids = []
if dep_task_ids:
dep_tasks = db.query(Task).filter(Task.id.in_(dep_task_ids)).all()
incomplete_tasks = [
t.id
for t in dep_tasks
if (t.status.value if hasattr(t.status, "value") else t.status) != "completed"
]
if incomplete_tasks:
raise HTTPException(
status_code=400,
detail=f"Cannot start: dependent tasks not completed: {incomplete_tasks}",
)
ms.status = MilestoneStatus.UNDERGOING
ms.started_at = datetime.now(timezone.utc)

View File

@@ -173,9 +173,9 @@ def create_milestone_task(project_id: int, milestone_id: int, task_data: schemas
task = Task(
title=data.get("title"),
description=data.get("description"),
task_type=data.get("task_type", "task"),
task_type=data.get("task_type", "issue"),
task_subtype=data.get("task_subtype"),
status=TaskStatus.OPEN,
status=TaskStatus.PENDING,
priority=TaskPriority.MEDIUM,
project_id=project_id,
milestone_id=milestone_id,

View File

@@ -16,6 +16,7 @@ from app.models.notification import Notification as NotificationModel
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task
from app.services.activity import log_activity
from app.services.dependency_check import check_task_deps
router = APIRouter(tags=["Tasks"])
@@ -208,6 +209,21 @@ def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Dep
body_fields = {k for k in update_data.keys() if k not in _always_allowed}
if body_fields:
# P3.6 supplement: feature story tasks locked after milestone freeze
task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "")
task_subtype = task.task_subtype or ""
if task_type == "story" and task_subtype == "feature" and task.milestone_id:
from app.models.milestone import Milestone
ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if ms:
ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status
if ms_status in ("freeze", "undergoing", "completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. "
f"Blocked fields: {sorted(body_fields)}",
)
# undergoing/completed/closed: body edits forbidden
if current_status in ("undergoing", "completed", "closed"):
raise HTTPException(
@@ -293,7 +309,7 @@ def transition_task(
# P5.1: enforce state-machine
_check_transition(old_status, new_status)
# P5.2: pending -> open requires milestone to be undergoing (dependencies checked later)
# P5.2: pending -> open requires milestone to be undergoing + task deps satisfied
if old_status == "pending" and new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
@@ -303,6 +319,13 @@ def transition_task(
status_code=400,
detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'",
)
# P4.3: check task-level depend_on
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
raise HTTPException(
status_code=400,
detail=f"Cannot open task: {dep_result.reason}",
)
# P5.3: open -> undergoing requires assignee AND operator must be the assignee
if old_status == "open" and new_status == "undergoing":
@@ -428,17 +451,24 @@ def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
# ---- Batch ----
class BatchTransition(BaseModel):
task_ids: List[int]
new_status: str
class BatchAssign(BaseModel):
task_ids: List[int]
assignee_id: int
class BatchTransitionBody(BaseModel):
task_ids: List[int]
new_status: str
comment: Optional[str] = None
@router.post("/tasks/batch/transition")
def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)):
def batch_transition(
data: BatchTransitionBody,
bg: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
valid_statuses = [s.value for s in TaskStatus]
if data.new_status not in valid_statuses:
raise HTTPException(status_code=400, detail="Invalid status")
@@ -446,22 +476,106 @@ def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = D
skipped = []
for task_id in data.task_ids:
task = db.query(Task).filter(Task.id == task_id).first()
if task:
old_status = task.status.value if hasattr(task.status, 'value') else task.status
allowed = VALID_TRANSITIONS.get(old_status, set())
if data.new_status not in allowed:
if not task:
skipped.append({"id": task_id, "title": None, "old": None,
"reason": "Task not found"})
continue
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: state-machine check
allowed = VALID_TRANSITIONS.get(old_status, set())
if data.new_status not in allowed:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
continue
# P5.2: pending → open requires milestone undergoing + task deps
if old_status == "pending" and data.new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status != "undergoing":
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Milestone is '{ms_status}', must be 'undergoing'"})
continue
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
"reason": dep_result.reason})
continue
if data.new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if data.new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
if data.new_status == "open" and old_status in ("completed", "closed"):
task.finished_on = None
task.status = data.new_status
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
# P5.3: open → undergoing requires assignee == current_user
if old_status == "open" and data.new_status == "undergoing":
if not task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Assignee must be set before starting"})
continue
if current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can start this task"})
continue
# P5.4: undergoing → completed requires comment + assignee check
if old_status == "undergoing" and data.new_status == "completed":
comment_text = data.comment
if not comment_text or not comment_text.strip():
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "A completion comment is required"})
continue
if task.assignee_id and current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can complete this task"})
continue
# P5.5: close requires permission
if data.new_status == "closed":
try:
check_permission(db, current_user.id, task.project_id, "task.close")
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Missing 'task.close' permission"})
continue
# P5.6: reopen requires permission
if data.new_status == "open" and old_status in ("completed", "closed"):
perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
try:
check_permission(db, current_user.id, task.project_id, perm)
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Missing '{perm}' permission"})
continue
task.finished_on = None
if data.new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if data.new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
task.status = data.new_status
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
# Activity log per task
log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id,
{"old_status": old_status, "new_status": data.new_status})
# P5.4: auto-create completion comment
if old_status == "undergoing" and data.new_status == "completed" and data.comment:
db_comment = models.Comment(
content=data.comment.strip(),
task_id=task.id,
author_id=current_user.id,
)
db.add(db_comment)
db.commit()
# P3.5: auto-complete milestone for any completed task
for u in updated:
if u["new"] == "completed":
t = db.query(Task).filter(Task.id == u["id"]).first()
if t:
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, t, user_id=current_user.id)
for u in updated:
event = "task.closed" if data.new_status == "closed" else "task.updated"
bg.add_task(fire_webhooks_sync, event, u, None, db)

View File

@@ -151,61 +151,93 @@ def init_default_permissions(db: Session) -> list[Permission]:
return db.query(Permission).all()
def init_admin_role(db: Session, admin_user: models.User) -> None:
"""Create admin role with all permissions and guest role with minimal permissions."""
# Check if admin role already exists
admin_role = db.query(Role).filter(Role.name == "admin").first()
if not admin_role:
admin_role = Role(
name="admin",
description="Administrator - full access to all features",
is_global=True
)
db.add(admin_role)
# ---------------------------------------------------------------------------
# Default role → permission mapping
# ---------------------------------------------------------------------------
# mgr: project management + all milestone/task/propose actions
_MGR_PERMISSIONS = {
"project.read", "project.write", "project.manage_members",
"task.create", "task.read", "task.write", "task.delete",
"milestone.create", "milestone.read", "milestone.write", "milestone.delete",
"milestone.freeze", "milestone.start", "milestone.close",
"task.close", "task.reopen_closed", "task.reopen_completed",
"propose.accept", "propose.reject", "propose.reopen",
"monitor.read",
}
# dev: day-to-day development work — no freeze/start/close milestone, no accept/reject propose
_DEV_PERMISSIONS = {
"project.read",
"task.create", "task.read", "task.write",
"milestone.read",
"task.close", "task.reopen_closed", "task.reopen_completed",
"monitor.read",
}
# Role definitions: (name, description, permission_set)
_DEFAULT_ROLES = [
("admin", "Administrator - full access to all features", None), # None ⇒ all perms
("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS),
("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS),
("guest", "Guest - read-only access", None), # special: *.read only
]
def _ensure_role(db: Session, name: str, description: str, is_global: bool = True) -> Role:
"""Get or create a role by name."""
role = db.query(Role).filter(Role.name == name).first()
if not role:
role = Role(name=name, description=description, is_global=is_global)
db.add(role)
db.commit()
db.refresh(admin_role)
logger.info("Created admin role (id=%d)", admin_role.id)
# Check if guest role already exists
guest_role = db.query(Role).filter(Role.name == "guest").first()
if not guest_role:
guest_role = Role(
name="guest",
description="Guest - read-only access",
is_global=True
)
db.add(guest_role)
db.commit()
db.refresh(guest_role)
logger.info("Created guest role (id=%d)", guest_role.id)
# Get all permissions
db.refresh(role)
logger.info("Created role '%s' (id=%d)", name, role.id)
return role
def _sync_role_permissions(db: Session, role: Role, target_perm_names: set[str] | None) -> None:
"""Ensure *role* has exactly the permissions in *target_perm_names*.
* ``None`` means **all** permissions (admin).
* The special sentinel ``"__read_only__"`` is handled by the caller passing
just the ``*.read`` names.
Only adds missing permissions; never removes manually-granted ones (additive).
"""
all_perms = db.query(Permission).all()
# Assign all permissions to admin role
existing_admin_perm_ids = {rp.permission_id for rp in admin_role.permissions}
for perm in all_perms:
if perm.id not in existing_admin_perm_ids:
rp = RolePermission(role_id=admin_role.id, permission_id=perm.id)
db.add(rp)
if all_perms:
perm_by_name = {p.name: p for p in all_perms}
if target_perm_names is None:
wanted_ids = {p.id for p in all_perms}
else:
wanted_ids = {perm_by_name[n].id for n in target_perm_names if n in perm_by_name}
existing_ids = {rp.permission_id for rp in role.permissions}
added = 0
for pid in wanted_ids - existing_ids:
db.add(RolePermission(role_id=role.id, permission_id=pid))
added += 1
if added:
db.commit()
logger.info("Assigned %d permissions to admin role", len(all_perms))
# Assign only read permissions to guest role
read_perms = db.query(Permission).filter(Permission.name.like("%.read")).all()
existing_guest_perm_ids = {rp.permission_id for rp in guest_role.permissions}
for perm in read_perms:
if perm.id not in existing_guest_perm_ids:
rp = RolePermission(role_id=guest_role.id, permission_id=perm.id)
db.add(rp)
if read_perms:
db.commit()
logger.info("Assigned %d read permissions to guest role", len(read_perms))
logger.info("Admin and guest roles setup complete")
logger.info("Assigned %d new permissions to role '%s'", added, role.name)
def init_admin_role(db: Session, admin_user: models.User) -> None:
"""Create default roles (admin / mgr / dev / guest) with preset permissions."""
all_perms = db.query(Permission).all()
read_perm_names = {p.name for p in all_perms if p.name.endswith(".read")}
for name, description, perm_set in _DEFAULT_ROLES:
role = _ensure_role(db, name, description)
if name == "guest":
_sync_role_permissions(db, role, read_perm_names)
else:
_sync_role_permissions(db, role, perm_set)
logger.info("Default roles setup complete (admin, mgr, dev, guest)")
def run_init(db: Session) -> None:

View File

@@ -7,7 +7,7 @@ import enum
class TaskType(str, enum.Enum):
"""Task type enum'issue' is a subtype of task, not the other way around."""
"""Task type enum."""
ISSUE = "issue"
MAINTENANCE = "maintenance"
RESEARCH = "research"
@@ -15,7 +15,6 @@ class TaskType(str, enum.Enum):
STORY = "story"
TEST = "test"
RESOLUTION = "resolution"
TASK = "task"
class TaskStatus(str, enum.Enum):

View File

@@ -0,0 +1,148 @@
"""P4.1 — Reusable dependency-check helpers.
Used by milestone start, milestone preflight, and (future) task pending→open
to verify that all declared dependencies are completed before allowing the
entity to proceed.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Sequence
from sqlalchemy.orm import Session
from app.models.milestone import Milestone
from app.models.task import Task
# ---------------------------------------------------------------------------
# Result type
# ---------------------------------------------------------------------------
@dataclass
class DepCheckResult:
"""Outcome of a dependency check."""
ok: bool = True
blockers: list[str] = field(default_factory=list)
@property
def reason(self) -> str | None:
return "; ".join(self.blockers) if self.blockers else None
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _parse_json_ids(raw: str | None) -> list[int]:
"""Safely parse a JSON-encoded list of integer IDs."""
if not raw:
return []
try:
ids = json.loads(raw)
if isinstance(ids, list):
return [int(i) for i in ids]
except (json.JSONDecodeError, TypeError, ValueError):
pass
return []
def _ms_status(ms: Milestone) -> str:
return ms.status.value if hasattr(ms.status, "value") else ms.status
def _task_status(t: Task) -> str:
return t.status.value if hasattr(t.status, "value") else t.status
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def check_milestone_deps(
db: Session,
depend_on_milestones: str | None,
depend_on_tasks: str | None,
*,
required_status: str = "completed",
) -> DepCheckResult:
"""Check whether all milestone + task dependencies are satisfied.
Parameters
----------
db:
Active DB session.
depend_on_milestones:
JSON-encoded list of milestone IDs (from the entity's field).
depend_on_tasks:
JSON-encoded list of task IDs (from the entity's field).
required_status:
The status that dependees must have reached (default ``"completed"``).
Returns
-------
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
with human-readable ``blockers``.
"""
result = DepCheckResult()
# --- milestone dependencies ---
ms_ids = _parse_json_ids(depend_on_milestones)
if ms_ids:
dep_milestones: Sequence[Milestone] = (
db.query(Milestone).filter(Milestone.id.in_(ms_ids)).all()
)
incomplete = [m.id for m in dep_milestones if _ms_status(m) != required_status]
if incomplete:
result.ok = False
result.blockers.append(f"Dependent milestones not {required_status}: {incomplete}")
# --- task dependencies ---
task_ids = _parse_json_ids(depend_on_tasks)
if task_ids:
dep_tasks: Sequence[Task] = (
db.query(Task).filter(Task.id.in_(task_ids)).all()
)
incomplete_tasks = [t.id for t in dep_tasks if _task_status(t) != required_status]
if incomplete_tasks:
result.ok = False
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete_tasks}")
return result
def check_task_deps(
db: Session,
depend_on: str | None,
*,
required_status: str = "completed",
) -> DepCheckResult:
"""Check whether a task's depend_on tasks are all satisfied.
Parameters
----------
db:
Active DB session.
depend_on:
JSON-encoded list of task IDs (from the task's ``depend_on`` field).
required_status:
The status dependees must have reached (default ``"completed"``).
Returns
-------
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
with human-readable ``blockers``.
"""
result = DepCheckResult()
task_ids = _parse_json_ids(depend_on)
if task_ids:
dep_tasks: Sequence[Task] = (
db.query(Task).filter(Task.id.in_(task_ids)).all()
)
incomplete = [t.id for t in dep_tasks if _task_status(t) != required_status]
if incomplete:
result.ok = False
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete}")
return result

92
cli.py
View File

@@ -23,7 +23,6 @@ STATUS_ICON = {
}
TYPE_ICON = {
"resolution": "⚖️",
"task": "📋",
"story": "📖",
"test": "🧪",
"issue": "📌",
@@ -151,10 +150,56 @@ def cmd_search(args):
def cmd_transition(args):
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}")
body = {}
if args.comment:
body["comment"] = args.comment
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}", body or None)
print(f"Task #{result['id']} transitioned to: {result['status']}")
# ── Propose commands ──────────────────────────────────────────────
def cmd_proposes(args):
if not args.project:
print("Error: --project is required for proposes", file=sys.stderr)
sys.exit(1)
result = _request("GET", f"/projects/{args.project}/proposes")
items = result if isinstance(result, list) else result.get("items", [])
if not items:
print(" No proposes found.")
return
for p in items:
status_icon = STATUS_ICON.get(p["status"], "")
feat = f" → task {p['feat_task_id']}" if p.get("feat_task_id") else ""
print(f" {status_icon} 💡 {p['propose_code']} {p['title']}{feat}")
def cmd_propose_create(args):
data = {"title": args.title}
if args.description:
data["description"] = args.description
result = _request("POST", f"/projects/{args.project}/proposes", data)
print(f"Created propose {result['propose_code']}: {result['title']}")
def cmd_propose_accept(args):
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/accept?milestone_id={args.milestone}")
print(f"Propose #{args.propose_id} accepted → task {result.get('feat_task_id', '?')}")
def cmd_propose_reject(args):
data = {}
if args.reason:
data["reason"] = args.reason
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reject", data or None)
print(f"Propose #{args.propose_id} rejected")
def cmd_propose_reopen(args):
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reopen")
print(f"Propose #{args.propose_id} reopened")
def cmd_stats(args):
params = f"?project_id={args.project}" if args.project else ""
stats = _request("GET", f"/dashboard/stats{params}")
@@ -170,8 +215,13 @@ def cmd_stats(args):
def cmd_milestones(args):
params = f"?project_id={args.project}" if args.project else ""
milestones = _request("GET", f"/milestones{params}")
params = []
if args.project:
params.append(f"project_id={args.project}")
if args.status:
params.append(f"status={args.status}")
qs = f"?{'&'.join(params)}" if params else ""
milestones = _request("GET", f"/milestones{qs}")
if not milestones:
print(" No milestones found.")
return
@@ -242,7 +292,7 @@ def main():
p_tasks = sub.add_parser("tasks", aliases=["issues"], help="List tasks")
p_tasks.add_argument("--project", "-p", type=int)
p_tasks.add_argument("--type", "-t", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_tasks.add_argument("--type", "-t", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_tasks.add_argument("--status", "-s", choices=["open", "pending", "undergoing", "completed", "closed"])
p_create = sub.add_parser("create-task", aliases=["create-issue"], help="Create a task")
@@ -250,7 +300,7 @@ def main():
p_create.add_argument("--project", "-p", type=int, required=True)
p_create.add_argument("--milestone", "-m", type=int, required=True)
p_create.add_argument("--reporter", "-r", type=int, required=True)
p_create.add_argument("--type", "-t", default="task", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_create.add_argument("--type", "-t", default="issue", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_create.add_argument("--subtype")
p_create.add_argument("--priority", choices=["low", "medium", "high", "critical"])
p_create.add_argument("--description", "-d")
@@ -271,12 +321,14 @@ def main():
p_trans = sub.add_parser("transition", help="Transition task status")
p_trans.add_argument("task_id", type=int)
p_trans.add_argument("status", choices=["open", "pending", "undergoing", "completed", "closed"])
p_trans.add_argument("--comment", "-c", help="Comment (required for undergoing→completed)")
p_stats = sub.add_parser("stats", help="Dashboard stats")
p_stats.add_argument("--project", "-p", type=int)
p_ms = sub.add_parser("milestones", help="List milestones")
p_ms.add_argument("--project", "-p", type=int)
p_ms.add_argument("--status", "-s", choices=["open", "freeze", "undergoing", "completed", "closed"])
p_msp = sub.add_parser("milestone-progress", help="Show milestone progress")
p_msp.add_argument("milestone_id", type=int)
@@ -296,6 +348,29 @@ def main():
p_worklogs = sub.add_parser("worklogs", help="List work logs for a task")
p_worklogs.add_argument("task_id", type=int)
# ── Propose subcommands ──
p_proposes = sub.add_parser("proposes", help="List proposes for a project")
p_proposes.add_argument("--project", "-p", type=int, required=True)
p_pc = sub.add_parser("propose-create", help="Create a propose")
p_pc.add_argument("title")
p_pc.add_argument("--project", "-p", type=int, required=True)
p_pc.add_argument("--description", "-d")
p_pa = sub.add_parser("propose-accept", help="Accept a propose into a milestone")
p_pa.add_argument("propose_id", type=int)
p_pa.add_argument("--project", "-p", type=int, required=True)
p_pa.add_argument("--milestone", "-m", type=int, required=True)
p_pr = sub.add_parser("propose-reject", help="Reject a propose")
p_pr.add_argument("propose_id", type=int)
p_pr.add_argument("--project", "-p", type=int, required=True)
p_pr.add_argument("--reason", "-r")
p_pro = sub.add_parser("propose-reopen", help="Reopen a rejected propose")
p_pro.add_argument("propose_id", type=int)
p_pro.add_argument("--project", "-p", type=int, required=True)
args = parser.parse_args()
if not args.command:
parser.print_help()
@@ -320,6 +395,11 @@ def main():
"overdue": cmd_overdue,
"log-time": cmd_log_time,
"worklogs": cmd_worklogs,
"proposes": cmd_proposes,
"propose-create": cmd_propose_create,
"propose-accept": cmd_propose_accept,
"propose-reject": cmd_propose_reject,
"propose-reopen": cmd_propose_reopen,
}
cmds[args.command](args)

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# tests package

302
tests/conftest.py Normal file
View File

@@ -0,0 +1,302 @@
"""Shared test fixtures — SQLite in-memory DB + FastAPI TestClient.
This avoids needing MySQL for unit/integration tests.
All models are created fresh for every test function (function-scoped session).
"""
import sys, os
# Ensure the backend app package is importable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
# --- Override engine BEFORE any app import touches the real DB ---
from app.core.config import Base
# Force-import ALL model modules so Base.metadata knows every table
import app.models.models # noqa: F401 — User, Project, Comment, etc.
import app.models.milestone # noqa: F401
import app.models.task # noqa: F401
import app.models.role_permission # noqa: F401
import app.models.activity # noqa: F401
import app.models.propose # noqa: F401
try:
import app.models.apikey # noqa: F401
except ImportError:
pass
try:
import app.models.webhook # noqa: F401
except ImportError:
pass
TEST_DATABASE_URL = "sqlite://" # in-memory
engine = create_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
# Use StaticPool so all sessions share the same in-memory connection
poolclass=__import__("sqlalchemy.pool", fromlist=["StaticPool"]).StaticPool,
)
# SQLite needs foreign keys enabled per-connection
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn, _):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def setup_database():
"""Create all tables before each test, drop after."""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture()
def db():
"""Yield a DB session for direct model manipulation."""
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
@pytest.fixture()
def client(db):
"""FastAPI TestClient wired to the test DB + a default authenticated user."""
from fastapi.testclient import TestClient
from app.main import app
from app.core.config import get_db
# Override DB dependency
def _override_get_db():
try:
yield db
finally:
pass # caller's `db` fixture handles close
app.dependency_overrides[get_db] = _override_get_db
yield TestClient(app)
app.dependency_overrides.clear()
# ---------------------------------------------------------------------------
# Helper factories
# ---------------------------------------------------------------------------
@pytest.fixture()
def make_user(db):
"""Factory to create a User row."""
from app.models.models import User
_counter = [0]
# Pre-compute a bcrypt hash for "password" to avoid passlib/bcrypt version issues
_pwd_hash = "$2b$12$LJ3m4ys/Xz.l1PaOHHKN/uE7dQFnSm1AUBfEkL0C2dN9.3Oau4XG"
def _make(username=None, is_admin=False):
_counter[0] += 1
n = _counter[0]
u = User(
username=username or f"testuser{n}",
email=f"test{n}@example.com",
hashed_password=_pwd_hash,
is_active=True,
is_admin=is_admin,
)
db.add(u)
db.commit()
db.refresh(u)
return u
return _make
@pytest.fixture()
def make_project(db):
"""Factory to create a Project row."""
from app.models.models import Project
_counter = [0]
def _make(owner_id, name=None, project_code=None):
_counter[0] += 1
n = _counter[0]
p = Project(
name=name or f"TestProject{n}",
project_code=project_code or f"TP{n}",
owner_name="owner",
owner_id=owner_id,
)
db.add(p)
db.commit()
db.refresh(p)
return p
return _make
@pytest.fixture()
def make_milestone(db):
"""Factory to create a Milestone row."""
from app.models.milestone import Milestone, MilestoneStatus
_counter = [0]
def _make(project_id, created_by_id, status=MilestoneStatus.OPEN, **kw):
_counter[0] += 1
n = _counter[0]
ms = Milestone(
title=kw.pop("title", f"Milestone {n}"),
project_id=project_id,
created_by_id=created_by_id,
status=status,
milestone_code=kw.pop("milestone_code", f"M{n:04d}"),
**kw,
)
db.add(ms)
db.commit()
db.refresh(ms)
return ms
return _make
@pytest.fixture()
def make_task(db):
"""Factory to create a Task row."""
from app.models.task import Task, TaskStatus
_counter = [0]
def _make(project_id, milestone_id, reporter_id, status=TaskStatus.PENDING, **kw):
_counter[0] += 1
n = _counter[0]
t = Task(
title=kw.pop("title", f"Task {n}"),
project_id=project_id,
milestone_id=milestone_id,
reporter_id=reporter_id,
created_by_id=kw.pop("created_by_id", reporter_id),
status=status,
task_code=kw.pop("task_code", f"T{n:04d}"),
task_type=kw.pop("task_type", "issue"),
task_subtype=kw.pop("task_subtype", None),
**kw,
)
db.add(t)
db.commit()
db.refresh(t)
return t
return _make
@pytest.fixture()
def seed_roles_and_permissions(db):
"""Create the minimal role + permission setup needed by action endpoints.
Returns (admin_role, mgr_role, dev_role).
"""
from app.models.role_permission import Role, Permission, RolePermission
# --- roles ---
admin_role = Role(name="admin", is_global=True)
mgr_role = Role(name="mgr", is_global=False)
dev_role = Role(name="dev", is_global=False)
db.add_all([admin_role, mgr_role, dev_role])
db.commit()
# --- permissions ---
perm_names = [
("milestone.freeze", "milestone"),
("milestone.start", "milestone"),
("milestone.close", "milestone"),
("task.close", "task"),
("task.reopen_closed", "task"),
("task.reopen_completed", "task"),
("propose.accept", "propose"),
("propose.reject", "propose"),
("propose.reopen", "propose"),
# add broad perms for role checks
("project.read", "project"),
("project.write", "project"),
("milestone.read", "milestone"),
("milestone.write", "milestone"),
("milestone.create", "milestone"),
("task.read", "task"),
("task.write", "task"),
("task.create", "task"),
]
perm_objs = {}
for name, cat in perm_names:
p = Permission(name=name, category=cat, description=name)
db.add(p)
db.flush()
perm_objs[name] = p
# admin gets all
for p in perm_objs.values():
db.add(RolePermission(role_id=admin_role.id, permission_id=p.id))
# mgr gets milestone + propose + task management perms
mgr_perms = [
"milestone.freeze", "milestone.start", "milestone.close",
"task.close", "task.reopen_closed", "task.reopen_completed",
"propose.accept", "propose.reject", "propose.reopen",
"project.read", "project.write",
"milestone.read", "milestone.write", "milestone.create",
"task.read", "task.write", "task.create",
]
for name in mgr_perms:
db.add(RolePermission(role_id=mgr_role.id, permission_id=perm_objs[name].id))
# dev gets basic perms
dev_perms = [
"project.read", "task.read", "task.write", "task.create",
"milestone.read", "task.close", "task.reopen_closed", "task.reopen_completed",
]
for name in dev_perms:
db.add(RolePermission(role_id=dev_role.id, permission_id=perm_objs[name].id))
db.commit()
db.refresh(admin_role)
db.refresh(mgr_role)
db.refresh(dev_role)
return admin_role, mgr_role, dev_role
@pytest.fixture()
def make_member(db):
"""Factory to add a user as project member with a given role."""
from app.models.models import ProjectMember
def _make(project_id, user_id, role_id):
pm = ProjectMember(project_id=project_id, user_id=user_id, role_id=role_id)
db.add(pm)
db.commit()
return pm
return _make
@pytest.fixture()
def auth_header():
"""Generate a JWT auth header for a given user."""
from app.api.deps import create_access_token
def _make(user):
token = create_access_token({"sub": str(user.id)})
return {"Authorization": f"Bearer {token}"}
return _make

View File

@@ -0,0 +1,358 @@
"""P13.1 — Milestone state-machine action tests.
Covers:
- freeze: success, missing release task, multiple release tasks, wrong status
- start: success + started_at, deps not met, wrong status
- close: from open/freeze/undergoing, wrong status (completed/closed)
- auto-complete: release task completion triggers milestone completed
"""
import json
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# -----------------------------------------------------------------------
# Freeze
# -----------------------------------------------------------------------
class TestFreeze:
"""POST /projects/{pid}/milestones/{mid}/actions/freeze"""
def test_freeze_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, _ = seed_roles_and_permissions
user = make_user(is_admin=False)
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
# Create exactly 1 maintenance/release task
make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "freeze"
db.refresh(ms)
assert ms.status == MilestoneStatus.FREEZE
def test_freeze_no_release_task(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "no maintenance/release task" in resp.json()["detail"].lower()
def test_freeze_multiple_release_tasks(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected exactly 1" in resp.json()["detail"].lower()
def test_freeze_wrong_status(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.CLOSED)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected 'open'" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Start
# -----------------------------------------------------------------------
class TestStart:
"""POST /projects/{pid}/milestones/{mid}/actions/start"""
def _freeze_milestone(self, db, ms):
ms.status = MilestoneStatus.FREEZE
db.commit()
db.refresh(ms)
def test_start_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
self._freeze_milestone(db, ms)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["status"] == "undergoing"
assert "started_at" in data
db.refresh(ms)
assert ms.status == MilestoneStatus.UNDERGOING
assert ms.started_at is not None
def test_start_deps_not_met(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
# Create a dependency milestone that is NOT completed
dep_ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
ms = make_milestone(
project.id, user.id,
depend_on_milestones=json.dumps([dep_ms.id]),
)
self._freeze_milestone(db, ms)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "cannot start" in resp.json()["detail"].lower()
def test_start_wrong_status(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected 'freeze'" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Close
# -----------------------------------------------------------------------
class TestClose:
"""POST /projects/{pid}/milestones/{mid}/actions/close"""
@pytest.mark.parametrize("initial_status", [
MilestoneStatus.OPEN,
MilestoneStatus.FREEZE,
MilestoneStatus.UNDERGOING,
])
def test_close_from_allowed_statuses(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header, initial_status,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=initial_status)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
headers=auth_header(user),
json={"reason": "no longer needed"},
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "closed"
db.refresh(ms)
assert ms.status == MilestoneStatus.CLOSED
@pytest.mark.parametrize("terminal_status", [
MilestoneStatus.COMPLETED,
MilestoneStatus.CLOSED,
])
def test_close_from_terminal_rejected(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header, terminal_status,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=terminal_status)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
headers=auth_header(user),
)
assert resp.status_code == 400
# -----------------------------------------------------------------------
# Auto-complete
# -----------------------------------------------------------------------
class TestAutoComplete:
"""When the sole release task is completed, milestone auto-completes."""
def test_auto_complete_on_release_task_finish(
self, db, make_user, make_project, make_milestone, make_task,
):
"""Direct unit test of try_auto_complete_milestone."""
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
release_task = make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, release_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.COMPLETED
def test_no_auto_complete_for_non_release_task(
self, db, make_user, make_project, make_milestone, make_task,
):
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
# Also add the required release task (still pending)
make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.PENDING,
)
normal_task = make_task(
project.id, ms.id, user.id,
task_type="issue", task_subtype="defect",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, normal_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.UNDERGOING # unchanged
def test_no_auto_complete_when_not_undergoing(
self, db, make_user, make_project, make_milestone, make_task,
):
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.FREEZE)
release_task = make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, release_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.FREEZE # unchanged
# -----------------------------------------------------------------------
# Preflight
# -----------------------------------------------------------------------
class TestPreflight:
"""GET /projects/{pid}/milestones/{mid}/actions/preflight"""
def test_preflight_freeze_allowed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.get(
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
headers=auth_header(user),
)
assert resp.status_code == 200
data = resp.json()
assert data["freeze"]["allowed"] is True
def test_preflight_freeze_not_allowed(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
# No release task
resp = client.get(
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
headers=auth_header(user),
)
assert resp.status_code == 200
data = resp.json()
assert data["freeze"]["allowed"] is False

559
tests/test_propose.py Normal file
View File

@@ -0,0 +1,559 @@
"""P13.3 — Propose backend tests.
Covers:
- CRUD: create, list, get, update
- propose_code per-project incrementing
- accept → auto-generate feature story task + feat_task_id
- accept with non-open milestone → fail
- reject → status change
- rejected → reopen back to open
- feat_task_id cannot be set manually
- edit restrictions (only open proposes editable)
- permission checks for accept/reject/reopen
"""
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _propose_url(project_id: int, propose_id: int | None = None) -> str:
base = f"/projects/{project_id}/proposes"
return f"{base}/{propose_id}" if propose_id else base
# ===========================================================================
# CRUD
# ===========================================================================
class TestProposeCRUD:
"""Basic create / list / get / update."""
def test_create_propose(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id, project_code="PROJ")
make_member(project.id, user.id, dev_role.id)
resp = client.post(
_propose_url(project.id),
json={"title": "New Feature Idea", "description": "Some details"},
headers=auth_header(user),
)
assert resp.status_code == 201
data = resp.json()
assert data["title"] == "New Feature Idea"
assert data["status"] == "open"
assert data["propose_code"].startswith("PROJ:P")
assert data["feat_task_id"] is None
def test_list_proposes(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
# Create two proposes
client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
client.post(_propose_url(project.id), json={"title": "P2"}, headers=auth_header(user))
resp = client.get(_propose_url(project.id), headers=auth_header(user))
assert resp.status_code == 200
assert len(resp.json()) == 2
def test_get_propose(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
propose_id = create_resp.json()["id"]
resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(user))
assert resp.status_code == 200
assert resp.json()["title"] == "P1"
def test_update_propose_open(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(_propose_url(project.id), json={"title": "Old"}, headers=auth_header(user))
propose_id = create_resp.json()["id"]
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "New Title", "description": "Updated"},
headers=auth_header(user),
)
assert resp.status_code == 200
assert resp.json()["title"] == "New Title"
assert resp.json()["description"] == "Updated"
# ===========================================================================
# Propose Code
# ===========================================================================
class TestProposeCode:
"""P1.4 — propose_code increments per project independently."""
def test_code_increments_per_project(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
proj_a = make_project(owner_id=user.id, project_code="ALPHA")
proj_b = make_project(owner_id=user.id, project_code="BETA")
make_member(proj_a.id, user.id, dev_role.id)
make_member(proj_b.id, user.id, dev_role.id)
# Create 2 in ALPHA
r1 = client.post(_propose_url(proj_a.id), json={"title": "A1"}, headers=auth_header(user))
r2 = client.post(_propose_url(proj_a.id), json={"title": "A2"}, headers=auth_header(user))
# Create 1 in BETA
r3 = client.post(_propose_url(proj_b.id), json={"title": "B1"}, headers=auth_header(user))
code1 = r1.json()["propose_code"]
code2 = r2.json()["propose_code"]
code3 = r3.json()["propose_code"]
assert code1.startswith("ALPHA:P")
assert code2.startswith("ALPHA:P")
assert code3.startswith("BETA:P")
# They should be distinct
assert code1 != code2
# ===========================================================================
# Accept
# ===========================================================================
class TestAccept:
"""P6.2 — accept propose → create feature story task."""
def test_accept_success(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id),
json={"title": "Cool Feature", "description": "Do something cool"},
headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "accepted"
assert data["feat_task_id"] is not None
# Verify the generated task exists
from app.models.task import Task
task = db.query(Task).filter(Task.id == int(data["feat_task_id"])).first()
assert task is not None
assert task.title == "Cool Feature"
assert task.description == "Do something cool"
assert task.task_type == "story"
assert task.task_subtype == "feature"
task_status = task.status.value if hasattr(task.status, "value") else task.status
assert task_status == "pending"
assert task.milestone_id == ms.id
def test_accept_non_open_milestone_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.FREEZE)
create_resp = client.post(
_propose_url(project.id),
json={"title": "Feature X"},
headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 400
assert "open" in resp.json()["detail"].lower()
def test_accept_already_accepted_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# First accept
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Second accept should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_accept_auto_fills_feat_task_id(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
data = resp.json()
assert data["feat_task_id"] is not None
# Re-fetch to confirm persistence
get_resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(mgr))
assert get_resp.json()["feat_task_id"] == data["feat_task_id"]
def test_accept_no_permission_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
"""dev role should not have propose.accept permission."""
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.OPEN)
# Dev creates the propose
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
# Dev tries to accept — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# Reject
# ===========================================================================
class TestReject:
"""P6.3 — reject propose."""
def test_reject_success(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "Not needed"},
headers=auth_header(mgr),
)
assert resp.status_code == 200
assert resp.json()["status"] == "rejected"
def test_reject_non_open_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Accept first
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Now reject should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "Changed mind"},
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_reject_no_permission_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "nah"},
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# Reopen
# ===========================================================================
class TestReopen:
"""P6.4 — reopen rejected propose."""
def test_reopen_success(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Reject first
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "wait"},
headers=auth_header(mgr),
)
# Reopen
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(mgr),
)
assert resp.status_code == 200
assert resp.json()["status"] == "open"
def test_reopen_non_rejected_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Try reopen on open propose — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_reopen_no_permission_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
# Owner rejects
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "nah"},
headers=auth_header(owner),
)
# Dev tries to reopen — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# feat_task_id protection
# ===========================================================================
class TestFeatTaskIdProtection:
"""P6.5 — feat_task_id is server-side only, cannot be set by client."""
def test_update_cannot_set_feat_task_id(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(user),
)
propose_id = create_resp.json()["id"]
# Try to set feat_task_id via PATCH
resp = client.patch(
_propose_url(project.id, propose_id),
json={"feat_task_id": "999"},
headers=auth_header(user),
)
assert resp.status_code == 200
# feat_task_id should still be None (server ignores it)
assert resp.json()["feat_task_id"] is None
# ===========================================================================
# Edit restrictions
# ===========================================================================
class TestEditRestrictions:
"""Propose editing is only allowed in open status."""
def test_edit_accepted_propose_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Accept
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Try to edit
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "Changed"},
headers=auth_header(mgr),
)
assert resp.status_code == 400
assert "open" in resp.json()["detail"].lower()
def test_edit_rejected_propose_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Reject
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "no"},
headers=auth_header(mgr),
)
# Try to edit
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "Changed"},
headers=auth_header(mgr),
)
assert resp.status_code == 400

View File

@@ -0,0 +1,564 @@
"""P13.2 — Task state-machine transition tests.
Covers:
- pending → open: success, milestone not undergoing, deps not met
- open → undergoing: success, no assignee, non-assignee blocked
- undergoing → completed: success with comment, no comment fails, non-assignee blocked
- close from pending/open/undergoing: permission required
- reopen from completed/closed → open: distinct permissions
- invalid transitions: rejected by state machine
- edit restrictions: P5.7 body edit guards by status/assignee
"""
import json
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# -----------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------
def _transition(client, task_id, new_status, headers, comment=None):
"""POST /tasks/{id}/transition?new_status=..."""
body = {}
if comment is not None:
body["comment"] = comment
return client.post(
f"/tasks/{task_id}/transition?new_status={new_status}",
json=body,
headers=headers,
)
# -----------------------------------------------------------------------
# pending → open
# -----------------------------------------------------------------------
class TestPendingToOpen:
def test_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open succeeds when milestone is undergoing and no deps."""
admin_role, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "open"
def test_milestone_not_undergoing(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open rejected when milestone is still open."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 400
assert "undergoing" in resp.json()["detail"].lower()
def test_deps_not_satisfied(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open rejected when depend_on tasks are not completed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.PENDING,
depend_on=json.dumps([dep_task.id]),
)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 400
assert "depend" in resp.json()["detail"].lower() or "block" in resp.json()["detail"].lower()
def test_deps_satisfied(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open succeeds when all depend_on tasks are completed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.PENDING,
depend_on=json.dumps([dep_task.id]),
)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
# -----------------------------------------------------------------------
# open → undergoing
# -----------------------------------------------------------------------
class TestOpenToUndergoing:
def test_success_assignee_starts(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Assignee can start their own task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.OPEN,
assignee_id=user.id,
)
resp = _transition(client, task.id, "undergoing", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "undergoing"
db.refresh(task)
assert task.started_on is not None
def test_no_assignee_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot start a task without an assignee."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
resp = _transition(client, task.id, "undergoing", auth_header(user))
assert resp.status_code == 400
assert "assignee" in resp.json()["detail"].lower()
def test_non_assignee_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""A different user cannot start someone else's task."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.OPEN,
assignee_id=owner.id,
)
resp = _transition(client, task.id, "undergoing", auth_header(other))
assert resp.status_code == 403
assert "assigned" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# undergoing → completed
# -----------------------------------------------------------------------
class TestUndergoingToCompleted:
def test_success_with_comment(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Assignee can complete a task with a completion comment."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user), comment="Done!")
assert resp.status_code == 200
assert resp.json()["status"] == "completed"
db.refresh(task)
assert task.finished_on is not None
def test_no_comment_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot complete without a comment."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user))
assert resp.status_code == 400
assert "comment" in resp.json()["detail"].lower()
def test_empty_comment_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Empty/whitespace comment is rejected."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user), comment=" ")
assert resp.status_code == 400
assert "comment" in resp.json()["detail"].lower()
def test_non_assignee_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Non-assignee cannot complete the task."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.UNDERGOING,
assignee_id=owner.id,
)
resp = _transition(client, task.id, "completed", auth_header(other), comment="I finished it")
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Close task (from various states)
# -----------------------------------------------------------------------
class TestCloseTask:
@pytest.mark.parametrize("initial_status", [
TaskStatus.PENDING,
TaskStatus.OPEN,
TaskStatus.UNDERGOING,
])
def test_close_from_valid_states(
self, initial_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Close is allowed from pending/open/undergoing with permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=initial_status)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "closed"
@pytest.mark.parametrize("initial_status", [
TaskStatus.COMPLETED,
TaskStatus.CLOSED,
])
def test_close_from_terminal_states_fails(
self, initial_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot close from completed or already closed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=initial_status)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 400
def test_close_without_permission_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""User without task.close permission cannot close."""
from app.models.role_permission import Role
_, _, dev_role = seed_roles_and_permissions
# Create a role with NO task.close permission
no_close_role = Role(name="viewer", is_global=False)
db.add(no_close_role)
db.commit()
# Give viewer only basic perms (project.read, task.read)
from app.models.role_permission import Permission, RolePermission
for pname in ("project.read", "task.read"):
p = db.query(Permission).filter(Permission.name == pname).first()
if p:
db.add(RolePermission(role_id=no_close_role.id, permission_id=p.id))
db.commit()
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, no_close_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Reopen (completed → open, closed → open)
# -----------------------------------------------------------------------
class TestReopen:
def test_reopen_completed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Reopen from completed → open with task.reopen_completed permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
# finished_on should be cleared
db.refresh(task)
assert task.finished_on is None
def test_reopen_closed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Reopen from closed → open with task.reopen_closed permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.CLOSED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
def test_reopen_without_permission_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""User without reopen permission cannot reopen."""
from app.models.role_permission import Role, Permission, RolePermission
# Create a role with task.close but NO reopen permissions
limited_role = Role(name="limited", is_global=False)
db.add(limited_role)
db.commit()
for pname in ("project.read", "task.read", "task.write", "task.close"):
p = db.query(Permission).filter(Permission.name == pname).first()
if p:
db.add(RolePermission(role_id=limited_role.id, permission_id=p.id))
db.commit()
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, limited_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Invalid transitions
# -----------------------------------------------------------------------
class TestInvalidTransitions:
@pytest.mark.parametrize("from_status,to_status", [
(TaskStatus.PENDING, "undergoing"),
(TaskStatus.PENDING, "completed"),
(TaskStatus.OPEN, "completed"),
(TaskStatus.OPEN, "pending"),
(TaskStatus.UNDERGOING, "open"),
(TaskStatus.UNDERGOING, "pending"),
(TaskStatus.COMPLETED, "undergoing"),
(TaskStatus.COMPLETED, "closed"),
(TaskStatus.CLOSED, "undergoing"),
(TaskStatus.CLOSED, "completed"),
])
def test_disallowed_transition(
self, from_status, to_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""State machine rejects transitions not in VALID_TRANSITIONS."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=from_status,
assignee_id=user.id,
)
resp = _transition(client, task.id, to_status, auth_header(user))
assert resp.status_code == 400
assert "cannot transition" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Edit restrictions (PATCH)
# -----------------------------------------------------------------------
class TestEditRestrictions:
def test_undergoing_body_edit_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot PATCH body fields on an undergoing task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.UNDERGOING, assignee_id=user.id)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "New Title"},
headers=auth_header(user),
)
assert resp.status_code == 400
assert "undergoing" in resp.json()["detail"].lower()
def test_completed_body_edit_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot PATCH body fields on a completed task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Changed"},
headers=auth_header(user),
)
assert resp.status_code == 400
def test_open_assignee_only_edit(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Open task with assignee: only assignee can edit body."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.OPEN,
assignee_id=owner.id,
)
# Other user cannot edit
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Hijack"},
headers=auth_header(other),
)
assert resp.status_code == 403
# Assignee can edit
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "My Change"},
headers=auth_header(owner),
)
assert resp.status_code == 200
assert resp.json()["title"] == "My Change"
def test_open_no_assignee_anyone_edits(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Open task without assignee: any project member can edit."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.OPEN,
assignee_id=None,
)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Anyone's Change"},
headers=auth_header(user),
)
assert resp.status_code == 200
assert resp.json()["title"] == "Anyone's Change"