163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
"""Role-based access control helpers - using configurable permissions."""
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from app.models import models
|
|
from app.models.role_permission import Role, Permission, RolePermission
|
|
from app.models.milestone import Milestone
|
|
from app.models.task import Task
|
|
|
|
|
|
def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None:
|
|
"""Get user's role in a project."""
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
|
|
if member and member.role_id:
|
|
return db.query(Role).filter(Role.id == member.role_id).first()
|
|
|
|
# Check global admin
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
# Return global admin role (name="admin")
|
|
return db.query(Role).filter(Role.is_global == True, Role.name == "admin").first()
|
|
|
|
return None
|
|
|
|
|
|
def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool:
|
|
"""Check if user has a specific permission in a project."""
|
|
role = get_user_role(db, user_id, project_id)
|
|
|
|
if not role:
|
|
return False
|
|
|
|
# Check if role has the permission
|
|
perm = db.query(Permission).filter(Permission.name == permission).first()
|
|
if not perm:
|
|
return False
|
|
|
|
role_perm = db.query(RolePermission).filter(
|
|
RolePermission.role_id == role.id,
|
|
RolePermission.permission_id == perm.id
|
|
).first()
|
|
|
|
return role_perm is not None
|
|
|
|
|
|
def check_permission(db: Session, user_id: int, project_id: int, permission: str):
|
|
"""Raise 403 if user doesn't have the permission."""
|
|
if not has_permission(db, user_id, project_id, permission):
|
|
role = get_user_role(db, user_id, project_id)
|
|
role_name = role.name if role else "none"
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Permission '{permission}' required. Your role: {role_name}"
|
|
)
|
|
|
|
|
|
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"):
|
|
"""Check if user has at least the specified role in a project."""
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
return True
|
|
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
|
|
if not member or not member.role_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You are not a member of this project"
|
|
)
|
|
|
|
role = db.query(Role).filter(Role.id == member.role_id).first()
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Role not found"
|
|
)
|
|
|
|
# Legacy compatibility: most current routes use non-hierarchical names like dev/mgr.
|
|
# For now, any valid membership passes those broad checks; strict edit rules are handled
|
|
# by the explicit can_edit_* helpers below.
|
|
if min_role in {"dev", "mgr", "viewer", "member", "guest", "admin"}:
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
def get_project_role_name(db: Session, user_id: int, project_id: int) -> str | None:
|
|
if is_global_admin(db, user_id):
|
|
return "admin"
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
if not member or not member.role_id:
|
|
return None
|
|
role = db.query(Role).filter(Role.id == member.role_id).first()
|
|
return role.name if role else None
|
|
|
|
|
|
def is_global_admin(db: Session, user_id: int) -> bool:
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
return bool(user and user.is_admin)
|
|
|
|
|
|
def has_project_admin_role(db: Session, user_id: int, project_id: int) -> bool:
|
|
return get_project_role_name(db, user_id, project_id) == "admin"
|
|
|
|
|
|
def can_edit_project(db: Session, user_id: int, project: models.Project) -> bool:
|
|
return (
|
|
is_global_admin(db, user_id)
|
|
or project.owner_id == user_id
|
|
or has_project_admin_role(db, user_id, project.id)
|
|
)
|
|
|
|
|
|
def ensure_can_edit_project(db: Session, user_id: int, project: models.Project):
|
|
if not can_edit_project(db, user_id, project):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Project edit permission denied")
|
|
|
|
|
|
def can_edit_milestone(db: Session, user_id: int, milestone: Milestone) -> bool:
|
|
project = db.query(models.Project).filter(models.Project.id == milestone.project_id).first()
|
|
if not project:
|
|
return False
|
|
return (
|
|
is_global_admin(db, user_id)
|
|
or project.owner_id == user_id
|
|
or milestone.created_by_id == user_id
|
|
or has_project_admin_role(db, user_id, milestone.project_id)
|
|
)
|
|
|
|
|
|
def ensure_can_edit_milestone(db: Session, user_id: int, milestone: Milestone):
|
|
if not can_edit_milestone(db, user_id, milestone):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Milestone edit permission denied")
|
|
|
|
|
|
def can_edit_task(db: Session, user_id: int, task: Task) -> bool:
|
|
project = db.query(models.Project).filter(models.Project.id == task.project_id).first()
|
|
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
|
|
if not project:
|
|
return False
|
|
return (
|
|
is_global_admin(db, user_id)
|
|
or project.owner_id == user_id
|
|
or task.created_by_id == user_id
|
|
or (milestone is not None and milestone.created_by_id == user_id)
|
|
or has_project_admin_role(db, user_id, task.project_id)
|
|
)
|
|
|
|
|
|
def ensure_can_edit_task(db: Session, user_id: int, task: Task):
|
|
if not can_edit_task(db, user_id, task):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Task edit permission denied")
|
|
|