From 6d58ee779c35539f137523b0e33a4b20fcf84307 Mon Sep 17 00:00:00 2001 From: Zhi Date: Tue, 24 Feb 2026 04:16:32 +0000 Subject: [PATCH] feat: RBAC module + project endpoints protected (admin/mgr roles) --- app/api/rbac.py | 46 +++++++++++++++++++++++++++++++++++++ app/api/routers/projects.py | 39 +++++++++++++++++++++++++++---- 2 files changed, 80 insertions(+), 5 deletions(-) create mode 100644 app/api/rbac.py diff --git a/app/api/rbac.py b/app/api/rbac.py new file mode 100644 index 0000000..501acef --- /dev/null +++ b/app/api/rbac.py @@ -0,0 +1,46 @@ +"""Role-based access control helpers.""" +from functools import wraps +from fastapi import HTTPException, status +from sqlalchemy.orm import Session +from app.models.models import ProjectMember, User + + +# Role hierarchy: admin > mgr > dev > ops > viewer +ROLE_LEVELS = { + "admin": 50, + "mgr": 40, + "dev": 30, + "ops": 20, + "viewer": 10, +} + + +def get_member_role(db: Session, user_id: int, project_id: int) -> str | None: + """Get user's role in a project. Returns None if not a member.""" + member = db.query(ProjectMember).filter( + ProjectMember.user_id == user_id, + ProjectMember.project_id == project_id, + ).first() + if member: + return member.role + # Check if user is global admin + user = db.query(User).filter(User.id == user_id).first() + if user and user.is_admin: + return "admin" + return None + + +def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "viewer"): + """Raise 403 if user doesn't have the minimum required role in the project.""" + role = get_member_role(db, user_id, project_id) + if role is None: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Not a member of this project" + ) + if ROLE_LEVELS.get(role, 0) < ROLE_LEVELS.get(min_role, 0): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Requires role '{min_role}' or higher, you have '{role}'" + ) + return role diff --git a/app/api/routers/projects.py b/app/api/routers/projects.py index 75eab8f..4ca9ab1 100644 --- a/app/api/routers/projects.py +++ b/app/api/routers/projects.py @@ -1,4 +1,4 @@ -"""Projects router.""" +"""Projects router with RBAC.""" from typing import List from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session @@ -6,6 +6,8 @@ from sqlalchemy.orm import Session from app.core.config import get_db from app.models import models from app.schemas import schemas +from app.api.deps import get_current_user_or_apikey +from app.api.rbac import check_project_role router = APIRouter(prefix="/projects", tags=["Projects"]) @@ -16,6 +18,10 @@ def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db) db.add(db_project) db.commit() db.refresh(db_project) + # Auto-add creator as admin member + db_member = models.ProjectMember(project_id=db_project.id, user_id=project.owner_id, role="admin") + db.add(db_member) + db.commit() return db_project @@ -33,7 +39,13 @@ def get_project(project_id: int, db: Session = Depends(get_db)): @router.patch("/{project_id}", response_model=schemas.ProjectResponse) -def update_project(project_id: int, project_update: schemas.ProjectUpdate, db: Session = Depends(get_db)): +def update_project( + project_id: int, + project_update: schemas.ProjectUpdate, + db: Session = Depends(get_db), + current_user: models.User = Depends(get_current_user_or_apikey), +): + check_project_role(db, current_user.id, project_id, min_role="mgr") project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") @@ -45,7 +57,12 @@ def update_project(project_id: int, project_update: schemas.ProjectUpdate, db: S @router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT) -def delete_project(project_id: int, db: Session = Depends(get_db)): +def delete_project( + project_id: int, + db: Session = Depends(get_db), + current_user: models.User = Depends(get_current_user_or_apikey), +): + check_project_role(db, current_user.id, project_id, min_role="admin") project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") @@ -57,7 +74,13 @@ def delete_project(project_id: int, db: Session = Depends(get_db)): # ---- Members ---- @router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED) -def add_project_member(project_id: int, member: schemas.ProjectMemberCreate, db: Session = Depends(get_db)): +def add_project_member( + project_id: int, + member: schemas.ProjectMemberCreate, + db: Session = Depends(get_db), + current_user: models.User = Depends(get_current_user_or_apikey), +): + check_project_role(db, current_user.id, project_id, min_role="mgr") project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise HTTPException(status_code=404, detail="Project not found") @@ -82,7 +105,13 @@ def list_project_members(project_id: int, db: Session = Depends(get_db)): @router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT) -def remove_project_member(project_id: int, user_id: int, db: Session = Depends(get_db)): +def remove_project_member( + project_id: int, + user_id: int, + db: Session = Depends(get_db), + current_user: models.User = Depends(get_current_user_or_apikey), +): + check_project_role(db, current_user.id, project_id, min_role="admin") member = db.query(models.ProjectMember).filter( models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == user_id ).first()