import base64 import hashlib import json import os from datetime import datetime, timezone from pathlib import Path from typing import Dict, Any from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import rsa, padding KEY_DIR = Path(os.getenv('MONITOR_KEY_DIR', '/config/monitor_keys')) PRIV_PATH = KEY_DIR / 'monitor_private.pem' PUB_PATH = KEY_DIR / 'monitor_public.pem' def ensure_keypair() -> None: KEY_DIR.mkdir(parents=True, exist_ok=True) if PRIV_PATH.exists() and PUB_PATH.exists(): return private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) private_pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) public_pem = private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) PRIV_PATH.write_bytes(private_pem) PUB_PATH.write_bytes(public_pem) def get_public_key_info() -> Dict[str, str]: ensure_keypair() pem = PUB_PATH.read_text() kid = hashlib.sha256(pem.encode()).hexdigest()[:16] return {'public_key_pem': pem, 'key_id': kid} def decrypt_payload_b64(ciphertext_b64: str) -> Dict[str, Any]: ensure_keypair() private_key = serialization.load_pem_private_key(PRIV_PATH.read_bytes(), password=None) plaintext = private_key.decrypt( base64.b64decode(ciphertext_b64), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None, ), ) obj = json.loads(plaintext.decode()) return obj def ts_within(ts_iso: str, max_minutes: int = 10) -> bool: try: ts = datetime.fromisoformat(ts_iso.replace('Z', '+00:00')) except Exception: return False now = datetime.now(timezone.utc) return abs((now - ts).total_seconds()) <= max_minutes * 60