"""Tessera (external OIDC, Keycloak-compatible) access-token verification. Accepts RS256 access tokens issued by the configured Tessera realm as API bearer tokens. This is ADDITIVE to the existing local HS256 JWT and API-key auth — see app/api/deps.get_current_user_or_apikey. Verification: * fetch + cache the realm JWKS ({issuer}/protocol/openid-connect/certs), * select the JWK by the token header `kid`, * verify the RS256 signature, `iss == TESSERA_ISSUER`, `aud` contains TESSERA_AUDIENCE, and require `exp`/`iat`. Verified claims are mapped to / provision an hf models.User, mirroring the OIDC login callback provisioning (app/api/routers/oidc.py). """ import logging import threading import time import requests from fastapi import HTTPException from jose import jwt from jose.exceptions import JWTError from sqlalchemy.orm import Session from app.core.config import settings from app.models import models from app.models.role_permission import Role logger = logging.getLogger("harborforge.tessera") # JWKS cache: refetched when a token's kid is unknown, and at most once per # _JWKS_TTL otherwise (so rotated/revoked keys are picked up within the TTL). _JWKS_TTL = 3600 _jwks_lock = threading.Lock() _jwks: dict | None = None _jwks_fetched_at: float = 0.0 def _jwks_url() -> str: return settings.TESSERA_ISSUER.rstrip("/") + "/protocol/openid-connect/certs" def _fetch_jwks() -> dict: resp = requests.get(_jwks_url(), timeout=5) resp.raise_for_status() return resp.json() def _get_jwks(force: bool = False) -> dict: global _jwks, _jwks_fetched_at with _jwks_lock: now = time.time() if force or _jwks is None or (now - _jwks_fetched_at) > _JWKS_TTL: _jwks = _fetch_jwks() _jwks_fetched_at = now return _jwks def _key_for_kid(kid: str | None) -> dict | None: keys = (_get_jwks() or {}).get("keys") or [] for k in keys: if k.get("kid") == kid: return k # Unknown kid → keys may have rotated; force a refresh once and retry. keys = (_get_jwks(force=True) or {}).get("keys") or [] for k in keys: if k.get("kid") == kid: return k return None def verify_tessera_token(token: str) -> dict: """Verify a Tessera RS256 access token and return its decoded claims. Raises HTTPException(401) on any failure (so callers can fall through to the next auth method without leaking which check failed). """ if not token: raise HTTPException(status_code=401, detail="No token") try: header = jwt.get_unverified_header(token) except JWTError as exc: raise HTTPException(status_code=401, detail="Malformed token") from exc if header.get("alg") != "RS256": raise HTTPException(status_code=401, detail="Unexpected token algorithm") key = _key_for_kid(header.get("kid")) if key is None: raise HTTPException(status_code=401, detail="Unknown signing key") try: claims = jwt.decode( token, key, algorithms=["RS256"], issuer=settings.TESSERA_ISSUER, audience=settings.TESSERA_AUDIENCE, options={"require_exp": True, "require_iat": True}, ) except JWTError as exc: raise HTTPException(status_code=401, detail="Invalid Tessera token") from exc if not claims.get("sub"): raise HTTPException(status_code=401, detail="Token missing subject") return claims def _collect_roles(claims: dict) -> set[str]: """Tessera/Keycloak roles: realm_access.roles + resource_access..roles. Mirrors app/api/routers/oidc._collect_roles (normalised lower-case, no leading slash). """ roles: set[str] = set() ra = claims.get("realm_access") if isinstance(ra, dict): roles.update(ra.get("roles") or []) res = claims.get("resource_access") if isinstance(res, dict): for v in res.values(): if isinstance(v, dict): roles.update(v.get("roles") or []) return {str(r).strip().lstrip("/").lower() for r in roles if r} # Token roles (lower-case) mapped to an hf global role name, highest first. # The first match wins. Token "admin" → hf admin (sets is_admin); others map # onto the existing global role hierarchy. _ROLE_PRIORITY = ["admin", "mgr", "dev", "member", "viewer", "guest"] def _resolve_global_role(db: Session, token_roles: set[str]) -> Role | None: for name in _ROLE_PRIORITY: if name in token_roles: role = db.query(Role).filter( Role.is_global == True, # noqa: E712 Role.name == name, ).first() if role: return role # No recognised role → fall back to guest (least privilege). return db.query(Role).filter( Role.is_global == True, # noqa: E712 Role.name == "guest", ).first() def _unique_username(db: Session, base: str) -> str: base = (base or "tessera-user").strip() or "tessera-user" candidate = base n = 1 while db.query(models.User).filter(models.User.username == candidate).first(): n += 1 candidate = f"{base}-{n}" return candidate def resolve_or_provision_user(db: Session, claims: dict) -> models.User: """Resolve the hf User for a verified Tessera token, auto-provisioning one if no match exists. Mirrors the OIDC callback binding/provisioning.""" issuer = claims.get("iss") or settings.TESSERA_ISSUER subject = claims.get("sub") email = (claims.get("email") or "").strip() or None username = (claims.get("preferred_username") or "").strip() or None token_roles = _collect_roles(claims) is_admin = "admin" in token_roles # 1) by (issuer, subject) user = db.query(models.User).filter( models.User.oidc_issuer == issuer, models.User.oidc_subject == subject, ).first() # 2) by email — bind this Tessera identity onto the existing account. if user is None and email: user = db.query(models.User).filter(models.User.email == email).first() if user is not None: if user.oidc_subject and user.oidc_subject != subject: # Email belongs to a user already bound to a different identity. raise HTTPException(status_code=401, detail="Account already bound to another identity") user.oidc_issuer = issuer user.oidc_subject = subject db.commit() # 3) auto-provision if user is None: role = _resolve_global_role(db, token_roles) if role is None: raise HTTPException(status_code=500, detail="No global role available (DB not seeded)") uname = _unique_username(db, username or (email.split("@")[0] if email else None) or f"tessera-{subject[:8]}") # Email is NOT NULL + unique; synthesise a stable placeholder if absent. eff_email = email or f"{subject}@tessera.local" if db.query(models.User).filter(models.User.email == eff_email).first(): eff_email = f"{subject}@tessera.local" user = models.User( username=uname, email=eff_email, full_name=(claims.get("name") or username or uname), hashed_password=None, oidc_issuer=issuer, oidc_subject=subject, is_active=True, is_admin=is_admin, role_id=role.id, ) db.add(user) db.commit() db.refresh(user) logger.info("Tessera: provisioned user '%s' (admin=%s) for subject %s", user.username, is_admin, subject) return user if not user.is_active or user.username in ("acc-mgr", "deleted-user"): raise HTTPException(status_code=401, detail="User is not permitted to sign in") # Keep admin status in sync with the token's realm/client roles on each # request so role changes in Tessera take effect without re-provisioning. if bool(user.is_admin) != is_admin: user.is_admin = is_admin db.commit() db.refresh(user) return user def authenticate_tessera(db: Session, token: str) -> models.User: """Verify a Tessera bearer token and return the resolved hf User.""" claims = verify_tessera_token(token) return resolve_or_provision_user(db, claims)