"""Users router.""" from datetime import datetime from typing import List from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from app.api.deps import get_current_user, get_password_hash from app.core.config import get_db from app.models import models from app.models.worklog import WorkLog from app.schemas import schemas router = APIRouter(prefix="/users", tags=["Users"]) def require_admin(current_user: models.User = Depends(get_current_user)): if not current_user.is_admin: raise HTTPException(status_code=403, detail="Admin required") return current_user @router.post("", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED) def create_user( user: schemas.UserCreate, db: Session = Depends(get_db), _: models.User = Depends(require_admin), ): existing = db.query(models.User).filter( (models.User.username == user.username) | (models.User.email == user.email) ).first() if existing: raise HTTPException(status_code=400, detail="Username or email already exists") hashed_password = get_password_hash(user.password) if user.password else None db_user = models.User( username=user.username, email=user.email, full_name=user.full_name, hashed_password=hashed_password, is_admin=user.is_admin, is_active=True, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @router.get("", response_model=List[schemas.UserResponse]) def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), _: models.User = Depends(require_admin), ): return db.query(models.User).order_by(models.User.created_at.desc()).offset(skip).limit(limit).all() @router.get("/{user_id}", response_model=schemas.UserResponse) def get_user( user_id: int, db: Session = Depends(get_db), _: models.User = Depends(require_admin), ): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user @router.patch("/{user_id}", response_model=schemas.UserResponse) def update_user( user_id: int, payload: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(require_admin), ): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") if payload.email is not None and payload.email != user.email: existing = db.query(models.User).filter(models.User.email == payload.email, models.User.id != user_id).first() if existing: raise HTTPException(status_code=400, detail="Email already exists") user.email = payload.email if payload.full_name is not None: user.full_name = payload.full_name if payload.password is not None and payload.password.strip(): user.hashed_password = get_password_hash(payload.password) if payload.is_admin is not None: if current_user.id == user.id and payload.is_admin is False: raise HTTPException(status_code=400, detail="You cannot revoke your own admin access") user.is_admin = payload.is_admin if payload.is_active is not None: if current_user.id == user.id and payload.is_active is False: raise HTTPException(status_code=400, detail="You cannot deactivate your own account") user.is_active = payload.is_active db.commit() db.refresh(user) return user @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( user_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(require_admin), ): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") if current_user.id == user.id: raise HTTPException(status_code=400, detail="You cannot delete your own account") try: db.delete(user) db.commit() except IntegrityError: db.rollback() raise HTTPException(status_code=400, detail="User has related records. Deactivate the account instead.") return None class WorkLogResponse(BaseModel): id: int task_id: int user_id: int hours: float description: str | None = None logged_date: datetime created_at: datetime class Config: from_attributes = True @router.get("/{user_id}/worklogs", response_model=List[WorkLogResponse]) def list_user_worklogs( user_id: int, limit: int = 50, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ): if current_user.id != user_id and not current_user.is_admin: raise HTTPException(status_code=403, detail="Forbidden") return db.query(WorkLog).filter(WorkLog.user_id == user_id).order_by(WorkLog.logged_date.desc()).limit(limit).all()