Files
HarborForge.Backend/app/api/routers/projects.py

304 lines
12 KiB
Python

"""Projects router with RBAC."""
import json
import re
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.config import get_db
from app.models import models
from app.schemas import schemas
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role
router = APIRouter(prefix="/projects", tags=["Projects"])
def _validate_project_links(db, codes: list[str] | None, self_code: str | None = None) -> list[str] | None:
if not codes:
return None
# dedupe preserve order
seen = set()
ordered = []
for c in codes:
if c and c not in seen:
ordered.append(c)
seen.add(c)
if self_code and self_code in seen:
raise HTTPException(status_code=400, detail='Project cannot link to itself')
existing = {p.project_code for p in db.query(models.Project).filter(models.Project.project_code.in_(ordered)).all()}
missing = [c for c in ordered if c not in existing]
if missing:
raise HTTPException(status_code=400, detail=f'Unknown project codes: {", ".join(missing)}')
return ordered
WORD_SEGMENT_RE = re.compile(r"[A-Za-z]+")
CAMEL_RE = re.compile(r"[A-Z]+(?=[A-Z][a-z])|[A-Z]?[a-z]+|[A-Z]+")
def _split_words(name: str):
segments = WORD_SEGMENT_RE.findall(name or '')
words = []
for seg in segments:
parts = CAMEL_RE.findall(seg)
for part in parts:
if part.isupper() and len(part) > 1:
words.extend(list(part))
else:
words.append(part)
return words
def _code_exists(db, code: str) -> bool:
return db.query(models.Project).filter(models.Project.project_code == code).first() is not None
def _next_counter(db, prefix: str, width: int) -> str:
if width <= 0:
return ''
counter = db.query(models.ProjectCodeCounter).filter(models.ProjectCodeCounter.prefix == prefix).first()
if not counter:
counter = models.ProjectCodeCounter(prefix=prefix, next_value=0)
db.add(counter)
db.flush()
value = counter.next_value
counter.next_value += 1
db.flush()
return format(value, 'x').upper().zfill(width)
def _generate_with_counter(db, prefix: str, width: int) -> str:
if prefix.upper() == 'UN':
prefix = 'UN'
while True:
suffix = _next_counter(db, prefix, width)
code = (prefix + suffix).upper()
if not _code_exists(db, code):
return code
def _generate_project_code(db, name: str) -> str:
words = _split_words(name)
if not words:
return _generate_with_counter(db, 'UN', 4)
if len(words) == 1:
letters = ''.join([c for c in words[0] if c.isalpha()]).upper()
if not letters:
return _generate_with_counter(db, 'UN', 4)
if len(letters) >= 6:
code = letters[:6]
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
prefix = letters
width = 6 - len(prefix)
return _generate_with_counter(db, prefix, width)
total_letters = sum(len(w) for w in words)
if len(words) > 6:
code = ''.join([w[0] for w in words[:6]]).upper()
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
if total_letters < 6:
prefix = ''.join(words).upper()
width = 6 - len(prefix)
return _generate_with_counter(db, prefix, width)
if total_letters == 6:
code = ''.join(words).upper()
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
word_count = len(words)
needed = 6 - word_count
for idx in range(word_count - 1, -1, -1):
extra_letters = list(words[idx][1:])
if needed > len(extra_letters):
continue
indices = list(range(len(extra_letters)))
def combos(start, depth, path):
if depth == 0:
yield path
return
for i in range(start, len(indices) - depth + 1):
yield from combos(i + 1, depth - 1, path + [indices[i]])
for combo in combos(0, needed, []):
pieces = []
for wi, w in enumerate(words):
pieces.append(w[0])
if wi == idx:
pieces.extend([extra_letters[i] for i in combo])
code = ''.join(pieces)[:6].upper()
if not _code_exists(db, code):
return code
raise HTTPException(status_code=400, detail='Project code collision')
@router.post("", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
# Check if user is admin
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Only admins can create projects")
# Auto-fill owner_name from owner_id
user = db.query(models.User).filter(models.User.id == project.owner_id).first()
if not user:
raise HTTPException(status_code=400, detail="Invalid owner_id: user not found")
payload = project.model_dump()
payload["owner_name"] = payload.get("owner_name") or user.username
payload["project_code"] = _generate_project_code(db, project.name)
# Validate and serialize sub_projects
sub_codes = payload.get("sub_projects")
if sub_codes:
payload["sub_projects"] = json.dumps(_validate_project_links(db, sub_codes, payload["project_code"]))
else:
payload["sub_projects"] = None
# Validate and serialize related_projects
related_codes = payload.get("related_projects")
if related_codes:
payload["related_projects"] = json.dumps(_validate_project_links(db, related_codes, payload["project_code"]))
else:
payload["related_projects"] = None
db_project = models.Project(**payload)
db.add(db_project)
db.commit()
db.refresh(db_project)
# Auto-add creator as admin member
db_member = models.ProjectMember(project_id=db_project.id, user_id=project.owner_id, role="admin")
db.add(db_member)
db.commit()
return db_project
@router.get("", response_model=List[schemas.ProjectResponse])
def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(models.Project).offset(skip).limit(limit).all()
@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(project_id: int, db: Session = Depends(get_db)):
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
project_id: int,
project_update: schemas.ProjectUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="mgr")
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
update_data = project_update.model_dump(exclude_unset=True)
update_data.pop("name", None)
if "sub_projects" in update_data and update_data["sub_projects"]:
update_data["sub_projects"] = json.dumps(update_data["sub_projects"])
elif "sub_projects" in update_data:
update_data["sub_projects"] = None
if "related_projects" in update_data and update_data["related_projects"]:
update_data["related_projects"] = json.dumps(update_data["related_projects"])
elif "related_projects" in update_data:
update_data["related_projects"] = None
for field, value in update_data.items():
setattr(project, field, value)
db.commit()
db.refresh(project)
return project
@router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_project(
project_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="admin")
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
db.delete(project)
db.commit()
return None
# ---- Members ----
@router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
def add_project_member(
project_id: int,
member: schemas.ProjectMemberCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="mgr")
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
user = db.query(models.User).filter(models.User.id == member.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
existing = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == member.user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="User already a member")
db_member = models.ProjectMember(project_id=project_id, user_id=member.user_id, role=member.role)
db.add(db_member)
db.commit()
db.refresh(db_member)
return db_member
@router.get("/{project_id}/members", response_model=List[schemas.ProjectMemberResponse])
def list_project_members(project_id: int, db: Session = Depends(get_db)):
return db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project_id).all()
@router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_project_member(
project_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="admin")
member = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == user_id
).first()
if not member:
raise HTTPException(status_code=404, detail="Member not found")
db.delete(member)
db.commit()
return None
# ---- Worklog summary ----
from app.models.worklog import WorkLog
from sqlalchemy import func as sqlfunc
@router.get("/{project_id}/worklogs/summary")
def project_worklog_summary(project_id: int, db: Session = Depends(get_db)):
results = db.query(
models.User.id, models.User.username,
sqlfunc.sum(WorkLog.hours).label("total_hours"),
sqlfunc.count(WorkLog.id).label("log_count")
).join(WorkLog, WorkLog.user_id == models.User.id)\
.join(models.Issue, WorkLog.issue_id == models.Issue.id)\
.filter(models.Issue.project_id == project_id)\
.group_by(models.User.id, models.User.username).all()
total = sum(r.total_hours for r in results)
by_user = [{"user_id": r.id, "username": r.username, "hours": round(r.total_hours, 2), "logs": r.log_count} for r in results]
return {"project_id": project_id, "total_hours": round(total, 2), "by_user": by_user}