Files
HarborForge.Backend/app/api/routers/misc.py
hzhang 51fb8ca073 fix(security): close critical auth/SSRF/RBAC holes
Verified locally end-to-end (before: exploitable, after: blocked).

- config: refuse to start on weak/default/short SECRET_KEY (was
  trivially forgeable JWT -> full admin)
- deps: add reusable require_admin dependency (JWT or API key)
- api-keys: require admin to mint/list/revoke; mask key on list
  (was unauthenticated -> instant admin API key)
- webhooks: whole router now admin-only (was fully unauthenticated
  CRUD + readable logs)
- webhook delivery: validate URL scheme + reject hosts resolving to
  private/loopback/link-local/reserved IPs; disable redirects
  (was a readable SSRF primitive)
- rbac: implement a real project-role hierarchy in check_project_role
  (was a no-op: any member, even guest, passed admin/mgr gates)
- misc: auth on delete_milestone (+ensure_can_edit_milestone),
  worklog create/delete (force caller user_id, owner-only delete),
  /activity and /export/tasks (were unauthenticated data exposure)
- tasks: auth + ensure_can_edit_task on assign_task and batch_assign

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:53:14 +01:00

817 lines
33 KiB
Python

"""Miscellaneous routers: API keys, activity, milestones, notifications, worklogs, export, dashboard."""
import csv
import io
import secrets
import math
from typing import List
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import func as sqlfunc
from pydantic import BaseModel
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey, require_admin
from app.api.rbac import check_project_role, ensure_can_edit_milestone
from app.models import models
from app.models.apikey import APIKey
from app.models.activity import ActivityLog
from app.models.milestone import Milestone as MilestoneModel
from app.models.task import Task, TaskStatus, TaskPriority
from app.models.support import Support, SupportStatus, SupportPriority
from app.models.meeting import Meeting, MeetingStatus, MeetingPriority
from app.models.notification import Notification as NotificationModel
from app.models.worklog import WorkLog
from app.schemas import schemas
router = APIRouter()
def _resolve_milestone(db: Session, identifier: str) -> MilestoneModel:
"""Resolve a milestone by numeric id or milestone_code string.
Raises 404 if not found."""
try:
ms_id = int(identifier)
ms = db.query(MilestoneModel).filter(MilestoneModel.id == ms_id).first()
except (ValueError, TypeError):
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == identifier).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
return ms
# ============ API Keys ============
class APIKeyCreate(BaseModel):
name: str
user_id: int
class APIKeyResponse(BaseModel):
id: int
key: str
name: str
user_id: int
is_active: bool
created_at: datetime
last_used_at: datetime | None = None
class Config:
from_attributes = True
@router.post("/api-keys", response_model=APIKeyResponse, status_code=status.HTTP_201_CREATED, tags=["API Keys"])
def create_api_key(data: APIKeyCreate, db: Session = Depends(get_db),
_: models.User = Depends(require_admin)):
user = db.query(models.User).filter(models.User.id == data.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
key = secrets.token_hex(32)
db_key = APIKey(key=key, name=data.name, user_id=data.user_id)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
@router.get("/api-keys", response_model=List[APIKeyResponse], tags=["API Keys"])
def list_api_keys(user_id: int = None, db: Session = Depends(get_db),
_: models.User = Depends(require_admin)):
query = db.query(APIKey)
if user_id:
query = query.filter(APIKey.user_id == user_id)
keys = query.all()
# Never expose the full secret on listing; show only a masked prefix.
for k in keys:
if k.key and len(k.key) > 8:
k.key = k.key[:6] + "" + k.key[-2:]
return keys
@router.delete("/api-keys/{key_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["API Keys"])
def revoke_api_key(key_id: int, db: Session = Depends(get_db),
_: models.User = Depends(require_admin)):
key_obj = db.query(APIKey).filter(APIKey.id == key_id).first()
if not key_obj:
raise HTTPException(status_code=404, detail="API key not found")
key_obj.is_active = False
db.commit()
return None
# ============ Activity Log ============
class ActivityLogResponse(BaseModel):
id: int
action: str
entity_type: str
entity_id: int
user_id: int | None
details: str | None
created_at: datetime
class Config:
from_attributes = True
@router.get("/activity", response_model=List[ActivityLogResponse], tags=["Activity"])
def list_activity(entity_type: str = None, entity_id: int = None, user_id: int = None,
limit: int = 50, db: Session = Depends(get_db),
_: models.User = Depends(get_current_user_or_apikey)):
query = db.query(ActivityLog)
if entity_type:
query = query.filter(ActivityLog.entity_type == entity_type)
if entity_id:
query = query.filter(ActivityLog.entity_id == entity_id)
if user_id:
query = query.filter(ActivityLog.user_id == user_id)
return query.order_by(ActivityLog.created_at.desc()).limit(limit).all()
# ============ Milestones (top-level, non project-scoped) ============
@router.post("/milestones", response_model=schemas.MilestoneResponse, status_code=status.HTTP_201_CREATED, tags=["Milestones"])
def create_milestone(ms: schemas.MilestoneCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
import json
project = db.query(models.Project).filter(models.Project.id == ms.project_id).first()
project_code = project.project_code if project and project.project_code else f"P{ms.project_id}"
max_ms = db.query(MilestoneModel).filter(MilestoneModel.project_id == ms.project_id).order_by(MilestoneModel.id.desc()).first()
next_num = (max_ms.id + 1) if max_ms else 1
milestone_code = f"{project_code}:{next_num:05x}"
data = ms.model_dump()
if data.get("depend_on_milestones"):
data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"])
else:
data["depend_on_milestones"] = None
if data.get("depend_on_tasks"):
data["depend_on_tasks"] = json.dumps(data["depend_on_tasks"])
else:
data["depend_on_tasks"] = None
db_ms = MilestoneModel(**data)
db_ms.created_by_id = current_user.id
db_ms.milestone_code = milestone_code
db.add(db_ms)
db.commit()
db.refresh(db_ms)
return db_ms
@router.get("/milestones", response_model=List[schemas.MilestoneResponse], tags=["Milestones"])
def list_milestones(project_id: str = None, project_code: str = None, status_filter: str = None, db: Session = Depends(get_db)):
query = db.query(MilestoneModel)
effective_project = project_code or project_id
if effective_project:
# Resolve project_id by numeric id or project_code
resolved_project = None
try:
pid = int(effective_project)
resolved_project = db.query(models.Project).filter(models.Project.id == pid).first()
except (ValueError, TypeError):
pass
if not resolved_project:
resolved_project = db.query(models.Project).filter(models.Project.project_code == effective_project).first()
if not resolved_project:
raise HTTPException(status_code=404, detail="Project not found")
query = query.filter(MilestoneModel.project_id == resolved_project.id)
if status_filter:
query = query.filter(MilestoneModel.status == status_filter)
return query.order_by(MilestoneModel.due_date.is_(None), MilestoneModel.due_date.asc()).all()
def _find_milestone_by_id_or_code(db, identifier) -> MilestoneModel | None:
"""Look up milestone by numeric id or milestone_code."""
try:
mid = int(identifier)
ms = db.query(MilestoneModel).filter(MilestoneModel.id == mid).first()
if ms:
return ms
except (ValueError, TypeError):
pass
return db.query(MilestoneModel).filter(MilestoneModel.milestone_code == str(identifier)).first()
@router.get("/milestones/{milestone_id}", response_model=schemas.MilestoneResponse, tags=["Milestones"])
def get_milestone(milestone_id: str, db: Session = Depends(get_db)):
return _resolve_milestone(db, milestone_id)
@router.patch("/milestones/{milestone_id}", response_model=schemas.MilestoneResponse, tags=["Milestones"])
def update_milestone(milestone_id: str, ms_update: schemas.MilestoneUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
ms = _resolve_milestone(db, milestone_id)
ensure_can_edit_milestone(db, current_user.id, ms)
for field, value in ms_update.model_dump(exclude_unset=True).items():
setattr(ms, field, value)
db.commit()
db.refresh(ms)
return ms
@router.delete("/milestones/{milestone_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["Milestones"])
def delete_milestone(milestone_id: str, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey)):
ms = _resolve_milestone(db, milestone_id)
ensure_can_edit_milestone(db, current_user.id, ms)
db.delete(ms)
db.commit()
return None
@router.get("/milestones/{milestone_id}/progress", tags=["Milestones"])
def milestone_progress(milestone_id: str, db: Session = Depends(get_db)):
ms = _resolve_milestone(db, milestone_id)
tasks = db.query(Task).filter(Task.milestone_id == ms.id).all()
total = len(tasks)
done = sum(1 for t in tasks if t.status == TaskStatus.CLOSED)
time_progress = None
if ms.planned_release_date and ms.created_at:
now = datetime.now()
total_duration = (ms.planned_release_date - ms.created_at).total_seconds()
elapsed = (now - ms.created_at).total_seconds()
time_progress = min(100, max(0, (elapsed / total_duration * 100)))
return {
"milestone_id": ms.id,
"title": ms.title,
"total": total,
"total_tasks": total,
"completed": done,
"progress_pct": round(done / total * 100, 1) if total else 0,
"time_progress_pct": round(time_progress, 1) if time_progress else None,
"planned_release_date": ms.planned_release_date,
}
# ============ Notifications ============
class NotificationResponse(BaseModel):
id: int
user_id: int
type: str
title: str
message: str | None = None
entity_type: str | None = None
entity_id: int | None = None
task_id: int | None = None
is_read: bool
created_at: datetime
def _serialize_notification(notification: NotificationModel):
return {
"id": notification.id,
"user_id": notification.user_id,
"type": notification.type,
"title": notification.title,
"message": notification.message or notification.title,
"entity_type": notification.entity_type,
"entity_id": notification.entity_id,
"task_id": notification.entity_id if notification.entity_type == "task" else None,
"is_read": notification.is_read,
"created_at": notification.created_at,
}
@router.get("/notifications", response_model=List[NotificationResponse], tags=["Notifications"])
def list_notifications(unread_only: bool = False, limit: int = 50, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
query = db.query(NotificationModel).filter(NotificationModel.user_id == current_user.id)
if unread_only:
query = query.filter(NotificationModel.is_read == False)
notifications = query.order_by(NotificationModel.created_at.desc()).limit(limit).all()
return [_serialize_notification(n) for n in notifications]
@router.get("/notifications/count", tags=["Notifications"])
def notification_count(db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
count = db.query(NotificationModel).filter(
NotificationModel.user_id == current_user.id, NotificationModel.is_read == False
).count()
return {"user_id": current_user.id, "count": count, "unread": count}
@router.post("/notifications/{notification_id}/read", tags=["Notifications"])
def mark_read(notification_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
n = db.query(NotificationModel).filter(NotificationModel.id == notification_id).first()
if not n:
raise HTTPException(status_code=404, detail="Notification not found")
if n.user_id != current_user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Forbidden")
n.is_read = True
db.commit()
return {"status": "read"}
@router.post("/notifications/read-all", tags=["Notifications"])
def mark_all_read(db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
db.query(NotificationModel).filter(
NotificationModel.user_id == current_user.id, NotificationModel.is_read == False
).update({"is_read": True})
db.commit()
return {"status": "all_read", "user_id": current_user.id}
# ============ Work Logs ============
class WorkLogCreate(BaseModel):
task_id: int
user_id: int
hours: float
description: str | None = None
logged_date: datetime
class WorkLogResponse(BaseModel):
id: int
task_id: int
user_id: int
hours: float
description: str | None = None
logged_date: datetime
created_at: datetime
class Config:
from_attributes = True
@router.post("/worklogs", response_model=WorkLogResponse, status_code=status.HTTP_201_CREATED, tags=["Time Tracking"])
def create_worklog(wl: WorkLogCreate, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey)):
task = db.query(Task).filter(Task.id == wl.task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if wl.hours <= 0:
raise HTTPException(status_code=400, detail="Hours must be positive")
data = wl.model_dump()
# Worklogs are always attributed to the caller (non-admins cannot log time for others).
if not current_user.is_admin or not data.get("user_id"):
data["user_id"] = current_user.id
db_wl = WorkLog(**data)
db.add(db_wl)
db.commit()
db.refresh(db_wl)
return db_wl
@router.get("/tasks/{task_id}/worklogs", response_model=List[WorkLogResponse], tags=["Time Tracking"])
def list_task_worklogs(task_id: str, db: Session = Depends(get_db)):
"""List worklogs for a task. task_id can be numeric id or task_code."""
try:
tid = int(task_id)
except (ValueError, TypeError):
task = db.query(Task).filter(Task.task_code == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
tid = task.id
return db.query(WorkLog).filter(WorkLog.task_id == tid).order_by(WorkLog.logged_date.desc()).all()
@router.get("/tasks/{task_id}/worklogs/summary", tags=["Time Tracking"])
def task_worklog_summary(task_id: str, db: Session = Depends(get_db)):
"""Worklog summary for a task. task_id can be numeric id or task_code."""
try:
tid = int(task_id)
except (ValueError, TypeError):
t = db.query(Task).filter(Task.task_code == task_id).first()
if not t:
raise HTTPException(status_code=404, detail="Task not found")
tid = t.id
task = db.query(Task).filter(Task.id == tid).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
total = db.query(sqlfunc.sum(WorkLog.hours)).filter(WorkLog.task_id == tid).scalar() or 0
count = db.query(WorkLog).filter(WorkLog.task_id == tid).count()
return {"task_id": tid, "total_hours": round(total, 2), "log_count": count}
@router.delete("/worklogs/{worklog_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["Time Tracking"])
def delete_worklog(worklog_id: int, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey)):
wl = db.query(WorkLog).filter(WorkLog.id == worklog_id).first()
if not wl:
raise HTTPException(status_code=404, detail="Work log not found")
if not current_user.is_admin and wl.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete another user's worklog")
db.delete(wl)
db.commit()
return None
# ============ Export ============
@router.get("/export/tasks", tags=["Export"])
def export_tasks_csv(project_id: int = None, db: Session = Depends(get_db),
_: models.User = Depends(get_current_user_or_apikey)):
query = db.query(Task)
if project_id:
query = query.filter(Task.project_id == project_id)
tasks = query.all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "title", "type", "subtype", "status", "priority", "project_id",
"milestone_id", "reporter_id", "assignee_id", "task_code",
"tags", "created_at", "updated_at"])
for t in tasks:
writer.writerow([t.id, t.title, t.task_type, t.task_subtype or "",
t.status.value if hasattr(t.status, 'value') else t.status,
t.priority.value if hasattr(t.priority, 'value') else t.priority,
t.project_id, t.milestone_id, t.reporter_id, t.assignee_id, t.task_code,
t.tags, t.created_at, t.updated_at])
output.seek(0)
return StreamingResponse(iter([output.getvalue()]), media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=tasks.csv"})
# ============ Dashboard ============
@router.get("/dashboard/stats", tags=["Dashboard"])
def dashboard_stats(project_id: int = None, db: Session = Depends(get_db)):
query = db.query(Task)
if project_id:
query = query.filter(Task.project_id == project_id)
total = query.count()
by_status = {s.value: query.filter(Task.status == s).count() for s in TaskStatus}
by_type = {t: query.filter(Task.task_type == t).count()
for t in ["task", "story", "test", "resolution", "issue", "maintenance", "research", "review"]}
by_priority = {p.value: query.filter(Task.priority == p).count() for p in TaskPriority}
recent = query.order_by(Task.created_at.desc()).limit(10).all()
return {
"total": total,
"total_tasks": total,
"by_status": by_status,
"by_type": by_type,
"by_priority": by_priority,
"recent_tasks": [schemas.TaskResponse.model_validate(t) for t in recent],
}
# ============ Milestone-scoped Tasks ============
@router.get("/tasks/{project_code}/{milestone_id}", tags=["Tasks"])
def list_milestone_tasks(project_code: str, milestone_id: str, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
milestone = db.query(MilestoneModel).filter(
MilestoneModel.milestone_code == milestone_id,
MilestoneModel.project_id == project.id,
).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
tasks = db.query(Task).filter(
Task.project_id == project.id,
Task.milestone_id == milestone.id
).all()
return [{
"id": t.id,
"title": t.title,
"description": t.description,
"status": t.status.value if hasattr(t.status, "value") else t.status,
"priority": t.priority.value if hasattr(t.priority, "value") else t.priority,
"task_code": t.task_code,
"task_type": t.task_type,
"task_subtype": t.task_subtype,
"estimated_effort": t.estimated_effort,
"estimated_working_time": str(t.estimated_working_time) if t.estimated_working_time else None,
"started_on": t.started_on,
"finished_on": t.finished_on,
"depend_on": t.depend_on,
"related_tasks": t.related_tasks,
"assignee_id": t.assignee_id,
"created_at": t.created_at,
} for t in tasks]
@router.post("/tasks/{project_code}/{milestone_id}", status_code=status.HTTP_201_CREATED, tags=["Tasks"])
def create_milestone_task(project_code: str, milestone_id: str, task_data: dict, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_task = db.query(Task).filter(Task.milestone_id == ms.id).order_by(Task.id.desc()).first()
next_num = (max_task.id + 1) if max_task else 1
task_code = f"{milestone_code}:T{next_num:05x}"
est_time = None
if task_data.get("estimated_working_time"):
try:
est_time = datetime.strptime(task_data["estimated_working_time"], "%H:%M").time()
except:
pass
task = Task(
title=task_data.get("title"),
description=task_data.get("description"),
status=TaskStatus.OPEN,
priority=TaskPriority.MEDIUM,
task_type=task_data.get("task_type", "issue"), # P7.1: default changed from 'task' to 'issue'
task_subtype=task_data.get("task_subtype"),
project_id=project.id,
milestone_id=ms.id,
reporter_id=current_user.id,
task_code=task_code,
estimated_effort=task_data.get("estimated_effort"),
estimated_working_time=est_time,
created_by_id=current_user.id,
)
db.add(task)
db.commit()
db.refresh(task)
return {
"title": task.title,
"description": task.description,
"task_code": task.task_code,
"code": task.task_code,
"status": task.status.value,
"priority": task.priority.value,
"created_at": task.created_at,
}
# ============ Supports ============
def _find_support_by_code(db: Session, support_code: str) -> Support | None:
return db.query(Support).filter(Support.support_code == str(support_code)).first()
def _serialize_support(db: Session, support: Support) -> dict:
project = db.query(models.Project).filter(models.Project.id == support.project_id).first()
milestone = db.query(MilestoneModel).filter(MilestoneModel.id == support.milestone_id).first()
assignee = None
if support.assignee_id:
assignee = db.query(models.User).filter(models.User.id == support.assignee_id).first()
return {
"code": support.support_code,
"support_code": support.support_code,
"title": support.title,
"description": support.description,
"status": support.status.value if hasattr(support.status, "value") else support.status,
"priority": support.priority.value if hasattr(support.priority, "value") else support.priority,
"project_code": project.project_code if project else None,
"milestone_code": milestone.milestone_code if milestone else None,
"reporter_id": support.reporter_id,
"assignee_id": support.assignee_id,
"taken_by": assignee.username if assignee else None,
"created_at": support.created_at,
"updated_at": support.updated_at,
}
@router.get("/supports", tags=["Supports"])
def list_all_supports(
status: str | None = None,
taken_by: str | None = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
"""List support tickets across all projects. Optional status/taken_by filters."""
query = db.query(Support)
if status:
query = query.filter(Support.status == SupportStatus(status))
if taken_by == "me":
query = query.filter(Support.assignee_id == current_user.id)
elif taken_by == "null":
query = query.filter(Support.assignee_id.is_(None))
elif taken_by:
assignee = db.query(models.User).filter(models.User.username == taken_by).first()
if assignee:
query = query.filter(Support.assignee_id == assignee.id)
else:
return []
query = query.order_by(Support.created_at.desc())
supports = query.all()
return [_serialize_support(db, s) for s in supports]
@router.get("/supports/{project_code}/{milestone_id}", tags=["Supports"])
def list_supports(project_code: str, milestone_id: str, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
milestone = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
supports = db.query(Support).filter(
Support.project_id == project.id,
Support.milestone_id == milestone.id
).all()
return [_serialize_support(db, s) for s in supports]
@router.post("/supports/{project_code}/{milestone_id}", status_code=status.HTTP_201_CREATED, tags=["Supports"])
def create_support(project_code: str, milestone_id: str, support_data: dict, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_support = db.query(Support).filter(Support.milestone_id == ms.id).order_by(Support.id.desc()).first()
next_num = (max_support.id + 1) if max_support else 1
support_code = f"{milestone_code}:S{next_num:05x}"
support = Support(
title=support_data.get("title"),
description=support_data.get("description"),
status=SupportStatus.OPEN,
priority=SupportPriority.MEDIUM,
project_id=project.id,
milestone_id=ms.id,
reporter_id=current_user.id,
support_code=support_code,
)
db.add(support)
db.commit()
db.refresh(support)
return _serialize_support(db, support)
@router.get("/supports/{support_code}", tags=["Supports"])
def get_support(support_code: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
support = _find_support_by_code(db, support_code)
if not support:
raise HTTPException(status_code=404, detail="Support not found")
check_project_role(db, current_user.id, support.project_id, min_role="viewer")
return _serialize_support(db, support)
@router.patch("/supports/{support_code}", tags=["Supports"])
def update_support(support_code: str, support_data: dict, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
support = _find_support_by_code(db, support_code)
if not support:
raise HTTPException(status_code=404, detail="Support not found")
check_project_role(db, current_user.id, support.project_id, min_role="dev")
allowed_fields = {"title", "description", "status", "priority"}
updated = False
for field, value in support_data.items():
if field not in allowed_fields:
continue
if field == "status" and value is not None:
value = SupportStatus(value)
if field == "priority" and value is not None:
value = SupportPriority(value)
setattr(support, field, value)
updated = True
if not updated:
raise HTTPException(status_code=400, detail="No supported fields to update")
db.commit()
db.refresh(support)
return _serialize_support(db, support)
@router.delete("/supports/{support_code}", status_code=status.HTTP_204_NO_CONTENT, tags=["Supports"])
def delete_support(support_code: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
support = _find_support_by_code(db, support_code)
if not support:
raise HTTPException(status_code=404, detail="Support not found")
check_project_role(db, current_user.id, support.project_id, min_role="dev")
db.delete(support)
db.commit()
return None
@router.post("/supports/{support_code}/take", tags=["Supports"])
def take_support(support_code: str, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
support = _find_support_by_code(db, support_code)
if not support:
raise HTTPException(status_code=404, detail="Support not found")
check_project_role(db, current_user.id, support.project_id, min_role="dev")
if support.assignee_id and support.assignee_id != current_user.id:
assignee = db.query(models.User).filter(models.User.id == support.assignee_id).first()
assignee_name = assignee.username if assignee else str(support.assignee_id)
raise HTTPException(status_code=409, detail=f"Support is already taken by {assignee_name}")
support.assignee_id = current_user.id
db.commit()
db.refresh(support)
return _serialize_support(db, support)
@router.post("/supports/{support_code}/transition", tags=["Supports"])
def transition_support(support_code: str, support_data: dict, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
support = _find_support_by_code(db, support_code)
if not support:
raise HTTPException(status_code=404, detail="Support not found")
check_project_role(db, current_user.id, support.project_id, min_role="dev")
status_value = support_data.get("status")
if not status_value:
raise HTTPException(status_code=400, detail="status is required")
support.status = SupportStatus(status_value)
db.commit()
db.refresh(support)
return _serialize_support(db, support)
# ============ Meetings ============
@router.get("/meetings/{project_code}/{milestone_id}", tags=["Meetings"])
def list_meetings(project_code: str, milestone_id: str, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
milestone = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not milestone:
raise HTTPException(status_code=404, detail="Milestone not found")
meetings = db.query(Meeting).filter(
Meeting.project_id == project.id,
Meeting.milestone_id == milestone.id
).all()
return [{
"title": m.title,
"description": m.description,
"meeting_code": m.meeting_code,
"code": m.meeting_code,
"status": m.status.value,
"priority": m.priority.value,
"scheduled_at": m.scheduled_at,
"duration_minutes": m.duration_minutes,
"created_at": m.created_at,
} for m in meetings]
@router.post("/meetings/{project_code}/{milestone_id}", status_code=status.HTTP_201_CREATED, tags=["Meetings"])
def create_meeting(project_code: str, milestone_id: str, meeting_data: dict, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_meeting = db.query(Meeting).filter(Meeting.milestone_id == ms.id).order_by(Meeting.id.desc()).first()
next_num = (max_meeting.id + 1) if max_meeting else 1
meeting_code = f"{milestone_code}:M{next_num:05x}"
scheduled_at = None
if meeting_data.get("scheduled_at"):
try:
scheduled_at = datetime.fromisoformat(meeting_data["scheduled_at"].replace("Z", "+00:00"))
except:
pass
meeting = Meeting(
title=meeting_data.get("title"),
description=meeting_data.get("description"),
status=MeetingStatus.SCHEDULED,
priority=MeetingPriority.MEDIUM,
project_id=project.id,
milestone_id=ms.id,
reporter_id=current_user.id,
meeting_code=meeting_code,
scheduled_at=scheduled_at,
duration_minutes=meeting_data.get("duration_minutes"),
)
db.add(meeting)
db.commit()
db.refresh(meeting)
return {
"meeting_code": meeting.meeting_code,
"code": meeting.meeting_code,
"title": meeting.title,
"description": meeting.description,
"status": meeting.status.value,
"priority": meeting.priority.value,
"scheduled_at": meeting.scheduled_at,
"duration_minutes": meeting.duration_minutes,
"created_at": meeting.created_at,
}