feat: apikey alias/renewal + markdown/patch authorship

- APIKey.alias (unique, required). Creating with an existing alias
  renews that key: same key string kept, validity reset to 15d,
  reactivated, name/roles updated (response has renewed=true).
- get_actor(): X-API-Key -> key alias, Bearer -> 'admin'.
- markdown & patch create/update record author / created_at /
  updated_at / last_modified_by from the actor.
- Idempotent run_migrations() (information_schema-guarded ALTERs +
  backfill) so existing tables/data gain the new columns on startup;
  create_all still covers fresh DBs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
h z
2026-05-16 22:51:40 +01:00
parent 9e2477df8c
commit bf4c0dbbbd
8 changed files with 164 additions and 8 deletions

View File

@@ -223,4 +223,25 @@ def update_last_used(api_key):
session.query(APIKey).filter_by(key=api_key.key).update(
{APIKey.last_used_at: datetime.now(UTC)}
)
session.commit()
session.commit()
def get_actor():
"""Identity string to record as author/last_modified_by.
- X-API-Key request -> the key's alias
- Keycloak Bearer request -> the literal 'admin' (the backend does not
track individual KC identities)
- otherwise -> None
Call only from endpoints already behind @require_auth.
"""
api_key_header = request.headers.get('X-API-Key')
if api_key_header:
api_key = get_api_key(api_key_header)
if api_key:
return api_key.alias
return None
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer'):
return 'admin'
return None

View File

@@ -1,3 +1,4 @@
from datetime import datetime, timedelta, UTC
from flask import Blueprint, request, jsonify
from api import generate_api_key
from db import get_db
@@ -10,13 +11,31 @@ api_key_bp = Blueprint('apikey', __name__, url_prefix='/api/apikey')
# product defines, regardless of what the request body asks for.
ALLOWED_API_KEY_ROLES = {'admin', 'creator', 'user'}
# Validity window applied on create and on every renewal.
KEY_TTL = timedelta(days=15)
@api_key_bp.route('/', methods=['POST'])
@require_auth(roles=['admin'])
def create_key():
data = request.get_json(silent=True)
"""Create an API key, or renew an existing one.
if not data or 'name' not in data:
`alias` is required and unique. Creating with an alias that already
exists is treated as a RENEWAL of that key: the same key string is
kept (so existing integrations keep working), its validity window is
reset, it is reactivated, and name/roles are updated. The (unchanged)
key string is returned again.
"""
data = request.get_json(silent=True)
if not data:
return jsonify({"error": "invalid or missing JSON body"}), 400
alias = data.get('alias')
name = data.get('name')
if not alias or not str(alias).strip():
return jsonify({"error": "alias is required"}), 400
if not name:
return jsonify({"error": "Name is required"}), 400
alias = str(alias).strip()
roles = data.get('roles', [])
if not isinstance(roles, list) or any(r not in ALLOWED_API_KEY_ROLES for r in roles):
@@ -24,10 +43,29 @@ def create_key():
try:
with get_db() as session:
apikey = APIKey(key=generate_api_key(), name=data['name'], roles=roles)
existing = session.query(APIKey).filter_by(alias=alias).first()
if existing is not None:
# Renewal: keep the key string, reset validity, reactivate.
existing.name = name
existing.roles = roles
existing.is_active = True
existing.expire = datetime.now(UTC) + KEY_TTL
session.commit()
result = existing.to_dict()
result['renewed'] = True
return jsonify(result), 200
apikey = APIKey(
key=generate_api_key(),
alias=alias,
name=name,
roles=roles,
expire=datetime.now(UTC) + KEY_TTL,
)
session.add(apikey)
session.commit()
result = apikey.to_dict()
result['renewed'] = False
return jsonify(result), 201
except Exception as e:
return jsonify({"error": str(e)}), 500

View File

@@ -1,8 +1,9 @@
from flask import Blueprint, request, jsonify
from sqlalchemy import or_
from api import limiter
from api import require_auth, etag_response, verify_token, is_user_admin
from api import require_auth, etag_response, verify_token, is_user_admin, get_actor
from contexts.RequestContext import RequestContext
from datetime import datetime, UTC
from db import get_db
from db.models.Markdown import Markdown
from db.models.MarkdownSetting import MarkdownSetting
@@ -225,7 +226,13 @@ def create_markdown():
setting_id = data.get('setting_id', None)
if not title or not content:
return jsonify({"error": "missing required fields"}), 400
new_markdown = Markdown(title=title, content=content, path_id=path_id, shortcut=shortcut, setting_id=setting_id)
actor = get_actor()
now = datetime.now(UTC)
new_markdown = Markdown(
title=title, content=content, path_id=path_id, shortcut=shortcut,
setting_id=setting_id, author=actor, last_modified_by=actor,
created_at=now, updated_at=now,
)
with get_db() as session:
try:
if shortcut != "":
@@ -301,6 +308,8 @@ def update_markdown(markdown_id):
markdown.shortcut = data.get('shortcut')
if 'setting_id' in data:
markdown.setting_id = data.get('setting_id')
markdown.updated_at = datetime.now(UTC)
markdown.last_modified_by = get_actor()
session.commit()
markdown_updated.send(None, payload=markdown.to_dict())
return jsonify(markdown.to_dict()), 200

View File

@@ -1,5 +1,6 @@
from datetime import datetime, UTC
from flask import Blueprint, request, jsonify
from api import limiter, require_auth, is_user_admin
from api import limiter, require_auth, is_user_admin, get_actor
from contexts.RequestContext import RequestContext
from db import get_db
from db.models.Markdown import Markdown
@@ -65,11 +66,17 @@ def create_patch():
if session.query(Markdown).get(markdown_id) is None:
return jsonify({"error": "markdown not found"}), 404
try:
actor = get_actor()
now = datetime.now(UTC)
patch = MarkdownPatch(
markdown_id=markdown_id,
title=data.get('title'),
content=content,
order=data.get('order', 0),
author=actor,
last_modified_by=actor,
created_at=now,
updated_at=now,
)
session.add(patch)
session.commit()
@@ -104,6 +111,8 @@ def update_patch(patch_id):
patch.content = data.get('content')
if 'order' in data:
patch.order = data.get('order')
patch.updated_at = datetime.now(UTC)
patch.last_modified_by = get_actor()
session.commit()
return jsonify(patch.to_dict()), 200