diff --git a/app/api/rbac.py b/app/api/rbac.py index 501acef..3087c02 100644 --- a/app/api/rbac.py +++ b/app/api/rbac.py @@ -1,46 +1,72 @@ -"""Role-based access control helpers.""" -from functools import wraps +"""Role-based access control helpers - using configurable permissions.""" from fastapi import HTTPException, status from sqlalchemy.orm import Session -from app.models.models import ProjectMember, User +from app.models import models +from app.models.role_permission import Role, Permission, RolePermission +from app.models import models -# Role hierarchy: admin > mgr > dev > ops > viewer -ROLE_LEVELS = { - "admin": 50, - "mgr": 40, - "dev": 30, - "ops": 20, - "viewer": 10, -} - - -def get_member_role(db: Session, user_id: int, project_id: int) -> str | None: - """Get user's role in a project. Returns None if not a member.""" +def get_user_role(db: Session, user_id: int, project_id: int) -> Role | None: + """Get user's role in a project.""" member = db.query(ProjectMember).filter( ProjectMember.user_id == user_id, ProjectMember.project_id == project_id, ).first() - if member: - return member.role - # Check if user is global admin - user = db.query(User).filter(User.id == user_id).first() + + if member and member.role_id: + return db.query(Role).filter(Role.id == member.role_id).first() + + # Check global admin + user = db.query(models.User).filter(models.User.id == user_id).first() if user and user.is_admin: - return "admin" + # Return global admin role + return db.query(Role).filter(Role.is_global == True, Role.name == "superadmin").first() + return None +def has_permission(db: Session, user_id: int, project_id: int, permission: str) -> bool: + """Check if user has a specific permission in a project.""" + role = get_user_role(db, user_id, project_id) + + if not role: + return False + + # Check if role has the permission + perm = db.query(Permission).filter(Permission.name == permission).first() + if not perm: + return False + + role_perm = db.query(RolePermission).filter( + RolePermission.role_id == role.id, + RolePermission.permission_id == perm.id + ).first() + + return role_perm is not None + + +def check_permission(db: Session, user_id: int, project_id: int, permission: str): + """Raise 403 if user doesn't have the permission.""" + if not has_permission(db, user_id, project_id, permission): + role = get_user_role(db, user_id, project_id) + role_name = role.name if role else "none" + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Permission '{permission}' required. Your role: {role_name}" + ) + + +# Keep old function for backward compatibility (deprecated) def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "viewer"): - """Raise 403 if user doesn't have the minimum required role in the project.""" - role = get_member_role(db, user_id, project_id) - if role is None: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Not a member of this project" - ) - if ROLE_LEVELS.get(role, 0) < ROLE_LEVELS.get(min_role, 0): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail=f"Requires role '{min_role}' or higher, you have '{role}'" - ) - return role + """Legacy function - maps old role names to new permission system.""" + # Map old roles to permissions + role_to_perm = { + "admin": "project.edit", + "mgr": "milestone.create", + "dev": "issue.create", + "ops": "issue.view", + "viewer": "project.view", + } + + perm = role_to_perm.get(min_role, "project.view") + check_permission(db, user_id, project_id, perm) diff --git a/app/api/routers/roles.py b/app/api/routers/roles.py new file mode 100644 index 0000000..130afd6 --- /dev/null +++ b/app/api/routers/roles.py @@ -0,0 +1,214 @@ +"""Roles and Permissions API router.""" +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session +from typing import List +from pydantic import BaseModel + +from app.core.config import get_db +from app.api.deps import get_current_user_or_apikey +from app.models import models +from app.models.role_permission import Role, Permission, RolePermission + +router = APIRouter(prefix="/roles", tags=["Roles"]) + + +# Schemas +class PermissionResponse(BaseModel): + id: int + name: str + description: str | None + category: str + + class Config: + from_attributes = True + + +class RoleResponse(BaseModel): + id: int + name: str + description: str | None + is_global: bool + permission_ids: List[int] = [] + + class Config: + from_attributes = True + + +class RoleDetailResponse(BaseModel): + id: int + name: str + description: str | None + is_global: bool + permissions: List[PermissionResponse] = [] + + class Config: + from_attributes = True + + +class RoleCreate(BaseModel): + name: str + description: str | None = None + is_global: bool = False + + +class RoleUpdate(BaseModel): + name: str | None = None + description: str | None = None + + +class PermissionAssign(BaseModel): + permission_ids: List[int] + + +@router.get("/permissions", response_model=List[PermissionResponse]) +def list_permissions(db: Session = Depends(get_db)): + """List all permissions.""" + return db.query(Permission).all() + + +@router.get("", response_model=List[RoleResponse]) +def list_roles(db: Session = Depends(get_db)): + """List all roles.""" + roles = db.query(Role).all() + result = [] + for role in roles: + perm_ids = [rp.permission_id for rp in role.permissions] + result.append(RoleResponse( + id=role.id, + name=role.name, + description=role.description, + is_global=role.is_global, + permission_ids=perm_ids + )) + return result + + +@router.get("/{role_id}", response_model=RoleDetailResponse) +def get_role(role_id: int, db: Session = Depends(get_db)): + """Get a role with its permissions.""" + role = db.query(Role).filter(Role.id == role_id).first() + if not role: + raise HTTPException(status_code=404, detail="Role not found") + + perms = [] + for rp in role.permissions: + perms.append(PermissionResponse( + id=rp.permission.id, + name=rp.permission.name, + description=rp.permission.description, + category=rp.permission.category + )) + + return RoleDetailResponse( + id=role.id, + name=role.name, + description=role.description, + is_global=role.is_global, + permissions=perms + ) + + +@router.post("", response_model=RoleResponse, status_code=status.HTTP_201_CREATED) +def create_role(role: RoleCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): + """Create a new role. Requires is_admin.""" + if not getattr(current_user, 'is_admin', False): + raise HTTPException(status_code=403, detail="Only admins can create roles") + + existing = db.query(Role).filter(Role.name == role.name).first() + if existing: + raise HTTPException(status_code=400, detail="Role already exists") + + db_role = Role(**role.model_dump()) + db.add(db_role) + db.commit() + db.refresh(db_role) + return RoleResponse( + id=db_role.id, + name=db_role.name, + description=db_role.description, + is_global=db_role.is_global, + permission_ids=[] + ) + + +@router.patch("/{role_id}", response_model=RoleResponse) +def update_role(role_id: int, role: RoleUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): + """Update a role.""" + if not getattr(current_user, 'is_admin', False): + raise HTTPException(status_code=403, detail="Only admins can edit roles") + + db_role = db.query(Role).filter(Role.id == role_id).first() + if not db_role: + raise HTTPException(status_code=404, detail="Role not found") + + for key, value in role.model_dump(exclude_unset=True).items(): + setattr(db_role, key, value) + db.commit() + db.refresh(db_role) + + perm_ids = [rp.permission_id for rp in db_role.permissions] + return RoleResponse( + id=db_role.id, + name=db_role.name, + description=db_role.description, + is_global=db_role.is_global, + permission_ids=perm_ids + ) + + +@router.delete("/{role_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_role(role_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): + """Delete a role.""" + if not getattr(current_user, 'is_admin', False): + raise HTTPException(status_code=403, detail="Only admins can delete roles") + + db_role = db.query(Role).filter(Role.id == role_id).first() + if not db_role: + raise HTTPException(status_code=404, detail="Role not found") + + member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count() + if member_count > 0: + raise HTTPException(status_code=400, detail="Role is in use by members") + + db.delete(db_role) + db.commit() + return None + + +@router.post("/{role_id}/permissions", response_model=RoleDetailResponse) +def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)): + """Assign permissions to a role.""" + if not getattr(current_user, 'is_admin', False): + raise HTTPException(status_code=403, detail="Only admins can edit role permissions") + + role = db.query(Role).filter(Role.id == role_id).first() + if not role: + raise HTTPException(status_code=404, detail="Role not found") + + db.query(RolePermission).filter(RolePermission.role_id == role_id).delete() + + for perm_id in perm_assign.permission_ids: + perm = db.query(Permission).filter(Permission.id == perm_id).first() + if perm: + rp = RolePermission(role_id=role_id, permission_id=perm_id) + db.add(rp) + + db.commit() + db.refresh(role) + + perms = [] + for rp in role.permissions: + perms.append(PermissionResponse( + id=rp.permission.id, + name=rp.permission.name, + description=rp.permission.description, + category=rp.permission.category + )) + + return RoleDetailResponse( + id=role.id, + name=role.name, + description=role.description, + is_global=role.is_global, + permissions=perms + ) diff --git a/app/main.py b/app/main.py index 3fa5488..d31feee 100644 --- a/app/main.py +++ b/app/main.py @@ -36,6 +36,7 @@ from app.api.routers.webhooks import router as webhooks_router from app.api.routers.misc import router as misc_router from app.api.routers.monitor import router as monitor_router from app.api.routers.milestones import router as milestones_router +from app.api.routers.roles import router as roles_router app.include_router(auth_router) app.include_router(issues_router) @@ -46,6 +47,7 @@ app.include_router(webhooks_router) app.include_router(misc_router) app.include_router(monitor_router) app.include_router(milestones_router) +app.include_router(roles_router) # Auto schema migration for lightweight deployments @@ -87,7 +89,7 @@ def _migrate_schema(): @app.on_event("startup") def startup(): from app.core.config import Base, engine, SessionLocal - from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor + from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor, role_permission Base.metadata.create_all(bind=engine) _migrate_schema() diff --git a/app/models/models.py b/app/models/models.py index 93a1e7c..545d499 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -2,6 +2,7 @@ from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Enum from sqlalchemy.orm import relationship from sqlalchemy.sql import func from app.core.config import Base +from app.models.role_permission import Role import enum @@ -131,7 +132,8 @@ class ProjectMember(Base): id = Column(Integer, primary_key=True, index=True) project_id = Column(Integer, ForeignKey("projects.id"), nullable=False) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) - role = Column(String(20), default="dev") # admin, dev, mgr, ops + role_id = Column(Integer, ForeignKey("roles.id"), nullable=False) + role = relationship("Role") project = relationship("Project", back_populates="members") user = relationship("User", back_populates="project_memberships") diff --git a/app/models/role_permission.py b/app/models/role_permission.py new file mode 100644 index 0000000..d92d39a --- /dev/null +++ b/app/models/role_permission.py @@ -0,0 +1,44 @@ +"""Role and Permission models.""" +from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean +from sqlalchemy.orm import relationship +from sqlalchemy.sql import func +from app.core.config import Base + + +class Role(Base): + """Role definition - configurable roles.""" + __tablename__ = "roles" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String(50), unique=True, nullable=False) + description = Column(String(255), nullable=True) + is_global = Column(Boolean, default=False) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column(DateTime(timezone=True), onupdate=func.now()) + + permissions = relationship("RolePermission", back_populates="role") + + +class Permission(Base): + """Permission definitions - granular permissions.""" + __tablename__ = "permissions" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String(100), unique=True, nullable=False) + description = Column(String(255), nullable=True) + category = Column(String(50), nullable=False) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + roles = relationship("RolePermission", back_populates="permission") + + +class RolePermission(Base): + """Maps roles to permissions.""" + __tablename__ = "role_permissions" + + id = Column(Integer, primary_key=True, index=True) + role_id = Column(Integer, ForeignKey("roles.id"), nullable=False) + permission_id = Column(Integer, ForeignKey("permissions.id"), nullable=False) + + role = relationship("Role", back_populates="permissions") + permission = relationship("Permission", back_populates="roles")