Files
zhi 88931d822d Fix milestones 422 + acc-mgr user + reset-apikey endpoint
- Fix: /milestones?project_id= now accepts project_code (str) not just int
- Add: built-in acc-mgr user created on wizard init (account-manager role, no login, undeletable)
- Add: POST /users/{id}/reset-apikey with permission-based access control
- Add: GET /auth/me/apikey-permissions for frontend capability check
- Add: user.reset-self-apikey and user.reset-apikey permissions
- Protect admin and acc-mgr accounts from deletion
- Block acc-mgr from login (/auth/token returns 403)
2026-03-22 05:39:03 +00:00

111 lines
4.0 KiB
Python

"""Auth router."""
from datetime import timedelta
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db, settings
from app.models import models
from app.models.role_permission import Permission, Role, RolePermission
from app.schemas import schemas
from app.api.deps import Token, verify_password, create_access_token, get_current_user
router = APIRouter(prefix="/auth", tags=["Auth"])
@router.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password or ""):
raise HTTPException(status_code=401, detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"})
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
# Built-in acc-mgr account cannot log in interactively
if user.username == "acc-mgr":
raise HTTPException(status_code=403, detail="This account cannot log in")
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=schemas.UserResponse)
async def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
class ApiKeyPermissionResponse(BaseModel):
can_reset_self: bool
can_reset_any: bool
@router.get("/me/apikey-permissions", response_model=ApiKeyPermissionResponse)
async def get_apikey_permissions(
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return the current user's API key reset capabilities."""
def _has_perm(perm_name: str) -> bool:
if current_user.is_admin:
return True
if not current_user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == perm_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == current_user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
return ApiKeyPermissionResponse(
can_reset_self=_has_perm("user.reset-self-apikey"),
can_reset_any=_has_perm("user.reset-apikey"),
)
class PermissionIntrospectionResponse(BaseModel):
username: str
role_name: str | None
is_admin: bool
permissions: List[str]
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
async def get_my_permissions(
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return the current user's effective permissions for CLI help introspection."""
perms: List[str] = []
role_name: str | None = None
if current_user.is_admin:
# Admin gets all permissions
all_perms = db.query(Permission).order_by(Permission.name).all()
perms = [p.name for p in all_perms]
role_name = "admin"
elif current_user.role_id:
role = db.query(Role).filter(Role.id == current_user.role_id).first()
if role:
role_name = role.name
perm_ids = db.query(RolePermission.permission_id).filter(
RolePermission.role_id == role.id
).all()
if perm_ids:
pid_list = [p[0] for p in perm_ids]
matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all()
perms = [p.name for p in matched]
return PermissionIntrospectionResponse(
username=current_user.username,
role_name=role_name,
is_admin=current_user.is_admin,
permissions=perms,
)