add: backend api auth by apikey/apikey gen/apikey revoke

This commit is contained in:
h z
2025-05-06 18:54:10 +01:00
parent 85d8124a0c
commit 1a160c9415
3 changed files with 109 additions and 7 deletions

View File

@@ -1,22 +1,25 @@
import base64
import os
import pkgutil
from functools import wraps
from datetime import datetime, UTC
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from flask import jsonify, Blueprint, request, make_response
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from jwt import decode, ExpiredSignatureError, InvalidTokenError, get_unverified_header
import importlib
import requests
from threading import Lock
from db.models.APIKey import APIKey
from db import get_db
import base64
import os
import pkgutil
import secrets
import string
import env_provider
import hashlib
import json
import importlib
import requests
_public_key_cache = {}
_lock = Lock()
@@ -96,18 +99,42 @@ def require_auth(roles=[]):
def wrapper(*args, **kwargs):
if request.method == "OPTIONS":
return '', 200
auth_header = request.headers.get('Authorization')
api_key_header = request.headers.get('X-API-Key')
if auth_header and api_key_header:
return jsonify({"error": "Cannot use both Bearer token and API Key authentication"}), 403
if api_key_header:
api_key = get_api_key(api_key_header)
if not api_key:
return jsonify({"error": "Invalid API key"}), 401
expire_time = api_key.expire.replace(tzinfo=UTC) if api_key.expire.tzinfo is None else api_key.expire
if datetime.now(UTC) > expire_time:
return jsonify({"error": "API key has expired"}), 401
if roles and not (set(roles) & set(api_key.roles)):
return jsonify({"error": "Forbidden, permission denied"}), 403
update_last_used(api_key)
return func(*args, **kwargs)
if not auth_header or not auth_header.startswith('Bearer'):
return jsonify({"error": "Unauthorized"}), 401
token = auth_header.split(" ")[1]
decoded = verify_token(token)
if not decoded:
return jsonify({"error": "Invalid or expired token"}), 401
user_roles = decoded.get("resource_access", {}).get(env_provider.KC_CLIENT_ID, {}).get("roles", [])
if roles and not (set(roles) & set(user_roles)):
print("auth failed")
return jsonify({"error": "Forbidden, permission denied"}), 403
print("auth success")
return func(*args, **kwargs)
return wrapper
@@ -169,3 +196,17 @@ def etag_response(f):
return resp
return response
return decorator
def generate_api_key(length=32):
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
def get_api_key(key):
with get_db() as session:
return session.query(APIKey).filter_by(key=key, is_active=True).first()
def update_last_used(api_key):
with get_db() as session:
api_key.last_used_at = datetime.now(UTC)
session.commit()

36
api/apikey/__init__.py Normal file
View File

@@ -0,0 +1,36 @@
from flask import Blueprint, request, jsonify
from api import get_api_key, generate_api_key
from db import get_db
from api import require_auth
from db.models.APIKey import APIKey
api_key_bp = Blueprint('apikey', __name__, url_prefix='/api/apikey')
@api_key_bp.route('/', methods=['POST'])
@require_auth(roles=['admin'])
def create_key():
data = request.get_json()
if not data or 'name' not in data:
return jsonify({"error": "Name is required"}), 400
roles = data.get('roles', [])
try:
with get_db() as session:
apikey = APIKey(key=generate_api_key(),name=data['name'], roles=roles)
session.add(apikey);
session.commit
return jsonify(apikey.to_dict()), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
@api_key_bp.route('/<key>', methods=['DELETE'])
@require_auth(roles=['admin'])
def revoke_key(key):
api_key = get_api_key(key)
with get_db() as session:
if not api_key:
return jsonify({"error": "API key not found"}), 404
api_key.is_active = False
session.commit()
return jsonify({"message": "API key revoked successfully"}), 200

25
db/models/APIKey.py Normal file
View File

@@ -0,0 +1,25 @@
from datetime import datetime, timedelta, UTC
from sqlalchemy import Column, String, DateTime, Boolean, JSON
from db.models import Base
class APIKey(Base):
__tablename__ = 'apikey'
key = Column(String(64), primary_key=True)
name = Column(String(255), nullable=False)
created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC))
last_used_at = Column(DateTime)
is_active = Column(Boolean, default=True)
roles = Column(JSON, nullable=False, default=list)
expire = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC) + timedelta(days=15))
def to_dict(self):
return {
"key": self.key,
"name": self.name,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
"is_active": self.is_active,
"roles": self.roles,
"expire": self.expire.isoformat() if self.expire else None
}