feat: RBAC module + project endpoints protected (admin/mgr roles)

This commit is contained in:
Zhi
2026-02-24 04:16:32 +00:00
parent 05f2e8706d
commit 6d58ee779c
2 changed files with 80 additions and 5 deletions

46
app/api/rbac.py Normal file
View File

@@ -0,0 +1,46 @@
"""Role-based access control helpers."""
from functools import wraps
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models.models import ProjectMember, User
# Role hierarchy: admin > mgr > dev > ops > viewer
ROLE_LEVELS = {
"admin": 50,
"mgr": 40,
"dev": 30,
"ops": 20,
"viewer": 10,
}
def get_member_role(db: Session, user_id: int, project_id: int) -> str | None:
"""Get user's role in a project. Returns None if not a member."""
member = db.query(ProjectMember).filter(
ProjectMember.user_id == user_id,
ProjectMember.project_id == project_id,
).first()
if member:
return member.role
# Check if user is global admin
user = db.query(User).filter(User.id == user_id).first()
if user and user.is_admin:
return "admin"
return None
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "viewer"):
"""Raise 403 if user doesn't have the minimum required role in the project."""
role = get_member_role(db, user_id, project_id)
if role is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not a member of this project"
)
if ROLE_LEVELS.get(role, 0) < ROLE_LEVELS.get(min_role, 0):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role '{min_role}' or higher, you have '{role}'"
)
return role

View File

@@ -1,4 +1,4 @@
"""Projects router.""" """Projects router with RBAC."""
from typing import List from typing import List
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@@ -6,6 +6,8 @@ from sqlalchemy.orm import Session
from app.core.config import get_db from app.core.config import get_db
from app.models import models from app.models import models
from app.schemas import schemas from app.schemas import schemas
from app.api.deps import get_current_user_or_apikey
from app.api.rbac import check_project_role
router = APIRouter(prefix="/projects", tags=["Projects"]) router = APIRouter(prefix="/projects", tags=["Projects"])
@@ -16,6 +18,10 @@ def create_project(project: schemas.ProjectCreate, db: Session = Depends(get_db)
db.add(db_project) db.add(db_project)
db.commit() db.commit()
db.refresh(db_project) db.refresh(db_project)
# Auto-add creator as admin member
db_member = models.ProjectMember(project_id=db_project.id, user_id=project.owner_id, role="admin")
db.add(db_member)
db.commit()
return db_project return db_project
@@ -33,7 +39,13 @@ def get_project(project_id: int, db: Session = Depends(get_db)):
@router.patch("/{project_id}", response_model=schemas.ProjectResponse) @router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(project_id: int, project_update: schemas.ProjectUpdate, db: Session = Depends(get_db)): def update_project(
project_id: int,
project_update: schemas.ProjectUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="mgr")
project = db.query(models.Project).filter(models.Project.id == project_id).first() project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project: if not project:
raise HTTPException(status_code=404, detail="Project not found") raise HTTPException(status_code=404, detail="Project not found")
@@ -45,7 +57,12 @@ def update_project(project_id: int, project_update: schemas.ProjectUpdate, db: S
@router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_project(project_id: int, db: Session = Depends(get_db)): def delete_project(
project_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="admin")
project = db.query(models.Project).filter(models.Project.id == project_id).first() project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project: if not project:
raise HTTPException(status_code=404, detail="Project not found") raise HTTPException(status_code=404, detail="Project not found")
@@ -57,7 +74,13 @@ def delete_project(project_id: int, db: Session = Depends(get_db)):
# ---- Members ---- # ---- Members ----
@router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED) @router.post("/{project_id}/members", response_model=schemas.ProjectMemberResponse, status_code=status.HTTP_201_CREATED)
def add_project_member(project_id: int, member: schemas.ProjectMemberCreate, db: Session = Depends(get_db)): def add_project_member(
project_id: int,
member: schemas.ProjectMemberCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="mgr")
project = db.query(models.Project).filter(models.Project.id == project_id).first() project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project: if not project:
raise HTTPException(status_code=404, detail="Project not found") raise HTTPException(status_code=404, detail="Project not found")
@@ -82,7 +105,13 @@ def list_project_members(project_id: int, db: Session = Depends(get_db)):
@router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT) @router.delete("/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def remove_project_member(project_id: int, user_id: int, db: Session = Depends(get_db)): def remove_project_member(
project_id: int,
user_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user_or_apikey),
):
check_project_role(db, current_user.id, project_id, min_role="admin")
member = db.query(models.ProjectMember).filter( member = db.query(models.ProjectMember).filter(
models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == user_id models.ProjectMember.project_id == project_id, models.ProjectMember.user_id == user_id
).first() ).first()