Verified locally end-to-end (before: exploitable, after: blocked). - config: refuse to start on weak/default/short SECRET_KEY (was trivially forgeable JWT -> full admin) - deps: add reusable require_admin dependency (JWT or API key) - api-keys: require admin to mint/list/revoke; mask key on list (was unauthenticated -> instant admin API key) - webhooks: whole router now admin-only (was fully unauthenticated CRUD + readable logs) - webhook delivery: validate URL scheme + reject hosts resolving to private/loopback/link-local/reserved IPs; disable redirects (was a readable SSRF primitive) - rbac: implement a real project-role hierarchy in check_project_role (was a no-op: any member, even guest, passed admin/mgr gates) - misc: auth on delete_milestone (+ensure_can_edit_milestone), worklog create/delete (force caller user_id, owner-only delete), /activity and /export/tasks (were unauthenticated data exposure) - tasks: auth + ensure_can_edit_task on assign_task and batch_assign Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
import json
|
|
import hmac
|
|
import hashlib
|
|
import logging
|
|
import socket
|
|
import ipaddress
|
|
from urllib.parse import urlparse
|
|
from sqlalchemy.orm import Session
|
|
from app.models.webhook import Webhook, WebhookLog
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _validate_webhook_url(url: str) -> None:
|
|
"""Raise ValueError if the URL would target a non-public address (SSRF guard)."""
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in ("http", "https"):
|
|
raise ValueError(f"unsupported scheme: {parsed.scheme!r}")
|
|
host = parsed.hostname
|
|
if not host:
|
|
raise ValueError("missing host")
|
|
# Resolve every address the host maps to and reject non-global ranges.
|
|
try:
|
|
infos = socket.getaddrinfo(host, parsed.port or (443 if parsed.scheme == "https" else 80))
|
|
except socket.gaierror as e:
|
|
raise ValueError(f"DNS resolution failed: {e}")
|
|
for info in infos:
|
|
ip = ipaddress.ip_address(info[4][0])
|
|
if (
|
|
ip.is_private
|
|
or ip.is_loopback
|
|
or ip.is_link_local
|
|
or ip.is_multicast
|
|
or ip.is_reserved
|
|
or ip.is_unspecified
|
|
):
|
|
raise ValueError(f"host resolves to non-public address {ip}")
|
|
|
|
|
|
def fire_webhooks_sync(event: str, payload: dict, project_id: int, db: Session):
|
|
"""Find matching webhooks and send payloads (sync version)."""
|
|
import httpx
|
|
|
|
webhooks = db.query(Webhook).filter(Webhook.is_active == True).all()
|
|
|
|
matched = []
|
|
for wh in webhooks:
|
|
events = [e.strip() for e in wh.events.split(",")]
|
|
if event not in events:
|
|
continue
|
|
if wh.project_id is not None and wh.project_id != project_id:
|
|
continue
|
|
matched.append(wh)
|
|
|
|
if not matched:
|
|
return
|
|
|
|
payload_json = json.dumps(payload, default=str)
|
|
|
|
for wh in matched:
|
|
log = WebhookLog(
|
|
webhook_id=wh.id,
|
|
event=event,
|
|
payload=payload_json,
|
|
)
|
|
try:
|
|
_validate_webhook_url(wh.url)
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
if wh.secret:
|
|
sig = hmac.new(
|
|
wh.secret.encode(), payload_json.encode(), hashlib.sha256
|
|
).hexdigest()
|
|
headers["X-Webhook-Signature"] = sig
|
|
|
|
with httpx.Client(timeout=10.0, follow_redirects=False) as client:
|
|
resp = client.post(wh.url, content=payload_json, headers=headers)
|
|
log.response_status = resp.status_code
|
|
log.response_body = resp.text[:1000]
|
|
log.success = 200 <= resp.status_code < 300
|
|
except Exception as e:
|
|
log.response_body = str(e)[:1000]
|
|
log.success = False
|
|
logger.warning(f"Webhook delivery failed for {wh.url}: {e}")
|
|
|
|
db.add(log)
|
|
db.commit()
|