From 271d5140e63a7c961b6a964b8aed91e6d30b40b4 Mon Sep 17 00:00:00 2001 From: zhi Date: Sat, 21 Mar 2026 08:44:19 +0000 Subject: [PATCH] feat(users): switch account management to single-role model - add users.role_id for one global role per account - seed protected account-manager role with account.create permission - default new accounts to guest role - block admin role assignment through user management - allow account-manager permission to create accounts --- app/api/routers/roles.py | 15 ++++++----- app/api/routers/users.py | 57 +++++++++++++++++++++++++++++++++++----- app/init_wizard.py | 6 +++++ app/main.py | 29 ++++++++++++++++++++ app/models/models.py | 8 +++++- app/schemas/schemas.py | 6 +++-- 6 files changed, 105 insertions(+), 16 deletions(-) diff --git a/app/api/routers/roles.py b/app/api/routers/roles.py index 8a6f419..2956d00 100644 --- a/app/api/routers/roles.py +++ b/app/api/routers/roles.py @@ -170,13 +170,14 @@ def delete_role(role_id: int, db: Session = Depends(get_db), current_user: model if not db_role: raise HTTPException(status_code=404, detail="Role not found") - # Prevent deleting the admin or guest role - if db_role.name in ("admin", "guest"): + # Prevent deleting protected default roles + if db_role.name in ("admin", "guest", "account-manager"): raise HTTPException(status_code=403, detail=f"Cannot delete the '{db_role.name}' role") member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count() - if member_count > 0: - raise HTTPException(status_code=400, detail="Role is in use by members") + account_count = db.query(models.User).filter(models.User.role_id == role_id).count() + if member_count > 0 or account_count > 0: + raise HTTPException(status_code=400, detail="Role is in use and cannot be deleted") # Delete role permissions first db.query(RolePermission).filter(RolePermission.role_id == role_id).delete() @@ -196,9 +197,9 @@ def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session if not role: raise HTTPException(status_code=404, detail="Role not found") - # Prevent modifying permissions of the admin role - if role.name == "admin": - raise HTTPException(status_code=403, detail="Cannot modify permissions of the admin role") + # Prevent modifying permissions of protected system roles + if role.name in ("admin", "account-manager"): + raise HTTPException(status_code=403, detail=f"Cannot modify permissions of the {role.name} role") db.query(RolePermission).filter(RolePermission.role_id == role_id).delete() diff --git a/app/api/routers/users.py b/app/api/routers/users.py index 525922f..42845e5 100644 --- a/app/api/routers/users.py +++ b/app/api/routers/users.py @@ -10,6 +10,7 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_user, get_password_hash from app.core.config import get_db from app.models import models +from app.models.role_permission import Permission, Role, RolePermission from app.models.worklog import WorkLog from app.schemas import schemas @@ -22,25 +23,68 @@ def require_admin(current_user: models.User = Depends(get_current_user)): return current_user +def _has_global_permission(db: Session, user: models.User, permission_name: str) -> bool: + if user.is_admin: + return True + if not user.role_id: + return False + perm = db.query(Permission).filter(Permission.name == permission_name).first() + if not perm: + return False + return db.query(RolePermission).filter( + RolePermission.role_id == user.role_id, + RolePermission.permission_id == perm.id, + ).first() is not None + + +def require_account_creator( + db: Session = Depends(get_db), + current_user: models.User = Depends(get_current_user), +): + if current_user.is_admin or _has_global_permission(db, current_user, "account.create"): + return current_user + raise HTTPException(status_code=403, detail="Account creation permission required") + + +def _resolve_user_role(db: Session, role_id: int | None) -> Role: + if role_id is None: + role = db.query(Role).filter(Role.name == "guest").first() + if not role: + raise HTTPException(status_code=500, detail="Default guest role is missing") + return role + + role = db.query(Role).filter(Role.id == role_id).first() + if not role: + raise HTTPException(status_code=400, detail="Role not found") + if role.name == "admin": + raise HTTPException(status_code=400, detail="Admin role cannot be assigned via user management") + if role.is_global is False: + raise HTTPException(status_code=400, detail="Only global roles can be assigned to accounts") + return role + + @router.post("", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user: schemas.UserCreate, db: Session = Depends(get_db), - _: models.User = Depends(require_admin), + _: models.User = Depends(require_account_creator), ): existing = db.query(models.User).filter( (models.User.username == user.username) | (models.User.email == user.email) ).first() if existing: raise HTTPException(status_code=400, detail="Username or email already exists") + + assigned_role = _resolve_user_role(db, user.role_id) hashed_password = get_password_hash(user.password) if user.password else None db_user = models.User( username=user.username, email=user.email, full_name=user.full_name, hashed_password=hashed_password, - is_admin=user.is_admin, + is_admin=False, is_active=True, + role_id=assigned_role.id, ) db.add(db_user) db.commit() @@ -93,10 +137,11 @@ def update_user( if payload.password is not None and payload.password.strip(): user.hashed_password = get_password_hash(payload.password) - if payload.is_admin is not None: - if current_user.id == user.id and payload.is_admin is False: - raise HTTPException(status_code=400, detail="You cannot revoke your own admin access") - user.is_admin = payload.is_admin + if payload.role_id is not None: + if user.is_admin: + raise HTTPException(status_code=400, detail="Admin accounts cannot be reassigned via user management") + assigned_role = _resolve_user_role(db, payload.role_id) + user.role_id = assigned_role.id if payload.is_active is not None: if current_user.id == user.id and payload.is_active is False: diff --git a/app/init_wizard.py b/app/init_wizard.py index 5f9a66d..0f33ca6 100644 --- a/app/init_wizard.py +++ b/app/init_wizard.py @@ -123,6 +123,7 @@ DEFAULT_PERMISSIONS = [ ("propose.reopen", "Reopen a rejected propose", "propose"), # Role/Permission management ("role.manage", "Manage roles and permissions", "admin"), + ("account.create", "Create HarborForge accounts", "account"), # User management ("user.manage", "Manage users", "admin"), # Monitor @@ -175,9 +176,14 @@ _DEV_PERMISSIONS = { "monitor.read", } +_ACCOUNT_MANAGER_PERMISSIONS = { + "account.create", +} + # Role definitions: (name, description, permission_set) _DEFAULT_ROLES = [ ("admin", "Administrator - full access to all features", None), # None ⇒ all perms + ("account-manager", "Account manager - can only create accounts", _ACCOUNT_MANAGER_PERMISSIONS), ("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS), ("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS), ("guest", "Guest - read-only access", None), # special: *.read only diff --git a/app/main.py b/app/main.py index 63a267a..524f46c 100644 --- a/app/main.py +++ b/app/main.py @@ -215,6 +215,11 @@ def _migrate_schema(): "DEFAULT 'open'" )) + # --- users.role_id for single global account role --- + if _has_table(db, "users") and not _has_column(db, "users", "role_id"): + db.execute(text("ALTER TABLE users ADD COLUMN role_id INTEGER NULL")) + _ensure_fk(db, "users", "role_id", "roles", "id", "fk_users_role_id") + # --- monitored_servers.api_key for heartbeat v2 --- if _has_table(db, "monitored_servers") and not _has_column(db, "monitored_servers", "api_key"): db.execute(text("ALTER TABLE monitored_servers ADD COLUMN api_key VARCHAR(64) NULL")) @@ -237,6 +242,29 @@ def _migrate_schema(): finally: db.close() +def _sync_default_user_roles(db): + from app.models import models + from app.models.role_permission import Role + + admin_role = db.query(Role).filter(Role.name == "admin").first() + guest_role = db.query(Role).filter(Role.name == "guest").first() + + if admin_role: + db.query(models.User).filter(models.User.is_admin == True).update( + {models.User.role_id: admin_role.id}, + synchronize_session=False, + ) + if guest_role: + db.query(models.User).filter( + models.User.role_id == None, + models.User.is_admin == False, + ).update( + {models.User.role_id: guest_role.id}, + synchronize_session=False, + ) + db.commit() + + # Run database migration on startup @app.on_event("startup") def startup(): @@ -250,6 +278,7 @@ def startup(): db = SessionLocal() try: run_init(db) + _sync_default_user_roles(db) finally: db.close() diff --git a/app/models/models.py b/app/models/models.py index 35f8941..24c0b12 100644 --- a/app/models/models.py +++ b/app/models/models.py @@ -74,12 +74,18 @@ class User(Base): full_name = Column(String(100), nullable=True) is_active = Column(Boolean, default=True) is_admin = Column(Boolean, default=False) + role_id = Column(Integer, ForeignKey("roles.id"), nullable=True) created_at = Column(DateTime(timezone=True), server_default=func.now()) - + + role = relationship("Role", foreign_keys=[role_id]) owned_projects = relationship("Project", back_populates="owner") comments = relationship("Comment", back_populates="author") project_memberships = relationship("ProjectMember", back_populates="user") + @property + def role_name(self): + return self.role.name if self.role else None + class ProjectMember(Base): __tablename__ = "project_members" diff --git a/app/schemas/schemas.py b/app/schemas/schemas.py index 28bac76..08d4388 100644 --- a/app/schemas/schemas.py +++ b/app/schemas/schemas.py @@ -161,14 +161,14 @@ class UserBase(BaseModel): class UserCreate(UserBase): password: Optional[str] = None - is_admin: bool = False + role_id: Optional[int] = None class UserUpdate(BaseModel): full_name: Optional[str] = None email: Optional[str] = None password: Optional[str] = None - is_admin: Optional[bool] = None + role_id: Optional[int] = None is_active: Optional[bool] = None @@ -176,6 +176,8 @@ class UserResponse(UserBase): id: int is_active: bool is_admin: bool + role_id: Optional[int] = None + role_name: Optional[str] = None created_at: datetime class Config: