"""Shared auth dependencies.""" import hashlib from datetime import datetime, timedelta from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, APIKeyHeader from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.config import get_db, settings from app.models import models from app.models.apikey import APIKey pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token", auto_error=False) apikey_header = APIKeyHeader(name="X-API-Key", auto_error=False) class Token(BaseModel): access_token: str token_type: str def verify_password(plain_password: str, hashed_password: str) -> bool: if not hashed_password: return False return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password[:72]) def create_access_token(data: dict, expires_delta: timedelta = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) if not token: raise credentials_exception try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) user_id = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(models.User).filter(models.User.id == user_id).first() if user is None: raise credentials_exception return user def hash_api_key(raw: str) -> str: """SHA-256 of a raw API key. Keys are high-entropy random tokens, so a fast hash (not bcrypt) is appropriate and allows O(1) lookup by hash.""" return hashlib.sha256(raw.encode()).hexdigest() def _lookup_api_key(db: Session, key: str) -> models.User | None: """Resolve an API key string to a User; mark last_used_at on hit.""" if not key: return None key_obj = db.query(APIKey).filter(APIKey.key_hash == hash_api_key(key), APIKey.is_active == True).first() # noqa: E712 if not key_obj: return None key_obj.last_used_at = datetime.utcnow() db.commit() return db.query(models.User).filter(models.User.id == key_obj.user_id).first() async def get_current_user_or_apikey( token: str = Depends(oauth2_scheme), api_key: str = Depends(apikey_header), db: Session = Depends(get_db) ): """Authenticate via JWT token (Authorization: Bearer ) OR API key (X-API-Key: , OR — as a convenience for CLI clients that only know Bearer — Authorization: Bearer ; falls back when JWT decode fails). """ # Native X-API-Key header if api_key: user = _lookup_api_key(db, api_key) if user: return user # Bearer header — try JWT first, then API key on decode failure if token: try: return await get_current_user(token=token, db=db) except HTTPException: user = _lookup_api_key(db, token) if user: return user raise raise HTTPException(status_code=401, detail="Not authenticated") def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User: """Dependency: caller must be a global admin (JWT or API key).""" if not getattr(current_user, "is_admin", False): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required") return current_user