Files
HarborForge.Backend/app/api/deps.py
hzhang 51fb8ca073 fix(security): close critical auth/SSRF/RBAC holes
Verified locally end-to-end (before: exploitable, after: blocked).

- config: refuse to start on weak/default/short SECRET_KEY (was
  trivially forgeable JWT -> full admin)
- deps: add reusable require_admin dependency (JWT or API key)
- api-keys: require admin to mint/list/revoke; mask key on list
  (was unauthenticated -> instant admin API key)
- webhooks: whole router now admin-only (was fully unauthenticated
  CRUD + readable logs)
- webhook delivery: validate URL scheme + reject hosts resolving to
  private/loopback/link-local/reserved IPs; disable redirects
  (was a readable SSRF primitive)
- rbac: implement a real project-role hierarchy in check_project_role
  (was a no-op: any member, even guest, passed admin/mgr gates)
- misc: auth on delete_milestone (+ensure_can_edit_milestone),
  worklog create/delete (force caller user_id, owner-only delete),
  /activity and /export/tasks (were unauthenticated data exposure)
- tasks: auth + ensure_can_edit_task on assign_task and batch_assign

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:53:14 +01:00

86 lines
3.0 KiB
Python

"""Shared auth dependencies."""
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.config import get_db, settings
from app.models import models
from app.models.apikey import APIKey
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token", auto_error=False)
apikey_header = APIKeyHeader(name="X-API-Key", auto_error=False)
class Token(BaseModel):
access_token: str
token_type: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
if not hashed_password:
return False
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password[:72])
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=30))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not token:
raise credentials_exception
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise credentials_exception
return user
async def get_current_user_or_apikey(
token: str = Depends(oauth2_scheme),
api_key: str = Depends(apikey_header),
db: Session = Depends(get_db)
):
"""Authenticate via JWT token OR API key."""
if api_key:
key_obj = db.query(APIKey).filter(APIKey.key == api_key, APIKey.is_active == True).first()
if key_obj:
key_obj.last_used_at = datetime.utcnow()
db.commit()
user = db.query(models.User).filter(models.User.id == key_obj.user_id).first()
if user:
return user
if token:
return await get_current_user(token=token, db=db)
raise HTTPException(status_code=401, detail="Not authenticated")
def require_admin(current_user: models.User = Depends(get_current_user_or_apikey)) -> models.User:
"""Dependency: caller must be a global admin (JWT or API key)."""
if not getattr(current_user, "is_admin", False):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required")
return current_user