Fix milestones 422 + acc-mgr user + reset-apikey endpoint
- Fix: /milestones?project_id= now accepts project_code (str) not just int
- Add: built-in acc-mgr user created on wizard init (account-manager role, no login, undeletable)
- Add: POST /users/{id}/reset-apikey with permission-based access control
- Add: GET /auth/me/apikey-permissions for frontend capability check
- Add: user.reset-self-apikey and user.reset-apikey permissions
- Protect admin and acc-mgr accounts from deletion
- Block acc-mgr from login (/auth/token returns 403)
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
"""Auth router."""
|
||||
from datetime import timedelta
|
||||
from typing import List
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
@@ -20,6 +22,9 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session =
|
||||
headers={"WWW-Authenticate": "Bearer"})
|
||||
if not user.is_active:
|
||||
raise HTTPException(status_code=400, detail="Inactive user")
|
||||
# Built-in acc-mgr account cannot log in interactively
|
||||
if user.username == "acc-mgr":
|
||||
raise HTTPException(status_code=403, detail="This account cannot log in")
|
||||
access_token = create_access_token(
|
||||
data={"sub": str(user.id)},
|
||||
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
@@ -30,3 +35,145 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session =
|
||||
@router.get("/me", response_model=schemas.UserResponse)
|
||||
async def get_me(current_user: models.User = Depends(get_current_user)):
|
||||
return current_user
|
||||
|
||||
|
||||
class ApiKeyPermissionResponse(BaseModel):
|
||||
can_reset_self: bool
|
||||
can_reset_any: bool
|
||||
|
||||
|
||||
@router.get("/me/apikey-permissions", response_model=ApiKeyPermissionResponse)
|
||||
async def get_apikey_permissions(
|
||||
current_user: models.User = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Return the current user's API key reset capabilities."""
|
||||
def _has_perm(perm_name: str) -> bool:
|
||||
if current_user.is_admin:
|
||||
return True
|
||||
if not current_user.role_id:
|
||||
return False
|
||||
perm = db.query(Permission).filter(Permission.name == perm_name).first()
|
||||
if not perm:
|
||||
return False
|
||||
return db.query(RolePermission).filter(
|
||||
RolePermission.role_id == current_user.role_id,
|
||||
RolePermission.permission_id == perm.id,
|
||||
).first() is not None
|
||||
|
||||
return ApiKeyPermissionResponse(
|
||||
can_reset_self=_has_perm("user.reset-self-apikey"),
|
||||
can_reset_any=_has_perm("user.reset-apikey"),
|
||||
)
|
||||
|
||||
|
||||
class PermissionIntrospectionResponse(BaseModel):
|
||||
username: str
|
||||
role_name: str | None
|
||||
is_admin: bool
|
||||
permissions: List[str]
|
||||
|
||||
|
||||
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
|
||||
async def get_my_permissions(
|
||||
current_user: models.User = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Return the current user's effective permissions for CLI help introspection."""
|
||||
perms: List[str] = []
|
||||
role_name: str | None = None
|
||||
|
||||
if current_user.is_admin:
|
||||
# Admin gets all permissions
|
||||
all_perms = db.query(Permission).order_by(Permission.name).all()
|
||||
perms = [p.name for p in all_perms]
|
||||
role_name = "admin"
|
||||
elif current_user.role_id:
|
||||
role = db.query(Role).filter(Role.id == current_user.role_id).first()
|
||||
if role:
|
||||
role_name = role.name
|
||||
perm_ids = db.query(RolePermission.permission_id).filter(
|
||||
RolePermission.role_id == role.id
|
||||
).all()
|
||||
if perm_ids:
|
||||
pid_list = [p[0] for p in perm_ids]
|
||||
matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all()
|
||||
perms = [p.name for p in matched]
|
||||
|
||||
return PermissionIntrospectionResponse(
|
||||
username=current_user.username,
|
||||
role_name=role_name,
|
||||
is_admin=current_user.is_admin,
|
||||
permissions=perms,
|
||||
)
|
||||
|
||||
|
||||
class ApiKeyPermissionResponse(BaseModel):
|
||||
can_reset_self: bool
|
||||
can_reset_any: bool
|
||||
|
||||
|
||||
@router.get("/me/apikey-permissions", response_model=ApiKeyPermissionResponse)
|
||||
async def get_apikey_permissions(
|
||||
current_user: models.User = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Return the current user's API key reset capabilities."""
|
||||
def _has_perm(perm_name: str) -> bool:
|
||||
if current_user.is_admin:
|
||||
return True
|
||||
if not current_user.role_id:
|
||||
return False
|
||||
perm = db.query(Permission).filter(Permission.name == perm_name).first()
|
||||
if not perm:
|
||||
return False
|
||||
return db.query(RolePermission).filter(
|
||||
RolePermission.role_id == current_user.role_id,
|
||||
RolePermission.permission_id == perm.id,
|
||||
).first() is not None
|
||||
|
||||
return ApiKeyPermissionResponse(
|
||||
can_reset_self=_has_perm("user.reset-self-apikey"),
|
||||
can_reset_any=_has_perm("user.reset-apikey"),
|
||||
)
|
||||
|
||||
|
||||
class PermissionIntrospectionResponse(BaseModel):
|
||||
username: str
|
||||
role_name: str | None
|
||||
is_admin: bool
|
||||
permissions: List[str]
|
||||
|
||||
|
||||
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
|
||||
async def get_my_permissions(
|
||||
current_user: models.User = Depends(get_current_user),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Return the current user's effective permissions for CLI help introspection."""
|
||||
perms: List[str] = []
|
||||
role_name: str | None = None
|
||||
|
||||
if current_user.is_admin:
|
||||
# Admin gets all permissions
|
||||
all_perms = db.query(Permission).order_by(Permission.name).all()
|
||||
perms = [p.name for p in all_perms]
|
||||
role_name = "admin"
|
||||
elif current_user.role_id:
|
||||
role = db.query(Role).filter(Role.id == current_user.role_id).first()
|
||||
if role:
|
||||
role_name = role.name
|
||||
perm_ids = db.query(RolePermission.permission_id).filter(
|
||||
RolePermission.role_id == role.id
|
||||
).all()
|
||||
if perm_ids:
|
||||
pid_list = [p[0] for p in perm_ids]
|
||||
matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all()
|
||||
perms = [p.name for p in matched]
|
||||
|
||||
return PermissionIntrospectionResponse(
|
||||
username=current_user.username,
|
||||
role_name=role_name,
|
||||
is_admin=current_user.is_admin,
|
||||
permissions=perms,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user