"""hf-cli admin … — bootstrap and manage the deployment's admin user.""" import argparse import json import sys from sqlalchemy.exc import IntegrityError from app.api.deps import get_password_hash from app.core.config import SessionLocal, settings from app.models import models from app.models.role_permission import Role def _open_db(): return SessionLocal() def _emit(payload: dict) -> None: sys.stdout.write(json.dumps(payload, indent=2) + "\n") # --------------------------------------------------------------------------- # create-user # --------------------------------------------------------------------------- def _cmd_create_user(argv: list[str]) -> int: p = argparse.ArgumentParser(prog="hf-cli admin create-user") p.add_argument("--email", required=True) p.add_argument("--username", default=None, help="Defaults to email's local-part if omitted.") p.add_argument("--full-name", default="Admin") p.add_argument("--password", default=None, help="Required when HARBORFORGE_OIDC_ONLY=false. Ignored " "when OIDC_ONLY=true (use --oidc-issuer/--oidc-subject).") p.add_argument("--oidc-issuer", default=None, help="Bind the new admin to this OIDC issuer at creation. " "Required in OIDC_ONLY mode for the bootstrap admin.") p.add_argument("--oidc-subject", default=None, help="OIDC subject claim (sub) to bind the new admin to.") args = p.parse_args(argv) username = args.username or args.email.split("@", 1)[0] oidc_only = bool(settings.HARBORFORGE_OIDC_ONLY) if oidc_only: if not (args.oidc_issuer and args.oidc_subject): sys.stderr.write( "HARBORFORGE_OIDC_ONLY=true: must pass --oidc-issuer and " "--oidc-subject so the new admin can sign in.\n" ) return 2 hashed_password = None else: if not args.password: sys.stderr.write("--password is required when OIDC_ONLY is false.\n") return 2 hashed_password = get_password_hash(args.password) if (args.oidc_issuer and not args.oidc_subject) or (args.oidc_subject and not args.oidc_issuer): sys.stderr.write("--oidc-issuer and --oidc-subject must be passed together.\n") return 2 db = _open_db() try: existing = db.query(models.User).filter(models.User.username == username).first() if existing: sys.stderr.write(f"user '{username}' already exists (id={existing.id})\n") return 3 admin_role = db.query(Role).filter(Role.name == "admin").first() if not admin_role: sys.stderr.write( "admin role not found — backend startup seed should create it. " "Restart the container then retry.\n" ) return 4 user = models.User( username=username, email=args.email, full_name=args.full_name, hashed_password=hashed_password, is_admin=True, is_active=True, role_id=admin_role.id, oidc_issuer=(args.oidc_issuer or None), oidc_subject=(args.oidc_subject or None), ) db.add(user) try: db.commit() except IntegrityError as e: db.rollback() sys.stderr.write(f"DB integrity error: {e.orig}\n") return 5 db.refresh(user) _emit({ "ok": True, "created": True, "user": { "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name, "is_admin": user.is_admin, "role_id": user.role_id, "oidc_issuer": user.oidc_issuer, "oidc_subject": user.oidc_subject, "has_password": user.hashed_password is not None, }, }) return 0 finally: db.close() # --------------------------------------------------------------------------- # list # --------------------------------------------------------------------------- def _cmd_list(_argv: list[str]) -> int: db = _open_db() try: admins = ( db.query(models.User) .filter(models.User.is_admin == True) # noqa: E712 .order_by(models.User.id.asc()) .all() ) _emit({ "ok": True, "count": len(admins), "admins": [ { "id": u.id, "username": u.username, "email": u.email, "is_active": u.is_active, "oidc_bound": bool(u.oidc_issuer and u.oidc_subject), "has_password": u.hashed_password is not None, } for u in admins ], }) return 0 finally: db.close() # --------------------------------------------------------------------------- # set-role # --------------------------------------------------------------------------- def _cmd_set_role(argv: list[str]) -> int: p = argparse.ArgumentParser(prog="hf-cli admin set-role") p.add_argument("--username", required=True) p.add_argument("--role", required=True) args = p.parse_args(argv) db = _open_db() try: user = db.query(models.User).filter(models.User.username == args.username).first() if not user: sys.stderr.write(f"user '{args.username}' not found\n") return 3 role = db.query(Role).filter(Role.name == args.role).first() if not role: sys.stderr.write(f"role '{args.role}' not found\n") return 4 user.role_id = role.id user.is_admin = (args.role == "admin") db.commit() _emit({ "ok": True, "user": {"id": user.id, "username": user.username, "role": role.name, "is_admin": user.is_admin}, }) return 0 finally: db.close() # --------------------------------------------------------------------------- # reset-password # --------------------------------------------------------------------------- def _cmd_reset_password(argv: list[str]) -> int: p = argparse.ArgumentParser(prog="hf-cli admin reset-password") p.add_argument("--username", required=True) p.add_argument("--password", required=True) args = p.parse_args(argv) if settings.HARBORFORGE_OIDC_ONLY: sys.stderr.write("HARBORFORGE_OIDC_ONLY=true: password login is disabled.\n") return 2 db = _open_db() try: user = db.query(models.User).filter(models.User.username == args.username).first() if not user: sys.stderr.write(f"user '{args.username}' not found\n") return 3 user.hashed_password = get_password_hash(args.password) db.commit() _emit({"ok": True, "user": {"id": user.id, "username": user.username, "password_reset": True}}) return 0 finally: db.close() # --------------------------------------------------------------------------- # bind-oidc — attach an OIDC identity to an existing admin # --------------------------------------------------------------------------- def _cmd_bind_oidc(argv: list[str]) -> int: p = argparse.ArgumentParser(prog="hf-cli admin bind-oidc") p.add_argument("--username", required=True) p.add_argument("--oidc-issuer", required=True) p.add_argument("--oidc-subject", required=True) args = p.parse_args(argv) db = _open_db() try: user = db.query(models.User).filter(models.User.username == args.username).first() if not user: sys.stderr.write(f"user '{args.username}' not found\n") return 3 clash = db.query(models.User).filter( models.User.oidc_issuer == args.oidc_issuer, models.User.oidc_subject == args.oidc_subject, models.User.id != user.id, ).first() if clash: sys.stderr.write(f"OIDC subject already bound to '{clash.username}' (id={clash.id})\n") return 4 user.oidc_issuer = args.oidc_issuer user.oidc_subject = args.oidc_subject db.commit() _emit({ "ok": True, "user": { "id": user.id, "username": user.username, "oidc_issuer": user.oidc_issuer, "oidc_subject": user.oidc_subject, }, }) return 0 finally: db.close() # --------------------------------------------------------------------------- # dispatcher # --------------------------------------------------------------------------- ACTIONS = { "create-user": _cmd_create_user, "list": _cmd_list, "set-role": _cmd_set_role, "reset-password": _cmd_reset_password, "bind-oidc": _cmd_bind_oidc, } def dispatch(argv: list[str]) -> int: if not argv: sys.stderr.write("admin: missing action; one of: " + ", ".join(ACTIONS) + "\n") return 1 action, rest = argv[0], argv[1:] fn = ACTIONS.get(action) if not fn: sys.stderr.write(f"admin: unknown action '{action}'; valid: {', '.join(ACTIONS)}\n") return 1 return fn(rest)