fix(security): RBAC on legacy create endpoints, hashed API keys, hardening

Addresses findings from the security audit:
- H1: add check_project_role to the legacy misc.py create endpoints
  (milestones=mgr, tasks/supports/meetings=dev) that previously required
  only authentication — closing a cross-project write bypass available to
  any logged-in user or agent API key.
- M2: comments are always attributed to the authenticated caller; the
  client-supplied author_id is dropped (no author spoofing).
- M3: API keys are stored as SHA-256 hashes (key_hash) plus a short
  key_prefix for display — never plaintext. Lookup hashes the presented
  key; listings never expose the secret. Includes an idempotent migration
  for existing deployments.
- M5: the OIDC session cookie's Secure flag is env-driven via
  SESSION_COOKIE_SECURE (default True; set false for plain-HTTP dev).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
h z
2026-05-31 20:16:11 +01:00
parent 88779d2db0
commit 3f5f813c65
8 changed files with 74 additions and 25 deletions

View File

@@ -1,4 +1,5 @@
"""Shared auth dependencies."""
import hashlib
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
@@ -59,11 +60,17 @@ async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = De
return user
def hash_api_key(raw: str) -> str:
"""SHA-256 of a raw API key. Keys are high-entropy random tokens, so a
fast hash (not bcrypt) is appropriate and allows O(1) lookup by hash."""
return hashlib.sha256(raw.encode()).hexdigest()
def _lookup_api_key(db: Session, key: str) -> models.User | None:
"""Resolve an API key string to a User; mark last_used_at on hit."""
if not key:
return None
key_obj = db.query(APIKey).filter(APIKey.key == key, APIKey.is_active == True).first() # noqa: E712
key_obj = db.query(APIKey).filter(APIKey.key_hash == hash_api_key(key), APIKey.is_active == True).first() # noqa: E712
if not key_obj:
return None
key_obj.last_used_at = datetime.utcnow()

View File

@@ -32,8 +32,12 @@ def create_comment(comment: schemas.CommentCreate, db: Session = Depends(get_db)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
check_project_role(db, current_user.id, task.project_id, min_role="viewer")
db_comment = models.Comment(**comment.model_dump())
# Always attribute the comment to the authenticated caller — never trust
# a client-supplied author_id (prevents author spoofing).
data = comment.model_dump()
data.pop("author_id", None)
db_comment = models.Comment(**data, author_id=current_user.id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)

View File

@@ -12,7 +12,7 @@ from sqlalchemy import func as sqlfunc
from pydantic import BaseModel
from app.core.config import get_db
from app.api.deps import get_current_user_or_apikey, require_admin
from app.api.deps import get_current_user_or_apikey, require_admin, hash_api_key
from app.api.rbac import check_project_role, ensure_can_edit_milestone
from app.models import models
from app.models.apikey import APIKey
@@ -49,7 +49,8 @@ class APIKeyCreate(BaseModel):
class APIKeyResponse(BaseModel):
id: int
key: str
key: str | None = None # full secret — only populated on create/reset
key_prefix: str | None = None # masked display for listings
name: str
user_id: int
is_active: bool
@@ -66,11 +67,16 @@ def create_api_key(data: APIKeyCreate, db: Session = Depends(get_db),
if not user:
raise HTTPException(status_code=404, detail="User not found")
key = secrets.token_hex(32)
db_key = APIKey(key=key, name=data.name, user_id=data.user_id)
db_key = APIKey(key_hash=hash_api_key(key), key_prefix=key[:8], name=data.name, user_id=data.user_id)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
# Return the raw key once (it is never stored or shown again).
return {
"id": db_key.id, "key": key, "key_prefix": db_key.key_prefix,
"name": db_key.name, "user_id": db_key.user_id, "is_active": db_key.is_active,
"created_at": db_key.created_at, "last_used_at": db_key.last_used_at,
}
@router.get("/api-keys", response_model=List[APIKeyResponse], tags=["API Keys"])
@@ -80,11 +86,14 @@ def list_api_keys(user_id: int = None, db: Session = Depends(get_db),
if user_id:
query = query.filter(APIKey.user_id == user_id)
keys = query.all()
# Never expose the full secret on listing; show only a masked prefix.
for k in keys:
if k.key and len(k.key) > 8:
k.key = k.key[:6] + "" + k.key[-2:]
return keys
# Never expose the secret on listing — the raw key isn't stored. Show only
# the masked prefix.
return [{
"id": k.id, "key": None,
"key_prefix": (k.key_prefix + "") if k.key_prefix else None,
"name": k.name, "user_id": k.user_id, "is_active": k.is_active,
"created_at": k.created_at, "last_used_at": k.last_used_at,
} for k in keys]
@router.delete("/api-keys/{key_id}", status_code=status.HTTP_204_NO_CONTENT, tags=["API Keys"])
@@ -132,7 +141,10 @@ def list_activity(entity_type: str = None, entity_id: int = None, user_id: int =
def create_milestone(ms: schemas.MilestoneCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user_or_apikey)):
import json
project = db.query(models.Project).filter(models.Project.id == ms.project_id).first()
project_code = project.project_code if project and project.project_code else f"P{ms.project_id}"
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="mgr")
project_code = project.project_code if project.project_code else f"P{ms.project_id}"
max_ms = db.query(MilestoneModel).filter(MilestoneModel.project_id == ms.project_id).order_by(MilestoneModel.id.desc()).first()
next_num = (max_ms.id + 1) if max_ms else 1
@@ -488,14 +500,15 @@ def create_milestone_task(project_code: str, milestone_id: str, task_data: dict,
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="dev")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_task = db.query(Task).filter(Task.milestone_id == ms.id).order_by(Task.id.desc()).first()
next_num = (max_task.id + 1) if max_task else 1
@@ -622,6 +635,7 @@ def create_support(project_code: str, milestone_id: str, support_data: dict, db:
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="dev")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
@@ -768,14 +782,15 @@ def create_meeting(project_code: str, milestone_id: str, meeting_data: dict, db:
project = db.query(models.Project).filter(models.Project.project_code == project_code).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
check_project_role(db, current_user.id, project.id, min_role="dev")
ms = db.query(MilestoneModel).filter(MilestoneModel.milestone_code == milestone_id, MilestoneModel.project_id == project.id).first()
if not ms:
raise HTTPException(status_code=404, detail="Milestone not found")
if ms.status and hasattr(ms.status, "value") and ms.status.value == "undergoing":
raise HTTPException(status_code=400, detail="Cannot add items to a milestone that is undergoing")
milestone_code = ms.milestone_code or f"m{ms.id}"
max_meeting = db.query(Meeting).filter(Meeting.milestone_id == ms.id).order_by(Meeting.id.desc()).first()
next_num = (max_meeting.id + 1) if max_meeting else 1

View File

@@ -7,7 +7,7 @@ from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from app.api.deps import get_current_user, get_current_user_or_apikey, get_password_hash
from app.api.deps import get_current_user, get_current_user_or_apikey, get_password_hash, hash_api_key
from app.core.config import get_db, settings
from app.init_bootstrap import DELETED_USER_USERNAME
from app.models import models
@@ -464,9 +464,10 @@ def reset_user_apikey(
existing_key.is_active = False
db.flush()
# Create new key
# Create new key (store only the hash + a display prefix)
new_key = APIKey(
key=new_key_value,
key_hash=hash_api_key(new_key_value),
key_prefix=new_key_value[:8],
name=f"{target_user.username}-key",
user_id=target_user.id,
is_active=True,

View File

@@ -22,6 +22,10 @@ class Settings(BaseSettings):
# in via a bound OIDC identity / API keys), frontend hides password UI.
HARBORFORGE_OIDC_ONLY: bool = False
# Mark the OIDC state/session cookie Secure (HTTPS-only). Defaults to True
# for production; set SESSION_COOKIE_SECURE=false for plain-HTTP local dev.
SESSION_COOKIE_SECURE: bool = True
class Config:
env_file = ".env"

View File

@@ -27,7 +27,7 @@ app.add_middleware(
secret_key=settings.SECRET_KEY,
session_cookie="hf_oidc",
same_site="lax",
https_only=False,
https_only=settings.SESSION_COOKIE_SECURE,
max_age=600,
)
@@ -449,6 +449,19 @@ def _migrate_schema():
"CREATE INDEX idx_time_slots_special_slot_id ON time_slots (special_slot_id)"
))
# --- api_keys: migrate legacy plaintext `key` -> hashed `key_hash` ---
# Only runs on deployments that still have the old plaintext column;
# fresh installs get key_hash/key_prefix directly from create_all.
if _has_table(db, "api_keys") and _has_column(db, "api_keys", "key"):
if not _has_column(db, "api_keys", "key_hash"):
db.execute(text("ALTER TABLE api_keys ADD COLUMN key_hash VARCHAR(64) NULL"))
if not _has_column(db, "api_keys", "key_prefix"):
db.execute(text("ALTER TABLE api_keys ADD COLUMN key_prefix VARCHAR(16) NULL"))
db.execute(text("ALTER TABLE api_keys MODIFY COLUMN `key` VARCHAR(64) NULL"))
db.execute(text("UPDATE api_keys SET key_hash = SHA2(`key`, 256), key_prefix = LEFT(`key`, 8) WHERE key_hash IS NULL AND `key` IS NOT NULL"))
db.execute(text("UPDATE api_keys SET `key` = NULL WHERE `key` IS NOT NULL"))
_ensure_unique_index(db, "api_keys", "idx_api_keys_key_hash", "key_hash")
# --- schedule_type_special_slots: create-table is handled by
# Base.metadata.create_all on first boot; no migration needed here
# because there is no legacy table to evolve. Future schema bumps

View File

@@ -7,7 +7,10 @@ class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
key = Column(String(64), unique=True, nullable=False, index=True)
# The raw key is never stored — only its SHA-256 hash. `key_prefix` holds
# the first few chars for human-readable display/masking in listings.
key_hash = Column(String(64), unique=True, nullable=False, index=True)
key_prefix = Column(String(16), nullable=True)
name = Column(String(100), nullable=False) # e.g. "agent-zhi", "agent-lyn"
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
is_active = Column(Boolean, default=True)

View File

@@ -105,7 +105,9 @@ class CommentBase(BaseModel):
class CommentCreate(CommentBase):
task_id: int
author_id: int
# author_id is NOT accepted from the client — the comment is always
# attributed to the authenticated caller (server-side) to prevent
# author spoofing.
class CommentUpdate(BaseModel):