In OIDC-only mode, before any admin is linked, an IdP user whose token carries the configured admin role (default "admin"; OIDC_ADMIN_ROLE / oidc_settings.admin_role) auto-connects to the unbound hf admin on first OIDC sign-in, then the window self-closes once any admin is bound. Roles are scanned across userinfo + the (unverified) access token: realm_access.roles, resource_access.*.roles, roles/role/groups. Adds admin_role to settings model/env/effective/API and to the wizard bootstrap config. Replaces the manual admin-subject approach. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
351 lines
14 KiB
Python
351 lines
14 KiB
Python
"""OIDC (OpenID Connect) login + admin-configurable provider settings.
|
|
|
|
The OIDC provider can be configured at runtime from the admin UI
|
|
(persisted in the oidc_settings table). A stored row's non-empty fields
|
|
override the OIDC_* env vars; env values act as bootstrap defaults.
|
|
|
|
Sign-in policy: an OIDC identity must already be bound to an hf user
|
|
(see PUT /users/{id}/oidc-binding). Unbound identities are rejected.
|
|
"""
|
|
import logging
|
|
from datetime import timedelta
|
|
from urllib.parse import urlencode
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from fastapi.responses import RedirectResponse
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import get_db, settings
|
|
from app.models import models
|
|
from app.models.oidc_settings import OidcSettings
|
|
from app.api.deps import create_access_token, get_current_user, get_current_user_or_apikey
|
|
|
|
router = APIRouter(prefix="/auth", tags=["Auth"])
|
|
logger = logging.getLogger("harborforge.oidc")
|
|
|
|
|
|
# ---- effective config (DB row overrides env) ------------------------------
|
|
|
|
class EffectiveOidc:
|
|
def __init__(self, enabled, issuer, client_id, client_secret,
|
|
redirect_uri, scopes, post_login_redirect, admin_role):
|
|
self.enabled = enabled
|
|
self.issuer = issuer
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.redirect_uri = redirect_uri
|
|
self.scopes = scopes or "openid email profile"
|
|
self.post_login_redirect = post_login_redirect
|
|
self.admin_role = (admin_role or "admin").strip() or "admin"
|
|
|
|
@property
|
|
def configured(self) -> bool:
|
|
return bool(self.enabled and self.issuer and self.client_id)
|
|
|
|
def fingerprint(self) -> str:
|
|
return "|".join([
|
|
str(self.enabled), self.issuer or "", self.client_id or "",
|
|
self.client_secret or "", self.redirect_uri or "", self.scopes or "",
|
|
])
|
|
|
|
|
|
def get_effective_oidc(db: Session) -> EffectiveOidc:
|
|
row = db.query(OidcSettings).filter(OidcSettings.id == 1).first()
|
|
|
|
def pick(db_val, env_val):
|
|
return db_val if (db_val is not None and db_val != "") else env_val
|
|
|
|
if row is None:
|
|
return EffectiveOidc(
|
|
settings.OIDC_ENABLED, settings.OIDC_ISSUER, settings.OIDC_CLIENT_ID,
|
|
settings.OIDC_CLIENT_SECRET, settings.OIDC_REDIRECT_URI,
|
|
settings.OIDC_SCOPES, settings.OIDC_POST_LOGIN_REDIRECT,
|
|
settings.OIDC_ADMIN_ROLE,
|
|
)
|
|
return EffectiveOidc(
|
|
bool(row.enabled),
|
|
pick(row.issuer, settings.OIDC_ISSUER),
|
|
pick(row.client_id, settings.OIDC_CLIENT_ID),
|
|
pick(row.client_secret, settings.OIDC_CLIENT_SECRET),
|
|
pick(row.redirect_uri, settings.OIDC_REDIRECT_URI),
|
|
pick(row.scopes, settings.OIDC_SCOPES),
|
|
pick(row.post_login_redirect, settings.OIDC_POST_LOGIN_REDIRECT),
|
|
pick(getattr(row, "admin_role", None), settings.OIDC_ADMIN_ROLE),
|
|
)
|
|
|
|
|
|
# Authlib client cache, rebuilt when the effective config changes.
|
|
_oauth = None
|
|
_oauth_fp = None
|
|
|
|
|
|
def _client(cfg: EffectiveOidc):
|
|
global _oauth, _oauth_fp
|
|
if not cfg.configured:
|
|
raise HTTPException(status_code=503, detail="OIDC is not configured")
|
|
fp = cfg.fingerprint()
|
|
if _oauth is None or _oauth_fp != fp:
|
|
from authlib.integrations.starlette_client import OAuth
|
|
oauth = OAuth()
|
|
oauth.register(
|
|
name="oidc",
|
|
server_metadata_url=cfg.issuer.rstrip("/") + "/.well-known/openid-configuration",
|
|
client_id=cfg.client_id,
|
|
client_secret=cfg.client_secret,
|
|
client_kwargs={"scope": cfg.scopes},
|
|
)
|
|
_oauth, _oauth_fp = oauth, fp
|
|
return _oauth.oidc
|
|
|
|
|
|
def _invalidate_client():
|
|
global _oauth, _oauth_fp
|
|
_oauth = None
|
|
_oauth_fp = None
|
|
|
|
|
|
def _collect_roles(claims: dict, token: dict) -> set[str]:
|
|
"""Roles from common OIDC claim shapes, across the ID-token/userinfo
|
|
claims and the (unverified) access token — Keycloak puts realm/client
|
|
roles in the access token by default."""
|
|
pools = [claims if isinstance(claims, dict) else {}]
|
|
at = token.get("access_token")
|
|
if at:
|
|
try:
|
|
from jose import jwt as _jwt
|
|
pools.append(_jwt.get_unverified_claims(at))
|
|
except Exception:
|
|
pass
|
|
roles: set[str] = set()
|
|
for p in pools:
|
|
if not isinstance(p, dict):
|
|
continue
|
|
ra = p.get("realm_access")
|
|
if isinstance(ra, dict):
|
|
roles.update(ra.get("roles") or [])
|
|
res = p.get("resource_access")
|
|
if isinstance(res, dict):
|
|
for v in res.values():
|
|
if isinstance(v, dict):
|
|
roles.update(v.get("roles") or [])
|
|
for key in ("roles", "role", "groups"):
|
|
val = p.get(key)
|
|
if isinstance(val, str):
|
|
roles.add(val)
|
|
elif isinstance(val, (list, tuple)):
|
|
roles.update(str(x) for x in val)
|
|
return {str(r).strip().lstrip("/").lower() for r in roles if r}
|
|
|
|
|
|
def _frontend(cfg: EffectiveOidc, qs: dict | None = None, fragment: str | None = None) -> str:
|
|
base = cfg.post_login_redirect or "/"
|
|
url = base
|
|
if qs:
|
|
url += ("&" if "?" in base else "?") + urlencode(qs)
|
|
if fragment:
|
|
url += "#" + fragment
|
|
return url
|
|
|
|
|
|
# ---- public auth config ---------------------------------------------------
|
|
|
|
@router.get("/config")
|
|
def auth_config(db: Session = Depends(get_db)):
|
|
cfg = get_effective_oidc(db)
|
|
return {
|
|
"oidc_enabled": cfg.configured,
|
|
"oidc_only": bool(settings.HARBORFORGE_OIDC_ONLY),
|
|
"password_login": not bool(settings.HARBORFORGE_OIDC_ONLY),
|
|
"oidc_login_url": "/auth/oidc/login",
|
|
}
|
|
|
|
|
|
# ---- sign-in / link flows -------------------------------------------------
|
|
|
|
@router.get("/oidc/login")
|
|
async def oidc_login(request: Request, db: Session = Depends(get_db)):
|
|
cfg = get_effective_oidc(db)
|
|
oidc = _client(cfg)
|
|
request.session.pop("hf_oidc_uid", None)
|
|
request.session["hf_oidc_mode"] = "login"
|
|
return await oidc.authorize_redirect(request, cfg.redirect_uri)
|
|
|
|
|
|
@router.get("/oidc/link")
|
|
async def oidc_link(request: Request, db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user)):
|
|
if settings.HARBORFORGE_OIDC_ONLY:
|
|
raise HTTPException(status_code=403, detail="Self-service linking is disabled in OIDC-only mode")
|
|
cfg = get_effective_oidc(db)
|
|
oidc = _client(cfg)
|
|
request.session["hf_oidc_mode"] = "link"
|
|
request.session["hf_oidc_uid"] = current_user.id
|
|
return await oidc.authorize_redirect(request, cfg.redirect_uri)
|
|
|
|
|
|
@router.get("/oidc/callback")
|
|
async def oidc_callback(request: Request, db: Session = Depends(get_db)):
|
|
cfg = get_effective_oidc(db)
|
|
oidc = _client(cfg)
|
|
mode = request.session.pop("hf_oidc_mode", "login")
|
|
link_uid = request.session.pop("hf_oidc_uid", None)
|
|
try:
|
|
token = await oidc.authorize_access_token(request)
|
|
except Exception:
|
|
return RedirectResponse(_frontend(cfg, {"oidc_error": "exchange_failed"}))
|
|
|
|
claims = token.get("userinfo") or {}
|
|
if not claims:
|
|
try:
|
|
claims = await oidc.userinfo(token=token)
|
|
except Exception:
|
|
claims = {}
|
|
subject = claims.get("sub")
|
|
issuer = claims.get("iss") or cfg.issuer
|
|
if not subject:
|
|
return RedirectResponse(_frontend(cfg, {"oidc_error": "no_subject"}))
|
|
|
|
if mode == "link":
|
|
if settings.HARBORFORGE_OIDC_ONLY or link_uid is None:
|
|
return RedirectResponse(_frontend(cfg, {"oidc_error": "link_not_allowed"}))
|
|
user = db.query(models.User).filter(models.User.id == link_uid).first()
|
|
if not user:
|
|
return RedirectResponse(_frontend(cfg, {"oidc_error": "user_gone"}))
|
|
clash = db.query(models.User).filter(
|
|
models.User.oidc_issuer == issuer,
|
|
models.User.oidc_subject == subject,
|
|
models.User.id != user.id,
|
|
).first()
|
|
if clash:
|
|
return RedirectResponse(_frontend(cfg, {"oidc_error": "already_bound"}))
|
|
user.oidc_issuer = issuer
|
|
user.oidc_subject = subject
|
|
db.commit()
|
|
return RedirectResponse(_frontend(cfg, {"oidc_linked": "1"}))
|
|
|
|
user = db.query(models.User).filter(
|
|
models.User.oidc_issuer == issuer,
|
|
models.User.oidc_subject == subject,
|
|
).first()
|
|
|
|
# OIDC-only bootstrap: before any admin is linked, an IdP user whose
|
|
# token carries the configured admin role auto-connects to the unbound
|
|
# hf admin. Self-closes once any admin is bound.
|
|
if user is None and settings.HARBORFORGE_OIDC_ONLY:
|
|
any_admin_bound = db.query(models.User).filter(
|
|
models.User.is_admin == True, # noqa: E712
|
|
models.User.oidc_subject.isnot(None),
|
|
).first()
|
|
if not any_admin_bound and cfg.admin_role.lower() in _collect_roles(claims, token):
|
|
taken = db.query(models.User).filter(
|
|
models.User.oidc_issuer == issuer,
|
|
models.User.oidc_subject == subject,
|
|
).first()
|
|
if taken is None:
|
|
boot = db.query(models.User).filter(
|
|
models.User.is_admin == True, # noqa: E712
|
|
models.User.is_active == True, # noqa: E712
|
|
models.User.oidc_subject.is_(None),
|
|
).order_by(models.User.id).first()
|
|
if boot is not None:
|
|
boot.oidc_issuer = issuer
|
|
boot.oidc_subject = subject
|
|
db.commit()
|
|
logger.info("OIDC bootstrap: auto-connected admin '%s' via admin role", boot.username)
|
|
user = boot
|
|
|
|
if not user or not user.is_active or user.username in ("acc-mgr", "deleted-user"):
|
|
return RedirectResponse(_frontend(cfg, {"oidc_error": "not_linked"}))
|
|
|
|
access_token = create_access_token(
|
|
data={"sub": str(user.id)},
|
|
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
|
|
)
|
|
return RedirectResponse(_frontend(cfg, fragment=urlencode({"token": access_token})))
|
|
|
|
|
|
# ---- admin: OIDC provider settings ----------------------------------------
|
|
|
|
def _require_admin_any(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User:
|
|
"""Admin via JWT OR API key. The API-key path is the recovery channel
|
|
when OIDC-only mode is on and OIDC is not yet/incorrectly configured."""
|
|
if not getattr(current_user, "is_admin", False):
|
|
raise HTTPException(status_code=403, detail="Admin privileges required")
|
|
return current_user
|
|
|
|
|
|
class OidcSettingsIn(BaseModel):
|
|
enabled: bool | None = None
|
|
issuer: str | None = None
|
|
client_id: str | None = None
|
|
client_secret: str | None = None # blank/omitted = keep existing
|
|
redirect_uri: str | None = None
|
|
scopes: str | None = None
|
|
post_login_redirect: str | None = None
|
|
admin_role: str | None = None
|
|
|
|
|
|
class OidcSettingsOut(BaseModel):
|
|
enabled: bool
|
|
issuer: str | None
|
|
client_id: str | None
|
|
has_client_secret: bool
|
|
redirect_uri: str | None
|
|
scopes: str | None
|
|
post_login_redirect: str | None
|
|
admin_role: str
|
|
oidc_only: bool # read-only (deploy env)
|
|
effective_enabled: bool # provider actually usable
|
|
source: str # "db" or "env"
|
|
|
|
|
|
@router.get("/oidc/settings", response_model=OidcSettingsOut)
|
|
def get_oidc_settings(db: Session = Depends(get_db), _: models.User = Depends(_require_admin_any)):
|
|
row = db.query(OidcSettings).filter(OidcSettings.id == 1).first()
|
|
cfg = get_effective_oidc(db)
|
|
return OidcSettingsOut(
|
|
enabled=bool(row.enabled) if row else bool(settings.OIDC_ENABLED),
|
|
issuer=(row.issuer if row else None) or settings.OIDC_ISSUER or None,
|
|
client_id=(row.client_id if row else None) or settings.OIDC_CLIENT_ID or None,
|
|
has_client_secret=bool((row.client_secret if row else None) or settings.OIDC_CLIENT_SECRET),
|
|
redirect_uri=(row.redirect_uri if row else None) or settings.OIDC_REDIRECT_URI or None,
|
|
scopes=(row.scopes if row else None) or settings.OIDC_SCOPES or None,
|
|
post_login_redirect=(row.post_login_redirect if row else None) or settings.OIDC_POST_LOGIN_REDIRECT or None,
|
|
admin_role=cfg.admin_role,
|
|
oidc_only=bool(settings.HARBORFORGE_OIDC_ONLY),
|
|
effective_enabled=cfg.configured,
|
|
source="db" if row else "env",
|
|
)
|
|
|
|
|
|
@router.put("/oidc/settings", response_model=OidcSettingsOut)
|
|
def update_oidc_settings(payload: OidcSettingsIn, db: Session = Depends(get_db),
|
|
_: models.User = Depends(_require_admin_any)):
|
|
row = db.query(OidcSettings).filter(OidcSettings.id == 1).first()
|
|
if row is None:
|
|
row = OidcSettings(id=1, enabled=False)
|
|
db.add(row)
|
|
|
|
if payload.enabled is not None:
|
|
row.enabled = payload.enabled
|
|
if payload.issuer is not None:
|
|
row.issuer = payload.issuer.strip() or None
|
|
if payload.client_id is not None:
|
|
row.client_id = payload.client_id.strip() or None
|
|
# client_secret: only overwrite when a non-empty value is supplied
|
|
if payload.client_secret:
|
|
row.client_secret = payload.client_secret
|
|
if payload.redirect_uri is not None:
|
|
row.redirect_uri = payload.redirect_uri.strip() or None
|
|
if payload.scopes is not None:
|
|
row.scopes = payload.scopes.strip() or None
|
|
if payload.post_login_redirect is not None:
|
|
row.post_login_redirect = payload.post_login_redirect.strip() or None
|
|
if payload.admin_role is not None:
|
|
row.admin_role = payload.admin_role.strip() or None
|
|
|
|
db.commit()
|
|
_invalidate_client()
|
|
return get_oidc_settings(db=db, _=_)
|