Files
HarborForge.Backend/app/api/routers/projects.py

402 lines
15 KiB
Python

"""Projects router with RBAC."""
import json
import re
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.config import get_db
from app.models import models
from app.models.role_permission import Role
from app.schemas import schemas
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_project
router = APIRouter(prefix="/projects", tags=["Projects"])
def _resolve_project(db: Session, identifier: str) -> models.Project:
"""Resolve a project by numeric id or project_code string.
Raises 404 if not found."""
try:
pid = int(identifier)
project = db.query(models.Project).filter(models.Project.id == pid).first()
except (ValueError, TypeError):
project = db.query(models.Project).filter(models.Project.project_code == identifier).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
def _validate_project_links(db, codes: list[str] | None, self_code: str | None = None) -> list[str] | None:
if not codes:
return None
# dedupe preserve order
seen = set()
ordered = []
for c in codes:
if c and c not in seen:
ordered.append(c)
seen.add(c)
if self_code and self_code in seen:
raise HTTPException(status_code=400, detail='Project cannot link to itself')
existing = {p.project_code for p in db.query(models.Project).filter(models.Project.project_code.in_(ordered)).all()}
missing = [c for c in ordered if c not in existing]
if missing:
raise HTTPException(status_code=400, detail=f'Unknown project codes: {", ".join(missing)}')
return ordered
WORD_SEGMENT_RE = re.compile(r"[A-Za-z]+")
CAMEL_RE = re.compile(r"[A-Z]+(?=[A-Z][a-z])|[A-Z]?[a-z]+|[A-Z]+")
def _split_words(name: str):
segments = WORD_SEGMENT_RE.findall(name or '')
words = []
for seg in segments:
parts = CAMEL_RE.findall(seg)
for part in parts:
if part.isupper() and len(part) > 1:
words.extend(list(part))
else:
words.append(part)
return words
def _code_exists(db, code: str) -> bool:
return db.query(models.Project).filter(models.Project.project_code == code).first() is not None
def _next_counter(db, prefix: str, width: int) -> str:
if width <= 0:
return ''
counter = db.query(models.ProjectCodeCounter).filter(models.ProjectCodeCounter.prefix == prefix).first()
if not counter:
counter = models.ProjectCodeCounter(prefix=prefix, next_value=0)
db.add(counter)
db.flush()
value = counter.next_value
counter.next_value += 1
db.flush()
return format(value, 'x').upper().zfill(width)
def _generate_with_counter(db, prefix: str, width: int) -> str:
if prefix.upper() == 'UN':
prefix = 'UN'
while True:
suffix = _next_counter(db, prefix, width)
code = (prefix + suffix).upper()
if not _code_exists(db, code):
return code
def _generate_project_code(db, name: str) -> str:
words = _split_words(name)
if not words:
return _generate_with_counter(db, 'UN', 4)
if len(words) == 1:
letters = ''.join([c for c in words[0] if c.isalpha()]).upper()
if not letters:
return _generate_with_counter(db, 'UN', 4)
if len(letters) >= 6:
code = letters[:6]
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
prefix = letters
width = 6 - len(prefix)
return _generate_with_counter(db, prefix, width)
total_letters = sum(len(w) for w in words)
if len(words) > 6:
code = ''.join([w[0] for w in words[:6]]).upper()
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
if total_letters < 6:
prefix = ''.join(words).upper()
width = 6 - len(prefix)
return _generate_with_counter(db, prefix, width)
if total_letters == 6:
code = ''.join(words).upper()
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
word_count = len(words)
needed = 6 - word_count
for idx in range(word_count - 1, -1, -1):
extra_letters = list(words[idx][1:])
if needed > len(extra_letters):
continue
indices = list(range(len(extra_letters)))
def combos(start, depth, path):
if depth == 0:
yield path
return
for i in range(start, len(indices) - depth + 1):
yield from combos(i + 1, depth - 1, path + [indices[i]])
for combo in combos(0, needed, []):
pieces = []
for wi, w in enumerate(words):
pieces.append(w[0])
if wi == idx:
pieces.extend([extra_letters[i] for i in combo])
code = ''.join(pieces)[:6].upper()
if not _code_exists(db, code):
return code
raise HTTPException(status_code=400, detail='Project code collision')
@router.post("", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
# Check if user is admin
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Only admins can create projects")
# Auto-fill owner_name from owner_id
user = db.query(models.User).filter(models.User.id == project.owner_id).first()
if not user:
raise HTTPException(status_code=400, detail="Invalid owner_id: user not found")
payload = project.model_dump()
payload["owner_name"] = payload.get("owner_name") or user.username
payload["project_code"] = _generate_project_code(db, project.name)
# Validate and serialize sub_projects
sub_codes = payload.get("sub_projects")
if sub_codes:
payload["sub_projects"] = json.dumps(_validate_project_links(db, sub_codes, payload["project_code"]))
else:
payload["sub_projects"] = None
# Validate and serialize related_projects
related_codes = payload.get("related_projects")
if related_codes:
payload["related_projects"] = json.dumps(_validate_project_links(db, related_codes, payload["project_code"]))
else:
payload["related_projects"] = None
db_project = models.Project(**payload)
db.add(db_project)
db.commit()
db.refresh(db_project)
# Auto-add creator as admin member
admin_role = db.query(Role).filter(Role.name == "admin").first()
db_member = models.ProjectMember(project_id=db_project.id, user_id=project.owner_id, role_id=admin_role.id if admin_role else None)
db.add(db_member)
db.commit()
return db_project
@router.get("", response_model=List[schemas.ProjectResponse])
def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(models.Project).offset(skip).limit(limit).all()
def _find_project_by_id_or_code(db, identifier) -> models.Project | None:
"""Look up project by numeric id or project_code."""
try:
pid = int(identifier)
project = db.query(models.Project).filter(models.Project.id == pid).first()
if project:
return project
except (ValueError, TypeError):
pass
return db.query(models.Project).filter(models.Project.project_code == str(identifier)).first()
@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(project_id: str, db: Session = Depends(get_db)):
return _resolve_project(db, project_id)
@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
project_id: str,
project_update: schemas.ProjectUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
ensure_can_edit_project(db, current_user.id, project)
update_data = project_update.model_dump(exclude_unset=True)
update_data.pop("name", None)
if "sub_projects" in update_data and update_data["sub_projects"]:
update_data["sub_projects"] = json.dumps(update_data["sub_projects"])
elif "sub_projects" in update_data:
update_data["sub_projects"] = None
if "related_projects" in update_data and update_data["related_projects"]:
update_data["related_projects"] = json.dumps(update_data["related_projects"])
elif "related_projects" in update_data:
update_data["related_projects"] = None
for field, value in update_data.items():
setattr(project, field, value)
db.commit()
db.refresh(project)
return project
@router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_project(
project_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
check_project_role(db, current_user.id, project.id, min_role="admin")
project_code = project.project_code
project_id_val = project.id
# Delete milestones and their tasks
from app.models.milestone import Milestone
from app.models.task import Task
milestones = db.query(Milestone).filter(Milestone.project_id == project_id_val).all()
for ms in milestones:
tasks = db.query(Task).filter(Task.milestone_id == ms.id).all()
for task in tasks:
db.delete(task)
db.delete(ms)
# Delete project members
members = db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project.id).all()
for m in members:
db.delete(m)
# Remove from other projects' sub_projects and related_projects
import json
all_projects = db.query(models.Project).all()
for p in all_projects:
if p.sub_projects and project_code in p.sub_projects:
subs = json.loads(p.sub_projects) if p.sub_projects else []
subs = [s for s in subs if s != project_code]
p.sub_projects = json.dumps(subs) if subs else None
if p.related_projects and project_code in p.related_projects:
related = json.loads(p.related_projects) if p.related_projects else []
related = [r for r in related if r != project_code]
p.related_projects = json.dumps(related) if related else None
db.delete(project)
db.commit()
return None
# ---- Members ----
@router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
def add_project_member(
project_id: str,
member: schemas.ProjectMemberCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
check_project_role(db, current_user.id, project.id, min_role="mgr")
user = db.query(models.User).filter(models.User.id == member.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
existing = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project.id, models.ProjectMember.user_id == member.user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="User already a member")
# Convert role name to role_id
role = db.query(Role).filter(Role.name == member.role).first()
role_id = role.id if role else None
db_member = models.ProjectMember(project_id=project.id, user_id=member.user_id, role_id=role_id)
db.add(db_member)
db.commit()
db.refresh(db_member)
role_name = "developer"
if db_member.role_id:
role = db.query(Role).filter(Role.id == db_member.role_id).first()
if role:
role_name = role.name
return {
"id": db_member.id,
"user_id": db_member.user_id,
"project_id": db_member.project_id,
"role": role_name
}
@router.get("/{project_id}/members", response_model=List[schemas.ProjectMemberResponse])
def list_project_members(project_id: str, db: Session = Depends(get_db)):
project = _resolve_project(db, project_id)
members = db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project.id).all()
result = []
for m in members:
role_name = "developer"
if m.role_id:
role = db.query(Role).filter(Role.id == m.role_id).first()
if role:
role_name = role.name
user = db.query(models.User).filter(models.User.id == m.user_id).first()
result.append({
"id": m.id,
"user_id": m.user_id,
"username": user.username if user else None,
"full_name": user.full_name if user else None,
"project_id": m.project_id,
"role": role_name
})
return result
@router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_project_member(
project_id: str,
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
check_permission(db, current_user.id, project.id, "member.remove")
member = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project.id, models.ProjectMember.user_id == user_id
).first()
# Prevent removing project owner (admin role)
if member.role_id:
role = db.query(Role).filter(Role.id == member.role_id).first()
if role and role.name == "admin":
# Check if this is the only admin
admin_count = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id,
models.ProjectMember.role_id == member.role_id
).count()
if admin_count <= 1:
raise HTTPException(status_code=400, detail="Cannot remove the last owner of the project")
if not member:
raise HTTPException(status_code=404, detail="Member not found")
db.delete(member)
db.commit()
return None
# ---- Worklog summary ----
from app.models.worklog import WorkLog
from sqlalchemy import func as sqlfunc
@router.get("/{project_id}/worklogs/summary")
def project_worklog_summary(project_id: str, db: Session = Depends(get_db)):
from app.models.task import Task as TaskModel
project = _resolve_project(db, project_id)
resolved_project_id = project.id
results = db.query(
models.User.id, models.User.username,
sqlfunc.sum(WorkLog.hours).label("total_hours"),
sqlfunc.count(WorkLog.id).label("log_count")
).join(WorkLog, WorkLog.user_id == models.User.id)\
.join(TaskModel, WorkLog.task_id == TaskModel.id)\
.filter(TaskModel.project_id == resolved_project_id)\
.group_by(models.User.id, models.User.username).all()
total = sum(r.total_hours for r in results)
by_user = [{"user_id": r.id, "username": r.username, "hours": round(r.total_hours, 2), "logs": r.log_count} for r in results]
return {"project_id": resolved_project_id, "total_hours": round(total, 2), "by_user": by_user}