47 lines
1.5 KiB
Python
47 lines
1.5 KiB
Python
"""Role-based access control helpers."""
|
|
from functools import wraps
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from app.models.models import ProjectMember, User
|
|
|
|
|
|
# Role hierarchy: admin > mgr > dev > ops > viewer
|
|
ROLE_LEVELS = {
|
|
"admin": 50,
|
|
"mgr": 40,
|
|
"dev": 30,
|
|
"ops": 20,
|
|
"viewer": 10,
|
|
}
|
|
|
|
|
|
def get_member_role(db: Session, user_id: int, project_id: int) -> str | None:
|
|
"""Get user's role in a project. Returns None if not a member."""
|
|
member = db.query(ProjectMember).filter(
|
|
ProjectMember.user_id == user_id,
|
|
ProjectMember.project_id == project_id,
|
|
).first()
|
|
if member:
|
|
return member.role
|
|
# Check if user is global admin
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
return "admin"
|
|
return None
|
|
|
|
|
|
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "viewer"):
|
|
"""Raise 403 if user doesn't have the minimum required role in the project."""
|
|
role = get_member_role(db, user_id, project_id)
|
|
if role is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not a member of this project"
|
|
)
|
|
if ROLE_LEVELS.get(role, 0) < ROLE_LEVELS.get(min_role, 0):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Requires role '{min_role}' or higher, you have '{role}'"
|
|
)
|
|
return role
|