Files
HarborForge.Backend/app/api/routers/roles.py
river 61e3349ca4 feat: add role/permission system with tests support
- Add Role model with 17 default permissions
- Add init_wizard to create admin/guest roles on first startup
- Protect admin role from modification/deletion via API
- Fix MilestoneCreate schema (project_id optional)
- Fix delete role to clean up role_permissions first
- Add check_project_role RBAC function
2026-03-15 12:25:59 +00:00

230 lines
7.3 KiB
Python

"""Roles and Permissions API router."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from pydantic import BaseModel
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.models import models
from app.models.role_permission import Role, Permission, RolePermission
router = APIRouter(prefix="/roles", tags=["Roles"])
# Schemas
class PermissionResponse(BaseModel):
id: int
name: str
description: str | None
category: str
class Config:
from_attributes = True
class RoleResponse(BaseModel):
id: int
name: str
description: str | None = None
is_global: bool | None = None
permission_ids: List[int] = []
class Config:
from_attributes = True
class RoleDetailResponse(BaseModel):
id: int
name: str
description: str | None = None
is_global: bool | None = None
permissions: List[PermissionResponse] = []
class Config:
from_attributes = True
class RoleCreate(BaseModel):
name: str
description: str | None = None
is_global: bool = False
class RoleUpdate(BaseModel):
name: str | None = None
description: str | None = None
class PermissionAssign(BaseModel):
permission_ids: List[int]
@router.get("/permissions", response_model=List[PermissionResponse])
def list_permissions(db: Session = Depends(get_db)):
"""List all permissions."""
return db.query(Permission).all()
@router.get("", response_model=List[RoleResponse])
def list_roles(db: Session = Depends(get_db)):
"""List all roles."""
roles = db.query(Role).all()
result = []
for role in roles:
perm_ids = [rp.permission_id for rp in role.permissions]
result.append(RoleResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permission_ids=perm_ids
))
return result
@router.get("/{role_id}", response_model=RoleDetailResponse)
def get_role(role_id: int, db: Session = Depends(get_db)):
"""Get a role with its permissions."""
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
perms = []
for rp in role.permissions:
perms.append(PermissionResponse(
id=rp.permission.id,
name=rp.permission.name,
description=rp.permission.description,
category=rp.permission.category
))
return RoleDetailResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permissions=perms
)
@router.post("", response_model=RoleResponse, status_code=status.HTTP_201_CREATED)
def create_role(role: RoleCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Create a new role. Requires is_admin."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can create roles")
existing = db.query(Role).filter(Role.name == role.name).first()
if existing:
raise HTTPException(status_code=400, detail="Role already exists")
db_role = Role(**role.model_dump())
db.add(db_role)
db.commit()
db.refresh(db_role)
return RoleResponse(
id=db_role.id,
name=db_role.name,
description=db_role.description,
is_global=db_role.is_global,
permission_ids=[]
)
@router.patch("/{role_id}", response_model=RoleResponse)
def update_role(role_id: int, role: RoleUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Update a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can edit roles")
db_role = db.query(Role).filter(Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent modifying the admin role
if db_role.name == "admin":
raise HTTPException(status_code=403, detail="Cannot modify the admin role")
for key, value in role.model_dump(exclude_unset=True).items():
setattr(db_role, key, value)
db.commit()
db.refresh(db_role)
perm_ids = [rp.permission_id for rp in db_role.permissions]
return RoleResponse(
id=db_role.id,
name=db_role.name,
description=db_role.description,
is_global=db_role.is_global,
permission_ids=perm_ids
)
@router.delete("/{role_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_role(role_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Delete a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can delete roles")
db_role = db.query(Role).filter(Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent deleting the admin or guest role
if db_role.name in ("admin", "guest"):
raise HTTPException(status_code=403, detail=f"Cannot delete the '{db_role.name}' role")
member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count()
if member_count > 0:
raise HTTPException(status_code=400, detail="Role is in use by members")
# Delete role permissions first
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()
db.delete(db_role)
db.commit()
return None
@router.post("/{role_id}/permissions", response_model=RoleDetailResponse)
def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Assign permissions to a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can edit role permissions")
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent modifying permissions of the admin role
if role.name == "admin":
raise HTTPException(status_code=403, detail="Cannot modify permissions of the admin role")
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()
for perm_id in perm_assign.permission_ids:
perm = db.query(Permission).filter(Permission.id == perm_id).first()
if perm:
rp = RolePermission(role_id=role_id, permission_id=perm_id)
db.add(rp)
db.commit()
db.refresh(role)
perms = []
for rp in role.permissions:
perms.append(PermissionResponse(
id=rp.permission.id,
name=rp.permission.name,
description=rp.permission.description,
category=rp.permission.category
))
return RoleDetailResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permissions=perms
)