Files
HarborForge.Backend/app/api/routers/auth.py
hzhang d2b83ad58d fix(projects): perm-gate create + apikey-via-Bearer + introspect with apikey
Three coupled fixes so non-admin agents (e.g. nav, role=mgr) can
actually create projects through hf-cli with their API key:

1. POST /projects no longer hardcodes is_admin. It checks the global
   `project.create` perm via role_permissions (admin still wins via
   is_admin short-circuit). Permission-denied 403 message names the
   exact perm.

2. /auth/me/permissions now uses get_current_user_or_apikey (was
   get_current_user JWT-only). This is what hf-cli hits to populate
   its local permission cache that drives the "not permitted" gate;
   previously every API-key-authed agent saw all commands as gated.

3. get_current_user_or_apikey now also accepts an API key delivered
   via Authorization: Bearer (in addition to X-API-Key). hf-cli only
   knows Bearer; trying to JWT-decode an API key string would fail —
   so on decode failure, fall through to the API key lookup. Keeps
   X-API-Key behavior unchanged.

4. init_bootstrap: add `project.create` to DEFAULT_PERMISSIONS and to
   _MGR_PERMISSIONS so admin (auto-all) + mgr both get it on seed.

Bug came to light when manager-agent reported `hf project list`/`create`
returned `not permitted`. Root cause: hf-cli calls /auth/me/permissions
with the API key via Bearer header → 401 → state.Known=false → every
command in the surface is gated false locally. Even after the local
gate, POST /projects would still 403 due to the hardcoded admin check.
All four steps above are required end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 22:09:34 +01:00

113 lines
4.2 KiB
Python

"""Auth router."""
from datetime import timedelta
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db, settings
from app.models import models
from app.models.role_permission import Permission, Role, RolePermission
from app.schemas import schemas
from app.api.deps import Token, verify_password, create_access_token, get_current_user, get_current_user_or_apikey
router = APIRouter(prefix="/auth", tags=["Auth"])
@router.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
if settings.HARBORFORGE_OIDC_ONLY:
raise HTTPException(status_code=403, detail="Password login is disabled (OIDC only)")
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password or ""):
raise HTTPException(status_code=401, detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"})
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
# Built-in acc-mgr account cannot log in interactively
if user.username == "acc-mgr":
raise HTTPException(status_code=403, detail="This account cannot log in")
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=schemas.UserResponse)
async def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
class ApiKeyPermissionResponse(BaseModel):
can_reset_self: bool
can_reset_any: bool
@router.get("/me/apikey-permissions", response_model=ApiKeyPermissionResponse)
async def get_apikey_permissions(
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return the current user's API key reset capabilities."""
def _has_perm(perm_name: str) -> bool:
if current_user.is_admin:
return True
if not current_user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == perm_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == current_user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
return ApiKeyPermissionResponse(
can_reset_self=_has_perm("user.reset-self-apikey"),
can_reset_any=_has_perm("user.reset-apikey"),
)
class PermissionIntrospectionResponse(BaseModel):
username: str
role_name: str | None
is_admin: bool
permissions: List[str]
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
async def get_my_permissions(
current_user: models.User = Depends(get_current_user_or_apikey),
db: Session = Depends(get_db),
):
"""Return the current user's effective permissions for CLI help introspection."""
perms: List[str] = []
role_name: str | None = None
if current_user.is_admin:
# Admin gets all permissions
all_perms = db.query(Permission).order_by(Permission.name).all()
perms = [p.name for p in all_perms]
role_name = "admin"
elif current_user.role_id:
role = db.query(Role).filter(Role.id == current_user.role_id).first()
if role:
role_name = role.name
perm_ids = db.query(RolePermission.permission_id).filter(
RolePermission.role_id == role.id
).all()
if perm_ids:
pid_list = [p[0] for p in perm_ids]
matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all()
perms = [p.name for p in matched]
return PermissionIntrospectionResponse(
username=current_user.username,
role_name=role_name,
is_admin=current_user.is_admin,
permissions=perms,
)