feat: add JWT auth (login/me), fix bcrypt version, add .gitignore

This commit is contained in:
Zhi
2026-02-21 12:11:06 +00:00
parent fd980c0344
commit 81e1a2fc58
11 changed files with 77 additions and 10 deletions

Binary file not shown.

View File

@@ -2,6 +2,11 @@ from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from typing import List
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from app.core.config import get_db, settings
from app.models import models
@@ -22,6 +27,51 @@ app.add_middleware(
allow_headers=["*"],
)
# Auth
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
user_id: int = None
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password:
return False
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
password = password[:72]
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise credentials_exception
return user
# Health check
@app.get("/health")
@@ -29,6 +79,23 @@ def health_check():
return {"status": "healthy"}
# ============ Auth API ============
@app.post("/auth/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password or ""):
raise HTTPException(status_code=401, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"})
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token = create_access_token(data={"sub": str(user.id)}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/auth/me", response_model=schemas.UserResponse)
async def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
# ============ Issues API ============
@app.post("/issues", response_model=schemas.IssueResponse, status_code=status.HTTP_201_CREATED)
@@ -43,7 +110,7 @@ def create_issue(issue: schemas.IssueCreate, db: Session = Depends(get_db)):
@app.get("/issues", response_model=List[schemas.IssueResponse])
def list_issues(
project_id: int = None,
status: str = None,
issue_status: str = None,
issue_type: str = None,
skip: int = 0,
limit: int = 100,
@@ -53,8 +120,8 @@ def list_issues(
if project_id:
query = query.filter(models.Issue.project_id == project_id)
if status:
query = query.filter(models.Issue.status == status)
if issue_status:
query = query.filter(models.Issue.status == issue_status)
if issue_type:
query = query.filter(models.Issue.issue_type == issue_type)
@@ -142,19 +209,13 @@ def get_project(project_id: int, db: Session = Depends(get_db)):
@app.post("/users", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Check if username or email exists
existing = db.query(models.User).filter(
(models.User.username == user.username) | (models.User.email == user.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
# Hash password if provided
hashed_password = None
if user.password:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
hashed_password = pwd_context.hash(user.password)
hashed_password = get_password_hash(user.password) if user.password else None
db_user = models.User(
username=user.username,