feat(users): switch account management to single-role model

- add users.role_id for one global role per account
- seed protected account-manager role with account.create permission
- default new accounts to guest role
- block admin role assignment through user management
- allow account-manager permission to create accounts
This commit is contained in:
zhi
2026-03-21 08:44:19 +00:00
parent 7d42d567d1
commit 271d5140e6
6 changed files with 105 additions and 16 deletions

View File

@@ -170,13 +170,14 @@ def delete_role(role_id: int, db: Session = Depends(get_db), current_user: model
if not db_role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent deleting the admin or guest role
if db_role.name in ("admin", "guest"):
# Prevent deleting protected default roles
if db_role.name in ("admin", "guest", "account-manager"):
raise HTTPException(status_code=403, detail=f"Cannot delete the '{db_role.name}' role")
member_count = db.query(models.ProjectMember).filter(models.ProjectMember.role_id == role_id).count()
if member_count > 0:
raise HTTPException(status_code=400, detail="Role is in use by members")
account_count = db.query(models.User).filter(models.User.role_id == role_id).count()
if member_count > 0 or account_count > 0:
raise HTTPException(status_code=400, detail="Role is in use and cannot be deleted")
# Delete role permissions first
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()
@@ -196,9 +197,9 @@ def assign_permissions(role_id: int, perm_assign: PermissionAssign, db: Session
if not role:
raise HTTPException(status_code=404, detail="Role not found")
# Prevent modifying permissions of the admin role
if role.name == "admin":
raise HTTPException(status_code=403, detail="Cannot modify permissions of the admin role")
# Prevent modifying permissions of protected system roles
if role.name in ("admin", "account-manager"):
raise HTTPException(status_code=403, detail=f"Cannot modify permissions of the {role.name} role")
db.query(RolePermission).filter(RolePermission.role_id == role_id).delete()

View File

@@ -10,6 +10,7 @@ from sqlalchemy.orm import Session
from app.api.deps import get_current_user, get_password_hash
from app.core.config import get_db
from app.models import models
from app.models.role_permission import Permission, Role, RolePermission
from app.models.worklog import WorkLog
from app.schemas import schemas
@@ -22,25 +23,68 @@ def require_admin(current_user: models.User = Depends(get_current_user)):
return current_user
def _has_global_permission(db: Session, user: models.User, permission_name: str) -> bool:
if user.is_admin:
return True
if not user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == permission_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
def require_account_creator(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
if current_user.is_admin or _has_global_permission(db, current_user, "account.create"):
return current_user
raise HTTPException(status_code=403, detail="Account creation permission required")
def _resolve_user_role(db: Session, role_id: int | None) -> Role:
if role_id is None:
role = db.query(Role).filter(Role.name == "guest").first()
if not role:
raise HTTPException(status_code=500, detail="Default guest role is missing")
return role
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=400, detail="Role not found")
if role.name == "admin":
raise HTTPException(status_code=400, detail="Admin role cannot be assigned via user management")
if role.is_global is False:
raise HTTPException(status_code=400, detail="Only global roles can be assigned to accounts")
return role
@router.post("", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
user: schemas.UserCreate,
db: Session = Depends(get_db),
_: models.User = Depends(require_admin),
_: models.User = Depends(require_account_creator),
):
existing = db.query(models.User).filter(
(models.User.username == user.username) | (models.User.email == user.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
assigned_role = _resolve_user_role(db, user.role_id)
hashed_password = get_password_hash(user.password) if user.password else None
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
hashed_password=hashed_password,
is_admin=user.is_admin,
is_admin=False,
is_active=True,
role_id=assigned_role.id,
)
db.add(db_user)
db.commit()
@@ -93,10 +137,11 @@ def update_user(
if payload.password is not None and payload.password.strip():
user.hashed_password = get_password_hash(payload.password)
if payload.is_admin is not None:
if current_user.id == user.id and payload.is_admin is False:
raise HTTPException(status_code=400, detail="You cannot revoke your own admin access")
user.is_admin = payload.is_admin
if payload.role_id is not None:
if user.is_admin:
raise HTTPException(status_code=400, detail="Admin accounts cannot be reassigned via user management")
assigned_role = _resolve_user_role(db, payload.role_id)
user.role_id = assigned_role.id
if payload.is_active is not None:
if current_user.id == user.id and payload.is_active is False:

View File

@@ -123,6 +123,7 @@ DEFAULT_PERMISSIONS = [
("propose.reopen", "Reopen a rejected propose", "propose"),
# Role/Permission management
("role.manage", "Manage roles and permissions", "admin"),
("account.create", "Create HarborForge accounts", "account"),
# User management
("user.manage", "Manage users", "admin"),
# Monitor
@@ -175,9 +176,14 @@ _DEV_PERMISSIONS = {
"monitor.read",
}
_ACCOUNT_MANAGER_PERMISSIONS = {
"account.create",
}
# Role definitions: (name, description, permission_set)
_DEFAULT_ROLES = [
("admin", "Administrator - full access to all features", None), # None ⇒ all perms
("account-manager", "Account manager - can only create accounts", _ACCOUNT_MANAGER_PERMISSIONS),
("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS),
("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS),
("guest", "Guest - read-only access", None), # special: *.read only

View File

@@ -215,6 +215,11 @@ def _migrate_schema():
"DEFAULT 'open'"
))
# --- users.role_id for single global account role ---
if _has_table(db, "users") and not _has_column(db, "users", "role_id"):
db.execute(text("ALTER TABLE users ADD COLUMN role_id INTEGER NULL"))
_ensure_fk(db, "users", "role_id", "roles", "id", "fk_users_role_id")
# --- monitored_servers.api_key for heartbeat v2 ---
if _has_table(db, "monitored_servers") and not _has_column(db, "monitored_servers", "api_key"):
db.execute(text("ALTER TABLE monitored_servers ADD COLUMN api_key VARCHAR(64) NULL"))
@@ -237,6 +242,29 @@ def _migrate_schema():
finally:
db.close()
def _sync_default_user_roles(db):
from app.models import models
from app.models.role_permission import Role
admin_role = db.query(Role).filter(Role.name == "admin").first()
guest_role = db.query(Role).filter(Role.name == "guest").first()
if admin_role:
db.query(models.User).filter(models.User.is_admin == True).update(
{models.User.role_id: admin_role.id},
synchronize_session=False,
)
if guest_role:
db.query(models.User).filter(
models.User.role_id == None,
models.User.is_admin == False,
).update(
{models.User.role_id: guest_role.id},
synchronize_session=False,
)
db.commit()
# Run database migration on startup
@app.on_event("startup")
def startup():
@@ -250,6 +278,7 @@ def startup():
db = SessionLocal()
try:
run_init(db)
_sync_default_user_roles(db)
finally:
db.close()

View File

@@ -74,12 +74,18 @@ class User(Base):
full_name = Column(String(100), nullable=True)
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
role_id = Column(Integer, ForeignKey("roles.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
role = relationship("Role", foreign_keys=[role_id])
owned_projects = relationship("Project", back_populates="owner")
comments = relationship("Comment", back_populates="author")
project_memberships = relationship("ProjectMember", back_populates="user")
@property
def role_name(self):
return self.role.name if self.role else None
class ProjectMember(Base):
__tablename__ = "project_members"

View File

@@ -161,14 +161,14 @@ class UserBase(BaseModel):
class UserCreate(UserBase):
password: Optional[str] = None
is_admin: bool = False
role_id: Optional[int] = None
class UserUpdate(BaseModel):
full_name: Optional[str] = None
email: Optional[str] = None
password: Optional[str] = None
is_admin: Optional[bool] = None
role_id: Optional[int] = None
is_active: Optional[bool] = None
@@ -176,6 +176,8 @@ class UserResponse(UserBase):
id: int
is_active: bool
is_admin: bool
role_id: Optional[int] = None
role_name: Optional[str] = None
created_at: datetime
class Config: