"""Shared auth dependencies.""" import hashlib import logging from datetime import datetime, timedelta from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, APIKeyHeader from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db, settings from app.models import models from app.models.apikey import APIKey logger = logging.getLogger("harborforge.deps") pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token", auto_error=False) apikey_header = APIKeyHeader(name="X-API-Key", auto_error=False) class Token(BaseModel): access_token: str token_type: str def verify_password(plain_password: str, hashed_password: str) -> bool: if not hashed_password: return False return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password[:72]) def create_access_token(data: dict, expires_delta: timedelta = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) if not token: raise credentials_exception try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(models.User).filter(models.User.id == user_id).first() if user is None: raise credentials_exception return user def hash_api_key(raw: str) -> str: """SHA-256 of a raw API key. Keys are high-entropy random tokens, so a fast hash (not bcrypt) is appropriate and allows O(1) lookup by hash.""" return hashlib.sha256(raw.encode()).hexdigest() def _lookup_api_key(db: Session, key: str) -> models.User | None: """Resolve an API key string to a User; mark last_used_at on hit.""" if not key: return None key_obj = db.query(APIKey).filter(APIKey.key_hash == hash_api_key(key), APIKey.is_active == True).first() # noqa: E712 if not key_obj: return None key_obj.last_used_at = datetime.utcnow() db.commit() return db.query(models.User).filter(models.User.id == key_obj.user_id).first() async def get_current_user_or_apikey( token: str = Depends(oauth2_scheme), api_key: str = Depends(apikey_header), db: Session = Depends(get_db) ): """Authenticate via JWT token (Authorization: Bearer ) OR API key (X-API-Key: , OR — as a convenience for CLI clients that only know Bearer — Authorization: Bearer ; falls back when JWT decode fails). Bearer tokens are tried in order: local HS256 JWT → external Tessera (OIDC) RS256 access token → API key. The Tessera path is purely additive and never affects local-JWT/API-key callers. """ # Native X-API-Key header if api_key: user = _lookup_api_key(db, api_key) if user: return user # Bearer header — local JWT first, then Tessera, then API key. if token: try: return await get_current_user(token=token, db=db) except HTTPException: pass # External Tessera (OIDC) RS256 access token. try: from app.api.tessera import authenticate_tessera return authenticate_tessera(db, token) except HTTPException: pass except Exception: # JWKS fetch / unexpected verifier error → don't 500 logger.warning("Tessera token verification error", exc_info=True) # Bearer-carried API key (CLI convenience). user = _lookup_api_key(db, token) if user: return user raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) raise HTTPException(status_code=401, detail="Not authenticated") def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User: """Dependency: caller must be a global admin (JWT or API key).""" if not getattr(current_user, "is_admin", False): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required") return current_user