"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table.""" import math from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db from app.models import models from app.models.task import Task, TaskStatus, TaskPriority from app.models.milestone import Milestone from app.schemas import schemas from app.services.webhook import fire_webhooks_sync from app.models.notification import Notification as NotificationModel from app.api.deps import get_current_user_or_apikey from app.api.rbac import check_project_role, ensure_can_edit_task from app.services.activity import log_activity router = APIRouter(tags=["Tasks"]) # ---- State-machine: valid transitions (P5.1-P5.6) ---- VALID_TRANSITIONS: dict[str, set[str]] = { "pending": {"open", "closed"}, "open": {"undergoing", "closed"}, "undergoing": {"completed", "closed"}, "completed": {"open"}, # reopen "closed": {"open"}, # reopen } def _check_transition(old_status: str, new_status: str) -> None: """Raise 400 if the transition is not allowed by the state machine.""" allowed = VALID_TRANSITIONS.get(old_status, set()) if new_status not in allowed: raise HTTPException( status_code=400, detail=f"Cannot transition from '{old_status}' to '{new_status}'. " f"Allowed targets from '{old_status}': {sorted(allowed) if allowed else 'none'}", ) # ---- Type / Subtype validation ---- TASK_SUBTYPE_MAP = { 'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'}, 'maintenance': {'deploy', 'release'}, 'review': {'code_review', 'decision_review', 'function_review'}, 'story': {'feature', 'improvement', 'refactor'}, 'test': {'regression', 'security', 'smoke', 'stress'}, 'research': set(), 'task': {'defect'}, 'resolution': set(), } ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys()) def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None): if task_type is None: return if task_type not in ALLOWED_TASK_TYPES: raise HTTPException(status_code=400, detail=f'Invalid task_type: {task_type}') allowed = TASK_SUBTYPE_MAP.get(task_type, set()) if task_subtype and task_subtype not in allowed: raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}') def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None): n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message, entity_type=entity_type, entity_id=entity_id) db.add(n) db.commit() return n # ---- CRUD ---- @router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED) def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): _validate_task_type_subtype(task_in.task_type, task_in.task_subtype) data = task_in.model_dump(exclude_unset=True) data["reporter_id"] = data.get("reporter_id") or current_user.id data["created_by_id"] = current_user.id if not data.get("project_id"): raise HTTPException(status_code=400, detail="project_id is required") if not data.get("milestone_id"): raise HTTPException(status_code=400, detail="milestone_id is required") check_project_role(db, current_user.id, data["project_id"], min_role="dev") milestone = db.query(Milestone).filter( Milestone.id == data["milestone_id"], Milestone.project_id == data["project_id"], ).first() if not milestone: raise HTTPException(status_code=404, detail="Milestone not found") est_time = None if data.get("estimated_working_time"): try: est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time() except Exception: pass data["estimated_working_time"] = est_time milestone_code = milestone.milestone_code or f"m{milestone.id}" max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first() next_num = (max_task.id + 1) if max_task else 1 data["task_code"] = f"{milestone_code}:T{next_num:05x}" db_task = Task(**data) db.add(db_task) db.commit() db.refresh(db_task) event = "resolution.created" if db_task.task_type == "resolution" else "task.created" bg.add_task( fire_webhooks_sync, event, {"task_id": db_task.id, "title": db_task.title, "type": db_task.task_type, "status": db_task.status.value}, db_task.project_id, db, ) log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title}) return db_task @router.get("/tasks") def list_tasks( project_id: int = None, task_status: str = None, task_type: str = None, task_subtype: str = None, assignee_id: int = None, tag: str = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 50, db: Session = Depends(get_db) ): query = db.query(Task) if project_id: query = query.filter(Task.project_id == project_id) if task_status: query = query.filter(Task.status == task_status) if task_type: query = query.filter(Task.task_type == task_type) if task_subtype: query = query.filter(Task.task_subtype == task_subtype) if assignee_id: query = query.filter(Task.assignee_id == assignee_id) if tag: query = query.filter(Task.tags.contains(tag)) sort_fields = { "created_at": Task.created_at, "updated_at": Task.updated_at, "priority": Task.priority, "title": Task.title, } sort_col = sort_fields.get(sort_by, Task.created_at) query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc()) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, } @router.get("/tasks/{task_id}", response_model=schemas.TaskResponse) def get_task(task_id: int, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") return task @router.patch("/tasks/{task_id}", response_model=schemas.TaskResponse) def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") ensure_can_edit_task(db, current_user.id, task) update_data = task_update.model_dump(exclude_unset=True) if "status" in update_data: new_status = update_data["status"] old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine even through PATCH _check_transition(old_status, new_status) if new_status == "open" and old_status in ("completed", "closed"): task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) # P3.5: auto-complete milestone when release task reaches completed via update if "status" in update_data and update_data["status"] == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) return task @router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_task(task_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") check_project_role(db, current_user.id, task.project_id, min_role="mgr") log_activity(db, "task.deleted", "task", task.id, current_user.id, {"title": task.title}) db.delete(task) db.commit() return None # ---- Transition ---- class TransitionBody(BaseModel): comment: Optional[str] = None @router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse) def transition_task( task_id: int, new_status: str, bg: BackgroundTasks, body: TransitionBody = None, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey), ): valid_statuses = [s.value for s in TaskStatus] if new_status not in valid_statuses: raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}") task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine _check_transition(old_status, new_status) # P5.2: pending -> open requires milestone to be undergoing (dependencies checked later) if old_status == "pending" and new_status == "open": milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if milestone: ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status if ms_status != "undergoing": raise HTTPException( status_code=400, detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'", ) # P5.3: open -> undergoing requires assignee AND operator must be the assignee if old_status == "open" and new_status == "undergoing": if not task.assignee_id: raise HTTPException(status_code=400, detail="Cannot start task: assignee must be set first") if current_user.id != task.assignee_id: raise HTTPException(status_code=403, detail="Only the assigned user can start this task") # P5.4: undergoing -> completed requires a completion comment if old_status == "undergoing" and new_status == "completed": comment_text = body.comment if body else None if not comment_text or not comment_text.strip(): raise HTTPException(status_code=400, detail="A completion comment is required when finishing a task") # P5.4: also only the assignee can complete if task.assignee_id and current_user.id != task.assignee_id: raise HTTPException(status_code=403, detail="Only the assigned user can complete this task") # P5.6: reopen from completed/closed -> open if new_status == "open" and old_status in ("completed", "closed"): # Clear finished_on on reopen so lifecycle timestamps are accurate task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() task.status = new_status db.commit() db.refresh(task) # P5.4: auto-create completion comment if old_status == "undergoing" and new_status == "completed" and body and body.comment: db_comment = models.Comment( content=body.comment.strip(), task_id=task.id, author_id=current_user.id, ) db.add(db_comment) db.commit() # Log the transition activity log_activity(db, f"task.transition.{new_status}", "task", task.id, current_user.id, {"old_status": old_status, "new_status": new_status}) # P3.5: auto-complete milestone when its sole release task is completed if new_status == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) event = "task.closed" if new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, {"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status}, task.project_id, db) return task # ---- Assignment ---- @router.post("/tasks/{task_id}/assign") def assign_task(task_id: int, assignee_id: int, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") user = db.query(models.User).filter(models.User.id == assignee_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") task.assignee_id = assignee_id db.commit() db.refresh(task) _notify_user(db, assignee_id, "task.assigned", f"Task #{task.id} assigned to you", f"'{task.title}' has been assigned to you.", "task", task.id) return {"task_id": task.id, "assignee_id": assignee_id, "title": task.title} # ---- Tags ---- @router.post("/tasks/{task_id}/tags") def add_tag(task_id: int, tag: str, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") current = set(task.tags.split(",")) if task.tags else set() current.add(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) db.commit() return {"task_id": task_id, "tags": list(current)} @router.delete("/tasks/{task_id}/tags") def remove_tag(task_id: int, tag: str, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") current = set(task.tags.split(",")) if task.tags else set() current.discard(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) if current else None db.commit() return {"task_id": task_id, "tags": list(current)} @router.get("/tags") def list_all_tags(project_id: int = None, db: Session = Depends(get_db)): query = db.query(Task.tags).filter(Task.tags != None) if project_id: query = query.filter(Task.project_id == project_id) all_tags = set() for (tags,) in query.all(): for t in tags.split(","): t = t.strip() if t: all_tags.add(t) return {"tags": sorted(all_tags)} # ---- Batch ---- class BatchTransition(BaseModel): task_ids: List[int] new_status: str class BatchAssign(BaseModel): task_ids: List[int] assignee_id: int @router.post("/tasks/batch/transition") def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = [s.value for s in TaskStatus] if data.new_status not in valid_statuses: raise HTTPException(status_code=400, detail="Invalid status") updated = [] skipped = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: old_status = task.status.value if hasattr(task.status, 'value') else task.status allowed = VALID_TRANSITIONS.get(old_status, set()) if data.new_status not in allowed: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"}) continue if data.new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if data.new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() if data.new_status == "open" and old_status in ("completed", "closed"): task.finished_on = None task.status = data.new_status updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status}) db.commit() for u in updated: event = "task.closed" if data.new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, u, None, db) result = {"updated": len(updated), "tasks": updated} if skipped: result["skipped"] = skipped return result @router.post("/tasks/batch/assign") def batch_assign(data: BatchAssign, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == data.assignee_id).first() if not user: raise HTTPException(status_code=404, detail="Assignee not found") updated = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: task.assignee_id = data.assignee_id updated.append(task_id) db.commit() return {"updated": len(updated), "task_ids": updated, "assignee_id": data.assignee_id} # ---- Search ---- @router.get("/search/tasks") def search_tasks(q: str, project_id: int = None, page: int = 1, page_size: int = 50, db: Session = Depends(get_db)): query = db.query(Task).filter( (Task.title.contains(q)) | (Task.description.contains(q)) ) if project_id: query = query.filter(Task.project_id == project_id) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, }