"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table.""" import math from typing import List from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db from app.models import models from app.models.task import Task, TaskStatus, TaskPriority from app.models.milestone import Milestone from app.schemas import schemas from app.services.webhook import fire_webhooks_sync from app.models.notification import Notification as NotificationModel from app.api.deps import get_current_user_or_apikey from app.api.rbac import check_project_role from app.services.activity import log_activity router = APIRouter(tags=["Tasks"]) # ---- Type / Subtype validation ---- TASK_SUBTYPE_MAP = { 'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'}, 'maintenance': {'deploy', 'release'}, 'review': {'code_review', 'decision_review', 'function_review'}, 'story': {'feature', 'improvement', 'refactor'}, 'test': {'regression', 'security', 'smoke', 'stress'}, 'research': set(), 'task': {'defect'}, 'resolution': set(), } ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys()) def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None): if task_type is None: return if task_type not in ALLOWED_TASK_TYPES: raise HTTPException(status_code=400, detail=f'Invalid task_type: {task_type}') allowed = TASK_SUBTYPE_MAP.get(task_type, set()) if task_subtype and task_subtype not in allowed: raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}') def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None): n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message, entity_type=entity_type, entity_id=entity_id) db.add(n) db.commit() return n # ---- CRUD ---- @router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED) def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): _validate_task_type_subtype(task_in.task_type, task_in.task_subtype) data = task_in.model_dump(exclude_unset=True) data["reporter_id"] = data.get("reporter_id") or current_user.id data["created_by_id"] = current_user.id if not data.get("project_id"): raise HTTPException(status_code=400, detail="project_id is required") if not data.get("milestone_id"): raise HTTPException(status_code=400, detail="milestone_id is required") check_project_role(db, current_user.id, data["project_id"], min_role="dev") milestone = db.query(Milestone).filter( Milestone.id == data["milestone_id"], Milestone.project_id == data["project_id"], ).first() if not milestone: raise HTTPException(status_code=404, detail="Milestone not found") est_time = None if data.get("estimated_working_time"): try: est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time() except Exception: pass data["estimated_working_time"] = est_time milestone_code = milestone.milestone_code or f"m{milestone.id}" max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first() next_num = (max_task.id + 1) if max_task else 1 data["task_code"] = f"{milestone_code}:T{next_num:05x}" db_task = Task(**data) db.add(db_task) db.commit() db.refresh(db_task) event = "resolution.created" if db_task.task_type == "resolution" else "task.created" bg.add_task( fire_webhooks_sync, event, {"task_id": db_task.id, "title": db_task.title, "type": db_task.task_type, "status": db_task.status.value}, db_task.project_id, db, ) log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title}) return db_task @router.get("/tasks") def list_tasks( project_id: int = None, task_status: str = None, task_type: str = None, task_subtype: str = None, assignee_id: int = None, tag: str = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 50, db: Session = Depends(get_db) ): query = db.query(Task) if project_id: query = query.filter(Task.project_id == project_id) if task_status: query = query.filter(Task.status == task_status) if task_type: query = query.filter(Task.task_type == task_type) if task_subtype: query = query.filter(Task.task_subtype == task_subtype) if assignee_id: query = query.filter(Task.assignee_id == assignee_id) if tag: query = query.filter(Task.tags.contains(tag)) sort_fields = { "created_at": Task.created_at, "updated_at": Task.updated_at, "priority": Task.priority, "title": Task.title, } sort_col = sort_fields.get(sort_by, Task.created_at) query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc()) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, } @router.get("/tasks/{task_id}", response_model=schemas.TaskResponse) def get_task(task_id: int, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") return task @router.patch("/tasks/{task_id}", response_model=schemas.TaskResponse) def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") check_project_role(db, current_user.id, task.project_id, min_role="dev") update_data = task_update.model_dump(exclude_unset=True) if "status" in update_data: new_status = update_data["status"] if new_status == "progressing" and not task.started_on: task.started_on = datetime.utcnow() if new_status == "closed" and not task.finished_on: task.finished_on = datetime.utcnow() for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) return task @router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_task(task_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") check_project_role(db, current_user.id, task.project_id, min_role="mgr") log_activity(db, "task.deleted", "task", task.id, current_user.id, {"title": task.title}) db.delete(task) db.commit() return None # ---- Transition ---- @router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse) def transition_task(task_id: int, new_status: str, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = [s.value for s in TaskStatus] if new_status not in valid_statuses: raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}") task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") old_status = task.status.value if hasattr(task.status, 'value') else task.status if new_status == "progressing" and not task.started_on: task.started_on = datetime.utcnow() if new_status == "closed" and not task.finished_on: task.finished_on = datetime.utcnow() task.status = new_status db.commit() db.refresh(task) event = "task.closed" if new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, {"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status}, task.project_id, db) return task # ---- Assignment ---- @router.post("/tasks/{task_id}/assign") def assign_task(task_id: int, assignee_id: int, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") user = db.query(models.User).filter(models.User.id == assignee_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") task.assignee_id = assignee_id db.commit() db.refresh(task) _notify_user(db, assignee_id, "task.assigned", f"Task #{task.id} assigned to you", f"'{task.title}' has been assigned to you.", "task", task.id) return {"task_id": task.id, "assignee_id": assignee_id, "title": task.title} # ---- Tags ---- @router.post("/tasks/{task_id}/tags") def add_tag(task_id: int, tag: str, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") current = set(task.tags.split(",")) if task.tags else set() current.add(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) db.commit() return {"task_id": task_id, "tags": list(current)} @router.delete("/tasks/{task_id}/tags") def remove_tag(task_id: int, tag: str, db: Session = Depends(get_db)): task = db.query(Task).filter(Task.id == task_id).first() if not task: raise HTTPException(status_code=404, detail="Task not found") current = set(task.tags.split(",")) if task.tags else set() current.discard(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) if current else None db.commit() return {"task_id": task_id, "tags": list(current)} @router.get("/tags") def list_all_tags(project_id: int = None, db: Session = Depends(get_db)): query = db.query(Task.tags).filter(Task.tags != None) if project_id: query = query.filter(Task.project_id == project_id) all_tags = set() for (tags,) in query.all(): for t in tags.split(","): t = t.strip() if t: all_tags.add(t) return {"tags": sorted(all_tags)} # ---- Batch ---- class BatchTransition(BaseModel): task_ids: List[int] new_status: str class BatchAssign(BaseModel): task_ids: List[int] assignee_id: int @router.post("/tasks/batch/transition") def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = [s.value for s in TaskStatus] if data.new_status not in valid_statuses: raise HTTPException(status_code=400, detail="Invalid status") updated = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: old_status = task.status.value if hasattr(task.status, 'value') else task.status task.status = data.new_status updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status}) db.commit() for u in updated: event = "task.closed" if data.new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, u, None, db) return {"updated": len(updated), "tasks": updated} @router.post("/tasks/batch/assign") def batch_assign(data: BatchAssign, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == data.assignee_id).first() if not user: raise HTTPException(status_code=404, detail="Assignee not found") updated = [] for task_id in data.task_ids: task = db.query(Task).filter(Task.id == task_id).first() if task: task.assignee_id = data.assignee_id updated.append(task_id) db.commit() return {"updated": len(updated), "task_ids": updated, "assignee_id": data.assignee_id} # ---- Search ---- @router.get("/search/tasks") def search_tasks(q: str, project_id: int = None, page: int = 1, page_size: int = 50, db: Session = Depends(get_db)): query = db.query(Task).filter( (Task.title.contains(q)) | (Task.description.contains(q)) ) if project_id: query = query.filter(Task.project_id == project_id) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [schemas.TaskResponse.model_validate(i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, }