Compare commits

...

22 Commits

Author SHA1 Message Date
zhi
e938507a24 test(P13.3): propose backend tests — 19 tests covering CRUD, accept/reject/reopen, code generation, feat_task_id protection, edit restrictions, permissions 2026-03-18 05:01:56 +00:00
zhi
c21e4ee335 test(P13.2): task state-machine tests — 34 tests covering transitions, assignee guards, comments, permissions, edit restrictions 2026-03-18 04:02:29 +00:00
zhi
011a2262ce test(P13.1): add milestone state machine tests — 17 tests covering freeze/start/close/auto-complete/preflight
New test infrastructure:
- tests/conftest.py: SQLite in-memory fixtures, TestClient wired to test DB,
  factory fixtures for User/Project/Milestone/Task/Roles/Permissions
- tests/test_milestone_actions.py: 17 tests covering:
  - freeze success/no-release-task/multiple-release-tasks/wrong-status
  - start success+started_at/deps-not-met/wrong-status
  - close from open/freeze/undergoing, rejected from completed/closed
  - auto-complete on release task finish, no auto-complete for non-release/wrong-status
  - preflight allowed/not-allowed
2026-03-18 03:07:30 +00:00
zhi
7bad57eb0e feat(P5): sync batch transition with P5.3-P5.6 guards — auth, assignee, comment, permission, deps, auto-complete 2026-03-18 01:01:59 +00:00
zhi
00a1786ec3 feat(P12.1): CLI — add propose subcommands, remove task_type=task, add milestone status filter, transition comment support 2026-03-18 00:01:52 +00:00
zhi
586e06f66a feat(P3.6): lock feature story task body edits when milestone is freeze/undergoing/completed/closed 2026-03-17 23:01:39 +00:00
zhi
ec91a15f65 fix(P7.1): remove TaskType.TASK from models.py + fix milestone task defaults (issue/pending) 2026-03-17 23:01:02 +00:00
zhi
8e38d4cf4d feat(P2.2): add default mgr/dev role seeds with preset permissions for milestone/task/propose actions 2026-03-17 19:02:44 +00:00
zhi
0c75045f6f feat(P4.3): wire task depend_on check into pending→open transition via reusable helper 2026-03-17 18:02:08 +00:00
zhi
c6b14ac25f P4.1: Extract reusable dependency check helper, deduplicate milestone_actions.py
- New app/services/dependency_check.py with check_milestone_deps()
- Replaces 3x duplicated JSON-parse + query + filter logic
- Supports both milestone and task dependency checking
- Returns structured DepCheckResult with ok/blockers/reason
- Refactored preflight and start endpoints to use shared helper
2026-03-17 17:03:45 +00:00
zhi
89e3bcdd0f feat(P7.1): remove task_type='task' — migrate to issue/defect, update defaults and DB migration 2026-03-17 16:05:32 +00:00
zhi
3afbbc2a88 feat(P2.1): register 9 new permissions (milestone/task/propose actions) + wire check_permission in all action endpoints
- Add milestone.freeze/start/close, task.close/reopen_closed/reopen_completed, propose.accept/reject/reopen to DEFAULT_PERMISSIONS
- Replace placeholder check_project_role with check_permission in proposes.py accept/reject/reopen
- Replace freeform permission strings with dotted names in milestone_actions.py
- Add task.close and task.reopen_* permission checks in tasks.py transition endpoint
- Admin role auto-inherits all new permissions via init_wizard
2026-03-17 15:03:48 +00:00
zhi
c18b8f3850 feat(P9.6): block story/feature and maintenance/release task creation via general create endpoints 2026-03-17 13:02:46 +00:00
zhi
7542f2d7c1 feat(P5.7): task edit restrictions — block body edits in undergoing/completed/closed, enforce assignee-only edit in open+assigned 2026-03-17 12:04:12 +00:00
zhi
ffb0fa6058 feat(P5.3+P5.4): enforce assignee identity on start/complete + require completion comment in transition endpoint 2026-03-17 11:02:19 +00:00
zhi
7a16639aac feat(P8.3): milestone preflight endpoint for freeze/start button pre-condition checks 2026-03-17 10:04:17 +00:00
zhi
314040cef5 feat(P3.6): milestone edit restrictions — block PATCH in terminal states, restrict scope fields in freeze/undergoing, protect delete 2026-03-17 09:01:40 +00:00
zhi
589b1cc8de feat(P5.1-P5.6): task state-machine validation — enforce legal transitions in transition/batch/update endpoints 2026-03-17 08:02:37 +00:00
zhi
7d8c448cb8 feat(P3.1): milestone action endpoints — freeze/start/close + auto-complete hook
- New milestone_actions router with POST freeze/start/close endpoints
- freeze: validates exactly 1 release maintenance task exists
- start: validates all milestone/task dependencies completed, records started_at
- close: allows from open/freeze/undergoing with reason
- try_auto_complete_milestone helper: auto-completes milestone when sole release task finishes
- Wired auto-complete into task transition and update endpoints
- Added freeze enforcement: no new feature story tasks after freeze
- Added started_at to milestone serializer
- All actions write activity logs
2026-03-17 04:03:05 +00:00
zhi
75ccbcb362 feat: propose CRUD router + accept/reject/reopen actions (P6.1-P6.4) 2026-03-17 03:01:49 +00:00
zhi
2bea75e843 feat: add Propose model/schema + DB enum migration scripts
- New Propose model (app/models/propose.py) with status enum (open/accepted/rejected)
- New Propose schemas (ProposeCreate/Update/Response) in schemas.py
- MySQL enum migration in main.py for milestone/task status columns
  - milestone: pending→open, deferred→closed, progressing→undergoing
  - task: progressing→undergoing
- Import propose model in startup for create_all
- Add started_at column migration for milestones
2026-03-17 02:04:42 +00:00
zhi
9e22c97ae8 refactor: update milestone/task status enums to new state machine values
Milestone: open/freeze/undergoing/completed/closed (was open/pending/deferred/progressing/closed)
Task: open/pending/undergoing/completed/closed (was open/pending/progressing/closed)

- Add MilestoneStatusEnum to schemas with typed validation
- Add started_at field to Milestone model
- Update all router/CLI references from progressing->undergoing
- Add completed status handling in task transition logic
2026-03-17 00:04:29 +00:00
19 changed files with 3221 additions and 108 deletions

View File

@@ -0,0 +1,336 @@
"""Milestone status-machine action endpoints (P3.1).
Provides freeze / start / close actions on milestones.
completed is triggered automatically when the sole release maintenance task finishes.
"""
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission
from app.models import models
from app.models.milestone import Milestone, MilestoneStatus
from app.models.task import Task, TaskStatus
from app.services.activity import log_activity
from app.services.dependency_check import check_milestone_deps
router = APIRouter(
prefix="/projects/{project_id}/milestones/{milestone_id}/actions",
tags=["Milestone Actions"],
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_milestone_or_404(db: Session, project_id: int, milestone_id: int) -> Milestone:
ms = (
db.query(Milestone)
.filter(Milestone.id == milestone_id, Milestone.project_id == project_id)
.first()
)
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
return ms
def _ms_status_value(ms: Milestone) -> str:
"""Return status as plain string regardless of enum wrapper."""
return ms.status.value if hasattr(ms.status, "value") else ms.status
# ---------------------------------------------------------------------------
# Request bodies
# ---------------------------------------------------------------------------
class CloseBody(BaseModel):
reason: Optional[str] = None
# ---------------------------------------------------------------------------
# GET /preflight — lightweight pre-condition check for UI button states
# ---------------------------------------------------------------------------
@router.get("/preflight", status_code=200)
def preflight_milestone_actions(
project_id: int,
milestone_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Return pre-condition check results for freeze / start actions.
The frontend uses this to decide whether to *disable* buttons and what
hint text to show. This endpoint never mutates data.
"""
check_project_role(db, current_user.id, project_id, min_role="viewer")
ms = _get_milestone_or_404(db, project_id, milestone_id)
ms_status = _ms_status_value(ms)
result: dict = {"status": ms_status, "freeze": None, "start": None}
# --- freeze pre-check (only meaningful when status == open) ---
if ms_status == "open":
release_tasks = (
db.query(Task)
.filter(
Task.milestone_id == milestone_id,
Task.task_type == "maintenance",
Task.task_subtype == "release",
)
.all()
)
if len(release_tasks) == 0:
result["freeze"] = {
"allowed": False,
"reason": "No maintenance/release task found. Create one before freezing.",
}
elif len(release_tasks) > 1:
result["freeze"] = {
"allowed": False,
"reason": f"Found {len(release_tasks)} maintenance/release tasks — expected exactly 1.",
}
else:
result["freeze"] = {"allowed": True, "reason": None}
# --- start pre-check (only meaningful when status == freeze) ---
if ms_status == "freeze":
dep_result = check_milestone_deps(
db, ms.depend_on_milestones, ms.depend_on_tasks,
)
if dep_result.ok:
result["start"] = {"allowed": True, "reason": None}
else:
result["start"] = {"allowed": False, "reason": dep_result.reason}
return result
# ---------------------------------------------------------------------------
# POST /freeze
# ---------------------------------------------------------------------------
@router.post("/freeze", status_code=200)
def freeze_milestone(
project_id: int,
milestone_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Freeze a milestone (open → freeze).
Pre-conditions:
- Milestone must be in ``open`` status.
- Milestone must have **exactly one** maintenance task with subtype ``release``.
- Caller must have ``freeze milestone`` permission.
"""
check_project_role(db, current_user.id, project_id, min_role="mgr")
check_permission(db, current_user.id, project_id, "milestone.freeze")
ms = _get_milestone_or_404(db, project_id, milestone_id)
if _ms_status_value(ms) != "open":
raise HTTPException(
status_code=400,
detail=f"Cannot freeze: milestone is '{_ms_status_value(ms)}', expected 'open'",
)
# Check: exactly one maintenance/release task
release_tasks = (
db.query(Task)
.filter(
Task.milestone_id == milestone_id,
Task.task_type == "maintenance",
Task.task_subtype == "release",
)
.all()
)
if len(release_tasks) == 0:
raise HTTPException(
status_code=400,
detail="Cannot freeze: milestone has no maintenance/release task. Create one first.",
)
if len(release_tasks) > 1:
raise HTTPException(
status_code=400,
detail=f"Cannot freeze: milestone has {len(release_tasks)} maintenance/release tasks, expected exactly 1.",
)
ms.status = MilestoneStatus.FREEZE
db.commit()
db.refresh(ms)
log_activity(
db,
action="freeze",
entity_type="milestone",
entity_id=ms.id,
user_id=current_user.id,
details={"from": "open", "to": "freeze"},
)
return {"detail": "Milestone frozen", "status": "freeze"}
# ---------------------------------------------------------------------------
# POST /start
# ---------------------------------------------------------------------------
@router.post("/start", status_code=200)
def start_milestone(
project_id: int,
milestone_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Start a milestone (freeze → undergoing).
Pre-conditions:
- Milestone must be in ``freeze`` status.
- All milestone dependencies must be completed.
- Caller must have ``start milestone`` permission.
"""
check_project_role(db, current_user.id, project_id, min_role="mgr")
check_permission(db, current_user.id, project_id, "milestone.start")
ms = _get_milestone_or_404(db, project_id, milestone_id)
if _ms_status_value(ms) != "freeze":
raise HTTPException(
status_code=400,
detail=f"Cannot start: milestone is '{_ms_status_value(ms)}', expected 'freeze'",
)
# Dependency check (P4.1 — shared helper)
dep_result = check_milestone_deps(
db, ms.depend_on_milestones, ms.depend_on_tasks,
)
if not dep_result.ok:
raise HTTPException(
status_code=400,
detail=f"Cannot start: {dep_result.reason}",
)
ms.status = MilestoneStatus.UNDERGOING
ms.started_at = datetime.now(timezone.utc)
db.commit()
db.refresh(ms)
log_activity(
db,
action="start",
entity_type="milestone",
entity_id=ms.id,
user_id=current_user.id,
details={"from": "freeze", "to": "undergoing", "started_at": ms.started_at.isoformat()},
)
return {"detail": "Milestone started", "status": "undergoing", "started_at": ms.started_at.isoformat()}
# ---------------------------------------------------------------------------
# POST /close
# ---------------------------------------------------------------------------
@router.post("/close", status_code=200)
def close_milestone(
project_id: int,
milestone_id: int,
body: CloseBody = CloseBody(),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Close (abandon) a milestone (open/freeze/undergoing → closed).
Pre-conditions:
- Milestone must be in ``open``, ``freeze``, or ``undergoing`` status.
- Caller must have ``close milestone`` permission.
"""
check_project_role(db, current_user.id, project_id, min_role="mgr")
check_permission(db, current_user.id, project_id, "milestone.close")
ms = _get_milestone_or_404(db, project_id, milestone_id)
current = _ms_status_value(ms)
allowed_from = {"open", "freeze", "undergoing"}
if current not in allowed_from:
raise HTTPException(
status_code=400,
detail=f"Cannot close: milestone is '{current}', must be one of {sorted(allowed_from)}",
)
ms.status = MilestoneStatus.CLOSED
db.commit()
db.refresh(ms)
log_activity(
db,
action="close",
entity_type="milestone",
entity_id=ms.id,
user_id=current_user.id,
details={"from": current, "to": "closed", "reason": body.reason},
)
return {"detail": "Milestone closed", "status": "closed"}
# ---------------------------------------------------------------------------
# Auto-complete helper (called from task completion logic)
# ---------------------------------------------------------------------------
def try_auto_complete_milestone(db: Session, task: Task, user_id: int | None = None):
"""Check if a just-completed task is the sole release/maintenance task
of its milestone, and if so auto-complete the milestone.
This function is designed to be called from the task status transition
logic whenever a task reaches ``completed``.
"""
if task.task_type != "maintenance" or task.task_subtype != "release":
return # not a release task — nothing to do
milestone = (
db.query(Milestone)
.filter(Milestone.id == task.milestone_id)
.first()
)
if not milestone:
return
if _ms_status_value(milestone) != "undergoing":
return # only auto-complete from undergoing
# Verify this is the *only* release task under the milestone
release_count = (
db.query(Task)
.filter(
Task.milestone_id == milestone.id,
Task.task_type == "maintenance",
Task.task_subtype == "release",
)
.count()
)
if release_count != 1:
return # ambiguous — don't auto-complete
milestone.status = MilestoneStatus.COMPLETED
db.commit()
log_activity(
db,
action="auto_complete",
entity_type="milestone",
entity_id=milestone.id,
user_id=user_id,
details={
"trigger": f"release task #{task.id} completed",
"from": "undergoing",
"to": "completed",
},
)

View File

@@ -31,6 +31,7 @@ def _serialize_milestone(milestone):
"depend_on_tasks": json.loads(milestone.depend_on_tasks) if milestone.depend_on_tasks else [],
"project_id": milestone.project_id,
"created_by_id": milestone.created_by_id,
"started_at": milestone.started_at,
"created_at": milestone.created_at,
"updated_at": milestone.updated_at,
}
@@ -81,7 +82,36 @@ def update_milestone(project_id: int, milestone_id: int, milestone: schemas.Mile
if not db_milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
ensure_can_edit_milestone(db, current_user.id, db_milestone)
# --- P3.6 Milestone edit restrictions based on status ---
ms_status = db_milestone.status.value if hasattr(db_milestone.status, 'value') else db_milestone.status
# Terminal states: no edits allowed
if ms_status in ("completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Cannot edit a milestone that is '{ms_status}'. No modifications are allowed in terminal state."
)
data = milestone.model_dump(exclude_unset=True)
# Never allow status changes via PATCH — use action endpoints instead
if "status" in data:
raise HTTPException(
status_code=400,
detail="Milestone status cannot be changed via PATCH. Use the action endpoints (freeze/start/close) instead."
)
# Freeze / undergoing: restrict scope-changing fields
SCOPE_FIELDS = {"title", "description", "due_date", "planned_release_date", "depend_on_milestones", "depend_on_tasks"}
if ms_status in ("freeze", "undergoing"):
blocked = SCOPE_FIELDS & set(data.keys())
if blocked:
raise HTTPException(
status_code=400,
detail=f"Cannot modify scope fields {sorted(blocked)} when milestone is '{ms_status}'. Scope changes are only allowed in 'open' status."
)
if "depend_on_milestones" in data:
data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"]) if data["depend_on_milestones"] else None
if "depend_on_tasks" in data:
@@ -99,6 +129,9 @@ def delete_milestone(project_id: int, milestone_id: int, db: Session = Depends(g
db_milestone = db.query(Milestone).filter(Milestone.id == milestone_id, Milestone.project_id == project_id).first()
if not db_milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
ms_status = db_milestone.status.value if hasattr(db_milestone.status, 'value') else db_milestone.status
if ms_status in ("undergoing", "completed"):
raise HTTPException(status_code=400, detail=f"Cannot delete a milestone that is '{ms_status}'")
db.delete(db_milestone)
db.commit()
return None
@@ -111,8 +144,17 @@ def create_milestone_task(project_id: int, milestone_id: int, task_data: schemas
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
if milestone.status and hasattr(milestone.status, 'value') and milestone.status.value == "progressing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is in_progress")
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status in ("undergoing", "completed", "closed"):
raise HTTPException(status_code=400, detail=f"Cannot add items to a milestone that is '{ms_status}'")
# P9.6: feature story tasks must come from propose accept, not direct creation
task_type = task_data.model_dump(exclude_unset=True).get("task_type", "")
task_subtype = task_data.model_dump(exclude_unset=True).get("task_subtype", "")
if task_type == "story" and task_subtype == "feature":
raise HTTPException(status_code=400, detail="Feature story tasks can only be created via propose accept, not direct creation")
# P3.6 / §5: freeze prevents adding new feature story tasks (redundant after P9.6 but kept as defense-in-depth)
if ms_status == "freeze" and task_type == "story" and task_subtype == "feature":
raise HTTPException(status_code=400, detail="Cannot add feature story tasks after milestone is frozen")
# Generate task_code
milestone_code = milestone.milestone_code or f"m{milestone.id}"
@@ -131,9 +173,9 @@ def create_milestone_task(project_id: int, milestone_id: int, task_data: schemas
task = Task(
title=data.get("title"),
description=data.get("description"),
task_type=data.get("task_type", "task"),
task_type=data.get("task_type", "issue"),
task_subtype=data.get("task_subtype"),
status=TaskStatus.OPEN,
status=TaskStatus.PENDING,
priority=TaskPriority.MEDIUM,
project_id=project_id,
milestone_id=milestone_id,

View File

@@ -425,8 +425,8 @@ def create_milestone_task(project_code: str, milestone_id: int, task_data: dict,
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "progressing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is in_progress")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_task = db.query(Task).filter(Task.milestone_id == ms.id).order_by(Task.id.desc()).first()
@@ -445,7 +445,7 @@ def create_milestone_task(project_code: str, milestone_id: int, task_data: dict,
description=task_data.get("description"),
status=TaskStatus.OPEN,
priority=TaskPriority.MEDIUM,
task_type=task_data.get("task_type", "task"),
task_type=task_data.get("task_type", "issue"), # P7.1: default changed from 'task' to 'issue'
task_subtype=task_data.get("task_subtype"),
project_id=project.id,
milestone_id=milestone_id,
@@ -504,8 +504,8 @@ def create_support(project_code: str, milestone_id: int, support_data: dict, db:
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "progressing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is in_progress")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_support = db.query(Support).filter(Support.milestone_id == milestone_id).order_by(Support.id.desc()).first()
@@ -563,8 +563,8 @@ def create_meeting(project_code: str, milestone_id: int, meeting_data: dict, db:
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "progressing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is in_progress")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_meeting = db.query(Meeting).filter(Meeting.milestone_id == milestone_id).order_by(Meeting.id.desc()).first()

275
app/api/routers/proposes.py Normal file
View File

@@ -0,0 +1,275 @@
"""Proposes API router (project-scoped) — CRUD + accept/reject/reopen actions."""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import func as sa_func
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission, is_global_admin
from app.models import models
from app.models.propose import Propose, ProposeStatus
from app.models.milestone import Milestone, MilestoneStatus
from app.models.task import Task, TaskStatus, TaskPriority
from app.schemas import schemas
from app.services.activity import log_activity
router = APIRouter(prefix="/projects/{project_id}/proposes", tags=["Proposes"])
def _generate_propose_code(db: Session, project_id: int) -> str:
"""Generate next propose code: {proj_code}:P{i:05x}"""
project = db.query(models.Project).filter(models.Project.id == project_id).first()
project_code = project.project_code if project and project.project_code else f"P{project_id}"
max_propose = (
db.query(Propose)
.filter(Propose.project_id == project_id)
.order_by(Propose.id.desc())
.first()
)
next_num = (max_propose.id + 1) if max_propose else 1
return f"{project_code}:P{next_num:05x}"
def _can_edit_propose(db: Session, user_id: int, propose: Propose) -> bool:
"""Only creator, project admin, or global admin can edit an open propose."""
if is_global_admin(db, user_id):
return True
if propose.created_by_id == user_id:
return True
project = db.query(models.Project).filter(models.Project.id == propose.project_id).first()
if project and project.owner_id == user_id:
return True
return False
# ---- CRUD ----
@router.get("", response_model=List[schemas.ProposeResponse])
def list_proposes(
project_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="viewer")
proposes = (
db.query(Propose)
.filter(Propose.project_id == project_id)
.order_by(Propose.id.desc())
.all()
)
return proposes
@router.post("", response_model=schemas.ProposeResponse, status_code=status.HTTP_201_CREATED)
def create_propose(
project_id: int,
propose_in: schemas.ProposeCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="dev")
propose_code = _generate_propose_code(db, project_id)
propose = Propose(
title=propose_in.title,
description=propose_in.description,
status=ProposeStatus.OPEN,
project_id=project_id,
created_by_id=current_user.id,
propose_code=propose_code,
)
db.add(propose)
db.commit()
db.refresh(propose)
log_activity(db, "create", "propose", propose.id, user_id=current_user.id, details={"title": propose.title})
return propose
@router.get("/{propose_id}", response_model=schemas.ProposeResponse)
def get_propose(
project_id: int,
propose_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="viewer")
propose = db.query(Propose).filter(Propose.id == propose_id, Propose.project_id == project_id).first()
if not propose:
raise HTTPException(status_code=404, detail="Propose not found")
return propose
@router.patch("/{propose_id}", response_model=schemas.ProposeResponse)
def update_propose(
project_id: int,
propose_id: int,
propose_in: schemas.ProposeUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
propose = db.query(Propose).filter(Propose.id == propose_id, Propose.project_id == project_id).first()
if not propose:
raise HTTPException(status_code=404, detail="Propose not found")
# Only open proposes can be edited
propose_status = propose.status.value if hasattr(propose.status, "value") else propose.status
if propose_status != "open":
raise HTTPException(status_code=400, detail="Only open proposes can be edited")
if not _can_edit_propose(db, current_user.id, propose):
raise HTTPException(status_code=403, detail="Propose edit permission denied")
data = propose_in.model_dump(exclude_unset=True)
# Never allow client to set feat_task_id
data.pop("feat_task_id", None)
for key, value in data.items():
setattr(propose, key, value)
db.commit()
db.refresh(propose)
log_activity(db, "update", "propose", propose.id, user_id=current_user.id, details=data)
return propose
# ---- Actions ----
class AcceptRequest(schemas.BaseModel):
milestone_id: int
@router.post("/{propose_id}/accept", response_model=schemas.ProposeResponse)
def accept_propose(
project_id: int,
propose_id: int,
body: AcceptRequest,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Accept a propose: create a feature story task in the chosen milestone."""
propose = db.query(Propose).filter(Propose.id == propose_id, Propose.project_id == project_id).first()
if not propose:
raise HTTPException(status_code=404, detail="Propose not found")
propose_status = propose.status.value if hasattr(propose.status, "value") else propose.status
if propose_status != "open":
raise HTTPException(status_code=400, detail="Only open proposes can be accepted")
check_permission(db, current_user.id, project_id, "propose.accept")
# Validate milestone
milestone = db.query(Milestone).filter(
Milestone.id == body.milestone_id,
Milestone.project_id == project_id,
).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found in this project")
ms_status = milestone.status.value if hasattr(milestone.status, "value") else milestone.status
if ms_status != "open":
raise HTTPException(status_code=400, detail="Target milestone must be in 'open' status")
# Generate task code
milestone_code = milestone.milestone_code or f"m{milestone.id}"
max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first()
next_num = (max_task.id + 1) if max_task else 1
task_code = f"{milestone_code}:T{next_num:05x}"
# Create feature story task
task = Task(
title=propose.title,
description=propose.description,
task_type="story",
task_subtype="feature",
status=TaskStatus.PENDING,
priority=TaskPriority.MEDIUM,
project_id=project_id,
milestone_id=milestone.id,
reporter_id=propose.created_by_id or current_user.id,
created_by_id=propose.created_by_id or current_user.id,
task_code=task_code,
)
db.add(task)
db.flush() # get task.id
# Update propose
propose.status = ProposeStatus.ACCEPTED
propose.feat_task_id = str(task.id)
db.commit()
db.refresh(propose)
log_activity(db, "accept", "propose", propose.id, user_id=current_user.id, details={
"milestone_id": milestone.id,
"generated_task_id": task.id,
"task_code": task_code,
})
return propose
class RejectRequest(schemas.BaseModel):
reason: str | None = None
@router.post("/{propose_id}/reject", response_model=schemas.ProposeResponse)
def reject_propose(
project_id: int,
propose_id: int,
body: RejectRequest | None = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Reject a propose."""
propose = db.query(Propose).filter(Propose.id == propose_id, Propose.project_id == project_id).first()
if not propose:
raise HTTPException(status_code=404, detail="Propose not found")
propose_status = propose.status.value if hasattr(propose.status, "value") else propose.status
if propose_status != "open":
raise HTTPException(status_code=400, detail="Only open proposes can be rejected")
check_permission(db, current_user.id, project_id, "propose.reject")
propose.status = ProposeStatus.REJECTED
db.commit()
db.refresh(propose)
log_activity(db, "reject", "propose", propose.id, user_id=current_user.id, details={
"reason": body.reason if body else None,
})
return propose
@router.post("/{propose_id}/reopen", response_model=schemas.ProposeResponse)
def reopen_propose(
project_id: int,
propose_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""Reopen a rejected propose back to open."""
propose = db.query(Propose).filter(Propose.id == propose_id, Propose.project_id == project_id).first()
if not propose:
raise HTTPException(status_code=404, detail="Propose not found")
propose_status = propose.status.value if hasattr(propose.status, "value") else propose.status
if propose_status != "rejected":
raise HTTPException(status_code=400, detail="Only rejected proposes can be reopened")
check_permission(db, current_user.id, project_id, "propose.reopen")
propose.status = ProposeStatus.OPEN
db.commit()
db.refresh(propose)
log_activity(db, "reopen", "propose", propose.id, user_id=current_user.id)
return propose

View File

@@ -1,6 +1,6 @@
"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table."""
import math
from typing import List
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
@@ -14,11 +14,31 @@ from app.schemas import schemas
from app.services.webhook import fire_webhooks_sync
from app.models.notification import Notification as NotificationModel
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, ensure_can_edit_task
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task
from app.services.activity import log_activity
from app.services.dependency_check import check_task_deps
router = APIRouter(tags=["Tasks"])
# ---- State-machine: valid transitions (P5.1-P5.6) ----
VALID_TRANSITIONS: dict[str, set[str]] = {
"pending": {"open", "closed"},
"open": {"undergoing", "closed"},
"undergoing": {"completed", "closed"},
"completed": {"open"}, # reopen
"closed": {"open"}, # reopen
}
def _check_transition(old_status: str, new_status: str) -> None:
"""Raise 400 if the transition is not allowed by the state machine."""
allowed = VALID_TRANSITIONS.get(old_status, set())
if new_status not in allowed:
raise HTTPException(
status_code=400,
detail=f"Cannot transition from '{old_status}' to '{new_status}'. "
f"Allowed targets from '{old_status}': {sorted(allowed) if allowed else 'none'}",
)
# ---- Type / Subtype validation ----
TASK_SUBTYPE_MAP = {
'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'},
@@ -27,13 +47,23 @@ TASK_SUBTYPE_MAP = {
'story': {'feature', 'improvement', 'refactor'},
'test': {'regression', 'security', 'smoke', 'stress'},
'research': set(),
'task': {'defect'},
# P7.1: 'task' type removed — defect subtype migrated to issue/defect
'resolution': set(),
}
ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys())
def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None):
"""P9.6 — type+subtype combos that may NOT be created via general create endpoints.
feature story → must come from propose accept
release maintenance → must come from controlled milestone/release flow
"""
RESTRICTED_TYPE_SUBTYPES = {
("story", "feature"),
("maintenance", "release"),
}
def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None, *, allow_restricted: bool = False):
if task_type is None:
return
if task_type not in ALLOWED_TASK_TYPES:
@@ -41,6 +71,13 @@ def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None)
allowed = TASK_SUBTYPE_MAP.get(task_type, set())
if task_subtype and task_subtype not in allowed:
raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}')
# P9.6: block restricted combos unless explicitly allowed (e.g. propose accept, internal create)
if not allow_restricted and (task_type, task_subtype) in RESTRICTED_TYPE_SUBTYPES:
raise HTTPException(
status_code=400,
detail=f"Cannot create {task_type}/{task_subtype} task via general create. "
f"Use the appropriate workflow (propose accept / milestone release setup)."
)
def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None):
@@ -162,20 +199,75 @@ def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Dep
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
ensure_can_edit_task(db, current_user.id, task)
# P5.7: status-based edit restrictions
current_status = task.status.value if hasattr(task.status, 'value') else task.status
update_data = task_update.model_dump(exclude_unset=True)
# Fields that are always allowed regardless of status (non-body edits)
_always_allowed = {"status"}
body_fields = {k for k in update_data.keys() if k not in _always_allowed}
if body_fields:
# P3.6 supplement: feature story tasks locked after milestone freeze
task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "")
task_subtype = task.task_subtype or ""
if task_type == "story" and task_subtype == "feature" and task.milestone_id:
from app.models.milestone import Milestone
ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if ms:
ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status
if ms_status in ("freeze", "undergoing", "completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. "
f"Blocked fields: {sorted(body_fields)}",
)
# undergoing/completed/closed: body edits forbidden
if current_status in ("undergoing", "completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Cannot edit task body fields in '{current_status}' status. "
f"Blocked fields: {sorted(body_fields)}",
)
# open + assignee set: only assignee or admin can edit body
if current_status == "open" and task.assignee_id is not None:
from app.api.rbac import is_global_admin, has_project_admin_role
is_admin = (
is_global_admin(db, current_user.id)
or has_project_admin_role(db, current_user.id, task.project_id)
)
if current_user.id != task.assignee_id and not is_admin:
raise HTTPException(
status_code=403,
detail="Only the current assignee or an admin can edit this task",
)
# Legacy general permission check (covers project membership etc.)
ensure_can_edit_task(db, current_user.id, task)
if "status" in update_data:
new_status = update_data["status"]
if new_status == "progressing" and not task.started_on:
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: enforce state-machine even through PATCH
_check_transition(old_status, new_status)
if new_status == "open" and old_status in ("completed", "closed"):
task.finished_on = None
if new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if new_status == "closed" and not task.finished_on:
if new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
# P3.5: auto-complete milestone when release task reaches completed via update
if "status" in update_data and update_data["status"] == "completed":
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, task, user_id=current_user.id)
return task
@@ -193,8 +285,19 @@ def delete_task(task_id: int, db: Session = Depends(get_db), current_user: model
# ---- Transition ----
class TransitionBody(BaseModel):
comment: Optional[str] = None
@router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse)
def transition_task(task_id: int, new_status: str, bg: BackgroundTasks, db: Session = Depends(get_db)):
def transition_task(
task_id: int,
new_status: str,
bg: BackgroundTasks,
body: TransitionBody = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
valid_statuses = [s.value for s in TaskStatus]
if new_status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}")
@@ -202,13 +305,82 @@ def transition_task(task_id: int, new_status: str, bg: BackgroundTasks, db: Sess
if not task:
raise HTTPException(status_code=404, detail="Task not found")
old_status = task.status.value if hasattr(task.status, 'value') else task.status
if new_status == "progressing" and not task.started_on:
# P5.1: enforce state-machine
_check_transition(old_status, new_status)
# P5.2: pending -> open requires milestone to be undergoing + task deps satisfied
if old_status == "pending" and new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status != "undergoing":
raise HTTPException(
status_code=400,
detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'",
)
# P4.3: check task-level depend_on
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
raise HTTPException(
status_code=400,
detail=f"Cannot open task: {dep_result.reason}",
)
# P5.3: open -> undergoing requires assignee AND operator must be the assignee
if old_status == "open" and new_status == "undergoing":
if not task.assignee_id:
raise HTTPException(status_code=400, detail="Cannot start task: assignee must be set first")
if current_user.id != task.assignee_id:
raise HTTPException(status_code=403, detail="Only the assigned user can start this task")
# P5.4: undergoing -> completed requires a completion comment
if old_status == "undergoing" and new_status == "completed":
comment_text = body.comment if body else None
if not comment_text or not comment_text.strip():
raise HTTPException(status_code=400, detail="A completion comment is required when finishing a task")
# P5.4: also only the assignee can complete
if task.assignee_id and current_user.id != task.assignee_id:
raise HTTPException(status_code=403, detail="Only the assigned user can complete this task")
# P5.5: closing a task requires 'task.close' permission
if new_status == "closed":
check_permission(db, current_user.id, task.project_id, "task.close")
# P5.6: reopen from completed/closed -> open
if new_status == "open" and old_status in ("completed", "closed"):
perm_name = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
check_permission(db, current_user.id, task.project_id, perm_name)
# Clear finished_on on reopen so lifecycle timestamps are accurate
task.finished_on = None
if new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if new_status == "closed" and not task.finished_on:
if new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
task.status = new_status
db.commit()
db.refresh(task)
# P5.4: auto-create completion comment
if old_status == "undergoing" and new_status == "completed" and body and body.comment:
db_comment = models.Comment(
content=body.comment.strip(),
task_id=task.id,
author_id=current_user.id,
)
db.add(db_comment)
db.commit()
# Log the transition activity
log_activity(db, f"task.transition.{new_status}", "task", task.id, current_user.id,
{"old_status": old_status, "new_status": new_status})
# P3.5: auto-complete milestone when its sole release task is completed
if new_status == "completed":
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, task, user_id=current_user.id)
event = "task.closed" if new_status == "closed" else "task.updated"
bg.add_task(fire_webhooks_sync, event,
{"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status},
@@ -279,32 +451,138 @@ def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
# ---- Batch ----
class BatchTransition(BaseModel):
task_ids: List[int]
new_status: str
class BatchAssign(BaseModel):
task_ids: List[int]
assignee_id: int
class BatchTransitionBody(BaseModel):
task_ids: List[int]
new_status: str
comment: Optional[str] = None
@router.post("/tasks/batch/transition")
def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)):
def batch_transition(
data: BatchTransitionBody,
bg: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
valid_statuses = [s.value for s in TaskStatus]
if data.new_status not in valid_statuses:
raise HTTPException(status_code=400, detail="Invalid status")
updated = []
skipped = []
for task_id in data.task_ids:
task = db.query(Task).filter(Task.id == task_id).first()
if task:
old_status = task.status.value if hasattr(task.status, 'value') else task.status
task.status = data.new_status
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
if not task:
skipped.append({"id": task_id, "title": None, "old": None,
"reason": "Task not found"})
continue
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: state-machine check
allowed = VALID_TRANSITIONS.get(old_status, set())
if data.new_status not in allowed:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
continue
# P5.2: pending → open requires milestone undergoing + task deps
if old_status == "pending" and data.new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status != "undergoing":
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Milestone is '{ms_status}', must be 'undergoing'"})
continue
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": dep_result.reason})
continue
# P5.3: open → undergoing requires assignee == current_user
if old_status == "open" and data.new_status == "undergoing":
if not task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Assignee must be set before starting"})
continue
if current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can start this task"})
continue
# P5.4: undergoing → completed requires comment + assignee check
if old_status == "undergoing" and data.new_status == "completed":
comment_text = data.comment
if not comment_text or not comment_text.strip():
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "A completion comment is required"})
continue
if task.assignee_id and current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can complete this task"})
continue
# P5.5: close requires permission
if data.new_status == "closed":
try:
check_permission(db, current_user.id, task.project_id, "task.close")
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Missing 'task.close' permission"})
continue
# P5.6: reopen requires permission
if data.new_status == "open" and old_status in ("completed", "closed"):
perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
try:
check_permission(db, current_user.id, task.project_id, perm)
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Missing '{perm}' permission"})
continue
task.finished_on = None
if data.new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if data.new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
task.status = data.new_status
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
# Activity log per task
log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id,
{"old_status": old_status, "new_status": data.new_status})
# P5.4: auto-create completion comment
if old_status == "undergoing" and data.new_status == "completed" and data.comment:
db_comment = models.Comment(
content=data.comment.strip(),
task_id=task.id,
author_id=current_user.id,
)
db.add(db_comment)
db.commit()
# P3.5: auto-complete milestone for any completed task
for u in updated:
if u["new"] == "completed":
t = db.query(Task).filter(Task.id == u["id"]).first()
if t:
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, t, user_id=current_user.id)
for u in updated:
event = "task.closed" if data.new_status == "closed" else "task.updated"
bg.add_task(fire_webhooks_sync, event, u, None, db)
return {"updated": len(updated), "tasks": updated}
result = {"updated": len(updated), "tasks": updated}
if skipped:
result["skipped"] = skipped
return result
@router.post("/tasks/batch/assign")

View File

@@ -109,6 +109,18 @@ DEFAULT_PERMISSIONS = [
("milestone.read", "View milestones", "milestone"),
("milestone.write", "Edit milestones", "milestone"),
("milestone.delete", "Delete milestones", "milestone"),
# Milestone actions
("milestone.freeze", "Freeze milestone scope", "milestone"),
("milestone.start", "Start milestone execution", "milestone"),
("milestone.close", "Close / abort milestone", "milestone"),
# Task actions
("task.close", "Close / cancel a task", "task"),
("task.reopen_closed", "Reopen a closed task", "task"),
("task.reopen_completed", "Reopen a completed task", "task"),
# Propose actions
("propose.accept", "Accept a propose into a milestone", "propose"),
("propose.reject", "Reject a propose", "propose"),
("propose.reopen", "Reopen a rejected propose", "propose"),
# Role/Permission management
("role.manage", "Manage roles and permissions", "admin"),
# User management
@@ -139,61 +151,93 @@ def init_default_permissions(db: Session) -> list[Permission]:
return db.query(Permission).all()
def init_admin_role(db: Session, admin_user: models.User) -> None:
"""Create admin role with all permissions and guest role with minimal permissions."""
# Check if admin role already exists
admin_role = db.query(Role).filter(Role.name == "admin").first()
if not admin_role:
admin_role = Role(
name="admin",
description="Administrator - full access to all features",
is_global=True
)
db.add(admin_role)
db.commit()
db.refresh(admin_role)
logger.info("Created admin role (id=%d)", admin_role.id)
# ---------------------------------------------------------------------------
# Default role → permission mapping
# ---------------------------------------------------------------------------
# Check if guest role already exists
guest_role = db.query(Role).filter(Role.name == "guest").first()
if not guest_role:
guest_role = Role(
name="guest",
description="Guest - read-only access",
is_global=True
)
db.add(guest_role)
db.commit()
db.refresh(guest_role)
logger.info("Created guest role (id=%d)", guest_role.id)
# mgr: project management + all milestone/task/propose actions
_MGR_PERMISSIONS = {
"project.read", "project.write", "project.manage_members",
"task.create", "task.read", "task.write", "task.delete",
"milestone.create", "milestone.read", "milestone.write", "milestone.delete",
"milestone.freeze", "milestone.start", "milestone.close",
"task.close", "task.reopen_closed", "task.reopen_completed",
"propose.accept", "propose.reject", "propose.reopen",
"monitor.read",
}
# Get all permissions
# dev: day-to-day development work — no freeze/start/close milestone, no accept/reject propose
_DEV_PERMISSIONS = {
"project.read",
"task.create", "task.read", "task.write",
"milestone.read",
"task.close", "task.reopen_closed", "task.reopen_completed",
"monitor.read",
}
# Role definitions: (name, description, permission_set)
_DEFAULT_ROLES = [
("admin", "Administrator - full access to all features", None), # None ⇒ all perms
("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS),
("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS),
("guest", "Guest - read-only access", None), # special: *.read only
]
def _ensure_role(db: Session, name: str, description: str, is_global: bool = True) -> Role:
"""Get or create a role by name."""
role = db.query(Role).filter(Role.name == name).first()
if not role:
role = Role(name=name, description=description, is_global=is_global)
db.add(role)
db.commit()
db.refresh(role)
logger.info("Created role '%s' (id=%d)", name, role.id)
return role
def _sync_role_permissions(db: Session, role: Role, target_perm_names: set[str] | None) -> None:
"""Ensure *role* has exactly the permissions in *target_perm_names*.
* ``None`` means **all** permissions (admin).
* The special sentinel ``"__read_only__"`` is handled by the caller passing
just the ``*.read`` names.
Only adds missing permissions; never removes manually-granted ones (additive).
"""
all_perms = db.query(Permission).all()
perm_by_name = {p.name: p for p in all_perms}
# Assign all permissions to admin role
existing_admin_perm_ids = {rp.permission_id for rp in admin_role.permissions}
for perm in all_perms:
if perm.id not in existing_admin_perm_ids:
rp = RolePermission(role_id=admin_role.id, permission_id=perm.id)
db.add(rp)
if target_perm_names is None:
wanted_ids = {p.id for p in all_perms}
else:
wanted_ids = {perm_by_name[n].id for n in target_perm_names if n in perm_by_name}
if all_perms:
existing_ids = {rp.permission_id for rp in role.permissions}
added = 0
for pid in wanted_ids - existing_ids:
db.add(RolePermission(role_id=role.id, permission_id=pid))
added += 1
if added:
db.commit()
logger.info("Assigned %d permissions to admin role", len(all_perms))
logger.info("Assigned %d new permissions to role '%s'", added, role.name)
# Assign only read permissions to guest role
read_perms = db.query(Permission).filter(Permission.name.like("%.read")).all()
existing_guest_perm_ids = {rp.permission_id for rp in guest_role.permissions}
for perm in read_perms:
if perm.id not in existing_guest_perm_ids:
rp = RolePermission(role_id=guest_role.id, permission_id=perm.id)
db.add(rp)
if read_perms:
db.commit()
logger.info("Assigned %d read permissions to guest role", len(read_perms))
def init_admin_role(db: Session, admin_user: models.User) -> None:
"""Create default roles (admin / mgr / dev / guest) with preset permissions."""
logger.info("Admin and guest roles setup complete")
all_perms = db.query(Permission).all()
read_perm_names = {p.name for p in all_perms if p.name.endswith(".read")}
for name, description, perm_set in _DEFAULT_ROLES:
role = _ensure_role(db, name, description)
if name == "guest":
_sync_role_permissions(db, role, read_perm_names)
else:
_sync_role_permissions(db, role, perm_set)
logger.info("Default roles setup complete (admin, mgr, dev, guest)")
def run_init(db: Session) -> None:

View File

@@ -37,6 +37,8 @@ from app.api.routers.misc import router as misc_router
from app.api.routers.monitor import router as monitor_router
from app.api.routers.milestones import router as milestones_router
from app.api.routers.roles import router as roles_router
from app.api.routers.proposes import router as proposes_router
from app.api.routers.milestone_actions import router as milestone_actions_router
app.include_router(auth_router)
app.include_router(tasks_router)
@@ -48,6 +50,8 @@ app.include_router(misc_router)
app.include_router(monitor_router)
app.include_router(milestones_router)
app.include_router(roles_router)
app.include_router(proposes_router)
app.include_router(milestone_actions_router)
# Auto schema migration for lightweight deployments
@@ -125,7 +129,7 @@ def _migrate_schema():
# tasks extra fields
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'task_type'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN task_type VARCHAR(32) DEFAULT 'task'"))
db.execute(text("ALTER TABLE tasks ADD COLUMN task_type VARCHAR(32) DEFAULT 'issue'"))
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'task_subtype'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN task_subtype VARCHAR(64) NULL"))
@@ -168,6 +172,49 @@ def _migrate_schema():
if _has_table(db, "issues"):
db.execute(text("DROP TABLE issues"))
# --- Milestone status enum migration (old -> new) ---
if _has_table(db, "milestones"):
# Alter enum column to accept new values
db.execute(text(
"ALTER TABLE milestones MODIFY COLUMN status "
"ENUM('open','pending','deferred','progressing','freeze','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# Migrate old values
db.execute(text("UPDATE milestones SET status='open' WHERE status='pending'"))
db.execute(text("UPDATE milestones SET status='closed' WHERE status='deferred'"))
db.execute(text("UPDATE milestones SET status='undergoing' WHERE status='progressing'"))
# Shrink enum to new-only values
db.execute(text(
"ALTER TABLE milestones MODIFY COLUMN status "
"ENUM('open','freeze','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# Add started_at if missing
if not _has_column(db, "milestones", "started_at"):
db.execute(text("ALTER TABLE milestones ADD COLUMN started_at DATETIME NULL"))
# --- P7.1: Migrate task_type='task' to 'issue' ---
if _has_table(db, "tasks") and _has_column(db, "tasks", "task_type"):
db.execute(text("UPDATE tasks SET task_type='issue' WHERE task_type='task'"))
# --- Task status enum migration (old -> new) ---
if _has_table(db, "tasks"):
# Widen enum first
db.execute(text(
"ALTER TABLE tasks MODIFY COLUMN status "
"ENUM('open','pending','progressing','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# Migrate old values
db.execute(text("UPDATE tasks SET status='undergoing' WHERE status='progressing'"))
# Shrink enum to new-only values
db.execute(text(
"ALTER TABLE tasks MODIFY COLUMN status "
"ENUM('open','pending','undergoing','completed','closed') "
"DEFAULT 'open'"
))
db.commit()
except Exception as e:
db.rollback()
@@ -179,7 +226,7 @@ def _migrate_schema():
@app.on_event("startup")
def startup():
from app.core.config import Base, engine, SessionLocal
from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor, role_permission, task, support, meeting
from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor, role_permission, task, support, meeting, propose
Base.metadata.create_all(bind=engine)
_migrate_schema()

View File

@@ -6,9 +6,9 @@ import enum
class MilestoneStatus(str, enum.Enum):
OPEN = "open"
PENDING = "pending"
DEFERRED = "deferred"
PROGRESSING = "progressing"
FREEZE = "freeze"
UNDERGOING = "undergoing"
COMPLETED = "completed"
CLOSED = "closed"
class Milestone(Base):
@@ -25,6 +25,7 @@ class Milestone(Base):
depend_on_tasks = Column(Text, nullable=True)
project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
created_by_id = Column(Integer, ForeignKey("users.id"), nullable=True)
started_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

View File

@@ -7,7 +7,7 @@ import enum
class TaskType(str, enum.Enum):
"""Task type enum'issue' is a subtype of task, not the other way around."""
"""Task type enum."""
ISSUE = "issue"
MAINTENANCE = "maintenance"
RESEARCH = "research"
@@ -15,13 +15,13 @@ class TaskType(str, enum.Enum):
STORY = "story"
TEST = "test"
RESOLUTION = "resolution"
TASK = "task"
class TaskStatus(str, enum.Enum):
OPEN = "open"
PENDING = "pending"
PROGRESSING = "progressing"
UNDERGOING = "undergoing"
COMPLETED = "completed"
CLOSED = "closed"

29
app/models/propose.py Normal file
View File

@@ -0,0 +1,29 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Enum
from sqlalchemy.sql import func
from app.core.config import Base
import enum
class ProposeStatus(str, enum.Enum):
OPEN = "open"
ACCEPTED = "accepted"
REJECTED = "rejected"
class Propose(Base):
__tablename__ = "proposes"
id = Column(Integer, primary_key=True, index=True)
propose_code = Column(String(64), nullable=True, unique=True, index=True)
title = Column(String(255), nullable=False)
description = Column(Text, nullable=True)
status = Column(Enum(ProposeStatus), default=ProposeStatus.OPEN)
project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
created_by_id = Column(Integer, ForeignKey("users.id"), nullable=True)
# Populated server-side after accept; links to the generated feature story task
feat_task_id = Column(String(64), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

View File

@@ -7,7 +7,8 @@ import enum
class TaskStatus(str, enum.Enum):
OPEN = "open"
PENDING = "pending"
PROGRESSING = "progressing"
UNDERGOING = "undergoing"
COMPLETED = "completed"
CLOSED = "closed"
class TaskPriority(str, enum.Enum):
@@ -27,7 +28,7 @@ class Task(Base):
task_code = Column(String(64), nullable=True, unique=True, index=True)
# Task type/subtype (replaces old issue_type/issue_subtype)
task_type = Column(String(32), default="task")
task_type = Column(String(32), default="issue") # P7.1: default changed from 'task' to 'issue'
task_subtype = Column(String(64), nullable=True)
project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)

View File

@@ -12,13 +12,14 @@ class TaskTypeEnum(str, Enum):
STORY = "story"
TEST = "test"
RESOLUTION = "resolution"
TASK = "task"
# P7.1: 'task' type removed — defect subtype migrated to issue/defect
class TaskStatusEnum(str, Enum):
OPEN = "open"
PENDING = "pending"
PROGRESSING = "progressing"
UNDERGOING = "undergoing"
COMPLETED = "completed"
CLOSED = "closed"
@@ -33,7 +34,7 @@ class TaskPriorityEnum(str, Enum):
class TaskBase(BaseModel):
title: str
description: Optional[str] = None
task_type: TaskTypeEnum = TaskTypeEnum.TASK
task_type: TaskTypeEnum = TaskTypeEnum.ISSUE
task_subtype: Optional[str] = None
priority: TaskPriorityEnum = TaskPriorityEnum.MEDIUM
tags: Optional[str] = None
@@ -193,11 +194,19 @@ class ProjectMemberResponse(BaseModel):
from_attributes = True
class MilestoneStatusEnum(str, Enum):
OPEN = "open"
FREEZE = "freeze"
UNDERGOING = "undergoing"
COMPLETED = "completed"
CLOSED = "closed"
# Milestone schemas
class MilestoneBase(BaseModel):
title: str
description: Optional[str] = None
status: Optional[str] = "open"
status: Optional[MilestoneStatusEnum] = MilestoneStatusEnum.OPEN
due_date: Optional[datetime] = None
planned_release_date: Optional[datetime] = None
depend_on_milestones: Optional[List[str]] = None
@@ -212,7 +221,7 @@ class MilestoneCreate(MilestoneBase):
class MilestoneUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None
status: Optional[MilestoneStatusEnum] = None
due_date: Optional[datetime] = None
planned_release_date: Optional[datetime] = None
depend_on_milestones: Optional[List[str]] = None
@@ -223,6 +232,43 @@ class MilestoneResponse(MilestoneBase):
id: int
project_id: int
created_by_id: Optional[int] = None
started_at: Optional[datetime] = None
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# Propose schemas
class ProposeStatusEnum(str, Enum):
OPEN = "open"
ACCEPTED = "accepted"
REJECTED = "rejected"
class ProposeBase(BaseModel):
title: str
description: Optional[str] = None
class ProposeCreate(ProposeBase):
project_id: Optional[int] = None
class ProposeUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
class ProposeResponse(ProposeBase):
id: int
propose_code: Optional[str] = None
status: ProposeStatusEnum
project_id: int
created_by_id: Optional[int] = None
feat_task_id: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None

View File

@@ -0,0 +1,148 @@
"""P4.1 — Reusable dependency-check helpers.
Used by milestone start, milestone preflight, and (future) task pending→open
to verify that all declared dependencies are completed before allowing the
entity to proceed.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Sequence
from sqlalchemy.orm import Session
from app.models.milestone import Milestone
from app.models.task import Task
# ---------------------------------------------------------------------------
# Result type
# ---------------------------------------------------------------------------
@dataclass
class DepCheckResult:
"""Outcome of a dependency check."""
ok: bool = True
blockers: list[str] = field(default_factory=list)
@property
def reason(self) -> str | None:
return "; ".join(self.blockers) if self.blockers else None
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _parse_json_ids(raw: str | None) -> list[int]:
"""Safely parse a JSON-encoded list of integer IDs."""
if not raw:
return []
try:
ids = json.loads(raw)
if isinstance(ids, list):
return [int(i) for i in ids]
except (json.JSONDecodeError, TypeError, ValueError):
pass
return []
def _ms_status(ms: Milestone) -> str:
return ms.status.value if hasattr(ms.status, "value") else ms.status
def _task_status(t: Task) -> str:
return t.status.value if hasattr(t.status, "value") else t.status
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def check_milestone_deps(
db: Session,
depend_on_milestones: str | None,
depend_on_tasks: str | None,
*,
required_status: str = "completed",
) -> DepCheckResult:
"""Check whether all milestone + task dependencies are satisfied.
Parameters
----------
db:
Active DB session.
depend_on_milestones:
JSON-encoded list of milestone IDs (from the entity's field).
depend_on_tasks:
JSON-encoded list of task IDs (from the entity's field).
required_status:
The status that dependees must have reached (default ``"completed"``).
Returns
-------
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
with human-readable ``blockers``.
"""
result = DepCheckResult()
# --- milestone dependencies ---
ms_ids = _parse_json_ids(depend_on_milestones)
if ms_ids:
dep_milestones: Sequence[Milestone] = (
db.query(Milestone).filter(Milestone.id.in_(ms_ids)).all()
)
incomplete = [m.id for m in dep_milestones if _ms_status(m) != required_status]
if incomplete:
result.ok = False
result.blockers.append(f"Dependent milestones not {required_status}: {incomplete}")
# --- task dependencies ---
task_ids = _parse_json_ids(depend_on_tasks)
if task_ids:
dep_tasks: Sequence[Task] = (
db.query(Task).filter(Task.id.in_(task_ids)).all()
)
incomplete_tasks = [t.id for t in dep_tasks if _task_status(t) != required_status]
if incomplete_tasks:
result.ok = False
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete_tasks}")
return result
def check_task_deps(
db: Session,
depend_on: str | None,
*,
required_status: str = "completed",
) -> DepCheckResult:
"""Check whether a task's depend_on tasks are all satisfied.
Parameters
----------
db:
Active DB session.
depend_on:
JSON-encoded list of task IDs (from the task's ``depend_on`` field).
required_status:
The status dependees must have reached (default ``"completed"``).
Returns
-------
DepCheckResult with ``ok=True`` if all deps satisfied, else ``ok=False``
with human-readable ``blockers``.
"""
result = DepCheckResult()
task_ids = _parse_json_ids(depend_on)
if task_ids:
dep_tasks: Sequence[Task] = (
db.query(Task).filter(Task.id.in_(task_ids)).all()
)
incomplete = [t.id for t in dep_tasks if _task_status(t) != required_status]
if incomplete:
result.ok = False
result.blockers.append(f"Dependent tasks not {required_status}: {incomplete}")
return result

100
cli.py
View File

@@ -16,12 +16,13 @@ TOKEN = os.environ.get("HARBORFORGE_TOKEN", "")
STATUS_ICON = {
"open": "🟢",
"pending": "🟡",
"progressing": "🔵",
"freeze": "🧊",
"undergoing": "🔵",
"completed": "",
"closed": "",
}
TYPE_ICON = {
"resolution": "⚖️",
"task": "📋",
"story": "📖",
"test": "🧪",
"issue": "📌",
@@ -149,10 +150,56 @@ def cmd_search(args):
def cmd_transition(args):
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}")
body = {}
if args.comment:
body["comment"] = args.comment
result = _request("POST", f"/tasks/{args.task_id}/transition?new_status={args.status}", body or None)
print(f"Task #{result['id']} transitioned to: {result['status']}")
# ── Propose commands ──────────────────────────────────────────────
def cmd_proposes(args):
if not args.project:
print("Error: --project is required for proposes", file=sys.stderr)
sys.exit(1)
result = _request("GET", f"/projects/{args.project}/proposes")
items = result if isinstance(result, list) else result.get("items", [])
if not items:
print(" No proposes found.")
return
for p in items:
status_icon = STATUS_ICON.get(p["status"], "")
feat = f" → task {p['feat_task_id']}" if p.get("feat_task_id") else ""
print(f" {status_icon} 💡 {p['propose_code']} {p['title']}{feat}")
def cmd_propose_create(args):
data = {"title": args.title}
if args.description:
data["description"] = args.description
result = _request("POST", f"/projects/{args.project}/proposes", data)
print(f"Created propose {result['propose_code']}: {result['title']}")
def cmd_propose_accept(args):
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/accept?milestone_id={args.milestone}")
print(f"Propose #{args.propose_id} accepted → task {result.get('feat_task_id', '?')}")
def cmd_propose_reject(args):
data = {}
if args.reason:
data["reason"] = args.reason
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reject", data or None)
print(f"Propose #{args.propose_id} rejected")
def cmd_propose_reopen(args):
result = _request("POST", f"/projects/{args.project}/proposes/{args.propose_id}/reopen")
print(f"Propose #{args.propose_id} reopened")
def cmd_stats(args):
params = f"?project_id={args.project}" if args.project else ""
stats = _request("GET", f"/dashboard/stats{params}")
@@ -168,8 +215,13 @@ def cmd_stats(args):
def cmd_milestones(args):
params = f"?project_id={args.project}" if args.project else ""
milestones = _request("GET", f"/milestones{params}")
params = []
if args.project:
params.append(f"project_id={args.project}")
if args.status:
params.append(f"status={args.status}")
qs = f"?{'&'.join(params)}" if params else ""
milestones = _request("GET", f"/milestones{qs}")
if not milestones:
print(" No milestones found.")
return
@@ -240,15 +292,15 @@ def main():
p_tasks = sub.add_parser("tasks", aliases=["issues"], help="List tasks")
p_tasks.add_argument("--project", "-p", type=int)
p_tasks.add_argument("--type", "-t", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_tasks.add_argument("--status", "-s", choices=["open", "pending", "progressing", "closed"])
p_tasks.add_argument("--type", "-t", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_tasks.add_argument("--status", "-s", choices=["open", "pending", "undergoing", "completed", "closed"])
p_create = sub.add_parser("create-task", aliases=["create-issue"], help="Create a task")
p_create.add_argument("title")
p_create.add_argument("--project", "-p", type=int, required=True)
p_create.add_argument("--milestone", "-m", type=int, required=True)
p_create.add_argument("--reporter", "-r", type=int, required=True)
p_create.add_argument("--type", "-t", default="task", choices=["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_create.add_argument("--type", "-t", default="issue", choices=["story", "test", "resolution", "issue", "maintenance", "research", "review"])
p_create.add_argument("--subtype")
p_create.add_argument("--priority", choices=["low", "medium", "high", "critical"])
p_create.add_argument("--description", "-d")
@@ -268,13 +320,15 @@ def main():
p_trans = sub.add_parser("transition", help="Transition task status")
p_trans.add_argument("task_id", type=int)
p_trans.add_argument("status", choices=["open", "pending", "progressing", "closed"])
p_trans.add_argument("status", choices=["open", "pending", "undergoing", "completed", "closed"])
p_trans.add_argument("--comment", "-c", help="Comment (required for undergoing→completed)")
p_stats = sub.add_parser("stats", help="Dashboard stats")
p_stats.add_argument("--project", "-p", type=int)
p_ms = sub.add_parser("milestones", help="List milestones")
p_ms.add_argument("--project", "-p", type=int)
p_ms.add_argument("--status", "-s", choices=["open", "freeze", "undergoing", "completed", "closed"])
p_msp = sub.add_parser("milestone-progress", help="Show milestone progress")
p_msp.add_argument("milestone_id", type=int)
@@ -294,6 +348,29 @@ def main():
p_worklogs = sub.add_parser("worklogs", help="List work logs for a task")
p_worklogs.add_argument("task_id", type=int)
# ── Propose subcommands ──
p_proposes = sub.add_parser("proposes", help="List proposes for a project")
p_proposes.add_argument("--project", "-p", type=int, required=True)
p_pc = sub.add_parser("propose-create", help="Create a propose")
p_pc.add_argument("title")
p_pc.add_argument("--project", "-p", type=int, required=True)
p_pc.add_argument("--description", "-d")
p_pa = sub.add_parser("propose-accept", help="Accept a propose into a milestone")
p_pa.add_argument("propose_id", type=int)
p_pa.add_argument("--project", "-p", type=int, required=True)
p_pa.add_argument("--milestone", "-m", type=int, required=True)
p_pr = sub.add_parser("propose-reject", help="Reject a propose")
p_pr.add_argument("propose_id", type=int)
p_pr.add_argument("--project", "-p", type=int, required=True)
p_pr.add_argument("--reason", "-r")
p_pro = sub.add_parser("propose-reopen", help="Reopen a rejected propose")
p_pro.add_argument("propose_id", type=int)
p_pro.add_argument("--project", "-p", type=int, required=True)
args = parser.parse_args()
if not args.command:
parser.print_help()
@@ -318,6 +395,11 @@ def main():
"overdue": cmd_overdue,
"log-time": cmd_log_time,
"worklogs": cmd_worklogs,
"proposes": cmd_proposes,
"propose-create": cmd_propose_create,
"propose-accept": cmd_propose_accept,
"propose-reject": cmd_propose_reject,
"propose-reopen": cmd_propose_reopen,
}
cmds[args.command](args)

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# tests package

302
tests/conftest.py Normal file
View File

@@ -0,0 +1,302 @@
"""Shared test fixtures — SQLite in-memory DB + FastAPI TestClient.
This avoids needing MySQL for unit/integration tests.
All models are created fresh for every test function (function-scoped session).
"""
import sys, os
# Ensure the backend app package is importable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
# --- Override engine BEFORE any app import touches the real DB ---
from app.core.config import Base
# Force-import ALL model modules so Base.metadata knows every table
import app.models.models # noqa: F401 — User, Project, Comment, etc.
import app.models.milestone # noqa: F401
import app.models.task # noqa: F401
import app.models.role_permission # noqa: F401
import app.models.activity # noqa: F401
import app.models.propose # noqa: F401
try:
import app.models.apikey # noqa: F401
except ImportError:
pass
try:
import app.models.webhook # noqa: F401
except ImportError:
pass
TEST_DATABASE_URL = "sqlite://" # in-memory
engine = create_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
# Use StaticPool so all sessions share the same in-memory connection
poolclass=__import__("sqlalchemy.pool", fromlist=["StaticPool"]).StaticPool,
)
# SQLite needs foreign keys enabled per-connection
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn, _):
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def setup_database():
"""Create all tables before each test, drop after."""
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
@pytest.fixture()
def db():
"""Yield a DB session for direct model manipulation."""
session = TestingSessionLocal()
try:
yield session
finally:
session.close()
@pytest.fixture()
def client(db):
"""FastAPI TestClient wired to the test DB + a default authenticated user."""
from fastapi.testclient import TestClient
from app.main import app
from app.core.config import get_db
# Override DB dependency
def _override_get_db():
try:
yield db
finally:
pass # caller's `db` fixture handles close
app.dependency_overrides[get_db] = _override_get_db
yield TestClient(app)
app.dependency_overrides.clear()
# ---------------------------------------------------------------------------
# Helper factories
# ---------------------------------------------------------------------------
@pytest.fixture()
def make_user(db):
"""Factory to create a User row."""
from app.models.models import User
_counter = [0]
# Pre-compute a bcrypt hash for "password" to avoid passlib/bcrypt version issues
_pwd_hash = "$2b$12$LJ3m4ys/Xz.l1PaOHHKN/uE7dQFnSm1AUBfEkL0C2dN9.3Oau4XG"
def _make(username=None, is_admin=False):
_counter[0] += 1
n = _counter[0]
u = User(
username=username or f"testuser{n}",
email=f"test{n}@example.com",
hashed_password=_pwd_hash,
is_active=True,
is_admin=is_admin,
)
db.add(u)
db.commit()
db.refresh(u)
return u
return _make
@pytest.fixture()
def make_project(db):
"""Factory to create a Project row."""
from app.models.models import Project
_counter = [0]
def _make(owner_id, name=None, project_code=None):
_counter[0] += 1
n = _counter[0]
p = Project(
name=name or f"TestProject{n}",
project_code=project_code or f"TP{n}",
owner_name="owner",
owner_id=owner_id,
)
db.add(p)
db.commit()
db.refresh(p)
return p
return _make
@pytest.fixture()
def make_milestone(db):
"""Factory to create a Milestone row."""
from app.models.milestone import Milestone, MilestoneStatus
_counter = [0]
def _make(project_id, created_by_id, status=MilestoneStatus.OPEN, **kw):
_counter[0] += 1
n = _counter[0]
ms = Milestone(
title=kw.pop("title", f"Milestone {n}"),
project_id=project_id,
created_by_id=created_by_id,
status=status,
milestone_code=kw.pop("milestone_code", f"M{n:04d}"),
**kw,
)
db.add(ms)
db.commit()
db.refresh(ms)
return ms
return _make
@pytest.fixture()
def make_task(db):
"""Factory to create a Task row."""
from app.models.task import Task, TaskStatus
_counter = [0]
def _make(project_id, milestone_id, reporter_id, status=TaskStatus.PENDING, **kw):
_counter[0] += 1
n = _counter[0]
t = Task(
title=kw.pop("title", f"Task {n}"),
project_id=project_id,
milestone_id=milestone_id,
reporter_id=reporter_id,
created_by_id=kw.pop("created_by_id", reporter_id),
status=status,
task_code=kw.pop("task_code", f"T{n:04d}"),
task_type=kw.pop("task_type", "issue"),
task_subtype=kw.pop("task_subtype", None),
**kw,
)
db.add(t)
db.commit()
db.refresh(t)
return t
return _make
@pytest.fixture()
def seed_roles_and_permissions(db):
"""Create the minimal role + permission setup needed by action endpoints.
Returns (admin_role, mgr_role, dev_role).
"""
from app.models.role_permission import Role, Permission, RolePermission
# --- roles ---
admin_role = Role(name="admin", is_global=True)
mgr_role = Role(name="mgr", is_global=False)
dev_role = Role(name="dev", is_global=False)
db.add_all([admin_role, mgr_role, dev_role])
db.commit()
# --- permissions ---
perm_names = [
("milestone.freeze", "milestone"),
("milestone.start", "milestone"),
("milestone.close", "milestone"),
("task.close", "task"),
("task.reopen_closed", "task"),
("task.reopen_completed", "task"),
("propose.accept", "propose"),
("propose.reject", "propose"),
("propose.reopen", "propose"),
# add broad perms for role checks
("project.read", "project"),
("project.write", "project"),
("milestone.read", "milestone"),
("milestone.write", "milestone"),
("milestone.create", "milestone"),
("task.read", "task"),
("task.write", "task"),
("task.create", "task"),
]
perm_objs = {}
for name, cat in perm_names:
p = Permission(name=name, category=cat, description=name)
db.add(p)
db.flush()
perm_objs[name] = p
# admin gets all
for p in perm_objs.values():
db.add(RolePermission(role_id=admin_role.id, permission_id=p.id))
# mgr gets milestone + propose + task management perms
mgr_perms = [
"milestone.freeze", "milestone.start", "milestone.close",
"task.close", "task.reopen_closed", "task.reopen_completed",
"propose.accept", "propose.reject", "propose.reopen",
"project.read", "project.write",
"milestone.read", "milestone.write", "milestone.create",
"task.read", "task.write", "task.create",
]
for name in mgr_perms:
db.add(RolePermission(role_id=mgr_role.id, permission_id=perm_objs[name].id))
# dev gets basic perms
dev_perms = [
"project.read", "task.read", "task.write", "task.create",
"milestone.read", "task.close", "task.reopen_closed", "task.reopen_completed",
]
for name in dev_perms:
db.add(RolePermission(role_id=dev_role.id, permission_id=perm_objs[name].id))
db.commit()
db.refresh(admin_role)
db.refresh(mgr_role)
db.refresh(dev_role)
return admin_role, mgr_role, dev_role
@pytest.fixture()
def make_member(db):
"""Factory to add a user as project member with a given role."""
from app.models.models import ProjectMember
def _make(project_id, user_id, role_id):
pm = ProjectMember(project_id=project_id, user_id=user_id, role_id=role_id)
db.add(pm)
db.commit()
return pm
return _make
@pytest.fixture()
def auth_header():
"""Generate a JWT auth header for a given user."""
from app.api.deps import create_access_token
def _make(user):
token = create_access_token({"sub": str(user.id)})
return {"Authorization": f"Bearer {token}"}
return _make

View File

@@ -0,0 +1,358 @@
"""P13.1 — Milestone state-machine action tests.
Covers:
- freeze: success, missing release task, multiple release tasks, wrong status
- start: success + started_at, deps not met, wrong status
- close: from open/freeze/undergoing, wrong status (completed/closed)
- auto-complete: release task completion triggers milestone completed
"""
import json
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# -----------------------------------------------------------------------
# Freeze
# -----------------------------------------------------------------------
class TestFreeze:
"""POST /projects/{pid}/milestones/{mid}/actions/freeze"""
def test_freeze_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, _ = seed_roles_and_permissions
user = make_user(is_admin=False)
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
# Create exactly 1 maintenance/release task
make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "freeze"
db.refresh(ms)
assert ms.status == MilestoneStatus.FREEZE
def test_freeze_no_release_task(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "no maintenance/release task" in resp.json()["detail"].lower()
def test_freeze_multiple_release_tasks(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected exactly 1" in resp.json()["detail"].lower()
def test_freeze_wrong_status(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.CLOSED)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/freeze",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected 'open'" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Start
# -----------------------------------------------------------------------
class TestStart:
"""POST /projects/{pid}/milestones/{mid}/actions/start"""
def _freeze_milestone(self, db, ms):
ms.status = MilestoneStatus.FREEZE
db.commit()
db.refresh(ms)
def test_start_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
self._freeze_milestone(db, ms)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["status"] == "undergoing"
assert "started_at" in data
db.refresh(ms)
assert ms.status == MilestoneStatus.UNDERGOING
assert ms.started_at is not None
def test_start_deps_not_met(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
# Create a dependency milestone that is NOT completed
dep_ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
ms = make_milestone(
project.id, user.id,
depend_on_milestones=json.dumps([dep_ms.id]),
)
self._freeze_milestone(db, ms)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "cannot start" in resp.json()["detail"].lower()
def test_start_wrong_status(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/start",
headers=auth_header(user),
)
assert resp.status_code == 400
assert "expected 'freeze'" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Close
# -----------------------------------------------------------------------
class TestClose:
"""POST /projects/{pid}/milestones/{mid}/actions/close"""
@pytest.mark.parametrize("initial_status", [
MilestoneStatus.OPEN,
MilestoneStatus.FREEZE,
MilestoneStatus.UNDERGOING,
])
def test_close_from_allowed_statuses(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header, initial_status,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=initial_status)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
headers=auth_header(user),
json={"reason": "no longer needed"},
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "closed"
db.refresh(ms)
assert ms.status == MilestoneStatus.CLOSED
@pytest.mark.parametrize("terminal_status", [
MilestoneStatus.COMPLETED,
MilestoneStatus.CLOSED,
])
def test_close_from_terminal_rejected(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header, terminal_status,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=terminal_status)
resp = client.post(
f"/projects/{project.id}/milestones/{ms.id}/actions/close",
headers=auth_header(user),
)
assert resp.status_code == 400
# -----------------------------------------------------------------------
# Auto-complete
# -----------------------------------------------------------------------
class TestAutoComplete:
"""When the sole release task is completed, milestone auto-completes."""
def test_auto_complete_on_release_task_finish(
self, db, make_user, make_project, make_milestone, make_task,
):
"""Direct unit test of try_auto_complete_milestone."""
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
release_task = make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, release_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.COMPLETED
def test_no_auto_complete_for_non_release_task(
self, db, make_user, make_project, make_milestone, make_task,
):
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
# Also add the required release task (still pending)
make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.PENDING,
)
normal_task = make_task(
project.id, ms.id, user.id,
task_type="issue", task_subtype="defect",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, normal_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.UNDERGOING # unchanged
def test_no_auto_complete_when_not_undergoing(
self, db, make_user, make_project, make_milestone, make_task,
):
from app.api.routers.milestone_actions import try_auto_complete_milestone
user = make_user()
project = make_project(owner_id=user.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.FREEZE)
release_task = make_task(
project.id, ms.id, user.id,
task_type="maintenance", task_subtype="release",
status=TaskStatus.COMPLETED,
)
try_auto_complete_milestone(db, release_task, user_id=user.id)
db.refresh(ms)
assert ms.status == MilestoneStatus.FREEZE # unchanged
# -----------------------------------------------------------------------
# Preflight
# -----------------------------------------------------------------------
class TestPreflight:
"""GET /projects/{pid}/milestones/{mid}/actions/preflight"""
def test_preflight_freeze_allowed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
make_task(project.id, ms.id, user.id, task_type="maintenance", task_subtype="release")
resp = client.get(
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
headers=auth_header(user),
)
assert resp.status_code == 200
data = resp.json()
assert data["freeze"]["allowed"] is True
def test_preflight_freeze_not_allowed(
self, client, db, make_user, make_project, make_milestone,
seed_roles_and_permissions, make_member, auth_header,
):
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id)
# No release task
resp = client.get(
f"/projects/{project.id}/milestones/{ms.id}/actions/preflight",
headers=auth_header(user),
)
assert resp.status_code == 200
data = resp.json()
assert data["freeze"]["allowed"] is False

559
tests/test_propose.py Normal file
View File

@@ -0,0 +1,559 @@
"""P13.3 — Propose backend tests.
Covers:
- CRUD: create, list, get, update
- propose_code per-project incrementing
- accept → auto-generate feature story task + feat_task_id
- accept with non-open milestone → fail
- reject → status change
- rejected → reopen back to open
- feat_task_id cannot be set manually
- edit restrictions (only open proposes editable)
- permission checks for accept/reject/reopen
"""
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _propose_url(project_id: int, propose_id: int | None = None) -> str:
base = f"/projects/{project_id}/proposes"
return f"{base}/{propose_id}" if propose_id else base
# ===========================================================================
# CRUD
# ===========================================================================
class TestProposeCRUD:
"""Basic create / list / get / update."""
def test_create_propose(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id, project_code="PROJ")
make_member(project.id, user.id, dev_role.id)
resp = client.post(
_propose_url(project.id),
json={"title": "New Feature Idea", "description": "Some details"},
headers=auth_header(user),
)
assert resp.status_code == 201
data = resp.json()
assert data["title"] == "New Feature Idea"
assert data["status"] == "open"
assert data["propose_code"].startswith("PROJ:P")
assert data["feat_task_id"] is None
def test_list_proposes(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
# Create two proposes
client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
client.post(_propose_url(project.id), json={"title": "P2"}, headers=auth_header(user))
resp = client.get(_propose_url(project.id), headers=auth_header(user))
assert resp.status_code == 200
assert len(resp.json()) == 2
def test_get_propose(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(_propose_url(project.id), json={"title": "P1"}, headers=auth_header(user))
propose_id = create_resp.json()["id"]
resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(user))
assert resp.status_code == 200
assert resp.json()["title"] == "P1"
def test_update_propose_open(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(_propose_url(project.id), json={"title": "Old"}, headers=auth_header(user))
propose_id = create_resp.json()["id"]
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "New Title", "description": "Updated"},
headers=auth_header(user),
)
assert resp.status_code == 200
assert resp.json()["title"] == "New Title"
assert resp.json()["description"] == "Updated"
# ===========================================================================
# Propose Code
# ===========================================================================
class TestProposeCode:
"""P1.4 — propose_code increments per project independently."""
def test_code_increments_per_project(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
proj_a = make_project(owner_id=user.id, project_code="ALPHA")
proj_b = make_project(owner_id=user.id, project_code="BETA")
make_member(proj_a.id, user.id, dev_role.id)
make_member(proj_b.id, user.id, dev_role.id)
# Create 2 in ALPHA
r1 = client.post(_propose_url(proj_a.id), json={"title": "A1"}, headers=auth_header(user))
r2 = client.post(_propose_url(proj_a.id), json={"title": "A2"}, headers=auth_header(user))
# Create 1 in BETA
r3 = client.post(_propose_url(proj_b.id), json={"title": "B1"}, headers=auth_header(user))
code1 = r1.json()["propose_code"]
code2 = r2.json()["propose_code"]
code3 = r3.json()["propose_code"]
assert code1.startswith("ALPHA:P")
assert code2.startswith("ALPHA:P")
assert code3.startswith("BETA:P")
# They should be distinct
assert code1 != code2
# ===========================================================================
# Accept
# ===========================================================================
class TestAccept:
"""P6.2 — accept propose → create feature story task."""
def test_accept_success(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id),
json={"title": "Cool Feature", "description": "Do something cool"},
headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] == "accepted"
assert data["feat_task_id"] is not None
# Verify the generated task exists
from app.models.task import Task
task = db.query(Task).filter(Task.id == int(data["feat_task_id"])).first()
assert task is not None
assert task.title == "Cool Feature"
assert task.description == "Do something cool"
assert task.task_type == "story"
assert task.task_subtype == "feature"
task_status = task.status.value if hasattr(task.status, "value") else task.status
assert task_status == "pending"
assert task.milestone_id == ms.id
def test_accept_non_open_milestone_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.FREEZE)
create_resp = client.post(
_propose_url(project.id),
json={"title": "Feature X"},
headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 400
assert "open" in resp.json()["detail"].lower()
def test_accept_already_accepted_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# First accept
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Second accept should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_accept_auto_fills_feat_task_id(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
data = resp.json()
assert data["feat_task_id"] is not None
# Re-fetch to confirm persistence
get_resp = client.get(_propose_url(project.id, propose_id), headers=auth_header(mgr))
assert get_resp.json()["feat_task_id"] == data["feat_task_id"]
def test_accept_no_permission_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
"""dev role should not have propose.accept permission."""
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.OPEN)
# Dev creates the propose
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
# Dev tries to accept — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# Reject
# ===========================================================================
class TestReject:
"""P6.3 — reject propose."""
def test_reject_success(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "Not needed"},
headers=auth_header(mgr),
)
assert resp.status_code == 200
assert resp.json()["status"] == "rejected"
def test_reject_non_open_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Accept first
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Now reject should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "Changed mind"},
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_reject_no_permission_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
resp = client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "nah"},
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# Reopen
# ===========================================================================
class TestReopen:
"""P6.4 — reopen rejected propose."""
def test_reopen_success(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Reject first
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "wait"},
headers=auth_header(mgr),
)
# Reopen
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(mgr),
)
assert resp.status_code == 200
assert resp.json()["status"] == "open"
def test_reopen_non_rejected_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Try reopen on open propose — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(mgr),
)
assert resp.status_code == 400
def test_reopen_no_permission_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
owner = make_user()
dev_user = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, dev_user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(dev_user),
)
propose_id = create_resp.json()["id"]
# Owner rejects
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "nah"},
headers=auth_header(owner),
)
# Dev tries to reopen — should fail
resp = client.post(
_propose_url(project.id, propose_id) + "/reopen",
headers=auth_header(dev_user),
)
assert resp.status_code == 403
# ===========================================================================
# feat_task_id protection
# ===========================================================================
class TestFeatTaskIdProtection:
"""P6.5 — feat_task_id is server-side only, cannot be set by client."""
def test_update_cannot_set_feat_task_id(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, dev_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(user),
)
propose_id = create_resp.json()["id"]
# Try to set feat_task_id via PATCH
resp = client.patch(
_propose_url(project.id, propose_id),
json={"feat_task_id": "999"},
headers=auth_header(user),
)
assert resp.status_code == 200
# feat_task_id should still be None (server ignores it)
assert resp.json()["feat_task_id"] is None
# ===========================================================================
# Edit restrictions
# ===========================================================================
class TestEditRestrictions:
"""Propose editing is only allowed in open status."""
def test_edit_accepted_propose_fails(
self, client, db, make_user, make_project, make_milestone, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
ms = make_milestone(project.id, mgr.id, status=MilestoneStatus.OPEN)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Accept
client.post(
_propose_url(project.id, propose_id) + "/accept",
json={"milestone_id": ms.id},
headers=auth_header(mgr),
)
# Try to edit
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "Changed"},
headers=auth_header(mgr),
)
assert resp.status_code == 400
assert "open" in resp.json()["detail"].lower()
def test_edit_rejected_propose_fails(
self, client, db, make_user, make_project, seed_roles_and_permissions, make_member, auth_header,
):
admin_role, mgr_role, dev_role = seed_roles_and_permissions
mgr = make_user()
project = make_project(owner_id=mgr.id)
make_member(project.id, mgr.id, mgr_role.id)
create_resp = client.post(
_propose_url(project.id), json={"title": "F"}, headers=auth_header(mgr),
)
propose_id = create_resp.json()["id"]
# Reject
client.post(
_propose_url(project.id, propose_id) + "/reject",
json={"reason": "no"},
headers=auth_header(mgr),
)
# Try to edit
resp = client.patch(
_propose_url(project.id, propose_id),
json={"title": "Changed"},
headers=auth_header(mgr),
)
assert resp.status_code == 400

View File

@@ -0,0 +1,564 @@
"""P13.2 — Task state-machine transition tests.
Covers:
- pending → open: success, milestone not undergoing, deps not met
- open → undergoing: success, no assignee, non-assignee blocked
- undergoing → completed: success with comment, no comment fails, non-assignee blocked
- close from pending/open/undergoing: permission required
- reopen from completed/closed → open: distinct permissions
- invalid transitions: rejected by state machine
- edit restrictions: P5.7 body edit guards by status/assignee
"""
import json
import pytest
from app.models.milestone import MilestoneStatus
from app.models.task import TaskStatus
# -----------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------
def _transition(client, task_id, new_status, headers, comment=None):
"""POST /tasks/{id}/transition?new_status=..."""
body = {}
if comment is not None:
body["comment"] = comment
return client.post(
f"/tasks/{task_id}/transition?new_status={new_status}",
json=body,
headers=headers,
)
# -----------------------------------------------------------------------
# pending → open
# -----------------------------------------------------------------------
class TestPendingToOpen:
def test_success(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open succeeds when milestone is undergoing and no deps."""
admin_role, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "open"
def test_milestone_not_undergoing(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open rejected when milestone is still open."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.OPEN)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.PENDING)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 400
assert "undergoing" in resp.json()["detail"].lower()
def test_deps_not_satisfied(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open rejected when depend_on tasks are not completed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.PENDING,
depend_on=json.dumps([dep_task.id]),
)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 400
assert "depend" in resp.json()["detail"].lower() or "block" in resp.json()["detail"].lower()
def test_deps_satisfied(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""pending→open succeeds when all depend_on tasks are completed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
dep_task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.PENDING,
depend_on=json.dumps([dep_task.id]),
)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
# -----------------------------------------------------------------------
# open → undergoing
# -----------------------------------------------------------------------
class TestOpenToUndergoing:
def test_success_assignee_starts(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Assignee can start their own task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.OPEN,
assignee_id=user.id,
)
resp = _transition(client, task.id, "undergoing", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "undergoing"
db.refresh(task)
assert task.started_on is not None
def test_no_assignee_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot start a task without an assignee."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
resp = _transition(client, task.id, "undergoing", auth_header(user))
assert resp.status_code == 400
assert "assignee" in resp.json()["detail"].lower()
def test_non_assignee_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""A different user cannot start someone else's task."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.OPEN,
assignee_id=owner.id,
)
resp = _transition(client, task.id, "undergoing", auth_header(other))
assert resp.status_code == 403
assert "assigned" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# undergoing → completed
# -----------------------------------------------------------------------
class TestUndergoingToCompleted:
def test_success_with_comment(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Assignee can complete a task with a completion comment."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user), comment="Done!")
assert resp.status_code == 200
assert resp.json()["status"] == "completed"
db.refresh(task)
assert task.finished_on is not None
def test_no_comment_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot complete without a comment."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user))
assert resp.status_code == 400
assert "comment" in resp.json()["detail"].lower()
def test_empty_comment_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Empty/whitespace comment is rejected."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.UNDERGOING,
assignee_id=user.id,
)
resp = _transition(client, task.id, "completed", auth_header(user), comment=" ")
assert resp.status_code == 400
assert "comment" in resp.json()["detail"].lower()
def test_non_assignee_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Non-assignee cannot complete the task."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.UNDERGOING,
assignee_id=owner.id,
)
resp = _transition(client, task.id, "completed", auth_header(other), comment="I finished it")
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Close task (from various states)
# -----------------------------------------------------------------------
class TestCloseTask:
@pytest.mark.parametrize("initial_status", [
TaskStatus.PENDING,
TaskStatus.OPEN,
TaskStatus.UNDERGOING,
])
def test_close_from_valid_states(
self, initial_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Close is allowed from pending/open/undergoing with permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=initial_status)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "closed"
@pytest.mark.parametrize("initial_status", [
TaskStatus.COMPLETED,
TaskStatus.CLOSED,
])
def test_close_from_terminal_states_fails(
self, initial_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot close from completed or already closed."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=initial_status)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 400
def test_close_without_permission_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""User without task.close permission cannot close."""
from app.models.role_permission import Role
_, _, dev_role = seed_roles_and_permissions
# Create a role with NO task.close permission
no_close_role = Role(name="viewer", is_global=False)
db.add(no_close_role)
db.commit()
# Give viewer only basic perms (project.read, task.read)
from app.models.role_permission import Permission, RolePermission
for pname in ("project.read", "task.read"):
p = db.query(Permission).filter(Permission.name == pname).first()
if p:
db.add(RolePermission(role_id=no_close_role.id, permission_id=p.id))
db.commit()
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, no_close_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.OPEN)
resp = _transition(client, task.id, "closed", auth_header(user))
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Reopen (completed → open, closed → open)
# -----------------------------------------------------------------------
class TestReopen:
def test_reopen_completed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Reopen from completed → open with task.reopen_completed permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
# finished_on should be cleared
db.refresh(task)
assert task.finished_on is None
def test_reopen_closed(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Reopen from closed → open with task.reopen_closed permission."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.CLOSED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 200
assert resp.json()["status"] == "open"
def test_reopen_without_permission_fails(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""User without reopen permission cannot reopen."""
from app.models.role_permission import Role, Permission, RolePermission
# Create a role with task.close but NO reopen permissions
limited_role = Role(name="limited", is_global=False)
db.add(limited_role)
db.commit()
for pname in ("project.read", "task.read", "task.write", "task.close"):
p = db.query(Permission).filter(Permission.name == pname).first()
if p:
db.add(RolePermission(role_id=limited_role.id, permission_id=p.id))
db.commit()
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, limited_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = _transition(client, task.id, "open", auth_header(user))
assert resp.status_code == 403
# -----------------------------------------------------------------------
# Invalid transitions
# -----------------------------------------------------------------------
class TestInvalidTransitions:
@pytest.mark.parametrize("from_status,to_status", [
(TaskStatus.PENDING, "undergoing"),
(TaskStatus.PENDING, "completed"),
(TaskStatus.OPEN, "completed"),
(TaskStatus.OPEN, "pending"),
(TaskStatus.UNDERGOING, "open"),
(TaskStatus.UNDERGOING, "pending"),
(TaskStatus.COMPLETED, "undergoing"),
(TaskStatus.COMPLETED, "closed"),
(TaskStatus.CLOSED, "undergoing"),
(TaskStatus.CLOSED, "completed"),
])
def test_disallowed_transition(
self, from_status, to_status,
client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""State machine rejects transitions not in VALID_TRANSITIONS."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=from_status,
assignee_id=user.id,
)
resp = _transition(client, task.id, to_status, auth_header(user))
assert resp.status_code == 400
assert "cannot transition" in resp.json()["detail"].lower()
# -----------------------------------------------------------------------
# Edit restrictions (PATCH)
# -----------------------------------------------------------------------
class TestEditRestrictions:
def test_undergoing_body_edit_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot PATCH body fields on an undergoing task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.UNDERGOING, assignee_id=user.id)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "New Title"},
headers=auth_header(user),
)
assert resp.status_code == 400
assert "undergoing" in resp.json()["detail"].lower()
def test_completed_body_edit_blocked(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Cannot PATCH body fields on a completed task."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(project.id, ms.id, user.id, status=TaskStatus.COMPLETED)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Changed"},
headers=auth_header(user),
)
assert resp.status_code == 400
def test_open_assignee_only_edit(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Open task with assignee: only assignee can edit body."""
_, mgr_role, _ = seed_roles_and_permissions
owner = make_user()
other = make_user()
project = make_project(owner_id=owner.id)
make_member(project.id, owner.id, mgr_role.id)
make_member(project.id, other.id, mgr_role.id)
ms = make_milestone(project.id, owner.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, owner.id,
status=TaskStatus.OPEN,
assignee_id=owner.id,
)
# Other user cannot edit
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Hijack"},
headers=auth_header(other),
)
assert resp.status_code == 403
# Assignee can edit
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "My Change"},
headers=auth_header(owner),
)
assert resp.status_code == 200
assert resp.json()["title"] == "My Change"
def test_open_no_assignee_anyone_edits(
self, client, db, make_user, make_project, make_milestone,
make_task, seed_roles_and_permissions, make_member, auth_header,
):
"""Open task without assignee: any project member can edit."""
_, mgr_role, _ = seed_roles_and_permissions
user = make_user()
project = make_project(owner_id=user.id)
make_member(project.id, user.id, mgr_role.id)
ms = make_milestone(project.id, user.id, status=MilestoneStatus.UNDERGOING)
task = make_task(
project.id, ms.id, user.id,
status=TaskStatus.OPEN,
assignee_id=None,
)
resp = client.patch(
f"/tasks/{task.id}",
json={"title": "Anyone's Change"},
headers=auth_header(user),
)
assert resp.status_code == 200
assert resp.json()["title"] == "Anyone's Change"