- ALLOWED_API_KEY_ROLES (+ apikey_cli ALLOWED_ROLES) gain 'agent'. - 'agent' added to require_auth on markdown/patch/path create/update/ delete/move and backup get/load. apikey mint, /backup/convert, logs, config, webhook and permission/template settings stay admin-only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
85 lines
3.2 KiB
Python
85 lines
3.2 KiB
Python
from datetime import datetime, timedelta, UTC
|
|
from flask import Blueprint, request, jsonify
|
|
from api import generate_api_key
|
|
from db import get_db
|
|
from api import require_auth
|
|
from db.models.APIKey import APIKey
|
|
|
|
api_key_bp = Blueprint('apikey', __name__, url_prefix='/api/apikey')
|
|
|
|
# An API key must never be able to request a role broader than what the
|
|
# product defines, regardless of what the request body asks for.
|
|
ALLOWED_API_KEY_ROLES = {'admin', 'creator', 'user', 'agent'}
|
|
|
|
# Validity window applied on create and on every renewal.
|
|
KEY_TTL = timedelta(days=15)
|
|
|
|
@api_key_bp.route('/', methods=['POST'])
|
|
@require_auth(roles=['admin'])
|
|
def create_key():
|
|
"""Create an API key, or renew an existing one.
|
|
|
|
`alias` is required and unique. Creating with an alias that already
|
|
exists is treated as a RENEWAL of that key: the same key string is
|
|
kept (so existing integrations keep working), its validity window is
|
|
reset, it is reactivated, and name/roles are updated. The (unchanged)
|
|
key string is returned again.
|
|
"""
|
|
data = request.get_json(silent=True)
|
|
if not data:
|
|
return jsonify({"error": "invalid or missing JSON body"}), 400
|
|
|
|
alias = data.get('alias')
|
|
name = data.get('name')
|
|
if not alias or not str(alias).strip():
|
|
return jsonify({"error": "alias is required"}), 400
|
|
if not name:
|
|
return jsonify({"error": "Name is required"}), 400
|
|
alias = str(alias).strip()
|
|
|
|
roles = data.get('roles', [])
|
|
if not isinstance(roles, list) or any(r not in ALLOWED_API_KEY_ROLES for r in roles):
|
|
return jsonify({"error": f"roles must be a subset of {sorted(ALLOWED_API_KEY_ROLES)}"}), 400
|
|
|
|
try:
|
|
with get_db() as session:
|
|
existing = session.query(APIKey).filter_by(alias=alias).first()
|
|
if existing is not None:
|
|
# Renewal: keep the key string, reset validity, reactivate.
|
|
existing.name = name
|
|
existing.roles = roles
|
|
existing.is_active = True
|
|
existing.expire = datetime.now(UTC) + KEY_TTL
|
|
session.commit()
|
|
result = existing.to_dict()
|
|
result['renewed'] = True
|
|
return jsonify(result), 200
|
|
|
|
apikey = APIKey(
|
|
key=generate_api_key(),
|
|
alias=alias,
|
|
name=name,
|
|
roles=roles,
|
|
expire=datetime.now(UTC) + KEY_TTL,
|
|
)
|
|
session.add(apikey)
|
|
session.commit()
|
|
result = apikey.to_dict()
|
|
result['renewed'] = False
|
|
return jsonify(result), 201
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@api_key_bp.route('/<key>', methods=['DELETE'])
|
|
@require_auth(roles=['admin'])
|
|
def revoke_key(key):
|
|
# Query and mutate within the same session, otherwise the update is
|
|
# performed on a detached instance and silently never persists.
|
|
with get_db() as session:
|
|
api_key = session.query(APIKey).filter_by(key=key, is_active=True).first()
|
|
if not api_key:
|
|
return jsonify({"error": "API key not found"}), 404
|
|
api_key.is_active = False
|
|
session.commit()
|
|
return jsonify({"message": "API key revoked successfully"}), 200
|