"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table.""" import math from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db from app.models import models from app.models.task import Task, TaskStatus, TaskPriority from app.models.milestone import Milestone from app.schemas import schemas from app.services.webhook import fire_webhooks_sync from app.models.notification import Notification as NotificationModel from app.api.deps import get_current_user_or_apikey from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task from app.services.activity import log_activity from app.services.dependency_check import check_task_deps router = APIRouter(tags=["Tasks"]) def _resolve_task(db: Session, identifier: str) -> Task: """Resolve a task by numeric id or task_code string. Raises 404 if not found.""" try: task_id = int(identifier) task = db.query(Task).filter(Task.id == task_id).first() except (ValueError, TypeError): task = db.query(Task).filter(Task.task_code == identifier).first() if not task: raise HTTPException(status_code=404, detail="Task not found") return task # ---- State-machine: valid transitions (P5.1-P5.6) ---- VALID_TRANSITIONS: dict[str, set[str]] = { "pending": {"open", "closed"}, "open": {"undergoing", "closed"}, "undergoing": {"completed", "closed"}, "completed": {"open"}, # reopen "closed": {"open"}, # reopen } def _check_transition(old_status: str, new_status: str) -> None: """Raise 400 if the transition is not allowed by the state machine.""" allowed = VALID_TRANSITIONS.get(old_status, set()) if new_status not in allowed: raise HTTPException( status_code=400, detail=f"Cannot transition from '{old_status}' to '{new_status}'. " f"Allowed targets from '{old_status}': {sorted(allowed) if allowed else 'none'}", ) # ---- Type / Subtype validation ---- TASK_SUBTYPE_MAP = { 'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'}, 'maintenance': {'deploy', 'release'}, 'review': {'code_review', 'decision_review', 'function_review'}, 'story': {'feature', 'improvement', 'refactor'}, 'test': {'regression', 'security', 'smoke', 'stress'}, 'research': set(), # P7.1: 'task' type removed — defect subtype migrated to issue/defect 'resolution': set(), } ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys()) """P9.6 — type+subtype combos that may NOT be created via general create endpoints. feature story → must come from propose accept release maintenance → must come from controlled milestone/release flow """ RESTRICTED_TYPE_SUBTYPES = { ("story", "feature"), ("maintenance", "release"), } def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None, *, allow_restricted: bool = False): if task_type is None: return if task_type not in ALLOWED_TASK_TYPES: raise HTTPException(status_code=400, detail=f'Invalid task_type: {task_type}') allowed = TASK_SUBTYPE_MAP.get(task_type, set()) if task_subtype and task_subtype not in allowed: raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}') # P9.6: block restricted combos unless explicitly allowed (e.g. propose accept, internal create) if not allow_restricted and (task_type, task_subtype) in RESTRICTED_TYPE_SUBTYPES: raise HTTPException( status_code=400, detail=f"Cannot create {task_type}/{task_subtype} task via general create. " f"Use the appropriate workflow (propose accept / milestone release setup)." ) def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None): n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message, entity_type=entity_type, entity_id=entity_id) db.add(n) db.commit() return n # ---- CRUD ---- @router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED) def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): _validate_task_type_subtype(task_in.task_type, task_in.task_subtype) data = task_in.model_dump(exclude_unset=True) data["reporter_id"] = data.get("reporter_id") or current_user.id data["created_by_id"] = current_user.id if not data.get("project_id"): raise HTTPException(status_code=400, detail="project_id is required") if not data.get("milestone_id"): raise HTTPException(status_code=400, detail="milestone_id is required") check_project_role(db, current_user.id, data["project_id"], min_role="dev") milestone = db.query(Milestone).filter( Milestone.id == data["milestone_id"], Milestone.project_id == data["project_id"], ).first() if not milestone: raise HTTPException(status_code=404, detail="Milestone not found") est_time = None if data.get("estimated_working_time"): try: est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time() except Exception: pass data["estimated_working_time"] = est_time milestone_code = milestone.milestone_code or f"m{milestone.id}" max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first() next_num = (max_task.id + 1) if max_task else 1 data["task_code"] = f"{milestone_code}:T{next_num:05x}" db_task = Task(**data) db.add(db_task) db.commit() db.refresh(db_task) event = "resolution.created" if db_task.task_type == "resolution" else "task.created" bg.add_task( fire_webhooks_sync, event, {"task_id": db_task.id, "title": db_task.title, "type": db_task.task_type, "status": db_task.status.value}, db_task.project_id, db, ) log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title}) return db_task @router.get("/tasks") def list_tasks( project_id: int = None, task_status: str = None, task_type: str = None, task_subtype: str = None, assignee_id: int = None, tag: str = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 50, db: Session = Depends(get_db) ): query = db.query(Task) if project_id: query = query.filter(Task.project_id == project_id) if task_status: query = query.filter(Task.status == task_status) if task_type: query = query.filter(Task.task_type == task_type) if task_subtype: query = query.filter(Task.task_subtype == task_subtype) if assignee_id: query = query.filter(Task.assignee_id == assignee_id) if tag: query = query.filter(Task.tags.contains(tag)) sort_fields = { "created_at": Task.created_at, "updated_at": Task.updated_at, "priority": Task.priority, "title": Task.title, } sort_col = sort_fields.get(sort_by, Task.created_at) query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc()) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, } @router.get("/tasks/{task_id}", response_model=schemas.TaskResponse) def get_task(task_id: str, db: Session = Depends(get_db)): return _resolve_task(db, task_id) @router.patch("/tasks/{task_id}", response_model=schemas.TaskResponse) def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = _resolve_task(db, task_id) # P5.7: status-based edit restrictions current_status = task.status.value if hasattr(task.status, 'value') else task.status update_data = task_update.model_dump(exclude_unset=True) # Fields that are always allowed regardless of status (non-body edits) _always_allowed = {"status"} body_fields = {k for k in update_data.keys() if k not in _always_allowed} if body_fields: # P3.6 supplement: feature story tasks locked after milestone freeze task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "") task_subtype = task.task_subtype or "" if task_type == "story" and task_subtype == "feature" and task.milestone_id: from app.models.milestone import Milestone ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if ms: ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status if ms_status in ("freeze", "undergoing", "completed", "closed"): raise HTTPException( status_code=400, detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. " f"Blocked fields: {sorted(body_fields)}", ) # undergoing/completed/closed: body edits forbidden if current_status in ("undergoing", "completed", "closed"): raise HTTPException( status_code=400, detail=f"Cannot edit task body fields in '{current_status}' status. " f"Blocked fields: {sorted(body_fields)}", ) # open + assignee set: only assignee or admin can edit body if current_status == "open" and task.assignee_id is not None: from app.api.rbac import is_global_admin, has_project_admin_role is_admin = ( is_global_admin(db, current_user.id) or has_project_admin_role(db, current_user.id, task.project_id) ) if current_user.id != task.assignee_id and not is_admin: raise HTTPException( status_code=403, detail="Only the current assignee or an admin can edit this task", ) # Legacy general permission check (covers project membership etc.) ensure_can_edit_task(db, current_user.id, task) if "status" in update_data: new_status = update_data["status"] old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine even through PATCH _check_transition(old_status, new_status) if new_status == "open" and old_status in ("completed", "closed"): task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) # P3.5: auto-complete milestone when release task reaches completed via update if "status" in update_data and update_data["status"] == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) return task @router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = _resolve_task(db, task_id) check_project_role(db, current_user.id, task.project_id, min_role="mgr") log_activity(db, "task.deleted", "task", task.id, current_user.id, {"title": task.title}) db.delete(task) db.commit() return None # ---- Transition ---- class TransitionBody(BaseModel): comment: Optional[str] = None @router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse) def transition_task( task_id: str, new_status: str, bg: BackgroundTasks, body: TransitionBody = None, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey), ): valid_statuses = [s.value for s in TaskStatus] if new_status not in valid_statuses: raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}") task = _resolve_task(db, task_id) old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine _check_transition(old_status, new_status) # P5.2: pending -> open requires milestone to be undergoing + task deps satisfied if old_status == "pending" and new_status == "open": milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if milestone: ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status if ms_status != "undergoing": raise HTTPException( status_code=400, detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'", ) # P4.3: check task-level depend_on dep_result = check_task_deps(db, task.depend_on) if not dep_result.ok: raise HTTPException( status_code=400, detail=f"Cannot open task: {dep_result.reason}", ) # P5.3: open -> undergoing requires assignee AND operator must be the assignee if old_status == "open" and new_status == "undergoing": if not task.assignee_id: raise HTTPException(status_code=400, detail="Cannot start task: assignee must be set first") if current_user.id != task.assignee_id: raise HTTPException(status_code=403, detail="Only the assigned user can start this task") # P5.4: undergoing -> completed requires a completion comment if old_status == "undergoing" and new_status == "completed": comment_text = body.comment if body else None if not comment_text or not comment_text.strip(): raise HTTPException(status_code=400, detail="A completion comment is required when finishing a task") # P5.4: also only the assignee can complete if task.assignee_id and current_user.id != task.assignee_id: raise HTTPException(status_code=403, detail="Only the assigned user can complete this task") # P5.5: closing a task requires 'task.close' permission if new_status == "closed": check_permission(db, current_user.id, task.project_id, "task.close") # P5.6: reopen from completed/closed -> open if new_status == "open" and old_status in ("completed", "closed"): perm_name = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed" check_permission(db, current_user.id, task.project_id, perm_name) # Clear finished_on on reopen so lifecycle timestamps are accurate task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() task.status = new_status db.commit() db.refresh(task) # P5.4: auto-create completion comment if old_status == "undergoing" and new_status == "completed" and body and body.comment: db_comment = models.Comment( content=body.comment.strip(), task_id=task.id, author_id=current_user.id, ) db.add(db_comment) db.commit() # Log the transition activity log_activity(db, f"task.transition.{new_status}", "task", task.id, current_user.id, {"old_status": old_status, "new_status": new_status}) # P3.5: auto-complete milestone when its sole release task is completed if new_status == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) event = "task.closed" if new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, {"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status}, task.project_id, db) return task # ---- Assignment ---- @router.post("/tasks/{task_id}/assign") def assign_task(task_id: str, assignee_id: int, db: Session = Depends(get_db)): task = _resolve_task(db, task_id) user = db.query(models.User).filter(models.User.id == assignee_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") task.assignee_id = assignee_id db.commit() db.refresh(task) _notify_user(db, assignee_id, "task.assigned", f"Task #{task.id} assigned to you", f"'{task.title}' has been assigned to you.", "task", task.id) return {"task_id": task.id, "assignee_id": assignee_id, "title": task.title} # ---- Tags ---- @router.post("/tasks/{task_id}/tags") def add_tag(task_id: str, tag: str, db: Session = Depends(get_db)): task = _resolve_task(db, task_id) current = set(task.tags.split(",")) if task.tags else set() current.add(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) db.commit() return {"task_id": task_id, "tags": list(current)} @router.delete("/tasks/{task_id}/tags") def remove_tag(task_id: str, tag: str, db: Session = Depends(get_db)): task = _resolve_task(db, task_id) current = set(task.tags.split(",")) if task.tags else set() current.discard(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) if current else None db.commit() return {"task_id": task_id, "tags": list(current)} @router.get("/tags") def list_all_tags(project_id: int = None, db: Session = Depends(get_db)): query = db.query(Task.tags).filter(Task.tags != None) if project_id: query = query.filter(Task.project_id == project_id) all_tags = set() for (tags,) in query.all(): for t in tags.split(","): t = t.strip() if t: all_tags.add(t) return {"tags": sorted(all_tags)} # ---- Batch ---- class BatchAssign(BaseModel): task_ids: List[int] assignee_id: int class BatchTransitionBody(BaseModel): task_ids: List[int] new_status: str comment: Optional[str] = None @router.post("/tasks/batch/transition") def batch_transition( data: BatchTransitionBody, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey), ): valid_statuses = [s.value for s in TaskStatus] if data.new_status not in valid_statuses: raise HTTPException(status_code=400, detail="Invalid status") updated = [] skipped = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if not task: skipped.append({"id": task_id, "title": None, "old": None, "reason": "Task not found"}) continue old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: state-machine check allowed = VALID_TRANSITIONS.get(old_status, set()) if data.new_status not in allowed: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"}) continue # P5.2: pending → open requires milestone undergoing + task deps if old_status == "pending" and data.new_status == "open": milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if milestone: ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status if ms_status != "undergoing": skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": f"Milestone is '{ms_status}', must be 'undergoing'"}) continue dep_result = check_task_deps(db, task.depend_on) if not dep_result.ok: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": dep_result.reason}) continue # P5.3: open → undergoing requires assignee == current_user if old_status == "open" and data.new_status == "undergoing": if not task.assignee_id: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": "Assignee must be set before starting"}) continue if current_user.id != task.assignee_id: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": "Only the assigned user can start this task"}) continue # P5.4: undergoing → completed requires comment + assignee check if old_status == "undergoing" and data.new_status == "completed": comment_text = data.comment if not comment_text or not comment_text.strip(): skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": "A completion comment is required"}) continue if task.assignee_id and current_user.id != task.assignee_id: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": "Only the assigned user can complete this task"}) continue # P5.5: close requires permission if data.new_status == "closed": try: check_permission(db, current_user.id, task.project_id, "task.close") except HTTPException: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": "Missing 'task.close' permission"}) continue # P5.6: reopen requires permission if data.new_status == "open" and old_status in ("completed", "closed"): perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed" try: check_permission(db, current_user.id, task.project_id, perm) except HTTPException: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": f"Missing '{perm}' permission"}) continue task.finished_on = None if data.new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if data.new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() task.status = data.new_status updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status}) # Activity log per task log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id, {"old_status": old_status, "new_status": data.new_status}) # P5.4: auto-create completion comment if old_status == "undergoing" and data.new_status == "completed" and data.comment: db_comment = models.Comment( content=data.comment.strip(), task_id=task.id, author_id=current_user.id, ) db.add(db_comment) db.commit() # P3.5: auto-complete milestone for any completed task for u in updated: if u["new"] == "completed": t = db.query(Task).filter(Task.id == u["id"]).first() if t: from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, t, user_id=current_user.id) for u in updated: event = "task.closed" if data.new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, u, None, db) result = {"updated": len(updated), "tasks": updated} if skipped: result["skipped"] = skipped return result @router.post("/tasks/batch/assign") def batch_assign(data: BatchAssign, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == data.assignee_id).first() if not user: raise HTTPException(status_code=404, detail="Assignee not found") updated = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: task.assignee_id = data.assignee_id updated.append(task_id) db.commit() return {"updated": len(updated), "task_ids": updated, "assignee_id": data.assignee_id} # ---- Search ---- @router.get("/search/tasks") def search_tasks(q: str, project_id: int = None, page: int = 1, page_size: int = 50, db: Session = Depends(get_db)): query = db.query(Task).filter( (Task.title.contains(q)) | (Task.description.contains(q)) ) if project_id: query = query.filter(Task.project_id == project_id) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, }