fix(security): close critical auth/SSRF/RBAC holes
Verified locally end-to-end (before: exploitable, after: blocked). - config: refuse to start on weak/default/short SECRET_KEY (was trivially forgeable JWT -> full admin) - deps: add reusable require_admin dependency (JWT or API key) - api-keys: require admin to mint/list/revoke; mask key on list (was unauthenticated -> instant admin API key) - webhooks: whole router now admin-only (was fully unauthenticated CRUD + readable logs) - webhook delivery: validate URL scheme + reject hosts resolving to private/loopback/link-local/reserved IPs; disable redirects (was a readable SSRF primitive) - rbac: implement a real project-role hierarchy in check_project_role (was a no-op: any member, even guest, passed admin/mgr gates) - misc: auth on delete_milestone (+ensure_can_edit_milestone), worklog create/delete (force caller user_id, owner-only delete), /activity and /export/tasks (were unauthenticated data exposure) - tasks: auth + ensure_can_edit_task on assign_task and batch_assign Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,12 +2,41 @@ import json
|
||||
import hmac
|
||||
import hashlib
|
||||
import logging
|
||||
import socket
|
||||
import ipaddress
|
||||
from urllib.parse import urlparse
|
||||
from sqlalchemy.orm import Session
|
||||
from app.models.webhook import Webhook, WebhookLog
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _validate_webhook_url(url: str) -> None:
|
||||
"""Raise ValueError if the URL would target a non-public address (SSRF guard)."""
|
||||
parsed = urlparse(url)
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
raise ValueError(f"unsupported scheme: {parsed.scheme!r}")
|
||||
host = parsed.hostname
|
||||
if not host:
|
||||
raise ValueError("missing host")
|
||||
# Resolve every address the host maps to and reject non-global ranges.
|
||||
try:
|
||||
infos = socket.getaddrinfo(host, parsed.port or (443 if parsed.scheme == "https" else 80))
|
||||
except socket.gaierror as e:
|
||||
raise ValueError(f"DNS resolution failed: {e}")
|
||||
for info in infos:
|
||||
ip = ipaddress.ip_address(info[4][0])
|
||||
if (
|
||||
ip.is_private
|
||||
or ip.is_loopback
|
||||
or ip.is_link_local
|
||||
or ip.is_multicast
|
||||
or ip.is_reserved
|
||||
or ip.is_unspecified
|
||||
):
|
||||
raise ValueError(f"host resolves to non-public address {ip}")
|
||||
|
||||
|
||||
def fire_webhooks_sync(event: str, payload: dict, project_id: int, db: Session):
|
||||
"""Find matching webhooks and send payloads (sync version)."""
|
||||
import httpx
|
||||
@@ -35,6 +64,8 @@ def fire_webhooks_sync(event: str, payload: dict, project_id: int, db: Session):
|
||||
payload=payload_json,
|
||||
)
|
||||
try:
|
||||
_validate_webhook_url(wh.url)
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if wh.secret:
|
||||
sig = hmac.new(
|
||||
@@ -42,7 +73,7 @@ def fire_webhooks_sync(event: str, payload: dict, project_id: int, db: Session):
|
||||
).hexdigest()
|
||||
headers["X-Webhook-Signature"] = sig
|
||||
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
with httpx.Client(timeout=10.0, follow_redirects=False) as client:
|
||||
resp = client.post(wh.url, content=payload_json, headers=headers)
|
||||
log.response_status = resp.status_code
|
||||
log.response_body = resp.text[:1000]
|
||||
|
||||
Reference in New Issue
Block a user