"""OIDC (OpenID Connect) login + public auth-config. Generic OIDC via discovery. The backend performs the Authorization Code flow, then issues its own existing HS256 JWT (same as password login) so the rest of the app is unchanged. Sign-in policy: an OIDC identity must already be bound to an hf user (see PUT /users/{id}/oidc-binding). Unbound identities are rejected — no auto-provisioning. """ from datetime import timedelta from urllib.parse import urlencode from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import RedirectResponse from sqlalchemy.orm import Session from app.core.config import get_db, settings from app.models import models from app.api.deps import create_access_token, get_current_user router = APIRouter(prefix="/auth", tags=["Auth"]) # Authlib registry — only configured when OIDC is enabled. _oauth = None def _get_oidc(): """Lazily build the Authlib OIDC client; 503 if not configured.""" global _oauth if not (settings.OIDC_ENABLED and settings.OIDC_ISSUER and settings.OIDC_CLIENT_ID): raise HTTPException(status_code=503, detail="OIDC is not configured") if _oauth is None: from authlib.integrations.starlette_client import OAuth oauth = OAuth() oauth.register( name="oidc", server_metadata_url=settings.OIDC_ISSUER.rstrip("/") + "/.well-known/openid-configuration", client_id=settings.OIDC_CLIENT_ID, client_secret=settings.OIDC_CLIENT_SECRET, client_kwargs={"scope": settings.OIDC_SCOPES}, ) _oauth = oauth return _oauth.oidc def _frontend(suffix_qs: dict | None = None, fragment: str | None = None) -> str: """Build the post-login frontend redirect (never client-controlled).""" base = settings.OIDC_POST_LOGIN_REDIRECT or "/" url = base if suffix_qs: url += ("&" if "?" in base else "?") + urlencode(suffix_qs) if fragment: url += "#" + fragment return url @router.get("/config") def auth_config(): """Public: lets the frontend decide which login UI to render.""" return { "oidc_enabled": bool(settings.OIDC_ENABLED and settings.OIDC_ISSUER and settings.OIDC_CLIENT_ID), "oidc_only": bool(settings.HARBORFORGE_OIDC_ONLY), "password_login": not bool(settings.HARBORFORGE_OIDC_ONLY), "oidc_login_url": "/auth/oidc/login", } @router.get("/oidc/login") async def oidc_login(request: Request): """Start the OIDC Authorization Code flow for sign-in.""" oidc = _get_oidc() request.session.pop("hf_oidc_mode", None) request.session.pop("hf_oidc_uid", None) request.session["hf_oidc_mode"] = "login" return await oidc.authorize_redirect(request, settings.OIDC_REDIRECT_URI) @router.get("/oidc/link") async def oidc_link(request: Request, current_user: models.User = Depends(get_current_user)): """Self-service: bind the caller's own account to an OIDC identity. Only available when NOT in OIDC-only mode (admins use the binding API in that mode).""" if settings.HARBORFORGE_OIDC_ONLY: raise HTTPException(status_code=403, detail="Self-service linking is disabled in OIDC-only mode") oidc = _get_oidc() request.session["hf_oidc_mode"] = "link" request.session["hf_oidc_uid"] = current_user.id return await oidc.authorize_redirect(request, settings.OIDC_REDIRECT_URI) @router.get("/oidc/callback") async def oidc_callback(request: Request, db: Session = Depends(get_db)): """OIDC redirect target. Resolves identity → hf user (must be bound).""" oidc = _get_oidc() mode = request.session.pop("hf_oidc_mode", "login") link_uid = request.session.pop("hf_oidc_uid", None) try: token = await oidc.authorize_access_token(request) except Exception: return RedirectResponse(_frontend({"oidc_error": "exchange_failed"})) claims = token.get("userinfo") or {} if not claims: try: claims = await oidc.userinfo(token=token) except Exception: claims = {} subject = claims.get("sub") issuer = claims.get("iss") or settings.OIDC_ISSUER if not subject: return RedirectResponse(_frontend({"oidc_error": "no_subject"})) if mode == "link": if settings.HARBORFORGE_OIDC_ONLY or link_uid is None: return RedirectResponse(_frontend({"oidc_error": "link_not_allowed"})) user = db.query(models.User).filter(models.User.id == link_uid).first() if not user: return RedirectResponse(_frontend({"oidc_error": "user_gone"})) clash = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, models.User.id != user.id, ).first() if clash: return RedirectResponse(_frontend({"oidc_error": "already_bound"})) user.oidc_issuer = issuer user.oidc_subject = subject db.commit() return RedirectResponse(_frontend({"oidc_linked": "1"})) # sign-in: identity must already be bound to an active hf user user = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, ).first() if not user or not user.is_active or user.username in ("acc-mgr", "deleted-user"): return RedirectResponse(_frontend({"oidc_error": "not_linked"})) access_token = create_access_token( data={"sub": str(user.id)}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), ) return RedirectResponse(_frontend(fragment=urlencode({"token": access_token})))