Files
HarborForge.Backend/app/services/crypto_box.py

64 lines
2.0 KiB
Python

import base64
import hashlib
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
KEY_DIR = Path(os.getenv('MONITOR_KEY_DIR', '/config/monitor_keys'))
PRIV_PATH = KEY_DIR / 'monitor_private.pem'
PUB_PATH = KEY_DIR / 'monitor_public.pem'
def ensure_keypair() -> None:
KEY_DIR.mkdir(parents=True, exist_ok=True)
if PRIV_PATH.exists() and PUB_PATH.exists():
return
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
public_pem = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
PRIV_PATH.write_bytes(private_pem)
PUB_PATH.write_bytes(public_pem)
def get_public_key_info() -> Dict[str, str]:
ensure_keypair()
pem = PUB_PATH.read_text()
kid = hashlib.sha256(pem.encode()).hexdigest()[:16]
return {'public_key_pem': pem, 'key_id': kid}
def decrypt_payload_b64(ciphertext_b64: str) -> Dict[str, Any]:
ensure_keypair()
private_key = serialization.load_pem_private_key(PRIV_PATH.read_bytes(), password=None)
plaintext = private_key.decrypt(
base64.b64decode(ciphertext_b64),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
obj = json.loads(plaintext.decode())
return obj
def ts_within(ts_iso: str, max_minutes: int = 10) -> bool:
try:
ts = datetime.fromisoformat(ts_iso.replace('Z', '+00:00'))
except Exception:
return False
now = datetime.now(timezone.utc)
return abs((now - ts).total_seconds()) <= max_minutes * 60