from datetime import datetime, timedelta, UTC from flask import Blueprint, request, jsonify from api import generate_api_key from db import get_db from api import require_auth from db.models.APIKey import APIKey api_key_bp = Blueprint('apikey', __name__, url_prefix='/api/apikey') # An API key must never be able to request a role broader than what the # product defines, regardless of what the request body asks for. ALLOWED_API_KEY_ROLES = {'admin', 'creator', 'user'} # Validity window applied on create and on every renewal. KEY_TTL = timedelta(days=15) @api_key_bp.route('/', methods=['POST']) @require_auth(roles=['admin']) def create_key(): """Create an API key, or renew an existing one. `alias` is required and unique. Creating with an alias that already exists is treated as a RENEWAL of that key: the same key string is kept (so existing integrations keep working), its validity window is reset, it is reactivated, and name/roles are updated. The (unchanged) key string is returned again. """ data = request.get_json(silent=True) if not data: return jsonify({"error": "invalid or missing JSON body"}), 400 alias = data.get('alias') name = data.get('name') if not alias or not str(alias).strip(): return jsonify({"error": "alias is required"}), 400 if not name: return jsonify({"error": "Name is required"}), 400 alias = str(alias).strip() roles = data.get('roles', []) if not isinstance(roles, list) or any(r not in ALLOWED_API_KEY_ROLES for r in roles): return jsonify({"error": f"roles must be a subset of {sorted(ALLOWED_API_KEY_ROLES)}"}), 400 try: with get_db() as session: existing = session.query(APIKey).filter_by(alias=alias).first() if existing is not None: # Renewal: keep the key string, reset validity, reactivate. existing.name = name existing.roles = roles existing.is_active = True existing.expire = datetime.now(UTC) + KEY_TTL session.commit() result = existing.to_dict() result['renewed'] = True return jsonify(result), 200 apikey = APIKey( key=generate_api_key(), alias=alias, name=name, roles=roles, expire=datetime.now(UTC) + KEY_TTL, ) session.add(apikey) session.commit() result = apikey.to_dict() result['renewed'] = False return jsonify(result), 201 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route('/', methods=['DELETE']) @require_auth(roles=['admin']) def revoke_key(key): # Query and mutate within the same session, otherwise the update is # performed on a detached instance and silently never persists. with get_db() as session: api_key = session.query(APIKey).filter_by(key=key, is_active=True).first() if not api_key: return jsonify({"error": "API key not found"}), 404 api_key.is_active = False session.commit() return jsonify({"message": "API key revoked successfully"}), 200