Files
HarborForge.Backend/app/api/routers/projects.py
hzhang d2b83ad58d fix(projects): perm-gate create + apikey-via-Bearer + introspect with apikey
Three coupled fixes so non-admin agents (e.g. nav, role=mgr) can
actually create projects through hf-cli with their API key:

1. POST /projects no longer hardcodes is_admin. It checks the global
   `project.create` perm via role_permissions (admin still wins via
   is_admin short-circuit). Permission-denied 403 message names the
   exact perm.

2. /auth/me/permissions now uses get_current_user_or_apikey (was
   get_current_user JWT-only). This is what hf-cli hits to populate
   its local permission cache that drives the "not permitted" gate;
   previously every API-key-authed agent saw all commands as gated.

3. get_current_user_or_apikey now also accepts an API key delivered
   via Authorization: Bearer (in addition to X-API-Key). hf-cli only
   knows Bearer; trying to JWT-decode an API key string would fail —
   so on decode failure, fall through to the API key lookup. Keeps
   X-API-Key behavior unchanged.

4. init_bootstrap: add `project.create` to DEFAULT_PERMISSIONS and to
   _MGR_PERMISSIONS so admin (auto-all) + mgr both get it on seed.

Bug came to light when manager-agent reported `hf project list`/`create`
returned `not permitted`. Root cause: hf-cli calls /auth/me/permissions
with the API key via Bearer header → 401 → state.Known=false → every
command in the surface is gated false locally. Even after the local
gate, POST /projects would still 403 due to the hardcoded admin check.
All four steps above are required end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 22:09:34 +01:00

420 lines
16 KiB
Python

"""Projects router with RBAC."""
import json
import re
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.config import get_db
from app.models import models
from app.models.role_permission import Role
from app.schemas import schemas
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role, check_permission, ensure_can_edit_project
router = APIRouter(prefix="/projects", tags=["Projects"])
def _resolve_project(db: Session, identifier: str) -> models.Project:
"""Resolve a project by numeric id or project_code string.
Raises 404 if not found."""
try:
pid = int(identifier)
project = db.query(models.Project).filter(models.Project.id == pid).first()
except (ValueError, TypeError):
project = db.query(models.Project).filter(models.Project.project_code == identifier).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
def _validate_project_links(db, codes: list[str] | None, self_code: str | None = None) -> list[str] | None:
if not codes:
return None
# dedupe preserve order
seen = set()
ordered = []
for c in codes:
if c and c not in seen:
ordered.append(c)
seen.add(c)
if self_code and self_code in seen:
raise HTTPException(status_code=400, detail='Project cannot link to itself')
existing = {p.project_code for p in db.query(models.Project).filter(models.Project.project_code.in_(ordered)).all()}
missing = [c for c in ordered if c not in existing]
if missing:
raise HTTPException(status_code=400, detail=f'Unknown project codes: {", ".join(missing)}')
return ordered
WORD_SEGMENT_RE = re.compile(r"[A-Za-z]+")
CAMEL_RE = re.compile(r"[A-Z]+(?=[A-Z][a-z])|[A-Z]?[a-z]+|[A-Z]+")
def _split_words(name: str):
segments = WORD_SEGMENT_RE.findall(name or '')
words = []
for seg in segments:
parts = CAMEL_RE.findall(seg)
for part in parts:
if part.isupper() and len(part) > 1:
words.extend(list(part))
else:
words.append(part)
return words
def _code_exists(db, code: str) -> bool:
return db.query(models.Project).filter(models.Project.project_code == code).first() is not None
def _next_counter(db, prefix: str, width: int) -> str:
if width <= 0:
return ''
counter = db.query(models.ProjectCodeCounter).filter(models.ProjectCodeCounter.prefix == prefix).first()
if not counter:
counter = models.ProjectCodeCounter(prefix=prefix, next_value=0)
db.add(counter)
db.flush()
value = counter.next_value
counter.next_value += 1
db.flush()
return format(value, 'x').upper().zfill(width)
def _generate_with_counter(db, prefix: str, width: int) -> str:
if prefix.upper() == 'UN':
prefix = 'UN'
while True:
suffix = _next_counter(db, prefix, width)
code = (prefix + suffix).upper()
if not _code_exists(db, code):
return code
def _generate_project_code(db, name: str) -> str:
words = _split_words(name)
if not words:
return _generate_with_counter(db, 'UN', 4)
if len(words) == 1:
letters = ''.join([c for c in words[0] if c.isalpha()]).upper()
if not letters:
return _generate_with_counter(db, 'UN', 4)
if len(letters) >= 6:
code = letters[:6]
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
prefix = letters
width = 6 - len(prefix)
return _generate_with_counter(db, prefix, width)
total_letters = sum(len(w) for w in words)
if len(words) > 6:
code = ''.join([w[0] for w in words[:6]]).upper()
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
if total_letters < 6:
prefix = ''.join(words).upper()
width = 6 - len(prefix)
return _generate_with_counter(db, prefix, width)
if total_letters == 6:
code = ''.join(words).upper()
if _code_exists(db, code):
raise HTTPException(status_code=400, detail='Project code collision')
return code
word_count = len(words)
needed = 6 - word_count
for idx in range(word_count - 1, -1, -1):
extra_letters = list(words[idx][1:])
if needed > len(extra_letters):
continue
indices = list(range(len(extra_letters)))
def combos(start, depth, path):
if depth == 0:
yield path
return
for i in range(start, len(indices) - depth + 1):
yield from combos(i + 1, depth - 1, path + [indices[i]])
for combo in combos(0, needed, []):
pieces = []
for wi, w in enumerate(words):
pieces.append(w[0])
if wi == idx:
pieces.extend([extra_letters[i] for i in combo])
code = ''.join(pieces)[:6].upper()
if not _code_exists(db, code):
return code
raise HTTPException(status_code=400, detail='Project code collision')
@router.post("", response_model=schemas.ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
# Project creation is gated by the `project.create` global permission
# (admin auto-grants by virtue of is_admin). Any role granted that perm
# via the Role Editor can create projects.
if not current_user.is_admin:
from app.models.role_permission import Permission, RolePermission
has = (
db.query(Permission.id)
.join(RolePermission, RolePermission.permission_id == Permission.id)
.filter(
RolePermission.role_id == current_user.role_id,
Permission.name == "project.create",
)
.first()
if current_user.role_id
else None
)
if not has:
raise HTTPException(
status_code=403,
detail="Permission denied: project.create required",
)
# Auto-fill owner_name from owner_id
user = db.query(models.User).filter(models.User.id == project.owner_id).first()
if not user:
raise HTTPException(status_code=400, detail="Invalid owner_id: user not found")
payload = project.model_dump()
payload["owner_name"] = payload.get("owner_name") or user.username
payload["project_code"] = _generate_project_code(db, project.name)
# Validate and serialize sub_projects
sub_codes = payload.get("sub_projects")
if sub_codes:
payload["sub_projects"] = json.dumps(_validate_project_links(db, sub_codes, payload["project_code"]))
else:
payload["sub_projects"] = None
# Validate and serialize related_projects
related_codes = payload.get("related_projects")
if related_codes:
payload["related_projects"] = json.dumps(_validate_project_links(db, related_codes, payload["project_code"]))
else:
payload["related_projects"] = None
db_project = models.Project(**payload)
db.add(db_project)
db.commit()
db.refresh(db_project)
# Auto-add creator as admin member
admin_role = db.query(Role).filter(Role.name == "admin").first()
db_member = models.ProjectMember(project_id=db_project.id, user_id=project.owner_id, role_id=admin_role.id if admin_role else None)
db.add(db_member)
db.commit()
return db_project
@router.get("", response_model=List[schemas.ProjectResponse])
def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(models.Project).offset(skip).limit(limit).all()
def _find_project_by_id_or_code(db, identifier) -> models.Project | None:
"""Look up project by numeric id or project_code."""
try:
pid = int(identifier)
project = db.query(models.Project).filter(models.Project.id == pid).first()
if project:
return project
except (ValueError, TypeError):
pass
return db.query(models.Project).filter(models.Project.project_code == str(identifier)).first()
@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(project_id: str, db: Session = Depends(get_db)):
return _resolve_project(db, project_id)
@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
project_id: str,
project_update: schemas.ProjectUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
ensure_can_edit_project(db, current_user.id, project)
update_data = project_update.model_dump(exclude_unset=True)
update_data.pop("name", None)
if "sub_projects" in update_data and update_data["sub_projects"]:
update_data["sub_projects"] = json.dumps(update_data["sub_projects"])
elif "sub_projects" in update_data:
update_data["sub_projects"] = None
if "related_projects" in update_data and update_data["related_projects"]:
update_data["related_projects"] = json.dumps(update_data["related_projects"])
elif "related_projects" in update_data:
update_data["related_projects"] = None
for field, value in update_data.items():
setattr(project, field, value)
db.commit()
db.refresh(project)
return project
@router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_project(
project_id: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
check_project_role(db, current_user.id, project.id, min_role="admin")
project_code = project.project_code
project_id_val = project.id
# Delete milestones and their tasks
from app.models.milestone import Milestone
from app.models.task import Task
milestones = db.query(Milestone).filter(Milestone.project_id == project_id_val).all()
for ms in milestones:
tasks = db.query(Task).filter(Task.milestone_id == ms.id).all()
for task in tasks:
db.delete(task)
db.delete(ms)
# Delete project members
members = db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project.id).all()
for m in members:
db.delete(m)
# Remove from other projects' sub_projects and related_projects
import json
all_projects = db.query(models.Project).all()
for p in all_projects:
if p.sub_projects and project_code in p.sub_projects:
subs = json.loads(p.sub_projects) if p.sub_projects else []
subs = [s for s in subs if s != project_code]
p.sub_projects = json.dumps(subs) if subs else None
if p.related_projects and project_code in p.related_projects:
related = json.loads(p.related_projects) if p.related_projects else []
related = [r for r in related if r != project_code]
p.related_projects = json.dumps(related) if related else None
db.delete(project)
db.commit()
return None
# ---- Members ----
@router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
def add_project_member(
project_id: str,
member: schemas.ProjectMemberCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
check_project_role(db, current_user.id, project.id, min_role="mgr")
user = db.query(models.User).filter(models.User.id == member.user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
existing = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project.id, models.ProjectMember.user_id == member.user_id
).first()
if existing:
raise HTTPException(status_code=400, detail="User already a member")
# Convert role name to role_id
role = db.query(Role).filter(Role.name == member.role).first()
role_id = role.id if role else None
db_member = models.ProjectMember(project_id=project.id, user_id=member.user_id, role_id=role_id)
db.add(db_member)
db.commit()
db.refresh(db_member)
role_name = "developer"
if db_member.role_id:
role = db.query(Role).filter(Role.id == db_member.role_id).first()
if role:
role_name = role.name
return {
"id": db_member.id,
"user_id": db_member.user_id,
"project_id": db_member.project_id,
"role": role_name
}
@router.get("/{project_id}/members", response_model=List[schemas.ProjectMemberResponse])
def list_project_members(project_id: str, db: Session = Depends(get_db)):
project = _resolve_project(db, project_id)
members = db.query(models.ProjectMember).filter(models.ProjectMember.project_id == project.id).all()
result = []
for m in members:
role_name = "developer"
if m.role_id:
role = db.query(Role).filter(Role.id == m.role_id).first()
if role:
role_name = role.name
user = db.query(models.User).filter(models.User.id == m.user_id).first()
result.append({
"id": m.id,
"user_id": m.user_id,
"username": user.username if user else None,
"full_name": user.full_name if user else None,
"project_id": m.project_id,
"role": role_name
})
return result
@router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_project_member(
project_id: str,
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
project = _resolve_project(db, project_id)
check_permission(db, current_user.id, project.id, "member.remove")
member = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project.id, models.ProjectMember.user_id == user_id
).first()
# Prevent removing project owner (admin role)
if member.role_id:
role = db.query(Role).filter(Role.id == member.role_id).first()
if role and role.name == "admin":
# Check if this is the only admin
admin_count = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id,
models.ProjectMember.role_id == member.role_id
).count()
if admin_count <= 1:
raise HTTPException(status_code=400, detail="Cannot remove the last owner of the project")
if not member:
raise HTTPException(status_code=404, detail="Member not found")
db.delete(member)
db.commit()
return None
# ---- Worklog summary ----
from app.models.worklog import WorkLog
from sqlalchemy import func as sqlfunc
@router.get("/{project_id}/worklogs/summary")
def project_worklog_summary(project_id: str, db: Session = Depends(get_db)):
from app.models.task import Task as TaskModel
project = _resolve_project(db, project_id)
resolved_project_id = project.id
results = db.query(
models.User.id, models.User.username,
sqlfunc.sum(WorkLog.hours).label("total_hours"),
sqlfunc.count(WorkLog.id).label("log_count")
).join(WorkLog, WorkLog.user_id == models.User.id)\
.join(TaskModel, WorkLog.task_id == TaskModel.id)\
.filter(TaskModel.project_id == resolved_project_id)\
.group_by(models.User.id, models.User.username).all()
total = sum(r.total_hours for r in results)
by_user = [{"user_id": r.id, "username": r.username, "hours": round(r.total_hours, 2), "logs": r.log_count} for r in results]
return {"project_id": resolved_project_id, "total_hours": round(total, 2), "by_user": by_user}