Files
HarborForge.Backend/app/api/routers/auth.py
hzhang a8c45bf57c fix(api): /auth/me accepts apikey + expose task id in TaskResponse
Two contract gaps that broke the agent `hf` CLI's comment/worklog flow
(apikey/padded-cell auth):

- GET /auth/me used Depends(get_current_user) (JWT-only) → 401 for apikey
  callers. The CLI resolves the current user via /auth/me to fill
  comment/worklog author_id, so every `hf comment add` / `hf worklog add`
  failed with "Could not validate credentials". Switch to
  get_current_user_or_apikey (already imported), matching the rest of the
  apikey-accepting routes.
- TaskResponse omitted the numeric `id`. CommentCreate/WorkLogCreate require
  a numeric task_id, which the CLI resolves from `GET /tasks/<code>`.id —
  but the field was never serialized, so resolveTaskID got 0 → POST
  /comments|/worklogs → 404 "Task not found". Expose `id: int`.

Verified on sim: hf comment add + hf worklog add now succeed end-to-end.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-10 10:14:48 +01:00

113 lines
4.2 KiB
Python

"""Auth router."""
from datetime import timedelta
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.core.config import get_db, settings
from app.models import models
from app.models.role_permission import Permission, Role, RolePermission
from app.schemas import schemas
from app.api.deps import Token, verify_password, create_access_token, get_current_user, get_current_user_or_apikey
router = APIRouter(prefix="/auth", tags=["Auth"])
@router.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
if settings.HARBORFORGE_OIDC_ONLY:
raise HTTPException(status_code=403, detail="Password login is disabled (OIDC only)")
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password or ""):
raise HTTPException(status_code=401, detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"})
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
# Built-in acc-mgr account cannot log in interactively
if user.username == "acc-mgr":
raise HTTPException(status_code=403, detail="This account cannot log in")
access_token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=schemas.UserResponse)
async def get_me(current_user: models.User = Depends(get_current_user_or_apikey)):
return current_user
class ApiKeyPermissionResponse(BaseModel):
can_reset_self: bool
can_reset_any: bool
@router.get("/me/apikey-permissions", response_model=ApiKeyPermissionResponse)
async def get_apikey_permissions(
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Return the current user's API key reset capabilities."""
def _has_perm(perm_name: str) -> bool:
if current_user.is_admin:
return True
if not current_user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == perm_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == current_user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
return ApiKeyPermissionResponse(
can_reset_self=_has_perm("user.reset-self-apikey"),
can_reset_any=_has_perm("user.reset-apikey"),
)
class PermissionIntrospectionResponse(BaseModel):
username: str
role_name: str | None
is_admin: bool
permissions: List[str]
@router.get("/me/permissions", response_model=PermissionIntrospectionResponse)
async def get_my_permissions(
current_user: models.User = Depends(get_current_user_or_apikey),
db: Session = Depends(get_db),
):
"""Return the current user's effective permissions for CLI help introspection."""
perms: List[str] = []
role_name: str | None = None
if current_user.is_admin:
# Admin gets all permissions
all_perms = db.query(Permission).order_by(Permission.name).all()
perms = [p.name for p in all_perms]
role_name = "admin"
elif current_user.role_id:
role = db.query(Role).filter(Role.id == current_user.role_id).first()
if role:
role_name = role.name
perm_ids = db.query(RolePermission.permission_id).filter(
RolePermission.role_id == role.id
).all()
if perm_ids:
pid_list = [p[0] for p in perm_ids]
matched = db.query(Permission).filter(Permission.id.in_(pid_list)).order_by(Permission.name).all()
perms = [p.name for p in matched]
return PermissionIntrospectionResponse(
username=current_user.username,
role_name=role_name,
is_admin=current_user.is_admin,
permissions=perms,
)