Files
HarborForge.Backend/app/api/routers/tasks.py

790 lines
32 KiB
Python

"""Tasks router — replaces the old issues router. All CRUD operates on the `tasks` table."""
import math
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.config import get_db
from app.models import models
from app.models.task import Task, TaskStatus, TaskPriority
from app.models.milestone import Milestone
from app.schemas import schemas
from app.services.webhook import fire_webhooks_sync
from app.models.notification import Notification as NotificationModel
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_task
from app.services.activity import log_activity
from app.services.dependency_check import check_task_deps
router = APIRouter(tags=["Tasks"])
def _resolve_task(db: Session, identifier: str) -> Task:
"""Resolve a task by numeric id or task_code string.
Raises 404 if not found."""
try:
task_id = int(identifier)
task = db.query(Task).filter(Task.id == task_id).first()
except (ValueError, TypeError):
task = db.query(Task).filter(Task.task_code == identifier).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
# ---- State-machine: valid transitions (P5.1-P5.6) ----
VALID_TRANSITIONS: dict[str, set[str]] = {
"pending": {"open", "closed"},
"open": {"undergoing", "closed"},
"undergoing": {"completed", "closed"},
"completed": {"open"}, # reopen
"closed": {"open"}, # reopen
}
def _check_transition(old_status: str, new_status: str) -> None:
"""Raise 400 if the transition is not allowed by the state machine."""
allowed = VALID_TRANSITIONS.get(old_status, set())
if new_status not in allowed:
raise HTTPException(
status_code=400,
detail=f"Cannot transition from '{old_status}' to '{new_status}'. "
f"Allowed targets from '{old_status}': {sorted(allowed) if allowed else 'none'}",
)
# ---- Type / Subtype validation ----
TASK_SUBTYPE_MAP = {
'issue': {'infrastructure', 'performance', 'regression', 'security', 'user_experience', 'defect'},
'maintenance': {'deploy', 'release'},
'review': {'code_review', 'decision_review', 'function_review'},
'story': {'feature', 'improvement', 'refactor'},
'test': {'regression', 'security', 'smoke', 'stress'},
'research': set(),
# P7.1: 'task' type removed — defect subtype migrated to issue/defect
'resolution': set(),
}
ALLOWED_TASK_TYPES = set(TASK_SUBTYPE_MAP.keys())
"""P9.6 — type+subtype combos that may NOT be created via general create endpoints.
feature story → must come from propose accept
release maintenance → must come from controlled milestone/release flow
"""
RESTRICTED_TYPE_SUBTYPES = {
("story", "feature"),
("maintenance", "release"),
}
def _validate_task_type_subtype(task_type: str | None, task_subtype: str | None, *, allow_restricted: bool = False):
if task_type is None:
return
if task_type not in ALLOWED_TASK_TYPES:
raise HTTPException(status_code=400, detail=f'Invalid task_type: {task_type}')
allowed = TASK_SUBTYPE_MAP.get(task_type, set())
if task_subtype and task_subtype not in allowed:
raise HTTPException(status_code=400, detail=f'Invalid task_subtype for {task_type}: {task_subtype}')
# P9.6: block restricted combos unless explicitly allowed (e.g. propose accept, internal create)
if not allow_restricted and (task_type, task_subtype) in RESTRICTED_TYPE_SUBTYPES:
raise HTTPException(
status_code=400,
detail=f"Cannot create {task_type}/{task_subtype} task via general create. "
f"Use the appropriate workflow (propose accept / milestone release setup)."
)
def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None):
n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message,
entity_type=entity_type, entity_id=entity_id)
db.add(n)
db.commit()
return n
def _resolve_project_id(db: Session, project_id: int | None, project_code: str | None) -> int | None:
if project_id:
return project_id
if not project_code:
return None
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project.id
def _resolve_milestone(db: Session, milestone_id: int | None, milestone_code: str | None, project_id: int | None) -> Milestone | None:
if milestone_id:
query = db.query(Milestone).filter(Milestone.id == milestone_id)
if project_id:
query = query.filter(Milestone.project_id == project_id)
milestone = query.first()
elif milestone_code:
query = db.query(Milestone).filter(Milestone.milestone_code == milestone_code)
if project_id:
query = query.filter(Milestone.project_id == project_id)
milestone = query.first()
else:
return None
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
return milestone
def _find_task_by_id_or_code(db: Session, identifier: str) -> Task | None:
try:
task_id = int(identifier)
task = db.query(Task).filter(Task.id == task_id).first()
if task:
return task
except ValueError:
pass
return db.query(Task).filter(Task.task_code == identifier).first()
def _serialize_task(db: Session, task: Task) -> dict:
payload = schemas.TaskResponse.model_validate(task).model_dump(mode="json")
project = db.query(models.Project).filter(models.Project.id == task.project_id).first()
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
assignee = None
if task.assignee_id:
assignee = db.query(models.User).filter(models.User.id == task.assignee_id).first()
payload.update({
"code": task.task_code,
"type": task.task_type,
"project_code": project.project_code if project else None,
"milestone_code": milestone.milestone_code if milestone else None,
"taken_by": assignee.username if assignee else None,
"due_date": None,
})
return payload
# ---- CRUD ----
@router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
requested_task_type = task_in.type or task_in.task_type
_validate_task_type_subtype(requested_task_type, task_in.task_subtype)
data = task_in.model_dump(exclude_unset=True)
if data.get("type") and not data.get("task_type"):
data["task_type"] = data.pop("type")
else:
data.pop("type", None)
data["project_id"] = _resolve_project_id(db, data.get("project_id"), data.pop("project_code", None))
milestone = _resolve_milestone(db, data.get("milestone_id"), data.pop("milestone_code", None), data.get("project_id"))
if milestone:
data["milestone_id"] = milestone.id
data["project_id"] = milestone.project_id
data["reporter_id"] = data.get("reporter_id") or current_user.id
data["created_by_id"] = current_user.id
if not data.get("project_id"):
raise HTTPException(status_code=400, detail="project_id or project_code is required")
if not data.get("milestone_id"):
raise HTTPException(status_code=400, detail="milestone_id or milestone_code is required")
check_project_role(db, current_user.id, data["project_id"], min_role="dev")
if not milestone:
milestone = db.query(Milestone).filter(
Milestone.id == data["milestone_id"],
Milestone.project_id == data["project_id"],
).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
est_time = None
if data.get("estimated_working_time"):
try:
est_time = datetime.strptime(data["estimated_working_time"], "%H:%M").time()
except Exception:
pass
data["estimated_working_time"] = est_time
milestone_code = milestone.milestone_code or f"m{milestone.id}"
max_task = db.query(Task).filter(Task.milestone_id == milestone.id).order_by(Task.id.desc()).first()
next_num = (max_task.id + 1) if max_task else 1
data["task_code"] = f"{milestone_code}:T{next_num:05x}"
db_task = Task(**data)
db.add(db_task)
db.commit()
db.refresh(db_task)
event = "resolution.created" if db_task.task_type == "resolution" else "task.created"
bg.add_task(
fire_webhooks_sync,
event,
{"task_id": db_task.id, "title": db_task.title, "type": db_task.task_type, "status": db_task.status.value},
db_task.project_id,
db,
)
log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title})
return _serialize_task(db, db_task)
@router.get("/tasks")
def list_tasks(
project_id: int = None, task_status: str = None, task_type: str = None, task_subtype: str = None,
assignee_id: int = None, tag: str = None,
sort_by: str = "created_at", sort_order: str = "desc",
page: int = 1, page_size: int = 50,
project: str = None, milestone: str = None, status_value: str = Query(None, alias="status"), taken_by: str = None,
order_by: str = None,
db: Session = Depends(get_db)
):
query = db.query(Task)
resolved_project_id = _resolve_project_id(db, project_id, project)
if resolved_project_id:
query = query.filter(Task.project_id == resolved_project_id)
if milestone:
milestone_obj = _resolve_milestone(db, None, milestone, resolved_project_id)
query = query.filter(Task.milestone_id == milestone_obj.id)
effective_status = status_value or task_status
if effective_status:
query = query.filter(Task.status == effective_status)
if task_type:
query = query.filter(Task.task_type == task_type)
if task_subtype:
query = query.filter(Task.task_subtype == task_subtype)
effective_assignee_id = assignee_id
if taken_by == "null":
query = query.filter(Task.assignee_id.is_(None))
elif taken_by:
user = db.query(models.User).filter(models.User.username == taken_by).first()
if not user:
return {"items": [], "total": 0, "total_tasks": 0, "page": 1, "page_size": page_size, "total_pages": 1}
effective_assignee_id = user.id
if effective_assignee_id:
query = query.filter(Task.assignee_id == effective_assignee_id)
if tag:
query = query.filter(Task.tags.contains(tag))
effective_sort_by = order_by or sort_by
sort_fields = {
"created": Task.created_at,
"created_at": Task.created_at,
"updated_at": Task.updated_at,
"priority": Task.priority,
"name": Task.title,
"title": Task.title,
}
sort_col = sort_fields.get(effective_sort_by, Task.created_at)
query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc())
total = query.count()
page = max(1, page)
page_size = min(max(1, page_size), 200)
total_pages = math.ceil(total / page_size) if total else 1
items = query.offset((page - 1) * page_size).limit(page_size).all()
return {
"items": [_serialize_task(db, i) for i in items],
"total": total,
"total_tasks": total,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
}
@router.get("/tasks/search", response_model=List[schemas.TaskResponse])
def search_tasks_alias(
q: str,
project: str = None,
status: str = None,
db: Session = Depends(get_db),
):
query = db.query(Task).filter(
(Task.title.contains(q)) | (Task.description.contains(q))
)
resolved_project_id = _resolve_project_id(db, None, project)
if resolved_project_id:
query = query.filter(Task.project_id == resolved_project_id)
if status:
query = query.filter(Task.status == status)
items = query.order_by(Task.created_at.desc()).limit(100).all()
return [_serialize_task(db, i) for i in items]
@router.get("/tasks/{task_id}", response_model=schemas.TaskResponse)
def get_task(task_id: str, db: Session = Depends(get_db)):
task = _resolve_task(db, task_id)
return _serialize_task(db, task)
@router.patch("/tasks/{task_id}", response_model=schemas.TaskResponse)
def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
task = _resolve_task(db, task_id)
# P5.7: status-based edit restrictions
current_status = task.status.value if hasattr(task.status, 'value') else task.status
update_data = task_update.model_dump(exclude_unset=True)
if update_data.get("type") and not update_data.get("task_type"):
update_data["task_type"] = update_data.pop("type")
else:
update_data.pop("type", None)
if "taken_by" in update_data:
taken_by = update_data.pop("taken_by")
if taken_by in (None, "null", ""):
update_data["assignee_id"] = None
else:
assignee = db.query(models.User).filter(models.User.username == taken_by).first()
if not assignee:
raise HTTPException(status_code=404, detail="Assignee user not found")
update_data["assignee_id"] = assignee.id
# Fields that are always allowed regardless of status (non-body edits)
_always_allowed = {"status"}
body_fields = {k for k in update_data.keys() if k not in _always_allowed}
if body_fields:
# P3.6 supplement: feature story tasks locked after milestone freeze
task_type = task.task_type.value if hasattr(task.task_type, 'value') else (task.task_type or "")
task_subtype = task.task_subtype or ""
if task_type == "story" and task_subtype == "feature" and task.milestone_id:
from app.models.milestone import Milestone
ms = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if ms:
ms_status = ms.status.value if hasattr(ms.status, 'value') else ms.status
if ms_status in ("freeze", "undergoing", "completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Feature story task cannot be edited: milestone is '{ms_status}'. "
f"Blocked fields: {sorted(body_fields)}",
)
# undergoing/completed/closed: body edits forbidden
if current_status in ("undergoing", "completed", "closed"):
raise HTTPException(
status_code=400,
detail=f"Cannot edit task body fields in '{current_status}' status. "
f"Blocked fields: {sorted(body_fields)}",
)
# open + assignee set: only assignee or admin can edit body
if current_status == "open" and task.assignee_id is not None:
from app.api.rbac import is_global_admin, has_project_admin_role
is_admin = (
is_global_admin(db, current_user.id)
or has_project_admin_role(db, current_user.id, task.project_id)
)
if current_user.id != task.assignee_id and not is_admin:
raise HTTPException(
status_code=403,
detail="Only the current assignee or an admin can edit this task",
)
# Legacy general permission check (covers project membership etc.)
ensure_can_edit_task(db, current_user.id, task)
if "status" in update_data:
new_status = update_data["status"]
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: enforce state-machine even through PATCH
_check_transition(old_status, new_status)
if new_status == "open" and old_status in ("completed", "closed"):
task.finished_on = None
if new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
# P3.5: auto-complete milestone when release task reaches completed via update
if "status" in update_data and update_data["status"] == "completed":
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, task, user_id=current_user.id)
return _serialize_task(db, task)
@router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
task = _resolve_task(db, task_id)
check_project_role(db, current_user.id, task.project_id, min_role="mgr")
log_activity(db, "task.deleted", "task", task.id, current_user.id, {"title": task.title})
db.delete(task)
db.commit()
return None
# ---- Transition ----
class TransitionBody(BaseModel):
status: Optional[str] = None
comment: Optional[str] = None
@router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse)
def transition_task(
task_id: str,
bg: BackgroundTasks,
new_status: str | None = None,
body: TransitionBody = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
new_status = new_status or (body.status if body else None)
valid_statuses = [s.value for s in TaskStatus]
if new_status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}")
task = _resolve_task(db, task_id)
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: enforce state-machine
_check_transition(old_status, new_status)
# P5.2: pending -> open requires milestone to be undergoing + task deps satisfied
if old_status == "pending" and new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status != "undergoing":
raise HTTPException(
status_code=400,
detail=f"Cannot open task: milestone is '{ms_status}', must be 'undergoing'",
)
# P4.3: check task-level depend_on
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
raise HTTPException(
status_code=400,
detail=f"Cannot open task: {dep_result.reason}",
)
# P5.3: open -> undergoing requires assignee AND operator must be the assignee
if old_status == "open" and new_status == "undergoing":
if not task.assignee_id:
raise HTTPException(status_code=400, detail="Cannot start task: assignee must be set first")
if current_user.id != task.assignee_id:
raise HTTPException(status_code=403, detail="Only the assigned user can start this task")
# P5.4: undergoing -> completed requires a completion comment
if old_status == "undergoing" and new_status == "completed":
comment_text = body.comment if body else None
if not comment_text or not comment_text.strip():
raise HTTPException(status_code=400, detail="A completion comment is required when finishing a task")
# P5.4: also only the assignee can complete
if task.assignee_id and current_user.id != task.assignee_id:
raise HTTPException(status_code=403, detail="Only the assigned user can complete this task")
# P5.5: closing a task requires 'task.close' permission
if new_status == "closed":
check_permission(db, current_user.id, task.project_id, "task.close")
# P5.6: reopen from completed/closed -> open
if new_status == "open" and old_status in ("completed", "closed"):
perm_name = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
check_permission(db, current_user.id, task.project_id, perm_name)
# Clear finished_on on reopen so lifecycle timestamps are accurate
task.finished_on = None
if new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
task.status = new_status
db.commit()
db.refresh(task)
# P5.4: auto-create completion comment
if old_status == "undergoing" and new_status == "completed" and body and body.comment:
db_comment = models.Comment(
content=body.comment.strip(),
task_id=task.id,
author_id=current_user.id,
)
db.add(db_comment)
db.commit()
# Log the transition activity
log_activity(db, f"task.transition.{new_status}", "task", task.id, current_user.id,
{"old_status": old_status, "new_status": new_status})
# P3.5: auto-complete milestone when its sole release task is completed
if new_status == "completed":
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, task, user_id=current_user.id)
event = "task.closed" if new_status == "closed" else "task.updated"
bg.add_task(fire_webhooks_sync, event,
{"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status},
task.project_id, db)
return _serialize_task(db, task)
@router.post("/tasks/{task_id}/take", response_model=schemas.TaskResponse)
def take_task(
task_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
task = _find_task_by_id_or_code(db, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
check_project_role(db, current_user.id, task.project_id, min_role="dev")
if task.assignee_id and task.assignee_id != current_user.id:
assignee = db.query(models.User).filter(models.User.id == task.assignee_id).first()
assignee_name = assignee.username if assignee else str(task.assignee_id)
raise HTTPException(status_code=409, detail=f"Task is already taken by {assignee_name}")
task.assignee_id = current_user.id
db.commit()
db.refresh(task)
_notify_user(
db,
current_user.id,
"task.assigned",
f"Task {task.task_code or task.id} assigned to you",
f"'{task.title}' has been assigned to you.",
"task",
task.id,
)
return _serialize_task(db, task)
# ---- Assignment ----
@router.post("/tasks/{task_id}/assign")
def assign_task(task_id: str, assignee_id: int, db: Session = Depends(get_db)):
task = _resolve_task(db, task_id)
user = db.query(models.User).filter(models.User.id == assignee_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
task.assignee_id = assignee_id
db.commit()
db.refresh(task)
_notify_user(db, assignee_id, "task.assigned",
f"Task #{task.id} assigned to you",
f"'{task.title}' has been assigned to you.", "task", task.id)
return {"task_id": task.id, "assignee_id": assignee_id, "title": task.title}
# ---- Tags ----
@router.post("/tasks/{task_id}/tags")
def add_tag(task_id: str, tag: str, db: Session = Depends(get_db)):
task = _resolve_task(db, task_id)
current = set(task.tags.split(",")) if task.tags else set()
current.add(tag.strip())
current.discard("")
task.tags = ",".join(sorted(current))
db.commit()
return {"task_id": task_id, "tags": list(current)}
@router.delete("/tasks/{task_id}/tags")
def remove_tag(task_id: str, tag: str, db: Session = Depends(get_db)):
task = _resolve_task(db, task_id)
current = set(task.tags.split(",")) if task.tags else set()
current.discard(tag.strip())
current.discard("")
task.tags = ",".join(sorted(current)) if current else None
db.commit()
return {"task_id": task_id, "tags": list(current)}
@router.get("/tags")
def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
query = db.query(Task.tags).filter(Task.tags != None)
if project_id:
query = query.filter(Task.project_id == project_id)
all_tags = set()
for (tags,) in query.all():
for t in tags.split(","):
t = t.strip()
if t:
all_tags.add(t)
return {"tags": sorted(all_tags)}
# ---- Batch ----
class BatchAssign(BaseModel):
task_ids: List[int]
assignee_id: int
class BatchTransitionBody(BaseModel):
task_ids: List[int]
new_status: str
comment: Optional[str] = None
@router.post("/tasks/batch/transition")
def batch_transition(
data: BatchTransitionBody,
bg: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
valid_statuses = [s.value for s in TaskStatus]
if data.new_status not in valid_statuses:
raise HTTPException(status_code=400, detail="Invalid status")
updated = []
skipped = []
for task_id in data.task_ids:
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
skipped.append({"id": task_id, "title": None, "old": None,
"reason": "Task not found"})
continue
old_status = task.status.value if hasattr(task.status, 'value') else task.status
# P5.1: state-machine check
allowed = VALID_TRANSITIONS.get(old_status, set())
if data.new_status not in allowed:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Cannot transition from '{old_status}' to '{data.new_status}'"})
continue
# P5.2: pending → open requires milestone undergoing + task deps
if old_status == "pending" and data.new_status == "open":
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if milestone:
ms_status = milestone.status.value if hasattr(milestone.status, 'value') else milestone.status
if ms_status != "undergoing":
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Milestone is '{ms_status}', must be 'undergoing'"})
continue
dep_result = check_task_deps(db, task.depend_on)
if not dep_result.ok:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": dep_result.reason})
continue
# P5.3: open → undergoing requires assignee == current_user
if old_status == "open" and data.new_status == "undergoing":
if not task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Assignee must be set before starting"})
continue
if current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can start this task"})
continue
# P5.4: undergoing → completed requires comment + assignee check
if old_status == "undergoing" and data.new_status == "completed":
comment_text = data.comment
if not comment_text or not comment_text.strip():
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "A completion comment is required"})
continue
if task.assignee_id and current_user.id != task.assignee_id:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Only the assigned user can complete this task"})
continue
# P5.5: close requires permission
if data.new_status == "closed":
try:
check_permission(db, current_user.id, task.project_id, "task.close")
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": "Missing 'task.close' permission"})
continue
# P5.6: reopen requires permission
if data.new_status == "open" and old_status in ("completed", "closed"):
perm = "task.reopen_completed" if old_status == "completed" else "task.reopen_closed"
try:
check_permission(db, current_user.id, task.project_id, perm)
except HTTPException:
skipped.append({"id": task.id, "title": task.title, "old": old_status,
"reason": f"Missing '{perm}' permission"})
continue
task.finished_on = None
if data.new_status == "undergoing" and not task.started_on:
task.started_on = datetime.utcnow()
if data.new_status in ("closed", "completed") and not task.finished_on:
task.finished_on = datetime.utcnow()
task.status = data.new_status
updated.append({"id": task.id, "title": task.title, "old": old_status, "new": data.new_status})
# Activity log per task
log_activity(db, f"task.transition.{data.new_status}", "task", task.id, current_user.id,
{"old_status": old_status, "new_status": data.new_status})
# P5.4: auto-create completion comment
if old_status == "undergoing" and data.new_status == "completed" and data.comment:
db_comment = models.Comment(
content=data.comment.strip(),
task_id=task.id,
author_id=current_user.id,
)
db.add(db_comment)
db.commit()
# P3.5: auto-complete milestone for any completed task
for u in updated:
if u["new"] == "completed":
t = db.query(Task).filter(Task.id == u["id"]).first()
if t:
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, t, user_id=current_user.id)
for u in updated:
event = "task.closed" if data.new_status == "closed" else "task.updated"
bg.add_task(fire_webhooks_sync, event, u, None, db)
result = {"updated": len(updated), "tasks": updated}
if skipped:
result["skipped"] = skipped
return result
@router.post("/tasks/batch/assign")
def batch_assign(data: BatchAssign, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == data.assignee_id).first()
if not user:
raise HTTPException(status_code=404, detail="Assignee not found")
updated = []
for task_id in data.task_ids:
task = db.query(Task).filter(Task.id == task_id).first()
if task:
task.assignee_id = data.assignee_id
updated.append(task_id)
db.commit()
return {"updated": len(updated), "task_ids": updated, "assignee_id": data.assignee_id}
# ---- Search ----
@router.get("/search/tasks")
def search_tasks(q: str, project_id: int = None, page: int = 1, page_size: int = 50,
db: Session = Depends(get_db)):
query = db.query(Task).filter(
(Task.title.contains(q)) | (Task.description.contains(q))
)
if project_id:
query = query.filter(Task.project_id == project_id)
total = query.count()
page = max(1, page)
page_size = min(max(1, page_size), 200)
total_pages = math.ceil(total / page_size) if total else 1
items = query.offset((page - 1) * page_size).limit(page_size).all()
return {
"items": [_serialize_task(db, i) for i in items],
"total": total,
"total_tasks": total,
"page": page,
"page_size": page_size,
"total_pages": total_pages,
}