Three coupled fixes so non-admin agents (e.g. nav, role=mgr) can actually create projects through hf-cli with their API key: 1. POST /projects no longer hardcodes is_admin. It checks the global `project.create` perm via role_permissions (admin still wins via is_admin short-circuit). Permission-denied 403 message names the exact perm. 2. /auth/me/permissions now uses get_current_user_or_apikey (was get_current_user JWT-only). This is what hf-cli hits to populate its local permission cache that drives the "not permitted" gate; previously every API-key-authed agent saw all commands as gated. 3. get_current_user_or_apikey now also accepts an API key delivered via Authorization: Bearer (in addition to X-API-Key). hf-cli only knows Bearer; trying to JWT-decode an API key string would fail — so on decode failure, fall through to the API key lookup. Keeps X-API-Key behavior unchanged. 4. init_bootstrap: add `project.create` to DEFAULT_PERMISSIONS and to _MGR_PERMISSIONS so admin (auto-all) + mgr both get it on seed. Bug came to light when manager-agent reported `hf project list`/`create` returned `not permitted`. Root cause: hf-cli calls /auth/me/permissions with the API key via Bearer header → 401 → state.Known=false → every command in the surface is gated false locally. Even after the local gate, POST /projects would still 403 due to the hardcoded admin check. All four steps above are required end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
113 lines
4.2 KiB
Python
113 lines
4.2 KiB
Python
"""Auth router."""
|
|
from datetime import timedelta
|
|
from typing import List
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import get_db, settings
|
|
from app.models import models
|
|
from app.models.role_permission import Permission, Role, RolePermission
|
|
from app.schemas import schemas
|
|
from app.api.deps import Token, verify_password, create_access_token, get_current_user, get_current_user_or_apikey
|
|
|
|
router = APIRouter(prefix="/auth", tags=["Auth"])
|
|
|
|
|
|
@router.post("/token", response_model=Token)
|
|
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
|
|
if settings.HARBORFORGE_OIDC_ONLY:
|
|
raise HTTPException(status_code=403, detail="Password login is disabled (OIDC only)")
|
|
user = db.query(models.User).filter(models.User.username == form_data.username).first()
|
|
if not user or not verify_password(form_data.password, user.hashed_password or ""):
|
|
raise HTTPException(status_code=401, detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"})
|
|
if not user.is_active:
|
|
raise HTTPException(status_code=400, detail="Inactive user")
|
|
# Built-in acc-mgr account cannot log in interactively
|
|
if user.username == "acc-mgr":
|
|
raise HTTPException(status_code=403, detail="This account cannot log in")
|
|
access_token = create_access_token(
|
|
data={"sub": str(user.id)},
|
|
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@router.get("/me", response_model=schemas.UserResponse)
|
|
async def get_me(current_user: models.User = Depends(get_current_user)):
|
|
return current_user
|
|
|
|
|
|
class ApiKeyPermissionResponse(BaseModel):
|
|
can_reset_self: bool
|
|
can_reset_any: bool
|
|
|
|
|
|
@router.get("/me/apikey-permissions", response_model=ApiKeyPermissionResponse)
|
|
async def get_apikey_permissions(
|
|
current_user: models.User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Return the current user's API key reset capabilities."""
|
|
def _has_perm(perm_name: str) -> bool:
|
|
if current_user.is_admin:
|
|
return True
|
|
if not current_user.role_id:
|
|
return False
|
|
perm = db.query(Permission).filter(Permission.name == perm_name).first()
|
|
if not perm:
|
|
return False
|
|
return db.query(RolePermission).filter(
|
|
RolePermission.role_id == current_user.role_id,
|
|
RolePermission.permission_id == perm.id,
|
|
).first() is not None
|
|
|
|
return ApiKeyPermissionResponse(
|
|
can_reset_self=_has_perm("user.reset-self-apikey"),
|
|
can_reset_any=_has_perm("user.reset-apikey"),
|
|
)
|
|
|
|
|
|
class PermissionIntrospectionResponse(BaseModel):
|
|
username: str
|
|
role_name: str | None
|
|
is_admin: bool
|
|
permissions: List[str]
|
|
|
|
|
|
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
|
|
async def get_my_permissions(
|
|
current_user: models.User = Depends(get_current_user_or_apikey),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Return the current user's effective permissions for CLI help introspection."""
|
|
perms: List[str] = []
|
|
role_name: str | None = None
|
|
|
|
if current_user.is_admin:
|
|
# Admin gets all permissions
|
|
all_perms = db.query(Permission).order_by(Permission.name).all()
|
|
perms = [p.name for p in all_perms]
|
|
role_name = "admin"
|
|
elif current_user.role_id:
|
|
role = db.query(Role).filter(Role.id == current_user.role_id).first()
|
|
if role:
|
|
role_name = role.name
|
|
perm_ids = db.query(RolePermission.permission_id).filter(
|
|
RolePermission.role_id == role.id
|
|
).all()
|
|
if perm_ids:
|
|
pid_list = [p[0] for p in perm_ids]
|
|
matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all()
|
|
perms = [p.name for p in matched]
|
|
|
|
return PermissionIntrospectionResponse(
|
|
username=current_user.username,
|
|
role_name=role_name,
|
|
is_admin=current_user.is_admin,
|
|
permissions=perms,
|
|
)
|