Files
HarborForge.Backend/app/api/routers/users.py
zhi e5fd89f972 feat: add username-based user lookup and permission introspection endpoint
- users router: accept username or id in get/update/delete/worklogs via _find_user_by_id_or_username()
- auth router: add GET /auth/me/permissions for CLI help introspection (token → user → role → permissions)
2026-03-21 14:21:54 +00:00

211 lines
7.2 KiB
Python

"""Users router."""
from datetime import datetime
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.api.deps import get_current_user, get_password_hash
from app.core.config import get_db
from app.models import models
from app.models.role_permission import Permission, Role, RolePermission
from app.models.worklog import WorkLog
from app.schemas import schemas
router = APIRouter(prefix="/users", tags=["Users"])
def require_admin(current_user: models.User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
def _has_global_permission(db: Session, user: models.User, permission_name: str) -> bool:
if user.is_admin:
return True
if not user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == permission_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
def require_account_creator(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
if current_user.is_admin or _has_global_permission(db, current_user, "account.create"):
return current_user
raise HTTPException(status_code=403, detail="Account creation permission required")
def _resolve_user_role(db: Session, role_id: int | None) -> Role:
if role_id is None:
role = db.query(Role).filter(Role.name == "guest").first()
if not role:
raise HTTPException(status_code=500, detail="Default guest role is missing")
return role
role = db.query(Role).filter(Role.id == role_id).first()
if not role:
raise HTTPException(status_code=400, detail="Role not found")
if role.name == "admin":
raise HTTPException(status_code=400, detail="Admin role cannot be assigned via user management")
if role.is_global is False:
raise HTTPException(status_code=400, detail="Only global roles can be assigned to accounts")
return role
@router.post("", response_model=schemas.UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(
user: schemas.UserCreate,
db: Session = Depends(get_db),
_: models.User = Depends(require_account_creator),
):
existing = db.query(models.User).filter(
(models.User.username == user.username) | (models.User.email == user.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Username or email already exists")
assigned_role = _resolve_user_role(db, user.role_id)
hashed_password = get_password_hash(user.password) if user.password else None
db_user = models.User(
username=user.username,
email=user.email,
full_name=user.full_name,
hashed_password=hashed_password,
is_admin=False,
is_active=True,
role_id=assigned_role.id,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.get("", response_model=List[schemas.UserResponse])
def list_users(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
_: models.User = Depends(require_admin),
):
return db.query(models.User).order_by(models.User.created_at.desc()).offset(skip).limit(limit).all()
def _find_user_by_id_or_username(db: Session, identifier: str) -> models.User | None:
"""Resolve a user by numeric id or username string."""
try:
uid = int(identifier)
return db.query(models.User).filter(models.User.id == uid).first()
except ValueError:
return db.query(models.User).filter(models.User.username == identifier).first()
@router.get("/{identifier}", response_model=schemas.UserResponse)
def get_user(
identifier: str,
db: Session = Depends(get_db),
_: models.User = Depends(require_admin),
):
user = _find_user_by_id_or_username(db, identifier)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.patch("/{identifier}", response_model=schemas.UserResponse)
def update_user(
identifier: str,
payload: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin),
):
user = _find_user_by_id_or_username(db, identifier)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if payload.email is not None and payload.email != user.email:
existing = db.query(models.User).filter(models.User.email == payload.email, models.User.id != user.id).first()
if existing:
raise HTTPException(status_code=400, detail="Email already exists")
user.email = payload.email
if payload.full_name is not None:
user.full_name = payload.full_name
if payload.password is not None and payload.password.strip():
user.hashed_password = get_password_hash(payload.password)
if payload.role_id is not None:
if user.is_admin:
raise HTTPException(status_code=400, detail="Admin accounts cannot be reassigned via user management")
assigned_role = _resolve_user_role(db, payload.role_id)
user.role_id = assigned_role.id
if payload.is_active is not None:
if current_user.id == user.id and payload.is_active is False:
raise HTTPException(status_code=400, detail="You cannot deactivate your own account")
user.is_active = payload.is_active
db.commit()
db.refresh(user)
return user
@router.delete("/{identifier}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(
identifier: str,
db: Session = Depends(get_db),
current_user: models.User = Depends(require_admin),
):
user = _find_user_by_id_or_username(db, identifier)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if current_user.id == user.id:
raise HTTPException(status_code=400, detail="You cannot delete your own account")
try:
db.delete(user)
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="User has related records. Deactivate the account instead.")
return None
class WorkLogResponse(BaseModel):
id: int
task_id: int
user_id: int
hours: float
description: str | None = None
logged_date: datetime
created_at: datetime
class Config:
from_attributes = True
@router.get("/{identifier}/worklogs", response_model=List[WorkLogResponse])
def list_user_worklogs(
identifier: str,
limit: int = 50,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
):
user = _find_user_by_id_or_username(db, identifier)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if current_user.id != user.id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="Forbidden")
return db.query(WorkLog).filter(WorkLog.user_id == user.id).order_by(WorkLog.logged_date.desc()).limit(limit).all()