from sqlalchemy.orm import Session from db.models.Path import Path from db.models.PathSetting import PathSetting from db.models.Webhook import Webhook from db.models.WebhookSetting import WebhookSetting from events import MARKDOWN_CREATED_EVENT, MARKDOWN_UPDATED_EVENT, MARKDOWN_DELETED_EVENT, PATH_CREATED_EVENT, \ PATH_UPDATED_EVENT, PATH_DELETED_EVENT import abc import importlib import json import pkgutil import requests import db event_type_map = { MARKDOWN_CREATED_EVENT: 'markdown_created_event', MARKDOWN_UPDATED_EVENT: 'markdown_updated_event', MARKDOWN_DELETED_EVENT: 'markdown_deleted_event', PATH_CREATED_EVENT: 'path_created_event', PATH_UPDATED_EVENT: 'path_updated_event', PATH_DELETED_EVENT: 'path_deleted_event', } class WebhookEventHandler(abc.ABC): def __init__(self, event_type=0): self.event_type = event_type @abc.abstractmethod def get_path_id(self, payload): pass def __call__(self, *args, **kwargs): payload = kwargs['payload'] path_id = self.get_path_id(payload) with db.get_db() as session: setting = self.get_setting(session, path_id) if setting is None: return headers = {'Content-Type': 'application/json', 'x-alchegos-event': event_type_map[self.event_type]} if setting.get("additional_header", None) is not None: headers.update(json.loads(setting["additional_header"])) body = json.dumps(payload, default=str) try: response = requests.post(setting["webhook_url"], data=body, headers=headers, timeout=5) response.raise_for_status() except Exception as e: print(e) def get_setting(self, session: Session, path_id): path = session.query(Path).filter(Path.id == path_id).first() if path is None: return None p = path.to_dict() path_setting = session.query(PathSetting).get(path.setting_id) if path_setting is None: return None webhook_setting = session.query(WebhookSetting).get(path_setting.webhook_setting_id) if webhook_setting is None and p["parent_id"] != 1: return self.get_setting(session, p["parent_id"]) setting = webhook_setting.to_dict() if not setting["enabled"] or setting["on_events"] & self.event_type == 0: return None webhook = session.query(Webhook).get(webhook_setting.webhook_id) if webhook is None: return None setting["webhook_url"] = webhook.to_dict()["hook_url"] return setting _auto_instantiate_classes = set() def auto_instantiate(cls): _auto_instantiate_classes.add(cls) return cls def register_all_webhook_event_handlers(): package = __name__ package_path = __path__ for finder, name, ispkg in pkgutil.walk_packages(package_path, package+"."): importlib.import_module(name) for cls in _auto_instantiate_classes: cls()