"""OIDC (OpenID Connect) login + admin-configurable provider settings. The OIDC provider can be configured at runtime from the admin UI (persisted in the oidc_settings table). A stored row's non-empty fields override the OIDC_* env vars; env values act as bootstrap defaults. Sign-in policy: an OIDC identity must already be bound to an hf user (see PUT /users/{id}/oidc-binding). Unbound identities are rejected. """ from datetime import timedelta from urllib.parse import urlencode from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import RedirectResponse from pydantic import BaseModel from sqlalchemy.orm import Session from app.core.config import get_db, settings from app.models import models from app.models.oidc_settings import OidcSettings from app.api.deps import create_access_token, get_current_user, get_current_user_or_apikey router = APIRouter(prefix="/auth", tags=["Auth"]) # ---- effective config (DB row overrides env) ------------------------------ class EffectiveOidc: def __init__(self, enabled, issuer, client_id, client_secret, redirect_uri, scopes, post_login_redirect): self.enabled = enabled self.issuer = issuer self.client_id = client_id self.client_secret = client_secret self.redirect_uri = redirect_uri self.scopes = scopes or "openid email profile" self.post_login_redirect = post_login_redirect @property def configured(self) -> bool: return bool(self.enabled and self.issuer and self.client_id) def fingerprint(self) -> str: return "|".join([ str(self.enabled), self.issuer or "", self.client_id or "", self.client_secret or "", self.redirect_uri or "", self.scopes or "", ]) def get_effective_oidc(db: Session) -> EffectiveOidc: row = db.query(OidcSettings).filter(OidcSettings.id == 1).first() def pick(db_val, env_val): return db_val if (db_val is not None and db_val != "") else env_val if row is None: return EffectiveOidc( settings.OIDC_ENABLED, settings.OIDC_ISSUER, settings.OIDC_CLIENT_ID, settings.OIDC_CLIENT_SECRET, settings.OIDC_REDIRECT_URI, settings.OIDC_SCOPES, settings.OIDC_POST_LOGIN_REDIRECT, ) return EffectiveOidc( bool(row.enabled), pick(row.issuer, settings.OIDC_ISSUER), pick(row.client_id, settings.OIDC_CLIENT_ID), pick(row.client_secret, settings.OIDC_CLIENT_SECRET), pick(row.redirect_uri, settings.OIDC_REDIRECT_URI), pick(row.scopes, settings.OIDC_SCOPES), pick(row.post_login_redirect, settings.OIDC_POST_LOGIN_REDIRECT), ) # Authlib client cache, rebuilt when the effective config changes. _oauth = None _oauth_fp = None def _client(cfg: EffectiveOidc): global _oauth, _oauth_fp if not cfg.configured: raise HTTPException(status_code=503, detail="OIDC is not configured") fp = cfg.fingerprint() if _oauth is None or _oauth_fp != fp: from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( name="oidc", server_metadata_url=cfg.issuer.rstrip("/") + "/.well-known/openid-configuration", client_id=cfg.client_id, client_secret=cfg.client_secret, client_kwargs={"scope": cfg.scopes}, ) _oauth, _oauth_fp = oauth, fp return _oauth.oidc def _invalidate_client(): global _oauth, _oauth_fp _oauth = None _oauth_fp = None def _frontend(cfg: EffectiveOidc, qs: dict | None = None, fragment: str | None = None) -> str: base = cfg.post_login_redirect or "/" url = base if qs: url += ("&" if "?" in base else "?") + urlencode(qs) if fragment: url += "#" + fragment return url # ---- public auth config --------------------------------------------------- @router.get("/config") def auth_config(db: Session = Depends(get_db)): cfg = get_effective_oidc(db) return { "oidc_enabled": cfg.configured, "oidc_only": bool(settings.HARBORFORGE_OIDC_ONLY), "password_login": not bool(settings.HARBORFORGE_OIDC_ONLY), "oidc_login_url": "/auth/oidc/login", } # ---- sign-in / link flows ------------------------------------------------- @router.get("/oidc/login") async def oidc_login(request: Request, db: Session = Depends(get_db)): cfg = get_effective_oidc(db) oidc = _client(cfg) request.session.pop("hf_oidc_uid", None) request.session["hf_oidc_mode"] = "login" return await oidc.authorize_redirect(request, cfg.redirect_uri) @router.get("/oidc/link") async def oidc_link(request: Request, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user)): if settings.HARBORFORGE_OIDC_ONLY: raise HTTPException(status_code=403, detail="Self-service linking is disabled in OIDC-only mode") cfg = get_effective_oidc(db) oidc = _client(cfg) request.session["hf_oidc_mode"] = "link" request.session["hf_oidc_uid"] = current_user.id return await oidc.authorize_redirect(request, cfg.redirect_uri) @router.get("/oidc/callback") async def oidc_callback(request: Request, db: Session = Depends(get_db)): cfg = get_effective_oidc(db) oidc = _client(cfg) mode = request.session.pop("hf_oidc_mode", "login") link_uid = request.session.pop("hf_oidc_uid", None) try: token = await oidc.authorize_access_token(request) except Exception: return RedirectResponse(_frontend(cfg, {"oidc_error": "exchange_failed"})) claims = token.get("userinfo") or {} if not claims: try: claims = await oidc.userinfo(token=token) except Exception: claims = {} subject = claims.get("sub") issuer = claims.get("iss") or cfg.issuer if not subject: return RedirectResponse(_frontend(cfg, {"oidc_error": "no_subject"})) if mode == "link": if settings.HARBORFORGE_OIDC_ONLY or link_uid is None: return RedirectResponse(_frontend(cfg, {"oidc_error": "link_not_allowed"})) user = db.query(models.User).filter(models.User.id == link_uid).first() if not user: return RedirectResponse(_frontend(cfg, {"oidc_error": "user_gone"})) clash = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, models.User.id != user.id, ).first() if clash: return RedirectResponse(_frontend(cfg, {"oidc_error": "already_bound"})) user.oidc_issuer = issuer user.oidc_subject = subject db.commit() return RedirectResponse(_frontend(cfg, {"oidc_linked": "1"})) user = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, ).first() if not user or not user.is_active or user.username in ("acc-mgr", "deleted-user"): return RedirectResponse(_frontend(cfg, {"oidc_error": "not_linked"})) access_token = create_access_token( data={"sub": str(user.id)}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), ) return RedirectResponse(_frontend(cfg, fragment=urlencode({"token": access_token}))) # ---- admin: OIDC provider settings ---------------------------------------- def _require_admin_any(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User: """Admin via JWT OR API key. The API-key path is the recovery channel when OIDC-only mode is on and OIDC is not yet/incorrectly configured.""" if not getattr(current_user, "is_admin", False): raise HTTPException(status_code=403, detail="Admin privileges required") return current_user class OidcSettingsIn(BaseModel): enabled: bool | None = None issuer: str | None = None client_id: str | None = None client_secret: str | None = None # blank/omitted = keep existing redirect_uri: str | None = None scopes: str | None = None post_login_redirect: str | None = None class OidcSettingsOut(BaseModel): enabled: bool issuer: str | None client_id: str | None has_client_secret: bool redirect_uri: str | None scopes: str | None post_login_redirect: str | None oidc_only: bool # read-only (deploy env) effective_enabled: bool # provider actually usable source: str # "db" or "env" @router.get("/oidc/settings", response_model=OidcSettingsOut) def get_oidc_settings(db: Session = Depends(get_db), _: models.User = Depends(_require_admin_any)): row = db.query(OidcSettings).filter(OidcSettings.id == 1).first() cfg = get_effective_oidc(db) return OidcSettingsOut( enabled=bool(row.enabled) if row else bool(settings.OIDC_ENABLED), issuer=(row.issuer if row else None) or settings.OIDC_ISSUER or None, client_id=(row.client_id if row else None) or settings.OIDC_CLIENT_ID or None, has_client_secret=bool((row.client_secret if row else None) or settings.OIDC_CLIENT_SECRET), redirect_uri=(row.redirect_uri if row else None) or settings.OIDC_REDIRECT_URI or None, scopes=(row.scopes if row else None) or settings.OIDC_SCOPES or None, post_login_redirect=(row.post_login_redirect if row else None) or settings.OIDC_POST_LOGIN_REDIRECT or None, oidc_only=bool(settings.HARBORFORGE_OIDC_ONLY), effective_enabled=cfg.configured, source="db" if row else "env", ) @router.put("/oidc/settings", response_model=OidcSettingsOut) def update_oidc_settings(payload: OidcSettingsIn, db: Session = Depends(get_db), _: models.User = Depends(_require_admin_any)): row = db.query(OidcSettings).filter(OidcSettings.id == 1).first() if row is None: row = OidcSettings(id=1, enabled=False) db.add(row) if payload.enabled is not None: row.enabled = payload.enabled if payload.issuer is not None: row.issuer = payload.issuer.strip() or None if payload.client_id is not None: row.client_id = payload.client_id.strip() or None # client_secret: only overwrite when a non-empty value is supplied if payload.client_secret: row.client_secret = payload.client_secret if payload.redirect_uri is not None: row.redirect_uri = payload.redirect_uri.strip() or None if payload.scopes is not None: row.scopes = payload.scopes.strip() or None if payload.post_login_redirect is not None: row.post_login_redirect = payload.post_login_redirect.strip() or None db.commit() _invalidate_client() return get_oidc_settings(db=db, _=_)