"""Role-based access control helpers - using configurable permissions.""" from fastapi import HTTPException, status from sqlalchemy.orm import Session from app.models import models from app.models.role_permission import Role, Permission, RolePermission from app.models.milestone import Milestone from app.models.task import Task def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None: """Get user's role in a project.""" member = db.query(models.ProjectMember).filter( models.ProjectMember.user_id == user_id, models.ProjectMember.project_id == project_id, ).first() if member and member.role_id: return db.query(Role).filter(Role.id == member.role_id).first() # Check global admin user = db.query(models.User).filter(models.User.id == user_id).first() if user and user.is_admin: # Return global admin role return db.query(Role).filter(Role.is_global == True, Role.name == "superadmin").first() return None def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool: """Check if user has a specific permission in a project.""" role = get_user_role(db, user_id, project_id) if not role: return False # Check if role has the permission perm = db.query(Permission).filter(Permission.name == permission).first() if not perm: return False role_perm = db.query(RolePermission).filter( RolePermission.role_id == role.id, RolePermission.permission_id == perm.id ).first() return role_perm is not None def check_permission(db: Session, user_id: int, project_id: int, permission: str): """Raise 403 if user doesn't have the permission.""" if not has_permission(db, user_id, project_id, permission): role = get_user_role(db, user_id, project_id) role_name = role.name if role else "none" raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission '{permission}' required. Your role: {role_name}" ) def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"): """Check if user has at least the specified role in a project.""" user = db.query(models.User).filter(models.User.id == user_id).first() if user and user.is_admin: return True member = db.query(models.ProjectMember).filter( models.ProjectMember.user_id == user_id, models.ProjectMember.project_id == project_id, ).first() if not member or not member.role_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You are not a member of this project" ) role = db.query(Role).filter(Role.id == member.role_id).first() if not role: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Role not found" ) # Legacy compatibility: most current routes use non-hierarchical names like dev/mgr. # For now, any valid membership passes those broad checks; strict edit rules are handled # by the explicit can_edit_* helpers below. if min_role in {"dev", "mgr", "viewer", "member", "guest", "admin"}: return True return True def get_project_role_name(db: Session, user_id: int, project_id: int) -> str | None: if is_global_admin(db, user_id): return "admin" member = db.query(models.ProjectMember).filter( models.ProjectMember.user_id == user_id, models.ProjectMember.project_id == project_id, ).first() if not member or not member.role_id: return None role = db.query(Role).filter(Role.id == member.role_id).first() return role.name if role else None def is_global_admin(db: Session, user_id: int) -> bool: user = db.query(models.User).filter(models.User.id == user_id).first() return bool(user and user.is_admin) def has_project_admin_role(db: Session, user_id: int, project_id: int) -> bool: return get_project_role_name(db, user_id, project_id) == "admin" def can_edit_project(db: Session, user_id: int, project: models.Project) -> bool: return ( is_global_admin(db, user_id) or project.owner_id == user_id or has_project_admin_role(db, user_id, project.id) ) def ensure_can_edit_project(db: Session, user_id: int, project: models.Project): if not can_edit_project(db, user_id, project): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Project edit permission denied") def can_edit_milestone(db: Session, user_id: int, milestone: Milestone) -> bool: project = db.query(models.Project).filter(models.Project.id == milestone.project_id).first() if not project: return False return ( is_global_admin(db, user_id) or project.owner_id == user_id or milestone.created_by_id == user_id or has_project_admin_role(db, user_id, milestone.project_id) ) def ensure_can_edit_milestone(db: Session, user_id: int, milestone: Milestone): if not can_edit_milestone(db, user_id, milestone): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Milestone edit permission denied") def can_edit_task(db: Session, user_id: int, task: Task) -> bool: project = db.query(models.Project).filter(models.Project.id == task.project_id).first() milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first() if not project: return False return ( is_global_admin(db, user_id) or project.owner_id == user_id or task.created_by_id == user_id or (milestone is not None and milestone.created_by_id == user_id) or has_project_admin_role(db, user_id, task.project_id) ) def ensure_can_edit_task(db: Session, user_id: int, task: Task): if not can_edit_task(db, user_id, task): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Task edit permission denied")