fix: enforce calendar role permissions

This commit is contained in:
2026-04-04 14:35:42 +00:00
parent e9529e3cb0
commit 41bebc862b

View File

@@ -62,10 +62,52 @@ from app.services.slot_immutability import (
guard_plan_cancel_no_past_retroaction,
guard_plan_edit_no_past_retroaction,
)
from app.models.role_permission import Permission, RolePermission
router = APIRouter(prefix="/calendar", tags=["Calendar"])
def _has_global_permission(db: Session, user: User, permission_name: str) -> bool:
if user.is_admin:
return True
if not user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == permission_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
def _require_calendar_permission(db: Session, user: User, permission_name: str) -> User:
if _has_global_permission(db, user, permission_name):
return user
raise HTTPException(status_code=403, detail=f"Calendar permission '{permission_name}' required")
def require_calendar_read(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return _require_calendar_permission(db, current_user, "calendar.read")
def require_calendar_write(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return _require_calendar_permission(db, current_user, "calendar.write")
def require_calendar_manage(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return _require_calendar_permission(db, current_user, "calendar.manage")
# ---------------------------------------------------------------------------
# TimeSlot creation (BE-CAL-API-001)
# ---------------------------------------------------------------------------
@@ -101,7 +143,7 @@ def _slot_to_response(slot: TimeSlot) -> TimeSlotResponse:
def create_slot(
payload: TimeSlotCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Create a one-off calendar slot.
@@ -230,7 +272,7 @@ def _virtual_slot_to_item(vs: dict) -> CalendarSlotItem:
def get_calendar_day(
date: Optional[date_type] = Query(None, description="Target date (defaults to today)"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_read),
):
"""Return all calendar slots for the authenticated user on the given date.
@@ -301,7 +343,7 @@ def edit_real_slot(
slot_id: int,
payload: TimeSlotEdit,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Edit an existing real (materialized) slot.
@@ -380,7 +422,7 @@ def edit_virtual_slot(
virtual_id: str,
payload: TimeSlotEdit,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Edit a virtual (plan-generated) slot.
@@ -469,7 +511,7 @@ def edit_virtual_slot(
def cancel_real_slot(
slot_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Cancel an existing real (materialized) slot.
@@ -516,7 +558,7 @@ def cancel_real_slot(
def cancel_virtual_slot(
virtual_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Cancel a virtual (plan-generated) slot.
@@ -596,7 +638,7 @@ def _plan_to_response(plan: SchedulePlan) -> SchedulePlanResponse:
def create_plan(
payload: SchedulePlanCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Create a new recurring schedule plan.
@@ -632,7 +674,7 @@ def create_plan(
def list_plans(
include_inactive: bool = Query(False, description="Include cancelled/inactive plans"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_read),
):
"""Return all schedule plans for the authenticated user.
@@ -658,7 +700,7 @@ def list_plans(
def get_plan(
plan_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_read),
):
"""Return a single schedule plan owned by the authenticated user."""
plan = (
@@ -705,7 +747,7 @@ def edit_plan(
plan_id: int,
payload: SchedulePlanEdit,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Edit an existing schedule plan.
@@ -792,7 +834,7 @@ def edit_plan(
def cancel_plan(
plan_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_write),
):
"""Cancel (soft-delete) a schedule plan.
@@ -859,7 +901,7 @@ _DATE_LIST_EXCLUDED_STATUSES = {SlotStatus.SKIPPED.value, SlotStatus.ABORTED.val
)
def list_dates(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_read),
):
"""Return a sorted list of future dates that have at least one
materialized (real) slot.
@@ -897,7 +939,7 @@ def list_dates(
)
def get_my_workload_config(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_manage),
):
"""Return the workload thresholds for the authenticated user.
@@ -916,7 +958,7 @@ def get_my_workload_config(
def put_my_workload_config(
payload: MinimumWorkloadConfig,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_manage),
):
"""Full replacement of the workload configuration."""
row = replace_workload_config(db, current_user.id, payload)
@@ -933,7 +975,7 @@ def put_my_workload_config(
def patch_my_workload_config(
payload: MinimumWorkloadUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
current_user: User = Depends(require_calendar_manage),
):
"""Partial update — only the provided periods are overwritten."""
row = upsert_workload_config(db, current_user.id, payload)