"""OIDC (OpenID Connect) login + admin-configurable provider settings. Provider config (issuer / client_id / client_secret / redirect_uri / scopes / post_login_redirect / admin_role / enabled) lives entirely in the `oidc_settings` DB table (single row, id=1) and is set via either the admin UI or `docker exec hf-backend hf-cli config oidc ...`. HARBORFORGE_OIDC_ONLY is the only OIDC-related env var (deploy-time policy: when true, password login is disabled). Sign-in policy: an OIDC identity must already be bound to an hf user (see PUT /users/{id}/oidc-binding). Unbound identities are rejected. """ import logging from datetime import timedelta from urllib.parse import urlencode from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import RedirectResponse from pydantic import BaseModel from sqlalchemy.orm import Session from app.core.config import get_db, settings from app.models import models from app.models.oidc_settings import OidcSettings from app.api.deps import create_access_token, get_current_user, get_current_user_or_apikey router = APIRouter(prefix="/auth", tags=["Auth"]) logger = logging.getLogger("harborforge.oidc") # ---- effective config (DB row overrides env) ------------------------------ class EffectiveOidc: def __init__(self, enabled, issuer, client_id, client_secret, redirect_uri, scopes, post_login_redirect, admin_role): self.enabled = enabled self.issuer = issuer self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.scopes = scopes or "openid email profile" self.post_login_redirect = post_login_redirect self.admin_role = (admin_role or "admin").strip() or "admin" @property def configured(self) -> bool: return bool(self.enabled and self.issuer and self.client_id) def fingerprint(self) -> str: return "|".join([ str(self.enabled), self.issuer or "", self.client_id or "", self.client_secret or "", self.redirect_uri or "", self.scopes or "", ]) def get_effective_oidc(db: Session) -> EffectiveOidc: """DB row is the only source of truth — no env fallback. If the row is absent OIDC is treated as unconfigured (login attempts will 503).""" row = db.query(OidcSettings).filter(OidcSettings.id == 1).first() if row is None: return EffectiveOidc(False, "", "", "", "", "", "", "admin") return EffectiveOidc( bool(row.enabled), row.issuer or "", row.client_id or "", row.client_secret or "", row.redirect_uri or "", row.scopes or "", row.post_login_redirect or "", getattr(row, "admin_role", None) or "admin", ) # Authlib client cache, rebuilt when the effective config changes. _oauth = None _oauth_fp = None def _client(cfg: EffectiveOidc): global _oauth, _oauth_fp if not cfg.configured: raise HTTPException(status_code=503, detail="OIDC is not configured") fp = cfg.fingerprint() if _oauth is None or _oauth_fp != fp: from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( name="oidc", server_metadata_url=cfg.issuer.rstrip("/") + "/.well-known/openid-configuration", client_id=cfg.client_id, client_secret=cfg.client_secret, client_kwargs={"scope": cfg.scopes}, ) _oauth, _oauth_fp = oauth, fp return _oauth.oidc def _invalidate_client(): global _oauth, _oauth_fp _oauth = None _oauth_fp = None def _collect_roles(claims: dict, token: dict) -> set[str]: """Roles from common OIDC claim shapes, across the ID-token/userinfo claims and the (unverified) access token — Keycloak puts realm/client roles in the access token by default.""" pools = [claims if isinstance(claims, dict) else {}] at = token.get("access_token") if at: try: from jose import jwt as _jwt pools.append(_jwt.get_unverified_claims(at)) except Exception: pass roles: set[str] = set() for p in pools: if not isinstance(p, dict): continue ra = p.get("realm_access") if isinstance(ra, dict): roles.update(ra.get("roles") or []) res = p.get("resource_access") if isinstance(res, dict): for v in res.values(): if isinstance(v, dict): roles.update(v.get("roles") or []) for key in ("roles", "role", "groups"): val = p.get(key) if isinstance(val, str): roles.add(val) elif isinstance(val, (list, tuple)): roles.update(str(x) for x in val) return {str(r).strip().lstrip("/").lower() for r in roles if r} def _frontend(cfg: EffectiveOidc, qs: dict | None = None, fragment: str | None = None) -> str: base = cfg.post_login_redirect or "/" url = base if qs: url += ("&" if "?" in base else "?") + urlencode(qs) if fragment: url += "#" + fragment return url # ---- public auth config --------------------------------------------------- @router.get("/config") def auth_config(db: Session = Depends(get_db)): cfg = get_effective_oidc(db) return { "oidc_enabled": cfg.configured, "oidc_only": bool(settings.HARBORFORGE_OIDC_ONLY), "password_login": not bool(settings.HARBORFORGE_OIDC_ONLY), "oidc_login_url": "/auth/oidc/login", } # ---- sign-in / link flows ------------------------------------------------- @router.get("/oidc/login") async def oidc_login(request: Request, db: Session = Depends(get_db)): cfg = get_effective_oidc(db) oidc = _client(cfg) request.session.pop("hf_oidc_uid", None) request.session["hf_oidc_mode"] = "login" return await oidc.authorize_redirect(request, cfg.redirect_uri) @router.get("/oidc/link") async def oidc_link(request: Request, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user)): if settings.HARBORFORGE_OIDC_ONLY: raise HTTPException(status_code=403, detail="Self-service linking is disabled in OIDC-only mode") cfg = get_effective_oidc(db) oidc = _client(cfg) request.session["hf_oidc_mode"] = "link" request.session["hf_oidc_uid"] = current_user.id return await oidc.authorize_redirect(request, cfg.redirect_uri) @router.get("/oidc/callback") async def oidc_callback(request: Request, db: Session = Depends(get_db)): cfg = get_effective_oidc(db) oidc = _client(cfg) mode = request.session.pop("hf_oidc_mode", "login") link_uid = request.session.pop("hf_oidc_uid", None) try: token = await oidc.authorize_access_token(request) except Exception: return RedirectResponse(_frontend(cfg, {"oidc_error": "exchange_failed"})) claims = token.get("userinfo") or {} if not claims: try: claims = await oidc.userinfo(token=token) except Exception: claims = {} subject = claims.get("sub") issuer = claims.get("iss") or cfg.issuer if not subject: return RedirectResponse(_frontend(cfg, {"oidc_error": "no_subject"})) if mode == "link": if settings.HARBORFORGE_OIDC_ONLY or link_uid is None: return RedirectResponse(_frontend(cfg, {"oidc_error": "link_not_allowed"})) user = db.query(models.User).filter(models.User.id == link_uid).first() if not user: return RedirectResponse(_frontend(cfg, {"oidc_error": "user_gone"})) clash = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, models.User.id != user.id, ).first() if clash: return RedirectResponse(_frontend(cfg, {"oidc_error": "already_bound"})) user.oidc_issuer = issuer user.oidc_subject = subject db.commit() return RedirectResponse(_frontend(cfg, {"oidc_linked": "1"})) user = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, ).first() # OIDC-only bootstrap: before any admin is linked, an IdP user whose # token carries the configured admin role auto-connects to the unbound # hf admin. Self-closes once any admin is bound. if user is None and settings.HARBORFORGE_OIDC_ONLY: any_admin_bound = db.query(models.User).filter( models.User.is_admin == True, # noqa: E712 models.User.oidc_subject.isnot(None), ).first() if not any_admin_bound and cfg.admin_role.lower() in _collect_roles(claims, token): taken = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, ).first() if taken is None: boot = db.query(models.User).filter( models.User.is_admin == True, # noqa: E712 models.User.is_active == True, # noqa: E712 models.User.oidc_subject.is_(None), ).order_by(models.User.id).first() if boot is not None: boot.oidc_issuer = issuer boot.oidc_subject = subject db.commit() logger.info("OIDC bootstrap: auto-connected admin '%s' via admin role", boot.username) user = boot if not user or not user.is_active or user.username in ("acc-mgr", "deleted-user"): return RedirectResponse(_frontend(cfg, {"oidc_error": "not_linked"})) access_token = create_access_token( data={"sub": str(user.id)}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), ) return RedirectResponse(_frontend(cfg, fragment=urlencode({"token": access_token}))) # ---- admin: OIDC provider settings ---------------------------------------- def _require_admin_any(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User: """Admin via JWT OR API key. The API-key path is the recovery channel when OIDC-only mode is on and OIDC is not yet/incorrectly configured.""" if not getattr(current_user, "is_admin", False): raise HTTPException(status_code=403, detail="Admin privileges required") return current_user class OidcSettingsIn(BaseModel): enabled: bool | None = None issuer: str | None = None client_id: str | None = None client_secret: str | None = None # blank/omitted = keep existing redirect_uri: str | None = None scopes: str | None = None post_login_redirect: str | None = None admin_role: str | None = None class OidcSettingsOut(BaseModel): enabled: bool issuer: str | None client_id: str | None has_client_secret: bool redirect_uri: str | None scopes: str | None post_login_redirect: str | None admin_role: str oidc_only: bool # read-only (deploy env) effective_enabled: bool # provider actually usable source: str # "db" or "env" @router.get("/oidc/settings", response_model=OidcSettingsOut) def get_oidc_settings(db: Session = Depends(get_db), _: models.User = Depends(_require_admin_any)): row = db.query(OidcSettings).filter(OidcSettings.id == 1).first() cfg = get_effective_oidc(db) return OidcSettingsOut( enabled=bool(row.enabled) if row else False, issuer=(row.issuer if row else None) or None, client_id=(row.client_id if row else None) or None, has_client_secret=bool(row.client_secret if row else None), redirect_uri=(row.redirect_uri if row else None) or None, scopes=(row.scopes if row else None) or None, post_login_redirect=(row.post_login_redirect if row else None) or None, admin_role=cfg.admin_role, oidc_only=bool(settings.HARBORFORGE_OIDC_ONLY), effective_enabled=cfg.configured, source="db", ) @router.put("/oidc/settings", response_model=OidcSettingsOut) def update_oidc_settings(payload: OidcSettingsIn, db: Session = Depends(get_db), _: models.User = Depends(_require_admin_any)): row = db.query(OidcSettings).filter(OidcSettings.id == 1).first() if row is None: row = OidcSettings(id=1, enabled=False) db.add(row) if payload.enabled is not None: row.enabled = payload.enabled if payload.issuer is not None: row.issuer = payload.issuer.strip() or None if payload.client_id is not None: row.client_id = payload.client_id.strip() or None # client_secret: only overwrite when a non-empty value is supplied if payload.client_secret: row.client_secret = payload.client_secret if payload.redirect_uri is not None: row.redirect_uri = payload.redirect_uri.strip() or None if payload.scopes is not None: row.scopes = payload.scopes.strip() or None if payload.post_login_redirect is not None: row.post_login_redirect = payload.post_login_redirect.strip() or None if payload.admin_role is not None: row.admin_role = payload.admin_role.strip() or None db.commit() _invalidate_client() return get_oidc_settings(db=db, _=_)