- Add Role model with 17 default permissions - Add init_wizard to create admin/guest roles on first startup - Protect admin role from modification/deletion via API - Fix MilestoneCreate schema (project_id optional) - Fix delete role to clean up role_permissions first - Add check_project_role RBAC function
99 lines
3.4 KiB
Python
99 lines
3.4 KiB
Python
"""Role-based access control helpers - using configurable permissions."""
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from app.models import models
|
|
from app.models.role_permission import Role, Permission, RolePermission
|
|
from app.models import models
|
|
|
|
|
|
def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None:
|
|
"""Get user's role in a project."""
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
|
|
if member and member.role_id:
|
|
return db.query(Role).filter(Role.id == member.role_id).first()
|
|
|
|
# Check global admin
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
# Return global admin role
|
|
return db.query(Role).filter(Role.is_global == True, Role.name == "superadmin").first()
|
|
|
|
return None
|
|
|
|
|
|
def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool:
|
|
"""Check if user has a specific permission in a project."""
|
|
role = get_user_role(db, user_id, project_id)
|
|
|
|
if not role:
|
|
return False
|
|
|
|
# Check if role has the permission
|
|
perm = db.query(Permission).filter(Permission.name == permission).first()
|
|
if not perm:
|
|
return False
|
|
|
|
role_perm = db.query(RolePermission).filter(
|
|
RolePermission.role_id == role.id,
|
|
RolePermission.permission_id == perm.id
|
|
).first()
|
|
|
|
return role_perm is not None
|
|
|
|
|
|
def check_permission(db: Session, user_id: int, project_id: int, permission: str):
|
|
"""Raise 403 if user doesn't have the permission."""
|
|
if not has_permission(db, user_id, project_id, permission):
|
|
role = get_user_role(db, user_id, project_id)
|
|
role_name = role.name if role else "none"
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Permission '{permission}' required. Your role: {role_name}"
|
|
)
|
|
|
|
|
|
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"):
|
|
"""Check if user has at least the specified role in a project."""
|
|
# Check if user is global admin
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
return True
|
|
|
|
# Get user's role in project
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
|
|
if not member or not member.role_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"You are not a member of this project"
|
|
)
|
|
|
|
role = db.query(Role).filter(Role.id == member.role_id).first()
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Role not found"
|
|
)
|
|
|
|
# Role hierarchy: admin > member > guest
|
|
role_hierarchy = {"admin": 3, "member": 2, "guest": 1}
|
|
user_role_level = role_hierarchy.get(role.name, 0)
|
|
required_level = role_hierarchy.get(min_role, 0)
|
|
|
|
if user_role_level < required_level:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Role '{min_role}' or higher required. Your role: {role.name}"
|
|
)
|
|
|
|
return True
|
|
|
|
|