From 61e3349ca4de4a60d1c5981606579b9b0820ce2b Mon Sep 17 00:00:00 2001 From: river Date: Sun, 15 Mar 2026 12:25:59 +0000 Subject: [PATCH] feat: add role/permission system with tests support - Add Role model with 17 default permissions - Add init_wizard to create admin/guest roles on first startup - Protect admin role from modification/deletion via API - Fix MilestoneCreate schema (project_id optional) - Fix delete role to clean up role_permissions first - Add check_project_role RBAC function --- app/api/rbac.py | 52 ++++++++++++---- app/api/routers/milestones.py | 2 + app/api/routers/roles.py | 23 +++++-- app/init_wizard.py | 111 ++++++++++++++++++++++++++++++++++ app/schemas/schemas.py | 2 +- 5 files changed, 172 insertions(+), 18 deletions(-) diff --git a/app/api/rbac.py b/app/api/rbac.py index 49793a2..4fa0c53 100644 --- a/app/api/rbac.py +++ b/app/api/rbac.py @@ -56,17 +56,43 @@ def check_permission(db: Session, user_id: int, project_id: int, permission: str ) -# Keep old function for backward compatibility (deprecated) -def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "viewer"): - """Legacy function - maps old role names to new permission system.""" - # Map old roles to permissions - role_to_perm = { - "admin": "project.edit", - "mgr": "milestone.create", - "dev": "issue.create", - "ops": "issue.view", - "viewer": "project.view", - } +def check_project_role(db: Session, user_id: int, project_id: int, min_role: str = "member"): + """Check if user has at least the specified role in a project.""" + # Check if user is global admin + user = db.query(models.User).filter(models.User.id == user_id).first() + if user and user.is_admin: + return True - perm = role_to_perm.get(min_role, "project.view") - check_permission(db, user_id, project_id, perm) + # Get user's role in project + member = db.query(models.ProjectMember).filter( + models.ProjectMember.user_id == user_id, + models.ProjectMember.project_id == project_id, + ).first() + + if not member or not member.role_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"You are not a member of this project" + ) + + role = db.query(Role).filter(Role.id == member.role_id).first() + if not role: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Role not found" + ) + + # Role hierarchy: admin > member > guest + role_hierarchy = {"admin": 3, "member": 2, "guest": 1} + user_role_level = role_hierarchy.get(role.name, 0) + required_level = role_hierarchy.get(min_role, 0) + + if user_role_level < required_level: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Role '{min_role}' or higher required. Your role: {role.name}" + ) + + return True + + diff --git a/app/api/routers/milestones.py b/app/api/routers/milestones.py index 0dee706..44ead20 100644 --- a/app/api/routers/milestones.py +++ b/app/api/routers/milestones.py @@ -54,6 +54,8 @@ def create_milestone(project_id: int, milestone: schemas.MilestoneCreate, db: Se milestone_code = f"{project_code}:{next_num:05x}" data = milestone.model_dump() + # Remove project_id from data if present (it's already in the URL path) + data.pop('project_id', None) # Handle JSON fields if data.get("depend_on_milestones"): data["depend_on_milestones"] = json.dumps(data["depend_on_milestones"]) diff --git a/app/api/routers/roles.py b/app/api/routers/roles.py index 130afd6..8a6f419 100644 --- a/app/api/routers/roles.py +++ b/app/api/routers/roles.py @@ -26,8 +26,8 @@ class PermissionResponse(BaseModel): class RoleResponse(BaseModel): id: int name: str - description: str | None - is_global: bool + description: str | None = None + is_global: bool | None = None permission_ids: List[int] = [] class Config: @@ -37,8 +37,8 @@ class RoleResponse(BaseModel): class RoleDetailResponse(BaseModel): id: int name: str - description: str | None - is_global: bool + description: str | None = None + is_global: bool | None = None permissions: List[PermissionResponse] = [] class Config: @@ -141,6 +141,10 @@ def update_role(role_id: int, role: RoleUpdate, db: Session = Depends(get_db), c if not db_role: raise HTTPException(status_code=404, detail="Role not found") + # Prevent modifying the admin role + if db_role.name == "admin": + raise HTTPException(status_code=403, detail="Cannot modify the admin role") + for key, value in role.model_dump(exclude_unset=True).items(): setattr(db_role, key, value) db.commit() @@ -166,10 +170,17 @@ def delete_role(role_id: int, db: Session = Depends(get_db), current_user: model if not db_role: raise HTTPException(status_code=404, detail="Role not found") + # Prevent deleting the admin or guest role + if db_role.name in ("admin", "guest"): + raise HTTPException(status_code=403, detail=f"Cannot delete the '{db_role.name}' role") + member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count() if member_count > 0: raise HTTPException(status_code=400, detail="Role is in use by members") + # Delete role permissions first + db.query(RolePermission).filter(RolePermission.role_id == role_id).delete() + db.delete(db_role) db.commit() return None @@ -185,6 +196,10 @@ def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session if not role: raise HTTPException(status_code=404, detail="Role not found") + # Prevent modifying permissions of the admin role + if role.name == "admin": + raise HTTPException(status_code=403, detail="Cannot modify permissions of the admin role") + db.query(RolePermission).filter(RolePermission.role_id == role_id).delete() for perm_id in perm_assign.permission_ids: diff --git a/app/init_wizard.py b/app/init_wizard.py index e05e8f3..2e2e5fd 100644 --- a/app/init_wizard.py +++ b/app/init_wizard.py @@ -10,6 +10,7 @@ import logging from sqlalchemy.orm import Session from app.models import models +from app.models.role_permission import Role, Permission, RolePermission from app.api.deps import get_password_hash logger = logging.getLogger("harborforge.init") @@ -92,6 +93,109 @@ def init_default_project(db: Session, project_cfg: dict, owner_id: int, owner_na logger.info("Created default project '%s' (id=%d)", name, project.id) +# Default permissions that will be created if not exist +DEFAULT_PERMISSIONS = [ + # Project permissions + ("project.read", "View project", "project"), + ("project.write", "Edit project", "project"), + ("project.delete", "Delete project", "project"), + ("project.manage_members", "Manage project members", "project"), + # Issue/Milestone permissions + ("issue.create", "Create issues", "issue"), + ("issue.read", "View issues", "issue"), + ("issue.write", "Edit issues", "issue"), + ("issue.delete", "Delete issues", "issue"), + ("milestone.create", "Create milestones", "milestone"), + ("milestone.read", "View milestones", "milestone"), + ("milestone.write", "Edit milestones", "milestone"), + ("milestone.delete", "Delete milestones", "milestone"), + # Role/Permission management + ("role.manage", "Manage roles and permissions", "admin"), + # User management + ("user.manage", "Manage users", "admin"), + # Monitor + ("monitor.read", "View monitor", "monitor"), + ("monitor.manage", "Manage monitor", "monitor"), + # Webhook + ("webhook.manage", "Manage webhooks", "admin"), +] + + +def init_default_permissions(db: Session) -> list[Permission]: + """Create default permissions if they don't exist. Returns all permissions.""" + created = [] + for name, description, category in DEFAULT_PERMISSIONS: + existing = db.query(Permission).filter(Permission.name == name).first() + if not existing: + perm = Permission(name=name, description=description, category=category) + db.add(perm) + created.append(perm) + logger.info("Created permission '%s'", name) + + if created: + db.commit() + + # Return all permissions + return db.query(Permission).all() + + +def init_admin_role(db: Session, admin_user: models.User) -> None: + """Create admin role with all permissions and guest role with minimal permissions.""" + # Check if admin role already exists + admin_role = db.query(Role).filter(Role.name == "admin").first() + if not admin_role: + admin_role = Role( + name="admin", + description="Administrator - full access to all features", + is_global=True + ) + db.add(admin_role) + db.commit() + db.refresh(admin_role) + logger.info("Created admin role (id=%d)", admin_role.id) + + # Check if guest role already exists + guest_role = db.query(Role).filter(Role.name == "guest").first() + if not guest_role: + guest_role = Role( + name="guest", + description="Guest - read-only access", + is_global=True + ) + db.add(guest_role) + db.commit() + db.refresh(guest_role) + logger.info("Created guest role (id=%d)", guest_role.id) + + # Get all permissions + all_perms = db.query(Permission).all() + + # Assign all permissions to admin role + existing_admin_perm_ids = {rp.permission_id for rp in admin_role.permissions} + for perm in all_perms: + if perm.id not in existing_admin_perm_ids: + rp = RolePermission(role_id=admin_role.id, permission_id=perm.id) + db.add(rp) + + if all_perms: + db.commit() + logger.info("Assigned %d permissions to admin role", len(all_perms)) + + # Assign only read permissions to guest role + read_perms = db.query(Permission).filter(Permission.name.like("%.read")).all() + existing_guest_perm_ids = {rp.permission_id for rp in guest_role.permissions} + for perm in read_perms: + if perm.id not in existing_guest_perm_ids: + rp = RolePermission(role_id=guest_role.id, permission_id=perm.id) + db.add(rp) + + if read_perms: + db.commit() + logger.info("Assigned %d read permissions to guest role", len(read_perms)) + + logger.info("Admin and guest roles setup complete") + + def run_init(db: Session) -> None: """Main initialization entry point. Reads config from shared volume.""" config = load_config() @@ -100,11 +204,18 @@ def run_init(db: Session) -> None: logger.info("Running HarborForge initialization from wizard config") + # Initialize default permissions and admin role (always run) + all_perms = init_default_permissions(db) + logger.info("Default permissions initialized: %d total", len(all_perms)) + # Admin user admin_cfg = config.get("admin") admin_user = None if admin_cfg: admin_user = init_admin_user(db, admin_cfg) + # Create admin role and assign to admin user + if admin_user: + init_admin_role(db, admin_user) # Default project project_cfg = config.get("default_project") diff --git a/app/schemas/schemas.py b/app/schemas/schemas.py index be8a6e5..bad66cf 100644 --- a/app/schemas/schemas.py +++ b/app/schemas/schemas.py @@ -211,7 +211,7 @@ class MilestoneBase(BaseModel): class MilestoneCreate(MilestoneBase): - project_id: int + project_id: Optional[int] = None pass