diff --git a/app/api/routers/auth.py b/app/api/routers/auth.py index bb3702c..efd6196 100644 --- a/app/api/routers/auth.py +++ b/app/api/routers/auth.py @@ -1,11 +1,15 @@ """Auth router.""" from datetime import timedelta +from typing import List + from fastapi import APIRouter, Depends, HTTPException from fastapi.security import OAuth2PasswordRequestForm +from pydantic import BaseModel from sqlalchemy.orm import Session from app.core.config import get_db, settings from app.models import models +from app.models.role_permission import Permission, Role, RolePermission from app.schemas import schemas from app.api.deps import Token, verify_password, create_access_token, get_current_user @@ -30,3 +34,44 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = @router.get("/me", response_model=schemas.UserResponse) async def get_me(current_user: models.User = Depends(get_current_user)): return current_user + + +class PermissionIntrospectionResponse(BaseModel): + username: str + role_name: str | None + is_admin: bool + permissions: List[str] + + +@router.get("/me/permissions", response_model=PermissionIntrospectionResponse) +async def get_my_permissions( + current_user: models.User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """Return the current user's effective permissions for CLI help introspection.""" + perms: List[str] = [] + role_name: str | None = None + + if current_user.is_admin: + # Admin gets all permissions + all_perms = db.query(Permission).order_by(Permission.name).all() + perms = [p.name for p in all_perms] + role_name = "admin" + elif current_user.role_id: + role = db.query(Role).filter(Role.id == current_user.role_id).first() + if role: + role_name = role.name + perm_ids = db.query(RolePermission.permission_id).filter( + RolePermission.role_id == role.id + ).all() + if perm_ids: + pid_list = [p[0] for p in perm_ids] + matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all() + perms = [p.name for p in matched] + + return PermissionIntrospectionResponse( + username=current_user.username, + role_name=role_name, + is_admin=current_user.is_admin, + permissions=perms, + ) diff --git a/app/api/routers/users.py b/app/api/routers/users.py index 42845e5..fb764f3 100644 --- a/app/api/routers/users.py +++ b/app/api/routers/users.py @@ -102,31 +102,40 @@ def list_users( return db.query(models.User).order_by(models.User.created_at.desc()).offset(skip).limit(limit).all() -@router.get("/{user_id}", response_model=schemas.UserResponse) +def _find_user_by_id_or_username(db: Session, identifier: str) -> models.User | None: + """Resolve a user by numeric id or username string.""" + try: + uid = int(identifier) + return db.query(models.User).filter(models.User.id == uid).first() + except ValueError: + return db.query(models.User).filter(models.User.username == identifier).first() + + +@router.get("/{identifier}", response_model=schemas.UserResponse) def get_user( - user_id: int, + identifier: str, db: Session = Depends(get_db), _: models.User = Depends(require_admin), ): - user = db.query(models.User).filter(models.User.id == user_id).first() + user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") return user -@router.patch("/{user_id}", response_model=schemas.UserResponse) +@router.patch("/{identifier}", response_model=schemas.UserResponse) def update_user( - user_id: int, + identifier: str, payload: schemas.UserUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(require_admin), ): - user = db.query(models.User).filter(models.User.id == user_id).first() + user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") if payload.email is not None and payload.email != user.email: - existing = db.query(models.User).filter(models.User.email == payload.email, models.User.id != user_id).first() + existing = db.query(models.User).filter(models.User.email == payload.email, models.User.id != user.id).first() if existing: raise HTTPException(status_code=400, detail="Email already exists") user.email = payload.email @@ -153,13 +162,13 @@ def update_user( return user -@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) +@router.delete("/{identifier}", status_code=status.HTTP_204_NO_CONTENT) def delete_user( - user_id: int, + identifier: str, db: Session = Depends(get_db), current_user: models.User = Depends(require_admin), ): - user = db.query(models.User).filter(models.User.id == user_id).first() + user = _find_user_by_id_or_username(db, identifier) if not user: raise HTTPException(status_code=404, detail="User not found") if current_user.id == user.id: @@ -186,13 +195,16 @@ class WorkLogResponse(BaseModel): from_attributes = True -@router.get("/{user_id}/worklogs", response_model=List[WorkLogResponse]) +@router.get("/{identifier}/worklogs", response_model=List[WorkLogResponse]) def list_user_worklogs( - user_id: int, + identifier: str, limit: int = 50, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ): - if current_user.id != user_id and not current_user.is_admin: + user = _find_user_by_id_or_username(db, identifier) + if not user: + raise HTTPException(status_code=404, detail="User not found") + if current_user.id != user.id and not current_user.is_admin: raise HTTPException(status_code=403, detail="Forbidden") - return db.query(WorkLog).filter(WorkLog.user_id == user_id).order_by(WorkLog.logged_date.desc()).limit(limit).all() + return db.query(WorkLog).filter(WorkLog.user_id == user.id).order_by(WorkLog.logged_date.desc()).limit(limit).all()