Files
HarborForge.Backend/app/api/routers/oidc.py
hzhang 9429e37542 feat(auth): OIDC-only admin-role bootstrap auto-connect
In OIDC-only mode, before any admin is linked, an IdP user whose token
carries the configured admin role (default "admin"; OIDC_ADMIN_ROLE /
oidc_settings.admin_role) auto-connects to the unbound hf admin on
first OIDC sign-in, then the window self-closes once any admin is
bound. Roles are scanned across userinfo + the (unverified) access
token: realm_access.roles, resource_access.*.roles, roles/role/groups.
Adds admin_role to settings model/env/effective/API and to the wizard
bootstrap config. Replaces the manual admin-subject approach.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 21:05:39 +01:00

351 lines
14 KiB
Python

"""OIDC (OpenID Connect) login + admin-configurable provider settings.
The OIDC provider can be configured at runtime from the admin UI
(persisted in the oidc_settings table). A stored row's non-empty fields
override the OIDC_* env vars; env values act as bootstrap defaults.
Sign-in policy: an OIDC identity must already be bound to an hf user
(see PUT /users/{id}/oidc-binding). Unbound identities are rejected.
"""
import logging
from datetime import timedelta
from urllib.parse import urlencode
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import RedirectResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db, settings
from app.models import models
from app.models.oidc_settings import OidcSettings
from app.api.deps import create_access_token, get_current_user, get_current_user_or_apikey
router = APIRouter(prefix="/auth", tags=["Auth"])
logger = logging.getLogger("harborforge.oidc")
# ---- effective config (DB row overrides env) ------------------------------
class EffectiveOidc:
def __init__(self, enabled, issuer, client_id, client_secret,
redirect_uri, scopes, post_login_redirect, admin_role):
self.enabled = enabled
self.issuer = issuer
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.scopes = scopes or "openid email profile"
self.post_login_redirect = post_login_redirect
self.admin_role = (admin_role or "admin").strip() or "admin"
@property
def configured(self) -> bool:
return bool(self.enabled and self.issuer and self.client_id)
def fingerprint(self) -> str:
return "|".join([
str(self.enabled), self.issuer or "", self.client_id or "",
self.client_secret or "", self.redirect_uri or "", self.scopes or "",
])
def get_effective_oidc(db: Session) -> EffectiveOidc:
row = db.query(OidcSettings).filter(OidcSettings.id == 1).first()
def pick(db_val, env_val):
return db_val if (db_val is not None and db_val != "") else env_val
if row is None:
return EffectiveOidc(
settings.OIDC_ENABLED, settings.OIDC_ISSUER, settings.OIDC_CLIENT_ID,
settings.OIDC_CLIENT_SECRET, settings.OIDC_REDIRECT_URI,
settings.OIDC_SCOPES, settings.OIDC_POST_LOGIN_REDIRECT,
settings.OIDC_ADMIN_ROLE,
)
return EffectiveOidc(
bool(row.enabled),
pick(row.issuer, settings.OIDC_ISSUER),
pick(row.client_id, settings.OIDC_CLIENT_ID),
pick(row.client_secret, settings.OIDC_CLIENT_SECRET),
pick(row.redirect_uri, settings.OIDC_REDIRECT_URI),
pick(row.scopes, settings.OIDC_SCOPES),
pick(row.post_login_redirect, settings.OIDC_POST_LOGIN_REDIRECT),
pick(getattr(row, "admin_role", None), settings.OIDC_ADMIN_ROLE),
)
# Authlib client cache, rebuilt when the effective config changes.
_oauth = None
_oauth_fp = None
def _client(cfg: EffectiveOidc):
global _oauth, _oauth_fp
if not cfg.configured:
raise HTTPException(status_code=503, detail="OIDC is not configured")
fp = cfg.fingerprint()
if _oauth is None or _oauth_fp != fp:
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name="oidc",
server_metadata_url=cfg.issuer.rstrip("/") + "/.well-known/openid-configuration",
client_id=cfg.client_id,
client_secret=cfg.client_secret,
client_kwargs={"scope": cfg.scopes},
)
_oauth, _oauth_fp = oauth, fp
return _oauth.oidc
def _invalidate_client():
global _oauth, _oauth_fp
_oauth = None
_oauth_fp = None
def _collect_roles(claims: dict, token: dict) -> set[str]:
"""Roles from common OIDC claim shapes, across the ID-token/userinfo
claims and the (unverified) access token — Keycloak puts realm/client
roles in the access token by default."""
pools = [claims if isinstance(claims, dict) else {}]
at = token.get("access_token")
if at:
try:
from jose import jwt as _jwt
pools.append(_jwt.get_unverified_claims(at))
except Exception:
pass
roles: set[str] = set()
for p in pools:
if not isinstance(p, dict):
continue
ra = p.get("realm_access")
if isinstance(ra, dict):
roles.update(ra.get("roles") or [])
res = p.get("resource_access")
if isinstance(res, dict):
for v in res.values():
if isinstance(v, dict):
roles.update(v.get("roles") or [])
for key in ("roles", "role", "groups"):
val = p.get(key)
if isinstance(val, str):
roles.add(val)
elif isinstance(val, (list, tuple)):
roles.update(str(x) for x in val)
return {str(r).strip().lstrip("/").lower() for r in roles if r}
def _frontend(cfg: EffectiveOidc, qs: dict | None = None, fragment: str | None = None) -> str:
base = cfg.post_login_redirect or "/"
url = base
if qs:
url += ("&" if "?" in base else "?") + urlencode(qs)
if fragment:
url += "#" + fragment
return url
# ---- public auth config ---------------------------------------------------
@router.get("/config")
def auth_config(db: Session = Depends(get_db)):
cfg = get_effective_oidc(db)
return {
"oidc_enabled": cfg.configured,
"oidc_only": bool(settings.HARBORFORGE_OIDC_ONLY),
"password_login": not bool(settings.HARBORFORGE_OIDC_ONLY),
"oidc_login_url": "/auth/oidc/login",
}
# ---- sign-in / link flows -------------------------------------------------
@router.get("/oidc/login")
async def oidc_login(request: Request, db: Session = Depends(get_db)):
cfg = get_effective_oidc(db)
oidc = _client(cfg)
request.session.pop("hf_oidc_uid", None)
request.session["hf_oidc_mode"] = "login"
return await oidc.authorize_redirect(request, cfg.redirect_uri)
@router.get("/oidc/link")
async def oidc_link(request: Request, db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)):
if settings.HARBORFORGE_OIDC_ONLY:
raise HTTPException(status_code=403, detail="Self-service linking is disabled in OIDC-only mode")
cfg = get_effective_oidc(db)
oidc = _client(cfg)
request.session["hf_oidc_mode"] = "link"
request.session["hf_oidc_uid"] = current_user.id
return await oidc.authorize_redirect(request, cfg.redirect_uri)
@router.get("/oidc/callback")
async def oidc_callback(request: Request, db: Session = Depends(get_db)):
cfg = get_effective_oidc(db)
oidc = _client(cfg)
mode = request.session.pop("hf_oidc_mode", "login")
link_uid = request.session.pop("hf_oidc_uid", None)
try:
token = await oidc.authorize_access_token(request)
except Exception:
return RedirectResponse(_frontend(cfg, {"oidc_error": "exchange_failed"}))
claims = token.get("userinfo") or {}
if not claims:
try:
claims = await oidc.userinfo(token=token)
except Exception:
claims = {}
subject = claims.get("sub")
issuer = claims.get("iss") or cfg.issuer
if not subject:
return RedirectResponse(_frontend(cfg, {"oidc_error": "no_subject"}))
if mode == "link":
if settings.HARBORFORGE_OIDC_ONLY or link_uid is None:
return RedirectResponse(_frontend(cfg, {"oidc_error": "link_not_allowed"}))
user = db.query(models.User).filter(models.User.id == link_uid).first()
if not user:
return RedirectResponse(_frontend(cfg, {"oidc_error": "user_gone"}))
clash = db.query(models.User).filter(
models.User.oidc_issuer == issuer,
models.User.oidc_subject == subject,
models.User.id != user.id,
).first()
if clash:
return RedirectResponse(_frontend(cfg, {"oidc_error": "already_bound"}))
user.oidc_issuer = issuer
user.oidc_subject = subject
db.commit()
return RedirectResponse(_frontend(cfg, {"oidc_linked": "1"}))
user = db.query(models.User).filter(
models.User.oidc_issuer == issuer,
models.User.oidc_subject == subject,
).first()
# OIDC-only bootstrap: before any admin is linked, an IdP user whose
# token carries the configured admin role auto-connects to the unbound
# hf admin. Self-closes once any admin is bound.
if user is None and settings.HARBORFORGE_OIDC_ONLY:
any_admin_bound = db.query(models.User).filter(
models.User.is_admin == True, # noqa: E712
models.User.oidc_subject.isnot(None),
).first()
if not any_admin_bound and cfg.admin_role.lower() in _collect_roles(claims, token):
taken = db.query(models.User).filter(
models.User.oidc_issuer == issuer,
models.User.oidc_subject == subject,
).first()
if taken is None:
boot = db.query(models.User).filter(
models.User.is_admin == True, # noqa: E712
models.User.is_active == True, # noqa: E712
models.User.oidc_subject.is_(None),
).order_by(models.User.id).first()
if boot is not None:
boot.oidc_issuer = issuer
boot.oidc_subject = subject
db.commit()
logger.info("OIDC bootstrap: auto-connected admin '%s' via admin role", boot.username)
user = boot
if not user or not user.is_active or user.username in ("acc-mgr", "deleted-user"):
return RedirectResponse(_frontend(cfg, {"oidc_error": "not_linked"}))
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
)
return RedirectResponse(_frontend(cfg, fragment=urlencode({"token": access_token})))
# ---- admin: OIDC provider settings ----------------------------------------
def _require_admin_any(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User:
"""Admin via JWT OR API key. The API-key path is the recovery channel
when OIDC-only mode is on and OIDC is not yet/incorrectly configured."""
if not getattr(current_user, "is_admin", False):
raise HTTPException(status_code=403, detail="Admin privileges required")
return current_user
class OidcSettingsIn(BaseModel):
enabled: bool | None = None
issuer: str | None = None
client_id: str | None = None
client_secret: str | None = None # blank/omitted = keep existing
redirect_uri: str | None = None
scopes: str | None = None
post_login_redirect: str | None = None
admin_role: str | None = None
class OidcSettingsOut(BaseModel):
enabled: bool
issuer: str | None
client_id: str | None
has_client_secret: bool
redirect_uri: str | None
scopes: str | None
post_login_redirect: str | None
admin_role: str
oidc_only: bool # read-only (deploy env)
effective_enabled: bool # provider actually usable
source: str # "db" or "env"
@router.get("/oidc/settings", response_model=OidcSettingsOut)
def get_oidc_settings(db: Session = Depends(get_db), _: models.User = Depends(_require_admin_any)):
row = db.query(OidcSettings).filter(OidcSettings.id == 1).first()
cfg = get_effective_oidc(db)
return OidcSettingsOut(
enabled=bool(row.enabled) if row else bool(settings.OIDC_ENABLED),
issuer=(row.issuer if row else None) or settings.OIDC_ISSUER or None,
client_id=(row.client_id if row else None) or settings.OIDC_CLIENT_ID or None,
has_client_secret=bool((row.client_secret if row else None) or settings.OIDC_CLIENT_SECRET),
redirect_uri=(row.redirect_uri if row else None) or settings.OIDC_REDIRECT_URI or None,
scopes=(row.scopes if row else None) or settings.OIDC_SCOPES or None,
post_login_redirect=(row.post_login_redirect if row else None) or settings.OIDC_POST_LOGIN_REDIRECT or None,
admin_role=cfg.admin_role,
oidc_only=bool(settings.HARBORFORGE_OIDC_ONLY),
effective_enabled=cfg.configured,
source="db" if row else "env",
)
@router.put("/oidc/settings", response_model=OidcSettingsOut)
def update_oidc_settings(payload: OidcSettingsIn, db: Session = Depends(get_db),
_: models.User = Depends(_require_admin_any)):
row = db.query(OidcSettings).filter(OidcSettings.id == 1).first()
if row is None:
row = OidcSettings(id=1, enabled=False)
db.add(row)
if payload.enabled is not None:
row.enabled = payload.enabled
if payload.issuer is not None:
row.issuer = payload.issuer.strip() or None
if payload.client_id is not None:
row.client_id = payload.client_id.strip() or None
# client_secret: only overwrite when a non-empty value is supplied
if payload.client_secret:
row.client_secret = payload.client_secret
if payload.redirect_uri is not None:
row.redirect_uri = payload.redirect_uri.strip() or None
if payload.scopes is not None:
row.scopes = payload.scopes.strip() or None
if payload.post_login_redirect is not None:
row.post_login_redirect = payload.post_login_redirect.strip() or None
if payload.admin_role is not None:
row.admin_role = payload.admin_role.strip() or None
db.commit()
_invalidate_client()
return get_oidc_settings(db=db, _=_)