Files
HarborForge.Backend/app/api/rbac.py

163 lines
5.9 KiB
Python

"""Role-based access control helpers - using configurable permissions."""
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models import models
from app.models.role_permission import Role, Permission, RolePermission
from app.models.milestone import Milestone
from app.models.task import Task
def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None:
"""Get user's role in a project."""
member = db.query(models.ProjectMember).filter(
models.ProjectMember.user_id == user_id,
models.ProjectMember.project_id == project_id,
).first()
if member and member.role_id:
return db.query(Role).filter(Role.id == member.role_id).first()
# Check global admin
user = db.query(models.User).filter(models.User.id == user_id).first()
if user and user.is_admin:
# Return global admin role (name="admin")
return db.query(Role).filter(Role.is_global == True, Role.name == "admin").first()
return None
def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool:
"""Check if user has a specific permission in a project."""
role = get_user_role(db, user_id, project_id)
if not role:
return False
# Check if role has the permission
perm = db.query(Permission).filter(Permission.name == permission).first()
if not perm:
return False
role_perm = db.query(RolePermission).filter(
RolePermission.role_id == role.id,
RolePermission.permission_id == perm.id
).first()
return role_perm is not None
def check_permission(db: Session, user_id: int, project_id: int, permission: str):
"""Raise 403 if user doesn't have the permission."""
if not has_permission(db, user_id, project_id, permission):
role = get_user_role(db, user_id, project_id)
role_name = role.name if role else "none"
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required. Your role: {role_name}"
)
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"):
"""Check if user has at least the specified role in a project."""
user = db.query(models.User).filter(models.User.id == user_id).first()
if user and user.is_admin:
return True
member = db.query(models.ProjectMember).filter(
models.ProjectMember.user_id == user_id,
models.ProjectMember.project_id == project_id,
).first()
if not member or not member.role_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not a member of this project"
)
role = db.query(Role).filter(Role.id == member.role_id).first()
if not role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Role not found"
)
# Legacy compatibility: most current routes use non-hierarchical names like dev/mgr.
# For now, any valid membership passes those broad checks; strict edit rules are handled
# by the explicit can_edit_* helpers below.
if min_role in {"dev", "mgr", "viewer", "member", "guest", "admin"}:
return True
return True
def get_project_role_name(db: Session, user_id: int, project_id: int) -> str | None:
if is_global_admin(db, user_id):
return "admin"
member = db.query(models.ProjectMember).filter(
models.ProjectMember.user_id == user_id,
models.ProjectMember.project_id == project_id,
).first()
if not member or not member.role_id:
return None
role = db.query(Role).filter(Role.id == member.role_id).first()
return role.name if role else None
def is_global_admin(db: Session, user_id: int) -> bool:
user = db.query(models.User).filter(models.User.id == user_id).first()
return bool(user and user.is_admin)
def has_project_admin_role(db: Session, user_id: int, project_id: int) -> bool:
return get_project_role_name(db, user_id, project_id) == "admin"
def can_edit_project(db: Session, user_id: int, project: models.Project) -> bool:
return (
is_global_admin(db, user_id)
or project.owner_id == user_id
or has_project_admin_role(db, user_id, project.id)
)
def ensure_can_edit_project(db: Session, user_id: int, project: models.Project):
if not can_edit_project(db, user_id, project):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Project edit permission denied")
def can_edit_milestone(db: Session, user_id: int, milestone: Milestone) -> bool:
project = db.query(models.Project).filter(models.Project.id == milestone.project_id).first()
if not project:
return False
return (
is_global_admin(db, user_id)
or project.owner_id == user_id
or milestone.created_by_id == user_id
or has_project_admin_role(db, user_id, milestone.project_id)
)
def ensure_can_edit_milestone(db: Session, user_id: int, milestone: Milestone):
if not can_edit_milestone(db, user_id, milestone):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Milestone edit permission denied")
def can_edit_task(db: Session, user_id: int, task: Task) -> bool:
project = db.query(models.Project).filter(models.Project.id == task.project_id).first()
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
if not project:
return False
return (
is_global_admin(db, user_id)
or project.owner_id == user_id
or task.created_by_id == user_id
or (milestone is not None and milestone.created_by_id == user_id)
or has_project_admin_role(db, user_id, task.project_id)
)
def ensure_can_edit_task(db: Session, user_id: int, task: Task):
if not can_edit_task(db, user_id, task):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Task edit permission denied")