feat: add code-first API support for projects, milestones, proposes, tasks

- Projects: get/update/delete/members endpoints now accept project_code
- Milestones: all project-scoped and top-level endpoints accept milestone_code
- Proposes: all endpoints accept project_code and propose_code
- Tasks: code-first support for all CRUD + transition + take + search
- Schemas: add code/type/due_date/project_code/milestone_code/taken_by fields
- All endpoints use id-or-code lookup helpers for backward compatibility
- Milestone serializer now includes milestone_code and code fields
- Task serializer enriches responses with project_code, milestone_code, taken_by

Addresses TODO §2.1: code-first API support across CLI-targeted resources
This commit is contained in:
zhi
2026-03-21 18:12:04 +00:00
parent 32e79a41d8
commit 43af5b29f6
6 changed files with 442 additions and 124 deletions

View File

@@ -2,7 +2,7 @@
import math
from typing import List, Optional
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
@@ -88,27 +88,100 @@ def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, enti
return n
def _resolve_project_id(db: Session, project_id: int | None, project_code: str | None) -> int | None:
if project_id:
return project_id
if not project_code:
return None
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project.id
def _resolve_milestone(db: Session, milestone_id: int | None, milestone_code: str | None, project_id: int | None) -> Milestone | None:
if milestone_id:
query = db.query(Milestone).filter(Milestone.id == milestone_id)
if project_id:
query = query.filter(Milestone.project_id == project_id)
milestone = query.first()
elif milestone_code:
query = db.query(Milestone).filter(Milestone.milestone_code == milestone_code)
if project_id:
query = query.filter(Milestone.project_id == project_id)
milestone = query.first()
else:
return None
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
return milestone
def _find_task_by_id_or_code(db: Session, identifier: str) -> Task | None:
try:
task_id = int(identifier)
task = db.query(Task).filter(Task.id == task_id).first()
if task:
return task
except ValueError:
pass
return db.query(Task).filter(Task.task_code == identifier).first()
def _serialize_task(db: Session, task: Task) -> dict:
payload = schemas.TaskResponse.model_validate(task).model_dump(mode="json")
project = db.query(models.Project).filter(models.Project.id == task.project_id).first()
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
assignee = None
if task.assignee_id:
assignee = db.query(models.User).filter(models.User.id == task.assignee_id).first()
payload.update({
"code": task.task_code,
"type": task.task_type,
"project_code": project.project_code if project else None,
"milestone_code": milestone.milestone_code if milestone else None,
"taken_by": assignee.username if assignee else None,
"due_date": None,
})
return payload
# ---- CRUD ----
@router.post("/tasks", response_model=schemas.TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
_validate_task_type_subtype(task_in.task_type, task_in.task_subtype)
requested_task_type = task_in.type or task_in.task_type
_validate_task_type_subtype(requested_task_type, task_in.task_subtype)
data = task_in.model_dump(exclude_unset=True)
if data.get("type") and not data.get("task_type"):
data["task_type"] = data.pop("type")
else:
data.pop("type", None)
data["project_id"] = _resolve_project_id(db, data.get("project_id"), data.pop("project_code", None))
milestone = _resolve_milestone(db, data.get("milestone_id"), data.pop("milestone_code", None), data.get("project_id"))
if milestone:
data["milestone_id"] = milestone.id
data["project_id"] = milestone.project_id
data["reporter_id"] = data.get("reporter_id") or current_user.id
data["created_by_id"] = current_user.id
if not data.get("project_id"):
raise HTTPException(status_code=400, detail="project_id is required")
raise HTTPException(status_code=400, detail="project_id or project_code is required")
if not data.get("milestone_id"):
raise HTTPException(status_code=400, detail="milestone_id is required")
raise HTTPException(status_code=400, detail="milestone_id or milestone_code is required")
check_project_role(db, current_user.id, data["project_id"], min_role="dev")
milestone = db.query(Milestone).filter(
Milestone.id == data["milestone_id"],
Milestone.project_id == data["project_id"],
).first()
if not milestone:
milestone = db.query(Milestone).filter(
Milestone.id == data["milestone_id"],
Milestone.project_id == data["project_id"],
).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
@@ -139,7 +212,7 @@ def create_task(task_in: schemas.TaskCreate, bg: BackgroundTasks, db: Session =
db,
)
log_activity(db, "task.created", "task", db_task.id, current_user.id, {"title": db_task.title})
return db_task
return _serialize_task(db, db_task)
@router.get("/tasks")
@@ -148,27 +221,51 @@ def list_tasks(
assignee_id: int = None, tag: str = None,
sort_by: str = "created_at", sort_order: str = "desc",
page: int = 1, page_size: int = 50,
project: str = None, milestone: str = None, status_value: str = Query(None, alias="status"), taken_by: str = None,
order_by: str = None,
db: Session = Depends(get_db)
):
query = db.query(Task)
if project_id:
query = query.filter(Task.project_id == project_id)
if task_status:
query = query.filter(Task.status == task_status)
resolved_project_id = _resolve_project_id(db, project_id, project)
if resolved_project_id:
query = query.filter(Task.project_id == resolved_project_id)
if milestone:
milestone_obj = _resolve_milestone(db, None, milestone, resolved_project_id)
query = query.filter(Task.milestone_id == milestone_obj.id)
effective_status = status_value or task_status
if effective_status:
query = query.filter(Task.status == effective_status)
if task_type:
query = query.filter(Task.task_type == task_type)
if task_subtype:
query = query.filter(Task.task_subtype == task_subtype)
if assignee_id:
query = query.filter(Task.assignee_id == assignee_id)
effective_assignee_id = assignee_id
if taken_by == "null":
query = query.filter(Task.assignee_id.is_(None))
elif taken_by:
user = db.query(models.User).filter(models.User.username == taken_by).first()
if not user:
return {"items": [], "total": 0, "total_tasks": 0, "page": 1, "page_size": page_size, "total_pages": 1}
effective_assignee_id = user.id
if effective_assignee_id:
query = query.filter(Task.assignee_id == effective_assignee_id)
if tag:
query = query.filter(Task.tags.contains(tag))
effective_sort_by = order_by or sort_by
sort_fields = {
"created_at": Task.created_at, "updated_at": Task.updated_at,
"priority": Task.priority, "title": Task.title,
"created": Task.created_at,
"created_at": Task.created_at,
"updated_at": Task.updated_at,
"priority": Task.priority,
"name": Task.title,
"title": Task.title,
}
sort_col = sort_fields.get(sort_by, Task.created_at)
sort_col = sort_fields.get(effective_sort_by, Task.created_at)
query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc())
total = query.count()
@@ -177,7 +274,7 @@ def list_tasks(
total_pages = math.ceil(total / page_size) if total else 1
items = query.offset((page - 1) * page_size).limit(page_size).all()
return {
"items": [schemas.TaskResponse.model_validate(i) for i in items],
"items": [_serialize_task(db, i) for i in items],
"total": total,
"total_tasks": total,
"page": page,
@@ -186,23 +283,56 @@ def list_tasks(
}
@router.get("/tasks/search", response_model=List[schemas.TaskResponse])
def search_tasks_alias(
q: str,
project: str = None,
status: str = None,
db: Session = Depends(get_db),
):
query = db.query(Task).filter(
(Task.title.contains(q)) | (Task.description.contains(q))
)
resolved_project_id = _resolve_project_id(db, None, project)
if resolved_project_id:
query = query.filter(Task.project_id == resolved_project_id)
if status:
query = query.filter(Task.status == status)
items = query.order_by(Task.created_at.desc()).limit(100).all()
return [_serialize_task(db, i) for i in items]
@router.get("/tasks/{task_id}", response_model=schemas.TaskResponse)
def get_task(task_id: int, db: Session = Depends(get_db)):
task = db.query(Task).filter(Task.id == task_id).first()
def get_task(task_id: str, db: Session = Depends(get_db)):
task = _find_task_by_id_or_code(db, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
return _serialize_task(db, task)
@router.patch("/tasks/{task_id}", response_model=schemas.TaskResponse)
def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
task = db.query(Task).filter(Task.id == task_id).first()
def update_task(task_id: str, task_update: schemas.TaskUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
task = _find_task_by_id_or_code(db, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
# P5.7: status-based edit restrictions
current_status = task.status.value if hasattr(task.status, 'value') else task.status
update_data = task_update.model_dump(exclude_unset=True)
if update_data.get("type") and not update_data.get("task_type"):
update_data["task_type"] = update_data.pop("type")
else:
update_data.pop("type", None)
if "taken_by" in update_data:
taken_by = update_data.pop("taken_by")
if taken_by in (None, "null", ""):
update_data["assignee_id"] = None
else:
assignee = db.query(models.User).filter(models.User.username == taken_by).first()
if not assignee:
raise HTTPException(status_code=404, detail="Assignee user not found")
update_data["assignee_id"] = assignee.id
# Fields that are always allowed regardless of status (non-body edits)
_always_allowed = {"status"}
@@ -268,12 +398,12 @@ def update_task(task_id: int, task_update: schemas.TaskUpdate, db: Session = Dep
from app.api.routers.milestone_actions import try_auto_complete_milestone
try_auto_complete_milestone(db, task, user_id=current_user.id)
return task
return _serialize_task(db, task)
@router.delete("/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(task_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
task = db.query(Task).filter(Task.id == task_id).first()
def delete_task(task_id: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
task = _find_task_by_id_or_code(db, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
check_project_role(db, current_user.id, task.project_id, min_role="mgr")
@@ -286,22 +416,24 @@ def delete_task(task_id: int, db: Session = Depends(get_db), current_user: model
# ---- Transition ----
class TransitionBody(BaseModel):
status: Optional[str] = None
comment: Optional[str] = None
@router.post("/tasks/{task_id}/transition", response_model=schemas.TaskResponse)
def transition_task(
task_id: int,
new_status: str,
task_id: str,
bg: BackgroundTasks,
new_status: str | None = None,
body: TransitionBody = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
new_status = new_status or (body.status if body else None)
valid_statuses = [s.value for s in TaskStatus]
if new_status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}")
task = db.query(Task).filter(Task.id == task_id).first()
task = _find_task_by_id_or_code(db, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
old_status = task.status.value if hasattr(task.status, 'value') else task.status
@@ -385,7 +517,40 @@ def transition_task(
bg.add_task(fire_webhooks_sync, event,
{"task_id": task.id, "title": task.title, "old_status": old_status, "new_status": new_status},
task.project_id, db)
return task
return _serialize_task(db, task)
@router.post("/tasks/{task_id}/take", response_model=schemas.TaskResponse)
def take_task(
task_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
task = _find_task_by_id_or_code(db, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
check_project_role(db, current_user.id, task.project_id, min_role="dev")
if task.assignee_id and task.assignee_id != current_user.id:
assignee = db.query(models.User).filter(models.User.id == task.assignee_id).first()
assignee_name = assignee.username if assignee else str(task.assignee_id)
raise HTTPException(status_code=409, detail=f"Task is already taken by {assignee_name}")
task.assignee_id = current_user.id
db.commit()
db.refresh(task)
_notify_user(
db,
current_user.id,
"task.assigned",
f"Task {task.task_code or task.id} assigned to you",
f"'{task.title}' has been assigned to you.",
"task",
task.id,
)
return _serialize_task(db, task)
# ---- Assignment ----
@@ -616,7 +781,7 @@ def search_tasks(q: str, project_id: int = None, page: int = 1, page_size: int =
total_pages = math.ceil(total / page_size) if total else 1
items = query.offset((page - 1) * page_size).limit(page_size).all()
return {
"items": [schemas.TaskResponse.model_validate(i) for i in items],
"items": [_serialize_task(db, i) for i in items],
"total": total,
"total_tasks": total,
"page": page,