"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table.""" import math from typing import List from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db from app.models import models from app.models.task import Task, TaskStatus, TaskPriority from app.models.milestone import Milestone from app.schemas import schemas from app.services.webhook import fire_webhooks_sync from app.models.notification import Notification as NotificationModel from app.api.deps import get_current_user_or_apikey from app.api.rbac import check_project_role, ensure_can_edit_task from app.services.activity import log_activity router = APIRouter(tags=["Tasks"]) # ---- State-machine: valid transitions (P5.1-P5.6) ---- VALID_TRANSITIONS: dict[str, set[str]] = { "pending": {"open", "closed"}, "open": {"undergoing", "closed"}, "undergoing": {"completed", "closed"}, "completed": {"open"}, # reopen "closed": {"open"}, # reopen } def _check_transition(old_status: str, new_status: str) -> None: """Raise 400 if the transition is not allowed by the state machine.""" allowed = VALID_TRANSITIONS.get(old_status, set()) if new_status not in allowed: raise HTTPException( status_code=400, detail=f"Cannot transition from '{old_status}' to '{new_status}'. " f"Allowed targets from '{old_status}': {sorted(allowed) if allowed else 'none'}", ) # ---- Type / Subtype validation ---- TASK_SUBTYPE_MAP = { 'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'}, 'maintenance': {'deploy', 'release'}, 'review': {'code_review', 'decision_review', 'function_review'}, 'story': {'feature', 'improvement', 'refactor'}, 'test': {'regression', 'security', 'smoke', 'stress'}, 'research': set(), 'task': {'defect'}, 'resolution': set(), } ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys()) def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None): if task_type is None: return if task_type not in ALLOWED_TASK_TYPES: raise HTTPException(status_code=400, detail=f'Invalid task_type: {task_type}') allowed = TASK_SUBTYPE_MAP.get(task_type, set()) if task_subtype and task_subtype not in allowed: raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}') def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None): n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message, entity_type=entity_type, entity_id=entity_id) db.add(n) db.commit() return n # ---- CRUD ---- @router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED) def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): _validate_task_type_subtype(task_in.task_type, task_in.task_subtype) data = task_in.model_dump(exclude_unset=True) data["reporter_id"] = data.get("reporter_id") or current_user.id data["created_by_id"] = current_user.id if not data.get("project_id"): raise HTTPException(status_code=400, detail="project_id is required") if not data.get("milestone_id"): raise HTTPException(status_code=400, detail="milestone_id is required") check_project_role(db, current_user.id, data["project_id"], min_role="dev") milestone = db.query(Milestone).filter( Milestone.id == data["milestone_id"], Milestone.project_id == data["project_id"], ).first() if not milestone: raise HTTPException(status_code=404, detail="Milestone not found") est_time = None if data.get("estimated_working_time"): try: est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time() except Exception: pass data["estimated_working_time"] = est_time milestone_code = milestone.milestone_code or f"m{milestone.id}" max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first() next_num = (max_task.id + 1) if max_task else 1 data["task_code"] = f"{milestone_code}:T{next_num:05x}" db_task = Task(**data) db.add(db_task) db.commit() db.refresh(db_task) event = "resolution.created" if db_task.task_type == "resolution" else "task.created" bg.add_task( fire_webhooks_sync, event, {"task_id": db_task.id, "title": db_task.title, "type": db_task.task_type, "status": db_task.status.value}, db_task.project_id, db, ) log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title}) return db_task @router.get("/tasks") def list_tasks( project_id: int = None, task_status: str = None, task_type: str = None, task_subtype: str = None, assignee_id: int = None, tag: str = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 50, db: Session = Depends(get_db) ): query = db.query(Task) if project_id: query = query.filter(Task.project_id == project_id) if task_status: query = query.filter(Task.status == task_status) if task_type: query = query.filter(Task.task_type == task_type) if task_subtype: query = query.filter(Task.task_subtype == task_subtype) if assignee_id: query = query.filter(Task.assignee_id == assignee_id) if tag: query = query.filter(Task.tags.contains(tag)) sort_fields = { "created_at": Task.created_at, "updated_at": Task.updated_at, "priority": Task.priority, "title": Task.title, } sort_col = sort_fields.get(sort_by, Task.created_at) query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc()) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, } @router.get("/tasks/{task_id}", response_model=schemas.TaskResponse) def get_task(task_id: int, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") return task @router.patch("/tasks/{task_id}", response_model=schemas.TaskResponse) def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") ensure_can_edit_task(db, current_user.id, task) update_data = task_update.model_dump(exclude_unset=True) if "status" in update_data: new_status = update_data["status"] old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine even through PATCH _check_transition(old_status, new_status) if new_status == "open" and old_status in ("completed", "closed"): task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) # P3.5: auto-complete milestone when release task reaches completed via update if "status" in update_data and update_data["status"] == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) return task @router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_task(task_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") check_project_role(db, current_user.id, task.project_id, min_role="mgr") log_activity(db, "task.deleted", "task", task.id, current_user.id, {"title": task.title}) db.delete(task) db.commit() return None # ---- Transition ---- @router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse) def transition_task(task_id: int, new_status: str, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = [s.value for s in TaskStatus] if new_status not in valid_statuses: raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}") task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine _check_transition(old_status, new_status) # P5.2: pending -> open requires milestone to be undergoing (dependencies checked later) if old_status == "pending" and new_status == "open": milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if milestone: ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status if ms_status != "undergoing": raise HTTPException( status_code=400, detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'", ) # P5.3: open -> undergoing requires assignee if old_status == "open" and new_status == "undergoing": if not task.assignee_id: raise HTTPException(status_code=400, detail="Cannot start task: assignee must be set first") # P5.6: reopen from completed/closed -> open if new_status == "open" and old_status in ("completed", "closed"): # Clear finished_on on reopen so lifecycle timestamps are accurate task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() task.status = new_status db.commit() db.refresh(task) # P3.5: auto-complete milestone when its sole release task is completed if new_status == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=None) event = "task.closed" if new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, {"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status}, task.project_id, db) return task # ---- Assignment ---- @router.post("/tasks/{task_id}/assign") def assign_task(task_id: int, assignee_id: int, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") user = db.query(models.User).filter(models.User.id == assignee_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") task.assignee_id = assignee_id db.commit() db.refresh(task) _notify_user(db, assignee_id, "task.assigned", f"Task #{task.id} assigned to you", f"'{task.title}' has been assigned to you.", "task", task.id) return {"task_id": task.id, "assignee_id": assignee_id, "title": task.title} # ---- Tags ---- @router.post("/tasks/{task_id}/tags") def add_tag(task_id: int, tag: str, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") current = set(task.tags.split(",")) if task.tags else set() current.add(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) db.commit() return {"task_id": task_id, "tags": list(current)} @router.delete("/tasks/{task_id}/tags") def remove_tag(task_id: int, tag: str, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") current = set(task.tags.split(",")) if task.tags else set() current.discard(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) if current else None db.commit() return {"task_id": task_id, "tags": list(current)} @router.get("/tags") def list_all_tags(project_id: int = None, db: Session = Depends(get_db)): query = db.query(Task.tags).filter(Task.tags != None) if project_id: query = query.filter(Task.project_id == project_id) all_tags = set() for (tags,) in query.all(): for t in tags.split(","): t = t.strip() if t: all_tags.add(t) return {"tags": sorted(all_tags)} # ---- Batch ---- class BatchTransition(BaseModel): task_ids: List[int] new_status: str class BatchAssign(BaseModel): task_ids: List[int] assignee_id: int @router.post("/tasks/batch/transition") def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = [s.value for s in TaskStatus] if data.new_status not in valid_statuses: raise HTTPException(status_code=400, detail="Invalid status") updated = [] skipped = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: old_status = task.status.value if hasattr(task.status, 'value') else task.status allowed = VALID_TRANSITIONS.get(old_status, set()) if data.new_status not in allowed: skipped.append({"id": task.id, "title": task.title, "old": old_status, "reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"}) continue if data.new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if data.new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() if data.new_status == "open" and old_status in ("completed", "closed"): task.finished_on = None task.status = data.new_status updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status}) db.commit() for u in updated: event = "task.closed" if data.new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, u, None, db) result = {"updated": len(updated), "tasks": updated} if skipped: result["skipped"] = skipped return result @router.post("/tasks/batch/assign") def batch_assign(data: BatchAssign, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == data.assignee_id).first() if not user: raise HTTPException(status_code=404, detail="Assignee not found") updated = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: task.assignee_id = data.assignee_id updated.append(task_id) db.commit() return {"updated": len(updated), "task_ids": updated, "assignee_id": data.assignee_id} # ---- Search ---- @router.get("/search/tasks") def search_tasks(q: str, project_id: int = None, page: int = 1, page_size: int = 50, db: Session = Depends(get_db)): query = db.query(Task).filter( (Task.title.contains(q)) | (Task.description.contains(q)) ) if project_id: query = query.filter(Task.project_id == project_id) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, }