"""Issues router.""" import math from typing import List from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db from app.models import models from app.schemas import schemas from app.services.webhook import fire_webhooks_sync from app.models.notification import Notification as NotificationModel router = APIRouter(tags=["Issues"]) def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None): n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message, entity_type=entity_type, entity_id=entity_id) db.add(n) db.commit() return n # ---- CRUD ---- @router.post("/issues", response_model=schemas.IssueResponse, status_code=status.HTTP_201_CREATED) def create_issue(issue: schemas.IssueCreate, bg: BackgroundTasks, db: Session = Depends(get_db)): db_issue = models.Issue(**issue.model_dump()) db.add(db_issue) db.commit() db.refresh(db_issue) event = "resolution.created" if db_issue.issue_type == "resolution" else "issue.created" bg.add_task(fire_webhooks_sync, event, {"issue_id": db_issue.id, "title": db_issue.title, "type": db_issue.issue_type, "status": db_issue.status}, db_issue.project_id, db) return db_issue @router.get("/issues") def list_issues( project_id: int = None, issue_status: str = None, issue_type: str = None, assignee_id: int = None, tag: str = None, sort_by: str = "created_at", sort_order: str = "desc", page: int = 1, page_size: int = 50, db: Session = Depends(get_db) ): """List issues with filtering, sorting, and pagination metadata.""" query = db.query(models.Issue) if project_id: query = query.filter(models.Issue.project_id == project_id) if issue_status: query = query.filter(models.Issue.status == issue_status) if issue_type: query = query.filter(models.Issue.issue_type == issue_type) if assignee_id: query = query.filter(models.Issue.assignee_id == assignee_id) if tag: query = query.filter(models.Issue.tags.contains(tag)) sort_fields = { "created_at": models.Issue.created_at, "updated_at": models.Issue.updated_at, "priority": models.Issue.priority, "title": models.Issue.title, "due_date": models.Issue.due_date, "status": models.Issue.status, } sort_col = sort_fields.get(sort_by, models.Issue.created_at) query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc()) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return {"items": [schemas.IssueResponse.model_validate(i) for i in items], "total": total, "page": page, "page_size": page_size, "total_pages": total_pages} @router.get("/issues/overdue", response_model=List[schemas.IssueResponse]) def list_overdue_issues(project_id: int = None, db: Session = Depends(get_db)): query = db.query(models.Issue).filter( models.Issue.due_date != None, models.Issue.due_date < datetime.utcnow(), models.Issue.status.notin_(["resolved", "closed"]) ) if project_id: query = query.filter(models.Issue.project_id == project_id) return query.order_by(models.Issue.due_date.asc()).all() @router.get("/issues/{issue_id}", response_model=schemas.IssueResponse) def get_issue(issue_id: int, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") return issue @router.patch("/issues/{issue_id}", response_model=schemas.IssueResponse) def update_issue(issue_id: int, issue_update: schemas.IssueUpdate, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") for field, value in issue_update.model_dump(exclude_unset=True).items(): setattr(issue, field, value) db.commit() db.refresh(issue) return issue @router.delete("/issues/{issue_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_issue(issue_id: int, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") db.delete(issue) db.commit() return None # ---- Transition ---- @router.post("/issues/{issue_id}/transition", response_model=schemas.IssueResponse) def transition_issue(issue_id: int, new_status: str, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = ["open", "in_progress", "resolved", "closed", "blocked"] if new_status not in valid_statuses: raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}") issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") old_status = issue.status issue.status = new_status db.commit() db.refresh(issue) event = "issue.closed" if new_status == "closed" else "issue.updated" bg.add_task(fire_webhooks_sync, event, {"issue_id": issue.id, "title": issue.title, "old_status": old_status, "new_status": new_status}, issue.project_id, db) return issue # ---- Assignment ---- @router.post("/issues/{issue_id}/assign") def assign_issue(issue_id: int, assignee_id: int, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") user = db.query(models.User).filter(models.User.id == assignee_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") issue.assignee_id = assignee_id db.commit() db.refresh(issue) _notify_user(db, assignee_id, "issue.assigned", f"Issue #{issue.id} assigned to you", f"'{issue.title}' has been assigned to you.", "issue", issue.id) return {"issue_id": issue.id, "assignee_id": assignee_id, "title": issue.title} # ---- Relations ---- class IssueRelation(BaseModel): parent_id: int child_id: int @router.post("/issues/link") def link_issues(rel: IssueRelation, db: Session = Depends(get_db)): parent = db.query(models.Issue).filter(models.Issue.id == rel.parent_id).first() child = db.query(models.Issue).filter(models.Issue.id == rel.child_id).first() if not parent or not child: raise HTTPException(status_code=404, detail="Issue not found") if rel.parent_id == rel.child_id: raise HTTPException(status_code=400, detail="Cannot link issue to itself") child.depends_on_id = rel.parent_id db.commit() return {"parent_id": rel.parent_id, "child_id": rel.child_id, "status": "linked"} @router.delete("/issues/link") def unlink_issues(child_id: int, db: Session = Depends(get_db)): child = db.query(models.Issue).filter(models.Issue.id == child_id).first() if not child: raise HTTPException(status_code=404, detail="Issue not found") child.depends_on_id = None db.commit() return {"child_id": child_id, "status": "unlinked"} @router.get("/issues/{issue_id}/children", response_model=List[schemas.IssueResponse]) def get_children(issue_id: int, db: Session = Depends(get_db)): return db.query(models.Issue).filter(models.Issue.depends_on_id == issue_id).all() # ---- Tags ---- @router.post("/issues/{issue_id}/tags") def add_tag(issue_id: int, tag: str, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") current = set(issue.tags.split(",")) if issue.tags else set() current.add(tag.strip()) current.discard("") issue.tags = ",".join(sorted(current)) db.commit() return {"issue_id": issue_id, "tags": list(current)} @router.delete("/issues/{issue_id}/tags") def remove_tag(issue_id: int, tag: str, db: Session = Depends(get_db)): issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if not issue: raise HTTPException(status_code=404, detail="Issue not found") current = set(issue.tags.split(",")) if issue.tags else set() current.discard(tag.strip()) current.discard("") issue.tags = ",".join(sorted(current)) if current else None db.commit() return {"issue_id": issue_id, "tags": list(current)} @router.get("/tags") def list_all_tags(project_id: int = None, db: Session = Depends(get_db)): query = db.query(models.Issue.tags).filter(models.Issue.tags != None) if project_id: query = query.filter(models.Issue.project_id == project_id) all_tags = set() for (tags,) in query.all(): for t in tags.split(","): t = t.strip() if t: all_tags.add(t) return {"tags": sorted(all_tags)} # ---- Batch ---- class BatchTransition(BaseModel): issue_ids: List[int] new_status: str class BatchAssign(BaseModel): issue_ids: List[int] assignee_id: int @router.post("/issues/batch/transition") def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)): valid_statuses = ["open", "in_progress", "resolved", "closed", "blocked"] if data.new_status not in valid_statuses: raise HTTPException(status_code=400, detail="Invalid status") updated = [] for issue_id in data.issue_ids: issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if issue: old_status = issue.status issue.status = data.new_status updated.append({"id": issue.id, "title": issue.title, "old": old_status, "new": data.new_status}) db.commit() for u in updated: event = "issue.closed" if data.new_status == "closed" else "issue.updated" bg.add_task(fire_webhooks_sync, event, u, None, db) return {"updated": len(updated), "issues": updated} @router.post("/issues/batch/assign") def batch_assign(data: BatchAssign, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == data.assignee_id).first() if not user: raise HTTPException(status_code=404, detail="Assignee not found") updated = [] for issue_id in data.issue_ids: issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first() if issue: issue.assignee_id = data.assignee_id updated.append(issue_id) db.commit() return {"updated": len(updated), "issue_ids": updated, "assignee_id": data.assignee_id} # ---- Search ---- @router.get("/search/issues") def search_issues(q: str, project_id: int = None, page: int = 1, page_size: int = 50, db: Session = Depends(get_db)): query = db.query(models.Issue).filter( (models.Issue.title.contains(q)) | (models.Issue.description.contains(q)) ) if project_id: query = query.filter(models.Issue.project_id == project_id) total = query.count() page = max(1, page) page_size = min(max(1, page_size), 200) total_pages = math.ceil(total / page_size) if total else 1 items = query.offset((page - 1) * page_size).limit(page_size).all() return {"items": [schemas.IssueResponse.model_validate(i) for i in items], "total": total, "page": page, "page_size": page_size, "total_pages": total_pages}