Files
HarborForge.Backend/app/api/routers/roles.py
zhi 271d5140e6 feat(users): switch account management to single-role model
- add users.role_id for one global role per account
- seed protected account-manager role with account.create permission
- default new accounts to guest role
- block admin role assignment through user management
- allow account-manager permission to create accounts
2026-03-21 08:44:19 +00:00

231 lines
7.4 KiB
Python

"""Roles and Permissions API router."""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from pydantic import BaseModel
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey
from app.models import models
from app.models.role_permission import Role, Permission, RolePermission
router = APIRouter(prefix="/roles", tags=["Roles"])
# Schemas
class PermissionResponse(BaseModel):
id: int
name: str
description: str | None
category: str
class Config:
from_attributes = True
class RoleResponse(BaseModel):
id: int
name: str
description: str | None = None
is_global: bool | None = None
permission_ids: List[int] = []
class Config:
from_attributes = True
class RoleDetailResponse(BaseModel):
id: int
name: str
description: str | None = None
is_global: bool | None = None
permissions: List[PermissionResponse] = []
class Config:
from_attributes = True
class RoleCreate(BaseModel):
name: str
description: str | None = None
is_global: bool = False
class RoleUpdate(BaseModel):
name: str | None = None
description: str | None = None
class PermissionAssign(BaseModel):
permission_ids: List[int]
@router.get("/permissions", response_model=List[PermissionResponse])
def list_permissions(db: Session = Depends(get_db)):
"""List all permissions."""
return db.query(Permission).all()
@router.get("", response_model=List[RoleResponse])
def list_roles(db: Session = Depends(get_db)):
"""List all roles."""
roles = db.query(Role).all()
result = []
for role in roles:
perm_ids = [rp.permission_id for rp in role.permissions]
result.append(RoleResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permission_ids=perm_ids
))
return result
@router.get("/{role_id}", response_model=RoleDetailResponse)
def get_role(role_id: int, db: Session = Depends(get_db)):
"""Get a role with its permissions."""
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
perms = []
for rp in role.permissions:
perms.append(PermissionResponse(
id=rp.permission.id,
name=rp.permission.name,
description=rp.permission.description,
category=rp.permission.category
))
return RoleDetailResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permissions=perms
)
@router.post("", response_model=RoleResponse, status_code=status.HTTP_201_CREATED)
def create_role(role: RoleCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Create a new role. Requires is_admin."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can create roles")
existing = db.query(Role).filter(Role.name == role.name).first()
if existing:
raise HTTPException(status_code=400, detail="Role already exists")
db_role = Role(**role.model_dump())
db.add(db_role)
db.commit()
db.refresh(db_role)
return RoleResponse(
id=db_role.id,
name=db_role.name,
description=db_role.description,
is_global=db_role.is_global,
permission_ids=[]
)
@router.patch("/{role_id}", response_model=RoleResponse)
def update_role(role_id: int, role: RoleUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Update a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can edit roles")
db_role = db.query(Role).filter(Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent modifying the admin role
if db_role.name == "admin":
raise HTTPException(status_code=403, detail="Cannot modify the admin role")
for key, value in role.model_dump(exclude_unset=True).items():
setattr(db_role, key, value)
db.commit()
db.refresh(db_role)
perm_ids = [rp.permission_id for rp in db_role.permissions]
return RoleResponse(
id=db_role.id,
name=db_role.name,
description=db_role.description,
is_global=db_role.is_global,
permission_ids=perm_ids
)
@router.delete("/{role_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_role(role_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Delete a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can delete roles")
db_role = db.query(Role).filter(Role.id == role_id).first()
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent deleting protected default roles
if db_role.name in ("admin", "guest", "account-manager"):
raise HTTPException(status_code=403, detail=f"Cannot delete the '{db_role.name}' role")
member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count()
account_count = db.query(models.User).filter(models.User.role_id == role_id).count()
if member_count > 0 or account_count > 0:
raise HTTPException(status_code=400, detail="Role is in use and cannot be deleted")
# Delete role permissions first
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()
db.delete(db_role)
db.commit()
return None
@router.post("/{role_id}/permissions", response_model=RoleDetailResponse)
def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
"""Assign permissions to a role."""
if not getattr(current_user, 'is_admin', False):
raise HTTPException(status_code=403, detail="Only admins can edit role permissions")
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent modifying permissions of protected system roles
if role.name in ("admin", "account-manager"):
raise HTTPException(status_code=403, detail=f"Cannot modify permissions of the {role.name} role")
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()
for perm_id in perm_assign.permission_ids:
perm = db.query(Permission).filter(Permission.id == perm_id).first()
if perm:
rp = RolePermission(role_id=role_id, permission_id=perm_id)
db.add(rp)
db.commit()
db.refresh(role)
perms = []
for rp in role.permissions:
perms.append(PermissionResponse(
id=rp.permission.id,
name=rp.permission.name,
description=rp.permission.description,
category=rp.permission.category
))
return RoleDetailResponse(
id=role.id,
name=role.name,
description=role.description,
is_global=role.is_global,
permissions=perms
)