Files
HarborForge.Backend/app/services/webhook.py
hzhang 51fb8ca073 fix(security): close critical auth/SSRF/RBAC holes
Verified locally end-to-end (before: exploitable, after: blocked).

- config: refuse to start on weak/default/short SECRET_KEY (was
  trivially forgeable JWT -> full admin)
- deps: add reusable require_admin dependency (JWT or API key)
- api-keys: require admin to mint/list/revoke; mask key on list
  (was unauthenticated -> instant admin API key)
- webhooks: whole router now admin-only (was fully unauthenticated
  CRUD + readable logs)
- webhook delivery: validate URL scheme + reject hosts resolving to
  private/loopback/link-local/reserved IPs; disable redirects
  (was a readable SSRF primitive)
- rbac: implement a real project-role hierarchy in check_project_role
  (was a no-op: any member, even guest, passed admin/mgr gates)
- misc: auth on delete_milestone (+ensure_can_edit_milestone),
  worklog create/delete (force caller user_id, owner-only delete),
  /activity and /export/tasks (were unauthenticated data exposure)
- tasks: auth + ensure_can_edit_task on assign_task and batch_assign

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:53:14 +01:00

88 lines
2.8 KiB
Python

import json
import hmac
import hashlib
import logging
import socket
import ipaddress
from urllib.parse import urlparse
from sqlalchemy.orm import Session
from app.models.webhook import Webhook, WebhookLog
logger = logging.getLogger(__name__)
def _validate_webhook_url(url: str) -> None:
"""Raise ValueError if the URL would target a non-public address (SSRF guard)."""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"unsupported scheme: {parsed.scheme!r}")
host = parsed.hostname
if not host:
raise ValueError("missing host")
# Resolve every address the host maps to and reject non-global ranges.
try:
infos = socket.getaddrinfo(host, parsed.port or (443 if parsed.scheme == "https" else 80))
except socket.gaierror as e:
raise ValueError(f"DNS resolution failed: {e}")
for info in infos:
ip = ipaddress.ip_address(info[4][0])
if (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
):
raise ValueError(f"host resolves to non-public address {ip}")
def fire_webhooks_sync(event: str, payload: dict, project_id: int, db: Session):
"""Find matching webhooks and send payloads (sync version)."""
import httpx
webhooks = db.query(Webhook).filter(Webhook.is_active == True).all()
matched = []
for wh in webhooks:
events = [e.strip() for e in wh.events.split(",")]
if event not in events:
continue
if wh.project_id is not None and wh.project_id != project_id:
continue
matched.append(wh)
if not matched:
return
payload_json = json.dumps(payload, default=str)
for wh in matched:
log = WebhookLog(
webhook_id=wh.id,
event=event,
payload=payload_json,
)
try:
_validate_webhook_url(wh.url)
headers = {"Content-Type": "application/json"}
if wh.secret:
sig = hmac.new(
wh.secret.encode(), payload_json.encode(), hashlib.sha256
).hexdigest()
headers["X-Webhook-Signature"] = sig
with httpx.Client(timeout=10.0, follow_redirects=False) as client:
resp = client.post(wh.url, content=payload_json, headers=headers)
log.response_status = resp.status_code
log.response_body = resp.text[:1000]
log.success = 200 <= resp.status_code < 300
except Exception as e:
log.response_body = str(e)[:1000]
log.success = False
logger.warning(f"Webhook delivery failed for {wh.url}: {e}")
db.add(log)
db.commit()