diff --git a/app/api/routers/users.py b/app/api/routers/users.py index ec0b39b..525922f 100644 --- a/app/api/routers/users.py +++ b/app/api/routers/users.py @@ -1,18 +1,33 @@ """Users router.""" +from datetime import datetime from typing import List + from fastapi import APIRouter, Depends, HTTPException, status +from pydantic import BaseModel +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session +from app.api.deps import get_current_user, get_password_hash from app.core.config import get_db from app.models import models +from app.models.worklog import WorkLog from app.schemas import schemas -from app.api.deps import get_password_hash router = APIRouter(prefix="/users", tags=["Users"]) +def require_admin(current_user: models.User = Depends(get_current_user)): + if not current_user.is_admin: + raise HTTPException(status_code=403, detail="Admin required") + return current_user + + @router.post("", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED) -def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): +def create_user( + user: schemas.UserCreate, + db: Session = Depends(get_db), + _: models.User = Depends(require_admin), +): existing = db.query(models.User).filter( (models.User.username == user.username) | (models.User.email == user.email) ).first() @@ -20,8 +35,12 @@ def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): raise HTTPException(status_code=400, detail="Username or email already exists") hashed_password = get_password_hash(user.password) if user.password else None db_user = models.User( - username=user.username, email=user.email, full_name=user.full_name, - hashed_password=hashed_password, is_admin=user.is_admin + username=user.username, + email=user.email, + full_name=user.full_name, + hashed_password=hashed_password, + is_admin=user.is_admin, + is_active=True, ) db.add(db_user) db.commit() @@ -30,12 +49,21 @@ def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): @router.get("", response_model=List[schemas.UserResponse]) -def list_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): - return db.query(models.User).offset(skip).limit(limit).all() +def list_users( + skip: int = 0, + limit: int = 100, + db: Session = Depends(get_db), + _: models.User = Depends(require_admin), +): + return db.query(models.User).order_by(models.User.created_at.desc()).offset(skip).limit(limit).all() @router.get("/{user_id}", response_model=schemas.UserResponse) -def get_user(user_id: int, db: Session = Depends(get_db)): +def get_user( + user_id: int, + db: Session = Depends(get_db), + _: models.User = Depends(require_admin), +): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") @@ -43,24 +71,61 @@ def get_user(user_id: int, db: Session = Depends(get_db)): @router.patch("/{user_id}", response_model=schemas.UserResponse) -def update_user(user_id: int, db: Session = Depends(get_db), full_name: str = None, email: str = None): +def update_user( + user_id: int, + payload: schemas.UserUpdate, + db: Session = Depends(get_db), + current_user: models.User = Depends(require_admin), +): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") - if full_name is not None: - user.full_name = full_name - if email is not None: - user.email = email + + if payload.email is not None and payload.email != user.email: + existing = db.query(models.User).filter(models.User.email == payload.email, models.User.id != user_id).first() + if existing: + raise HTTPException(status_code=400, detail="Email already exists") + user.email = payload.email + + if payload.full_name is not None: + user.full_name = payload.full_name + + if payload.password is not None and payload.password.strip(): + user.hashed_password = get_password_hash(payload.password) + + if payload.is_admin is not None: + if current_user.id == user.id and payload.is_admin is False: + raise HTTPException(status_code=400, detail="You cannot revoke your own admin access") + user.is_admin = payload.is_admin + + if payload.is_active is not None: + if current_user.id == user.id and payload.is_active is False: + raise HTTPException(status_code=400, detail="You cannot deactivate your own account") + user.is_active = payload.is_active + db.commit() db.refresh(user) return user -# ---- User worklogs ---- - -from app.models.worklog import WorkLog -from pydantic import BaseModel -from datetime import datetime +@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_user( + user_id: int, + db: Session = Depends(get_db), + current_user: models.User = Depends(require_admin), +): + user = db.query(models.User).filter(models.User.id == user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + if current_user.id == user.id: + raise HTTPException(status_code=400, detail="You cannot delete your own account") + try: + db.delete(user) + db.commit() + except IntegrityError: + db.rollback() + raise HTTPException(status_code=400, detail="User has related records. Deactivate the account instead.") + return None class WorkLogResponse(BaseModel): @@ -71,10 +136,18 @@ class WorkLogResponse(BaseModel): description: str | None = None logged_date: datetime created_at: datetime + class Config: from_attributes = True @router.get("/{user_id}/worklogs", response_model=List[WorkLogResponse]) -def list_user_worklogs(user_id: int, limit: int = 50, db: Session = Depends(get_db)): +def list_user_worklogs( + user_id: int, + limit: int = 50, + db: Session = Depends(get_db), + current_user: models.User = Depends(get_current_user), +): + if current_user.id != user_id and not current_user.is_admin: + raise HTTPException(status_code=403, detail="Forbidden") return db.query(WorkLog).filter(WorkLog.user_id == user_id).order_by(WorkLog.logged_date.desc()).limit(limit).all() diff --git a/app/schemas/schemas.py b/app/schemas/schemas.py index 15ed9ad..28bac76 100644 --- a/app/schemas/schemas.py +++ b/app/schemas/schemas.py @@ -164,6 +164,14 @@ class UserCreate(UserBase): is_admin: bool = False +class UserUpdate(BaseModel): + full_name: Optional[str] = None + email: Optional[str] = None + password: Optional[str] = None + is_admin: Optional[bool] = None + is_active: Optional[bool] = None + + class UserResponse(UserBase): id: int is_active: bool