"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table.""" import math from typing import List, Optional from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks, Query from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db from app.models import models from app.models.task import Task, TaskStatus, TaskPriority from app.models.milestone import Milestone from app.models.proposal import Proposal from app.models.essential import Essential from app.schemas import schemas from app.services.webhook import fire_webhooks_sync from app.models.notification import Notification as NotificationModel from app.api.deps import get_current_user_or_apikey from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task from app.services.activity import log_activity from app.services.dependency_check import check_task_deps router = APIRouter(tags=["Tasks"]) def _resolve_task(db: Session, task_code: str) -> Task: """Resolve a task by task_code string. Raises 404 if not found.""" task = db.query(Task).filter(Task.task_code == task_code).first() if not task: raise HTTPException(status_code=404, detail="Task not found") return task # ---- State-machine: valid transitions (P5.1-P5.6) ---- VALID_TRANSITIONS: dict[str, set[str]] = { "pending": {"open", "closed"}, "open": {"undergoing", "closed"}, "undergoing": {"completed", "closed"}, "completed": {"open"}, # reopen "closed": {"open"}, # reopen } def _check_transition(old_status: str, new_status: str) -> None: """Raise 400 if the transition is not allowed by the state machine.""" allowed = VALID_TRANSITIONS.get(old_status, set()) if new_status not in allowed: raise HTTPException( status_code=400, detail=f"Cannot transition from '{old_status}' to '{new_status}'. " f"Allowed targets from '{old_status}': {sorted(allowed) if allowed else 'none'}", ) # ---- Type / Subtype validation ---- TASK_SUBTYPE_MAP = { 'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'}, 'maintenance': {'deploy', 'release'}, 'review': {'code_review', 'decision_review', 'function_review'}, 'story': {'feature', 'improvement', 'refactor'}, 'test': {'regression', 'security', 'smoke', 'stress'}, 'research': set(), # P7.1: 'task' type removed — defect subtype migrated to issue/defect 'resolution': set(), } ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys()) """P9.6 / BE-PR-009 — type+subtype combos that may NOT be created via general endpoints. All story/* subtypes are restricted; they must come from Proposal Accept. maintenance/release must come from the milestone release flow. """ RESTRICTED_TYPE_SUBTYPES = { ("story", "feature"), ("story", "improvement"), ("story", "refactor"), ("story", None), # story with no subtype is also blocked ("maintenance", "release"), } # Convenience set: task types whose *entire* type is restricted regardless of subtype. # Used for a fast-path check so we don't need to enumerate every subtype. FULLY_RESTRICTED_TYPES = {"story"} def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None, *, allow_restricted: bool = False): if task_type is None: return if task_type not in ALLOWED_TASK_TYPES: raise HTTPException(status_code=400, detail=f'Invalid task_type: {task_type}') allowed = TASK_SUBTYPE_MAP.get(task_type, set()) if task_subtype and task_subtype not in allowed: raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}') # P9.6 / BE-PR-009: block restricted combos unless explicitly allowed # (e.g. Proposal Accept, internal create) if not allow_restricted: # Fast-path: entire type is restricted (all story/* combos) if task_type in FULLY_RESTRICTED_TYPES: raise HTTPException( status_code=400, detail=f"Cannot create '{task_type}' tasks via general endpoints. " f"Use the Proposal Accept workflow instead.", ) # Specific type+subtype combos (e.g. maintenance/release) if (task_type, task_subtype) in RESTRICTED_TYPE_SUBTYPES: raise HTTPException( status_code=400, detail=f"Cannot create {task_type}/{task_subtype} task via general create. " f"Use the appropriate workflow (Proposal Accept / milestone release setup).", ) def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None): n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message, entity_type=entity_type, entity_id=entity_id) db.add(n) db.commit() return n def _resolve_project_id(db: Session, project_code: str | None) -> int | None: if not project_code: return None project = db.query(models.Project).filter(models.Project.project_code == project_code).first() if not project: raise HTTPException(status_code=404, detail="Project not found") return project.id def _resolve_milestone(db: Session, milestone_code: str | None, project_id: int | None) -> Milestone | None: if not milestone_code: return None query = db.query(Milestone).filter(Milestone.milestone_code == milestone_code) if project_id: query = query.filter(Milestone.project_id == project_id) milestone = query.first() if not milestone: raise HTTPException(status_code=404, detail="Milestone not found") return milestone def _find_task_by_code(db: Session, task_code: str) -> Task | None: return db.query(Task).filter(Task.task_code == task_code).first() def _serialize_task(db: Session, task: Task) -> dict: payload = schemas.TaskResponse.model_validate(task).model_dump(mode="json") project = db.query(models.Project).filter(models.Project.id == task.project_id).first() milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() proposal_code = None essential_code = None if task.source_proposal_id: proposal = db.query(Proposal).filter(Proposal.id == task.source_proposal_id).first() proposal_code = proposal.propose_code if proposal else None if task.source_essential_id: essential = db.query(Essential).filter(Essential.id == task.source_essential_id).first() essential_code = essential.essential_code if essential else None assignee = None if task.assignee_id: assignee = db.query(models.User).filter(models.User.id == task.assignee_id).first() payload.update({ "code": task.task_code, "type": task.task_type, "project_code": project.project_code if project else None, "milestone_code": milestone.milestone_code if milestone else None, "taken_by": assignee.username if assignee else None, "due_date": None, "source_proposal_code": proposal_code, "source_essential_code": essential_code, }) return payload # ---- CRUD ---- @router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED) def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): requested_task_type = task_in.type or task_in.task_type _validate_task_type_subtype(requested_task_type, task_in.task_subtype) data = task_in.model_dump(exclude_unset=True) if data.get("type") and not data.get("task_type"): data["task_type"] = data.pop("type") else: data.pop("type", None) data["project_id"] = _resolve_project_id(db, data.pop("project_code", None)) milestone = _resolve_milestone(db, data.pop("milestone_code", None), data.get("project_id")) if milestone: data["milestone_id"] = milestone.id data["project_id"] = milestone.project_id data["reporter_id"] = data.get("reporter_id") or current_user.id data["created_by_id"] = current_user.id if not data.get("project_id"): raise HTTPException(status_code=400, detail="project_code is required") if not data.get("milestone_id"): raise HTTPException(status_code=400, detail="milestone_code is required") check_project_role(db, current_user.id, data["project_id"], min_role="dev") if not milestone: raise HTTPException(status_code=404, detail="Milestone not found") est_time = None if data.get("estimated_working_time"): try: est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time() except Exception: pass data["estimated_working_time"] = est_time milestone_code = milestone.milestone_code or f"m{milestone.id}" max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first() next_num = (max_task.id + 1) if max_task else 1 data["task_code"] = f"{milestone_code}:T{next_num:05x}" db_task = Task(**data) db.add(db_task) db.commit() db.refresh(db_task) event = "resolution.created" if db_task.task_type == "resolution" else "task.created" bg.add_task( fire_webhooks_sync, event, {"task_code": db_task.task_code, "title": db_task.title, "type": db_task.task_type, "status": db_task.status.value}, db_task.project_id, db, ) log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title}) return _serialize_task(db, db_task) @router.get("/tasks") def list_tasks( task_status: str = None, task_type: str = None, task_subtype: str = None, assignee_id: int = None, tag: str = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 50, project_code: str = None, milestone_code: str = None, status_value: str = Query(None, alias="status"), taken_by: str = None, order_by: str = None, db: Session = Depends(get_db) ): query = db.query(Task) resolved_project_id = _resolve_project_id(db, project_code) if resolved_project_id: query = query.filter(Task.project_id == resolved_project_id) if milestone_code: milestone_obj = _resolve_milestone(db, milestone_code, resolved_project_id) query = query.filter(Task.milestone_id == milestone_obj.id) effective_status = status_value or task_status if effective_status: query = query.filter(Task.status == effective_status) if task_type: query = query.filter(Task.task_type == task_type) if task_subtype: query = query.filter(Task.task_subtype == task_subtype) effective_assignee_id = assignee_id if taken_by == "null": query = query.filter(Task.assignee_id.is_(None)) elif taken_by: user = db.query(models.User).filter(models.User.username == taken_by).first() if not user: return {"items": [], "total": 0, "total_tasks": 0, "page": 1, "page_size": page_size, "total_pages": 1} effective_assignee_id = user.id if effective_assignee_id: query = query.filter(Task.assignee_id == effective_assignee_id) if tag: query = query.filter(Task.tags.contains(tag)) effective_sort_by = order_by or sort_by sort_fields = { "created": Task.created_at, "created_at": Task.created_at, "updated_at": Task.updated_at, "priority": Task.priority, "name": Task.title, "title": Task.title, } sort_col = sort_fields.get(effective_sort_by, Task.created_at) query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc()) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [_serialize_task(db, i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, } @router.get("/tasks/search", response_model=List[schemas.TaskResponse]) def search_tasks_alias( q: str, project_code: str = None, status: str = None, db: Session = Depends(get_db), ): query = db.query(Task).filter( (Task.title.contains(q)) | (Task.description.contains(q)) ) resolved_project_id = _resolve_project_id(db, project_code) if resolved_project_id: query = query.filter(Task.project_id == resolved_project_id) if status: query = query.filter(Task.status == status) items = query.order_by(Task.created_at.desc()).limit(100).all() return [_serialize_task(db, i) for i in items] @router.get("/tasks/{task_code}", response_model=schemas.TaskResponse) def get_task(task_code: str, db: Session = Depends(get_db)): task = _resolve_task(db, task_code) return _serialize_task(db, task) @router.patch("/tasks/{task_code}", response_model=schemas.TaskResponse) def update_task(task_code: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = _resolve_task(db, task_code) # P5.7: status-based edit restrictions current_status = task.status.value if hasattr(task.status, 'value') else task.status update_data = task_update.model_dump(exclude_unset=True) if update_data.get("type") and not update_data.get("task_type"): update_data["task_type"] = update_data.pop("type") else: update_data.pop("type", None) if "taken_by" in update_data: taken_by = update_data.pop("taken_by") if taken_by in (None, "null", ""): update_data["assignee_id"] = None else: assignee = db.query(models.User).filter(models.User.username == taken_by).first() if not assignee: raise HTTPException(status_code=404, detail="Assignee user not found") update_data["assignee_id"] = assignee.id # Fields that are always allowed regardless of status (non-body edits) _always_allowed = {"status"} body_fields = {k for k in update_data.keys() if k not in _always_allowed} if body_fields: # P3.6 supplement: feature story tasks locked after milestone freeze task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "") task_subtype = task.task_subtype or "" if task_type == "story" and task_subtype == "feature" and task.milestone_id: from app.models.milestone import Milestone ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if ms: ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status if ms_status in ("freeze", "undergoing", "completed", "closed"): raise HTTPException( status_code=400, detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. " f"Blocked fields: {sorted(body_fields)}", ) # undergoing/completed/closed: body edits forbidden if current_status in ("undergoing", "completed", "closed"): raise HTTPException( status_code=400, detail=f"Cannot edit task body fields in '{current_status}' status. " f"Blocked fields: {sorted(body_fields)}", ) # open + assignee set: only assignee or admin can edit body if current_status == "open" and task.assignee_id is not None: from app.api.rbac import is_global_admin, has_project_admin_role is_admin = ( is_global_admin(db, current_user.id) or has_project_admin_role(db, current_user.id, task.project_id) ) if current_user.id != task.assignee_id and not is_admin: raise HTTPException( status_code=403, detail="Only the current assignee or an admin can edit this task", ) # BE-PR-009: prevent changing task_type to a restricted type via PATCH new_task_type = update_data.get("task_type") new_task_subtype = update_data.get("task_subtype", task.task_subtype) if new_task_type is not None: _validate_task_type_subtype(new_task_type, new_task_subtype) elif "task_subtype" in update_data: # subtype changed but type unchanged — validate the combo current_type = task.task_type.value if hasattr(task.task_type, "value") else (task.task_type or "issue") _validate_task_type_subtype(current_type, new_task_subtype) # Legacy general permission check (covers project membership etc.) ensure_can_edit_task(db, current_user.id, task) if "status" in update_data: new_status = update_data["status"] old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine even through PATCH _check_transition(old_status, new_status) if new_status == "open" and old_status in ("completed", "closed"): task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) # P3.5: auto-complete milestone when release task reaches completed via update if "status" in update_data and update_data["status"] == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) return _serialize_task(db, task) @router.delete("/tasks/{task_code}", status_code=status.HTTP_204_NO_CONTENT) def delete_task(task_code: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): task = _resolve_task(db, task_code) check_project_role(db, current_user.id, task.project_id, min_role="mgr") log_activity(db, "task.deleted", "task", task.id, current_user.id, {"title": task.title}) db.delete(task) db.commit() return None # ---- Transition ---- class TransitionBody(BaseModel): status: Optional[str] = None comment: Optional[str] = None @router.post("/tasks/{task_code}/transition", response_model=schemas.TaskResponse) def transition_task( task_code: str, bg: BackgroundTasks, new_status: str | None = None, body: TransitionBody = None, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey), ): new_status = new_status or (body.status if body else None) valid_statuses = [s.value for s in TaskStatus] if new_status not in valid_statuses: raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}") task = _resolve_task(db, task_code) old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: enforce state-machine _check_transition(old_status, new_status) # P5.2: pending -> open requires milestone to be undergoing + task deps satisfied if old_status == "pending" and new_status == "open": milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if milestone: ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status if ms_status != "undergoing": raise HTTPException( status_code=400, detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'", ) # P4.3: check task-level depend_on dep_result = check_task_deps(db, task.depend_on) if not dep_result.ok: raise HTTPException( status_code=400, detail=f"Cannot open task: {dep_result.reason}", ) # P5.3: open -> undergoing requires assignee AND operator must be the assignee if old_status == "open" and new_status == "undergoing": if not task.assignee_id: raise HTTPException(status_code=400, detail="Cannot start task: assignee must be set first") if current_user.id != task.assignee_id: raise HTTPException(status_code=403, detail="Only the assigned user can start this task") # P5.4: undergoing -> completed requires a completion comment if old_status == "undergoing" and new_status == "completed": comment_text = body.comment if body else None if not comment_text or not comment_text.strip(): raise HTTPException(status_code=400, detail="A completion comment is required when finishing a task") # P5.4: also only the assignee can complete if task.assignee_id and current_user.id != task.assignee_id: raise HTTPException(status_code=403, detail="Only the assigned user can complete this task") # P5.5: closing a task requires 'task.close' permission if new_status == "closed": check_permission(db, current_user.id, task.project_id, "task.close") # P5.6: reopen from completed/closed -> open if new_status == "open" and old_status in ("completed", "closed"): perm_name = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed" check_permission(db, current_user.id, task.project_id, perm_name) # Clear finished_on on reopen so lifecycle timestamps are accurate task.finished_on = None if new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() task.status = new_status db.commit() db.refresh(task) # P5.4: auto-create completion comment if old_status == "undergoing" and new_status == "completed" and body and body.comment: db_comment = models.Comment( content=body.comment.strip(), task_id=task.id, author_id=current_user.id, ) db.add(db_comment) db.commit() # Log the transition activity log_activity(db, f"task.transition.{new_status}", "task", task.id, current_user.id, {"old_status": old_status, "new_status": new_status}) # P3.5: auto-complete milestone when its sole release task is completed if new_status == "completed": from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, task, user_id=current_user.id) event = "task.closed" if new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, {"task_code": task.task_code, "title": task.title, "old_status": old_status, "new_status": new_status}, task.project_id, db) return _serialize_task(db, task) @router.post("/tasks/{task_code}/take", response_model=schemas.TaskResponse) def take_task( task_code: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey), ): task = _find_task_by_code(db, task_code) if not task: raise HTTPException(status_code=404, detail="Task not found") check_project_role(db, current_user.id, task.project_id, min_role="dev") if task.assignee_id and task.assignee_id != current_user.id: assignee = db.query(models.User).filter(models.User.id == task.assignee_id).first() assignee_name = assignee.username if assignee else str(task.assignee_id) raise HTTPException(status_code=409, detail=f"Task is already taken by {assignee_name}") task.assignee_id = current_user.id db.commit() db.refresh(task) _notify_user( db, current_user.id, "task.assigned", f"Task {task.task_code} assigned to you", f"'{task.title}' has been assigned to you.", "task", task.id, ) return _serialize_task(db, task) # ---- Assignment ---- @router.post("/tasks/{task_code}/assign") def assign_task(task_code: str, assignee_id: int, db: Session = Depends(get_db)): task = _resolve_task(db, task_code) user = db.query(models.User).filter(models.User.id == assignee_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") task.assignee_id = assignee_id db.commit() db.refresh(task) _notify_user(db, assignee_id, "task.assigned", f"Task {task.task_code} assigned to you", f"'{task.title}' has been assigned to you.", "task", task.id) return {"task_code": task.task_code, "assignee_id": assignee_id, "title": task.title} # ---- Tags ---- @router.post("/tasks/{task_code}/tags") def add_tag(task_code: str, tag: str, db: Session = Depends(get_db)): task = _resolve_task(db, task_code) current = set(task.tags.split(",")) if task.tags else set() current.add(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) db.commit() return {"task_code": task.task_code, "tags": list(current)} @router.delete("/tasks/{task_code}/tags") def remove_tag(task_code: str, tag: str, db: Session = Depends(get_db)): task = _resolve_task(db, task_code) current = set(task.tags.split(",")) if task.tags else set() current.discard(tag.strip()) current.discard("") task.tags = ",".join(sorted(current)) if current else None db.commit() return {"task_code": task.task_code, "tags": list(current)} @router.get("/tags") def list_all_tags(project_id: int = None, db: Session = Depends(get_db)): query = db.query(Task.tags).filter(Task.tags != None) if project_id: query = query.filter(Task.project_id == project_id) all_tags = set() for (tags,) in query.all(): for t in tags.split(","): t = t.strip() if t: all_tags.add(t) return {"tags": sorted(all_tags)} # ---- Batch ---- class BatchAssign(BaseModel): task_codes: List[str] assignee_id: int class BatchTransitionBody(BaseModel): task_codes: List[str] new_status: str comment: Optional[str] = None @router.post("/tasks/batch/transition") def batch_transition( data: BatchTransitionBody, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey), ): valid_statuses = [s.value for s in TaskStatus] if data.new_status not in valid_statuses: raise HTTPException(status_code=400, detail="Invalid status") updated = [] skipped = [] for task_code in data.task_codes: task = db.query(Task).filter(Task.task_code == task_code).first() if not task: skipped.append({"task_code": task_code, "title": None, "old": None, "reason": "Task not found"}) continue old_status = task.status.value if hasattr(task.status, 'value') else task.status # P5.1: state-machine check allowed = VALID_TRANSITIONS.get(old_status, set()) if data.new_status not in allowed: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"}) continue # P5.2: pending → open requires milestone undergoing + task deps if old_status == "pending" and data.new_status == "open": milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if milestone: ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status if ms_status != "undergoing": skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": f"Milestone is '{ms_status}', must be 'undergoing'"}) continue dep_result = check_task_deps(db, task.depend_on) if not dep_result.ok: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": dep_result.reason}) continue # P5.3: open → undergoing requires assignee == current_user if old_status == "open" and data.new_status == "undergoing": if not task.assignee_id: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": "Assignee must be set before starting"}) continue if current_user.id != task.assignee_id: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": "Only the assigned user can start this task"}) continue # P5.4: undergoing → completed requires comment + assignee check if old_status == "undergoing" and data.new_status == "completed": comment_text = data.comment if not comment_text or not comment_text.strip(): skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": "A completion comment is required"}) continue if task.assignee_id and current_user.id != task.assignee_id: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": "Only the assigned user can complete this task"}) continue # P5.5: close requires permission if data.new_status == "closed": try: check_permission(db, current_user.id, task.project_id, "task.close") except HTTPException: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": "Missing 'task.close' permission"}) continue # P5.6: reopen requires permission if data.new_status == "open" and old_status in ("completed", "closed"): perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed" try: check_permission(db, current_user.id, task.project_id, perm) except HTTPException: skipped.append({"task_code": task.task_code, "title": task.title, "old": old_status, "reason": f"Missing '{perm}' permission"}) continue task.finished_on = None if data.new_status == "undergoing" and not task.started_on: task.started_on = datetime.utcnow() if data.new_status in ("closed", "completed") and not task.finished_on: task.finished_on = datetime.utcnow() task.status = data.new_status updated.append({"task_code": task.task_code, "title": task.title, "old": old_status, "new": data.new_status}) # Activity log per task log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id, {"old_status": old_status, "new_status": data.new_status}) # P5.4: auto-create completion comment if old_status == "undergoing" and data.new_status == "completed" and data.comment: db_comment = models.Comment( content=data.comment.strip(), task_id=task.id, author_id=current_user.id, ) db.add(db_comment) db.commit() # P3.5: auto-complete milestone for any completed task for u in updated: if u["new"] == "completed": t = db.query(Task).filter(Task.task_code == u["task_code"]).first() if t: from app.api.routers.milestone_actions import try_auto_complete_milestone try_auto_complete_milestone(db, t, user_id=current_user.id) for u in updated: event = "task.closed" if data.new_status == "closed" else "task.updated" bg.add_task(fire_webhooks_sync, event, u, None, db) result = {"updated": len(updated), "tasks": updated} if skipped: result["skipped"] = skipped return result @router.post("/tasks/batch/assign") def batch_assign(data: BatchAssign, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == data.assignee_id).first() if not user: raise HTTPException(status_code=404, detail="Assignee not found") updated = [] for task_code in data.task_codes: task = db.query(Task).filter(Task.task_code == task_code).first() if task: task.assignee_id = data.assignee_id updated.append(task.task_code) db.commit() return {"updated": len(updated), "task_codes": updated, "assignee_id": data.assignee_id} # ---- Search ---- @router.get("/search/tasks") def search_tasks(q: str, project_code: str = None, page: int = 1, page_size: int = 50, db: Session = Depends(get_db)): query = db.query(Task).filter( (Task.title.contains(q)) | (Task.description.contains(q)) ) if project_code: project_id = _resolve_project_id(db, project_code) if project_id: query = query.filter(Task.project_id == project_id) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return { "items": [_serialize_task(db, i) for i in items], "total": total, "total_tasks": total, "page": page, "page_size": page_size, "total_pages": total_pages, }