Verified locally end-to-end (before: exploitable, after: blocked). - config: refuse to start on weak/default/short SECRET_KEY (was trivially forgeable JWT -> full admin) - deps: add reusable require_admin dependency (JWT or API key) - api-keys: require admin to mint/list/revoke; mask key on list (was unauthenticated -> instant admin API key) - webhooks: whole router now admin-only (was fully unauthenticated CRUD + readable logs) - webhook delivery: validate URL scheme + reject hosts resolving to private/loopback/link-local/reserved IPs; disable redirects (was a readable SSRF primitive) - rbac: implement a real project-role hierarchy in check_project_role (was a no-op: any member, even guest, passed admin/mgr gates) - misc: auth on delete_milestone (+ensure_can_edit_milestone), worklog create/delete (force caller user_id, owner-only delete), /activity and /export/tasks (were unauthenticated data exposure) - tasks: auth + ensure_can_edit_task on assign_task and batch_assign Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
76 lines
2.3 KiB
Python
76 lines
2.3 KiB
Python
import os
|
|
import json
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker
|
|
from pydantic_settings import BaseSettings
|
|
from typing import Optional
|
|
|
|
|
|
def _resolve_db_url(env_url: str) -> str:
|
|
"""Read DB config from wizard config volume if available, else use env."""
|
|
config_dir = os.getenv("CONFIG_DIR", "/config")
|
|
config_file = os.getenv("CONFIG_FILE", "harborforge.json")
|
|
config_path = os.path.join(config_dir, config_file)
|
|
|
|
if os.path.exists(config_path):
|
|
try:
|
|
with open(config_path, "r") as f:
|
|
cfg = json.load(f)
|
|
db_cfg = cfg.get("database")
|
|
if db_cfg:
|
|
host = db_cfg.get("host", "mysql")
|
|
port = db_cfg.get("port", 3306)
|
|
user = db_cfg.get("user", "harborforge")
|
|
password = db_cfg.get("password", "harborforge_pass")
|
|
database = db_cfg.get("database", "harborforge")
|
|
return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}"
|
|
except Exception:
|
|
pass
|
|
|
|
return env_url
|
|
|
|
|
|
class Settings(BaseSettings):
|
|
DATABASE_URL: str = "mysql+pymysql://harborforge:harborforge_pass@mysql:3306/harborforge"
|
|
SECRET_KEY: str = "change-me-in-production"
|
|
LOG_LEVEL: str = "INFO"
|
|
ALGORITHM: str = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
|
|
|
|
class Config:
|
|
env_file = ".env"
|
|
|
|
|
|
settings = Settings()
|
|
|
|
# Fail fast on a weak/default JWT signing key (prevents token forgery).
|
|
_WEAK_SECRETS = {
|
|
"change-me-in-production",
|
|
"change_me_in_production",
|
|
"change-me-use-openssl-rand-hex-32",
|
|
"secret",
|
|
"changeme",
|
|
"",
|
|
}
|
|
if settings.SECRET_KEY in _WEAK_SECRETS or len(settings.SECRET_KEY) < 32:
|
|
raise RuntimeError(
|
|
"Insecure SECRET_KEY: set a strong random value "
|
|
"(e.g. `openssl rand -hex 32`) via the SECRET_KEY env var. "
|
|
"Refusing to start with a default/short key."
|
|
)
|
|
|
|
# Resolve DB URL: wizard config volume > env > default
|
|
_db_url = _resolve_db_url(settings.DATABASE_URL)
|
|
engine = create_engine(_db_url, pool_pre_ping=True)
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
Base = declarative_base()
|
|
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|