Files
HarborForge.Backend/app/api/routers/issues.py
Zhi f60dc68b22 refactor: split monolithic main.py into FastAPI routers (v0.2.0)
- app/api/deps.py: shared auth dependencies
- app/api/routers/auth.py: login, me
- app/api/routers/issues.py: CRUD, transition, assign, relations, tags, batch, search
- app/api/routers/projects.py: CRUD, members, worklog summary
- app/api/routers/users.py: CRUD, worklogs
- app/api/routers/comments.py: CRUD
- app/api/routers/webhooks.py: CRUD, logs, retry
- app/api/routers/misc.py: API keys, activity, milestones, notifications, worklogs, export, dashboard
- main.py: 1165 lines → 51 lines
- Version bump to 0.2.0
2026-02-23 15:14:46 +00:00

300 lines
12 KiB
Python

"""Issues router."""
import math
from typing import List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.config import get_db
from app.models import models
from app.schemas import schemas
from app.services.webhook import fire_webhooks_sync
from app.models.notification import Notification as NotificationModel
router = APIRouter(tags=["Issues"])
def _notify_user(db, user_id, ntype, title, message=None, entity_type=None, entity_id=None):
n = NotificationModel(user_id=user_id, type=ntype, title=title, message=message,
entity_type=entity_type, entity_id=entity_id)
db.add(n)
db.commit()
return n
# ---- CRUD ----
@router.post("/issues", response_model=schemas.IssueResponse, status_code=status.HTTP_201_CREATED)
def create_issue(issue: schemas.IssueCreate, bg: BackgroundTasks, db: Session = Depends(get_db)):
db_issue = models.Issue(**issue.model_dump())
db.add(db_issue)
db.commit()
db.refresh(db_issue)
event = "resolution.created" if db_issue.issue_type == "resolution" else "issue.created"
bg.add_task(fire_webhooks_sync, event,
{"issue_id": db_issue.id, "title": db_issue.title, "type": db_issue.issue_type, "status": db_issue.status},
db_issue.project_id, db)
return db_issue
@router.get("/issues")
def list_issues(
project_id: int = None, issue_status: str = None, issue_type: str = None,
assignee_id: int = None, tag: str = None,
sort_by: str = "created_at", sort_order: str = "desc",
page: int = 1, page_size: int = 50,
db: Session = Depends(get_db)
):
"""List issues with filtering, sorting, and pagination metadata."""
query = db.query(models.Issue)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
if issue_status:
query = query.filter(models.Issue.status == issue_status)
if issue_type:
query = query.filter(models.Issue.issue_type == issue_type)
if assignee_id:
query = query.filter(models.Issue.assignee_id == assignee_id)
if tag:
query = query.filter(models.Issue.tags.contains(tag))
sort_fields = {
"created_at": models.Issue.created_at, "updated_at": models.Issue.updated_at,
"priority": models.Issue.priority, "title": models.Issue.title,
"due_date": models.Issue.due_date, "status": models.Issue.status,
}
sort_col = sort_fields.get(sort_by, models.Issue.created_at)
query = query.order_by(sort_col.asc() if sort_order == "asc" else sort_col.desc())
total = query.count()
page = max(1, page)
page_size = min(max(1, page_size), 200)
total_pages = math.ceil(total / page_size) if total else 1
items = query.offset((page - 1) * page_size).limit(page_size).all()
return {"items": [schemas.IssueResponse.model_validate(i) for i in items],
"total": total, "page": page, "page_size": page_size, "total_pages": total_pages}
@router.get("/issues/overdue", response_model=List[schemas.IssueResponse])
def list_overdue_issues(project_id: int = None, db: Session = Depends(get_db)):
query = db.query(models.Issue).filter(
models.Issue.due_date != None,
models.Issue.due_date < datetime.utcnow(),
models.Issue.status.notin_(["resolved", "closed"])
)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
return query.order_by(models.Issue.due_date.asc()).all()
@router.get("/issues/{issue_id}", response_model=schemas.IssueResponse)
def get_issue(issue_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
return issue
@router.patch("/issues/{issue_id}", response_model=schemas.IssueResponse)
def update_issue(issue_id: int, issue_update: schemas.IssueUpdate, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
for field, value in issue_update.model_dump(exclude_unset=True).items():
setattr(issue, field, value)
db.commit()
db.refresh(issue)
return issue
@router.delete("/issues/{issue_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_issue(issue_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
db.delete(issue)
db.commit()
return None
# ---- Transition ----
@router.post("/issues/{issue_id}/transition", response_model=schemas.IssueResponse)
def transition_issue(issue_id: int, new_status: str, bg: BackgroundTasks, db: Session = Depends(get_db)):
valid_statuses = ["open", "in_progress", "resolved", "closed", "blocked"]
if new_status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}")
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
old_status = issue.status
issue.status = new_status
db.commit()
db.refresh(issue)
event = "issue.closed" if new_status == "closed" else "issue.updated"
bg.add_task(fire_webhooks_sync, event,
{"issue_id": issue.id, "title": issue.title, "old_status": old_status, "new_status": new_status},
issue.project_id, db)
return issue
# ---- Assignment ----
@router.post("/issues/{issue_id}/assign")
def assign_issue(issue_id: int, assignee_id: int, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
user = db.query(models.User).filter(models.User.id == assignee_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
issue.assignee_id = assignee_id
db.commit()
db.refresh(issue)
_notify_user(db, assignee_id, "issue.assigned",
f"Issue #{issue.id} assigned to you",
f"'{issue.title}' has been assigned to you.", "issue", issue.id)
return {"issue_id": issue.id, "assignee_id": assignee_id, "title": issue.title}
# ---- Relations ----
class IssueRelation(BaseModel):
parent_id: int
child_id: int
@router.post("/issues/link")
def link_issues(rel: IssueRelation, db: Session = Depends(get_db)):
parent = db.query(models.Issue).filter(models.Issue.id == rel.parent_id).first()
child = db.query(models.Issue).filter(models.Issue.id == rel.child_id).first()
if not parent or not child:
raise HTTPException(status_code=404, detail="Issue not found")
if rel.parent_id == rel.child_id:
raise HTTPException(status_code=400, detail="Cannot link issue to itself")
child.depends_on_id = rel.parent_id
db.commit()
return {"parent_id": rel.parent_id, "child_id": rel.child_id, "status": "linked"}
@router.delete("/issues/link")
def unlink_issues(child_id: int, db: Session = Depends(get_db)):
child = db.query(models.Issue).filter(models.Issue.id == child_id).first()
if not child:
raise HTTPException(status_code=404, detail="Issue not found")
child.depends_on_id = None
db.commit()
return {"child_id": child_id, "status": "unlinked"}
@router.get("/issues/{issue_id}/children", response_model=List[schemas.IssueResponse])
def get_children(issue_id: int, db: Session = Depends(get_db)):
return db.query(models.Issue).filter(models.Issue.depends_on_id == issue_id).all()
# ---- Tags ----
@router.post("/issues/{issue_id}/tags")
def add_tag(issue_id: int, tag: str, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
current = set(issue.tags.split(",")) if issue.tags else set()
current.add(tag.strip())
current.discard("")
issue.tags = ",".join(sorted(current))
db.commit()
return {"issue_id": issue_id, "tags": list(current)}
@router.delete("/issues/{issue_id}/tags")
def remove_tag(issue_id: int, tag: str, db: Session = Depends(get_db)):
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if not issue:
raise HTTPException(status_code=404, detail="Issue not found")
current = set(issue.tags.split(",")) if issue.tags else set()
current.discard(tag.strip())
current.discard("")
issue.tags = ",".join(sorted(current)) if current else None
db.commit()
return {"issue_id": issue_id, "tags": list(current)}
@router.get("/tags")
def list_all_tags(project_id: int = None, db: Session = Depends(get_db)):
query = db.query(models.Issue.tags).filter(models.Issue.tags != None)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
all_tags = set()
for (tags,) in query.all():
for t in tags.split(","):
t = t.strip()
if t:
all_tags.add(t)
return {"tags": sorted(all_tags)}
# ---- Batch ----
class BatchTransition(BaseModel):
issue_ids: List[int]
new_status: str
class BatchAssign(BaseModel):
issue_ids: List[int]
assignee_id: int
@router.post("/issues/batch/transition")
def batch_transition(data: BatchTransition, bg: BackgroundTasks, db: Session = Depends(get_db)):
valid_statuses = ["open", "in_progress", "resolved", "closed", "blocked"]
if data.new_status not in valid_statuses:
raise HTTPException(status_code=400, detail="Invalid status")
updated = []
for issue_id in data.issue_ids:
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if issue:
old_status = issue.status
issue.status = data.new_status
updated.append({"id": issue.id, "title": issue.title, "old": old_status, "new": data.new_status})
db.commit()
for u in updated:
event = "issue.closed" if data.new_status == "closed" else "issue.updated"
bg.add_task(fire_webhooks_sync, event, u, None, db)
return {"updated": len(updated), "issues": updated}
@router.post("/issues/batch/assign")
def batch_assign(data: BatchAssign, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == data.assignee_id).first()
if not user:
raise HTTPException(status_code=404, detail="Assignee not found")
updated = []
for issue_id in data.issue_ids:
issue = db.query(models.Issue).filter(models.Issue.id == issue_id).first()
if issue:
issue.assignee_id = data.assignee_id
updated.append(issue_id)
db.commit()
return {"updated": len(updated), "issue_ids": updated, "assignee_id": data.assignee_id}
# ---- Search ----
@router.get("/search/issues")
def search_issues(q: str, project_id: int = None, page: int = 1, page_size: int = 50,
db: Session = Depends(get_db)):
query = db.query(models.Issue).filter(
(models.Issue.title.contains(q)) | (models.Issue.description.contains(q))
)
if project_id:
query = query.filter(models.Issue.project_id == project_id)
total = query.count()
page = max(1, page)
page_size = min(max(1, page_size), 200)
total_pages = math.ceil(total / page_size) if total else 1
items = query.offset((page - 1) * page_size).limit(page_size).all()
return {"items": [schemas.IssueResponse.model_validate(i) for i in items],
"total": total, "page": page, "page_size": page_size, "total_pages": total_pages}