Files
Dialectic.Backend/services/api_key_service.py
2026-02-12 15:45:48 +00:00

71 lines
2.1 KiB
Python

from cryptography.fernet import Fernet, InvalidToken
from sqlalchemy.orm import Session
from db_models import ApiKey
import os
# Initialize the encryption key from environment or generate a new one
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode())
cipher_suite = Fernet(ENCRYPTION_KEY.encode())
class ApiKeyService:
"""
Service for managing API keys in the database
"""
@staticmethod
def encrypt_api_key(api_key: str) -> str:
"""
Encrypt an API key
"""
encrypted_key = cipher_suite.encrypt(api_key.encode())
return encrypted_key.decode()
@staticmethod
def decrypt_api_key(encrypted_api_key: str) -> str:
"""
Decrypt an API key
"""
decrypted_key = cipher_suite.decrypt(encrypted_api_key.encode())
return decrypted_key.decode()
@staticmethod
def get_api_key(db: Session, provider: str) -> str:
"""
Retrieve and decrypt an API key for a provider
"""
api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first()
if not api_key_record or not api_key_record.api_key_encrypted:
return None
try:
return ApiKeyService.decrypt_api_key(api_key_record.api_key_encrypted)
except InvalidToken:
return None
@staticmethod
def set_api_key(db: Session, provider: str, api_key: str) -> bool:
"""
Encrypt and store an API key for a provider
"""
encrypted_key = ApiKeyService.encrypt_api_key(api_key)
# Check if record exists
api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first()
if api_key_record:
# Update existing record
api_key_record.api_key_encrypted = encrypted_key
else:
# Create new record
api_key_record = ApiKey(
provider=provider,
api_key_encrypted=encrypted_key
)
db.add(api_key_record)
try:
db.commit()
return True
except Exception:
db.rollback()
return False