Files
HarborForge.Backend/app/api/routers/roles.py

215 lines
6.6 KiB
Python

"""Roles and Permissions API router."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from pydantic import BaseModel
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.models import models
from app.models.role_permission import Role, Permission, RolePermission
router = APIRouter(prefix="/roles", tags=["Roles"])
# Schemas
class PermissionResponse(BaseModel):
id: int
name: str
description: str | None
category: str
class Config:
from_attributes = True
class RoleResponse(BaseModel):
id: int
name: str
description: str | None
is_global: bool
permission_ids: List[int] = []
class Config:
from_attributes = True
class RoleDetailResponse(BaseModel):
id: int
name: str
description: str | None
is_global: bool
permissions: List[PermissionResponse] = []
class Config:
from_attributes = True
class RoleCreate(BaseModel):
name: str
description: str | None = None
is_global: bool = False
class RoleUpdate(BaseModel):
name: str | None = None
description: str | None = None
class PermissionAssign(BaseModel):
permission_ids: List[int]
@router.get("/permissions", response_model=List[PermissionResponse])
def list_permissions(db: Session = Depends(get_db)):
"""List all permissions."""
return db.query(Permission).all()
@router.get("", response_model=List[RoleResponse])
def list_roles(db: Session = Depends(get_db)):
"""List all roles."""
roles = db.query(Role).all()
result = []
for role in roles:
perm_ids = [rp.permission_id for rp in role.permissions]
result.append(RoleResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permission_ids=perm_ids
))
return result
@router.get("/{role_id}", response_model=RoleDetailResponse)
def get_role(role_id: int, db: Session = Depends(get_db)):
"""Get a role with its permissions."""
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
perms = []
for rp in role.permissions:
perms.append(PermissionResponse(
id=rp.permission.id,
name=rp.permission.name,
description=rp.permission.description,
category=rp.permission.category
))
return RoleDetailResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permissions=perms
)
@router.post("", response_model=RoleResponse, status_code=status.HTTP_201_CREATED)
def create_role(role: RoleCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Create a new role. Requires is_admin."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can create roles")
existing = db.query(Role).filter(Role.name == role.name).first()
if existing:
raise HTTPException(status_code=400, detail="Role already exists")
db_role = Role(**role.model_dump())
db.add(db_role)
db.commit()
db.refresh(db_role)
return RoleResponse(
id=db_role.id,
name=db_role.name,
description=db_role.description,
is_global=db_role.is_global,
permission_ids=[]
)
@router.patch("/{role_id}", response_model=RoleResponse)
def update_role(role_id: int, role: RoleUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Update a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can edit roles")
db_role = db.query(Role).filter(Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
for key, value in role.model_dump(exclude_unset=True).items():
setattr(db_role, key, value)
db.commit()
db.refresh(db_role)
perm_ids = [rp.permission_id for rp in db_role.permissions]
return RoleResponse(
id=db_role.id,
name=db_role.name,
description=db_role.description,
is_global=db_role.is_global,
permission_ids=perm_ids
)
@router.delete("/{role_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_role(role_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Delete a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can delete roles")
db_role = db.query(Role).filter(Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count()
if member_count > 0:
raise HTTPException(status_code=400, detail="Role is in use by members")
db.delete(db_role)
db.commit()
return None
@router.post("/{role_id}/permissions", response_model=RoleDetailResponse)
def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Assign permissions to a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can edit role permissions")
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()
for perm_id in perm_assign.permission_ids:
perm = db.query(Permission).filter(Permission.id == perm_id).first()
if perm:
rp = RolePermission(role_id=role_id, permission_id=perm_id)
db.add(rp)
db.commit()
db.refresh(role)
perms = []
for rp in role.permissions:
perms.append(PermissionResponse(
id=rp.permission.id,
name=rp.permission.name,
description=rp.permission.description,
category=rp.permission.category
))
return RoleDetailResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permissions=perms
)