Verified locally end-to-end (before: exploitable, after: blocked). - config: refuse to start on weak/default/short SECRET_KEY (was trivially forgeable JWT -> full admin) - deps: add reusable require_admin dependency (JWT or API key) - api-keys: require admin to mint/list/revoke; mask key on list (was unauthenticated -> instant admin API key) - webhooks: whole router now admin-only (was fully unauthenticated CRUD + readable logs) - webhook delivery: validate URL scheme + reject hosts resolving to private/loopback/link-local/reserved IPs; disable redirects (was a readable SSRF primitive) - rbac: implement a real project-role hierarchy in check_project_role (was a no-op: any member, even guest, passed admin/mgr gates) - misc: auth on delete_milestone (+ensure_can_edit_milestone), worklog create/delete (force caller user_id, owner-only delete), /activity and /export/tasks (were unauthenticated data exposure) - tasks: auth + ensure_can_edit_task on assign_task and batch_assign Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
179 lines
6.4 KiB
Python
179 lines
6.4 KiB
Python
"""Role-based access control helpers - using configurable permissions."""
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from app.models import models
|
|
from app.models.role_permission import Role, Permission, RolePermission
|
|
from app.models.milestone import Milestone
|
|
from app.models.task import Task
|
|
|
|
|
|
def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None:
|
|
"""Get user's role in a project."""
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
|
|
if member and member.role_id:
|
|
return db.query(Role).filter(Role.id == member.role_id).first()
|
|
|
|
# Check global admin
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
# Return global admin role (name="admin")
|
|
return db.query(Role).filter(Role.is_global == True, Role.name == "admin").first()
|
|
|
|
return None
|
|
|
|
|
|
def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool:
|
|
"""Check if user has a specific permission in a project."""
|
|
role = get_user_role(db, user_id, project_id)
|
|
|
|
if not role:
|
|
return False
|
|
|
|
# Check if role has the permission
|
|
perm = db.query(Permission).filter(Permission.name == permission).first()
|
|
if not perm:
|
|
return False
|
|
|
|
role_perm = db.query(RolePermission).filter(
|
|
RolePermission.role_id == role.id,
|
|
RolePermission.permission_id == perm.id
|
|
).first()
|
|
|
|
return role_perm is not None
|
|
|
|
|
|
def check_permission(db: Session, user_id: int, project_id: int, permission: str):
|
|
"""Raise 403 if user doesn't have the permission."""
|
|
if not has_permission(db, user_id, project_id, permission):
|
|
role = get_user_role(db, user_id, project_id)
|
|
role_name = role.name if role else "none"
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Permission '{permission}' required. Your role: {role_name}"
|
|
)
|
|
|
|
|
|
def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"):
|
|
"""Check if user has at least the specified role in a project."""
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if user and user.is_admin:
|
|
return True
|
|
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
|
|
if not member or not member.role_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You are not a member of this project"
|
|
)
|
|
|
|
role = db.query(Role).filter(Role.id == member.role_id).first()
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Role not found"
|
|
)
|
|
|
|
# Enforce a real role hierarchy. Higher rank == more privilege.
|
|
_RANK = {
|
|
"guest": 0,
|
|
"viewer": 1,
|
|
"member": 2,
|
|
"dev": 3,
|
|
"mgr": 4,
|
|
"admin": 5,
|
|
}
|
|
role_rank = _RANK.get((role.name or "").lower())
|
|
required_rank = _RANK.get((min_role or "member").lower())
|
|
if role_rank is None or required_rank is None:
|
|
# Unknown role on either side -> deny by default.
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Insufficient project role (have '{role.name}', need '{min_role}')",
|
|
)
|
|
if role_rank < required_rank:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Insufficient project role (have '{role.name}', need '{min_role}')",
|
|
)
|
|
return True
|
|
|
|
|
|
def get_project_role_name(db: Session, user_id: int, project_id: int) -> str | None:
|
|
if is_global_admin(db, user_id):
|
|
return "admin"
|
|
member = db.query(models.ProjectMember).filter(
|
|
models.ProjectMember.user_id == user_id,
|
|
models.ProjectMember.project_id == project_id,
|
|
).first()
|
|
if not member or not member.role_id:
|
|
return None
|
|
role = db.query(Role).filter(Role.id == member.role_id).first()
|
|
return role.name if role else None
|
|
|
|
|
|
def is_global_admin(db: Session, user_id: int) -> bool:
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
return bool(user and user.is_admin)
|
|
|
|
|
|
def has_project_admin_role(db: Session, user_id: int, project_id: int) -> bool:
|
|
return get_project_role_name(db, user_id, project_id) == "admin"
|
|
|
|
|
|
def can_edit_project(db: Session, user_id: int, project: models.Project) -> bool:
|
|
return (
|
|
is_global_admin(db, user_id)
|
|
or project.owner_id == user_id
|
|
or has_project_admin_role(db, user_id, project.id)
|
|
)
|
|
|
|
|
|
def ensure_can_edit_project(db: Session, user_id: int, project: models.Project):
|
|
if not can_edit_project(db, user_id, project):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Project edit permission denied")
|
|
|
|
|
|
def can_edit_milestone(db: Session, user_id: int, milestone: Milestone) -> bool:
|
|
project = db.query(models.Project).filter(models.Project.id == milestone.project_id).first()
|
|
if not project:
|
|
return False
|
|
return (
|
|
is_global_admin(db, user_id)
|
|
or project.owner_id == user_id
|
|
or milestone.created_by_id == user_id
|
|
or has_project_admin_role(db, user_id, milestone.project_id)
|
|
)
|
|
|
|
|
|
def ensure_can_edit_milestone(db: Session, user_id: int, milestone: Milestone):
|
|
if not can_edit_milestone(db, user_id, milestone):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Milestone edit permission denied")
|
|
|
|
|
|
def can_edit_task(db: Session, user_id: int, task: Task) -> bool:
|
|
project = db.query(models.Project).filter(models.Project.id == task.project_id).first()
|
|
milestone = db.query(Milestone).filter(Milestone.id == task.milestone_id).first()
|
|
if not project:
|
|
return False
|
|
return (
|
|
is_global_admin(db, user_id)
|
|
or project.owner_id == user_id
|
|
or task.created_by_id == user_id
|
|
or (milestone is not None and milestone.created_by_id == user_id)
|
|
or has_project_admin_role(db, user_id, task.project_id)
|
|
)
|
|
|
|
|
|
def ensure_can_edit_task(db: Session, user_id: int, task: Task):
|
|
if not can_edit_task(db, user_id, task):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Task edit permission denied")
|
|
|