331 lines
12 KiB
Python
331 lines
12 KiB
Python
"""Projects router with RBAC."""
|
|
import json
|
|
import re
|
|
from typing import List
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import get_db
|
|
from app.models import models
|
|
from app.schemas import schemas
|
|
from app.api.deps import get_current_user_or_apikey
|
|
from app.api.rbac import check_project_role
|
|
|
|
router = APIRouter(prefix="/projects", tags=["Projects"])
|
|
|
|
|
|
def _validate_project_links(db, codes: list[str] | None, self_code: str | None = None) -> list[str] | None:
|
|
if not codes:
|
|
return None
|
|
# dedupe preserve order
|
|
seen = set()
|
|
ordered = []
|
|
for c in codes:
|
|
if c and c not in seen:
|
|
ordered.append(c)
|
|
seen.add(c)
|
|
if self_code and self_code in seen:
|
|
raise HTTPException(status_code=400, detail='Project cannot link to itself')
|
|
existing = {p.project_code for p in db.query(models.Project).filter(models.Project.project_code.in_(ordered)).all()}
|
|
missing = [c for c in ordered if c not in existing]
|
|
if missing:
|
|
raise HTTPException(status_code=400, detail=f'Unknown project codes: {", ".join(missing)}')
|
|
return ordered
|
|
|
|
WORD_SEGMENT_RE = re.compile(r"[A-Za-z]+")
|
|
CAMEL_RE = re.compile(r"[A-Z]+(?=[A-Z][a-z])|[A-Z]?[a-z]+|[A-Z]+")
|
|
|
|
|
|
def _split_words(name: str):
|
|
segments = WORD_SEGMENT_RE.findall(name or '')
|
|
words = []
|
|
for seg in segments:
|
|
parts = CAMEL_RE.findall(seg)
|
|
for part in parts:
|
|
if part.isupper() and len(part) > 1:
|
|
words.extend(list(part))
|
|
else:
|
|
words.append(part)
|
|
return words
|
|
|
|
|
|
def _code_exists(db, code: str) -> bool:
|
|
return db.query(models.Project).filter(models.Project.project_code == code).first() is not None
|
|
|
|
|
|
def _next_counter(db, prefix: str, width: int) -> str:
|
|
if width <= 0:
|
|
return ''
|
|
counter = db.query(models.ProjectCodeCounter).filter(models.ProjectCodeCounter.prefix == prefix).first()
|
|
if not counter:
|
|
counter = models.ProjectCodeCounter(prefix=prefix, next_value=0)
|
|
db.add(counter)
|
|
db.flush()
|
|
value = counter.next_value
|
|
counter.next_value += 1
|
|
db.flush()
|
|
return format(value, 'x').upper().zfill(width)
|
|
|
|
|
|
def _generate_with_counter(db, prefix: str, width: int) -> str:
|
|
if prefix.upper() == 'UN':
|
|
prefix = 'UN'
|
|
while True:
|
|
suffix = _next_counter(db, prefix, width)
|
|
code = (prefix + suffix).upper()
|
|
if not _code_exists(db, code):
|
|
return code
|
|
|
|
|
|
def _generate_project_code(db, name: str) -> str:
|
|
words = _split_words(name)
|
|
if not words:
|
|
return _generate_with_counter(db, 'UN', 4)
|
|
|
|
if len(words) == 1:
|
|
letters = ''.join([c for c in words[0] if c.isalpha()]).upper()
|
|
if not letters:
|
|
return _generate_with_counter(db, 'UN', 4)
|
|
if len(letters) >= 6:
|
|
code = letters[:6]
|
|
if _code_exists(db, code):
|
|
raise HTTPException(status_code=400, detail='Project code collision')
|
|
return code
|
|
prefix = letters
|
|
width = 6 - len(prefix)
|
|
return _generate_with_counter(db, prefix, width)
|
|
|
|
total_letters = sum(len(w) for w in words)
|
|
if len(words) > 6:
|
|
code = ''.join([w[0] for w in words[:6]]).upper()
|
|
if _code_exists(db, code):
|
|
raise HTTPException(status_code=400, detail='Project code collision')
|
|
return code
|
|
|
|
if total_letters < 6:
|
|
prefix = ''.join(words).upper()
|
|
width = 6 - len(prefix)
|
|
return _generate_with_counter(db, prefix, width)
|
|
|
|
if total_letters == 6:
|
|
code = ''.join(words).upper()
|
|
if _code_exists(db, code):
|
|
raise HTTPException(status_code=400, detail='Project code collision')
|
|
return code
|
|
|
|
word_count = len(words)
|
|
needed = 6 - word_count
|
|
for idx in range(word_count - 1, -1, -1):
|
|
extra_letters = list(words[idx][1:])
|
|
if needed > len(extra_letters):
|
|
continue
|
|
indices = list(range(len(extra_letters)))
|
|
def combos(start, depth, path):
|
|
if depth == 0:
|
|
yield path
|
|
return
|
|
for i in range(start, len(indices) - depth + 1):
|
|
yield from combos(i + 1, depth - 1, path + [indices[i]])
|
|
for combo in combos(0, needed, []):
|
|
pieces = []
|
|
for wi, w in enumerate(words):
|
|
pieces.append(w[0])
|
|
if wi == idx:
|
|
pieces.extend([extra_letters[i] for i in combo])
|
|
code = ''.join(pieces)[:6].upper()
|
|
if not _code_exists(db, code):
|
|
return code
|
|
raise HTTPException(status_code=400, detail='Project code collision')
|
|
|
|
@router.post("", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
|
|
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
|
|
# Check if user is admin
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=403, detail="Only admins can create projects")
|
|
# Auto-fill owner_name from owner_id
|
|
user = db.query(models.User).filter(models.User.id == project.owner_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=400, detail="Invalid owner_id: user not found")
|
|
payload = project.model_dump()
|
|
payload["owner_name"] = payload.get("owner_name") or user.username
|
|
payload["project_code"] = _generate_project_code(db, project.name)
|
|
|
|
# Validate and serialize sub_projects
|
|
sub_codes = payload.get("sub_projects")
|
|
if sub_codes:
|
|
payload["sub_projects"] = json.dumps(_validate_project_links(db, sub_codes, payload["project_code"]))
|
|
else:
|
|
payload["sub_projects"] = None
|
|
|
|
# Validate and serialize related_projects
|
|
related_codes = payload.get("related_projects")
|
|
if related_codes:
|
|
payload["related_projects"] = json.dumps(_validate_project_links(db, related_codes, payload["project_code"]))
|
|
else:
|
|
payload["related_projects"] = None
|
|
|
|
db_project = models.Project(**payload)
|
|
db.add(db_project)
|
|
db.commit()
|
|
db.refresh(db_project)
|
|
# Auto-add creator as admin member
|
|
db_member = models.ProjectMember(project_id=db_project.id, user_id=project.owner_id, role="admin")
|
|
db.add(db_member)
|
|
db.commit()
|
|
return db_project
|
|
|
|
|
|
@router.get("", response_model=List[schemas.ProjectResponse])
|
|
def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
|
|
return db.query(models.Project).offset(skip).limit(limit).all()
|
|
|
|
|
|
@router.get("/{project_id}", response_model=schemas.ProjectResponse)
|
|
def get_project(project_id: int, db: Session = Depends(get_db)):
|
|
project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
return project
|
|
|
|
|
|
@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
|
|
def update_project(
|
|
project_id: int,
|
|
project_update: schemas.ProjectUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user_or_apikey),
|
|
):
|
|
check_project_role(db, current_user.id, project_id, min_role="mgr")
|
|
project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
update_data = project_update.model_dump(exclude_unset=True)
|
|
update_data.pop("name", None)
|
|
if "sub_projects" in update_data and update_data["sub_projects"]:
|
|
update_data["sub_projects"] = json.dumps(update_data["sub_projects"])
|
|
elif "sub_projects" in update_data:
|
|
update_data["sub_projects"] = None
|
|
if "related_projects" in update_data and update_data["related_projects"]:
|
|
update_data["related_projects"] = json.dumps(update_data["related_projects"])
|
|
elif "related_projects" in update_data:
|
|
update_data["related_projects"] = None
|
|
for field, value in update_data.items():
|
|
setattr(project, field, value)
|
|
db.commit()
|
|
db.refresh(project)
|
|
return project
|
|
|
|
|
|
@router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_project(
|
|
project_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user_or_apikey),
|
|
):
|
|
check_project_role(db, current_user.id, project_id, min_role="admin")
|
|
project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
db.delete(project)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
# ---- Members ----
|
|
|
|
@router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
|
|
def add_project_member(
|
|
project_id: int,
|
|
member: schemas.ProjectMemberCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user_or_apikey),
|
|
):
|
|
check_project_role(db, current_user.id, project_id, min_role="mgr")
|
|
project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="Project not found")
|
|
user = db.query(models.User).filter(models.User.id == member.user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
existing = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == member.user_id
|
|
).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="User already a member")
|
|
# Convert role name to role_id
|
|
role = db.query(Role).filter(Role.name == member.role).first()
|
|
role_id = role.id if role else None
|
|
db_member = models.ProjectMember(project_id=project_id, user_id=member.user_id, role_id=role_id)
|
|
db.add(db_member)
|
|
db.commit()
|
|
db.refresh(db_member)
|
|
role_name = "developer"
|
|
if db_member.role_id:
|
|
role = db.query(Role).filter(Role.id == db_member.role_id).first()
|
|
if role:
|
|
role_name = role.name
|
|
return {
|
|
"id": db_member.id,
|
|
"user_id": db_member.user_id,
|
|
"project_id": db_member.project_id,
|
|
"role": role_name
|
|
}
|
|
|
|
|
|
@router.get("/{project_id}/members", response_model=List[schemas.ProjectMemberResponse])
|
|
def list_project_members(project_id: int, db: Session = Depends(get_db)):
|
|
members = db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project_id).all()
|
|
result = []
|
|
for m in members:
|
|
role_name = "developer"
|
|
if m.role_id:
|
|
role = db.query(Role).filter(Role.id == m.role_id).first()
|
|
if role:
|
|
role_name = role.name
|
|
result.append({
|
|
"id": m.id,
|
|
"user_id": m.user_id,
|
|
"project_id": m.project_id,
|
|
"role": role_name
|
|
})
|
|
return result
|
|
|
|
|
|
@router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def remove_project_member(
|
|
project_id: int,
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user_or_apikey),
|
|
):
|
|
check_project_role(db, current_user.id, project_id, min_role="admin")
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == user_id
|
|
).first()
|
|
if not member:
|
|
raise HTTPException(status_code=404, detail="Member not found")
|
|
db.delete(member)
|
|
db.commit()
|
|
return None
|
|
|
|
|
|
# ---- Worklog summary ----
|
|
|
|
from app.models.worklog import WorkLog
|
|
from sqlalchemy import func as sqlfunc
|
|
|
|
|
|
@router.get("/{project_id}/worklogs/summary")
|
|
def project_worklog_summary(project_id: int, db: Session = Depends(get_db)):
|
|
results = db.query(
|
|
models.User.id, models.User.username,
|
|
sqlfunc.sum(WorkLog.hours).label("total_hours"),
|
|
sqlfunc.count(WorkLog.id).label("log_count")
|
|
).join(WorkLog, WorkLog.user_id == models.User.id)\
|
|
.join(models.Issue, WorkLog.issue_id == models.Issue.id)\
|
|
.filter(models.Issue.project_id == project_id)\
|
|
.group_by(models.User.id, models.User.username).all()
|
|
total = sum(r.total_hours for r in results)
|
|
by_user = [{"user_id": r.id, "username": r.username, "hours": round(r.total_hours, 2), "logs": r.log_count} for r in results]
|
|
return {"project_id": project_id, "total_hours": round(total, 2), "by_user": by_user}
|