Compare commits

...

11 Commits

Author SHA1 Message Date
h z
bfef232b8f Merge pull request 'feat: add 'agent' API key role (content CRUD + backup)' (#4) from feat/agent-role into main
Reviewed-on: #4
2026-05-17 14:10:29 +00:00
b31480bf25 feat: add 'agent' API key role (content CRUD + backup)
- ALLOWED_API_KEY_ROLES (+ apikey_cli ALLOWED_ROLES) gain 'agent'.
- 'agent' added to require_auth on markdown/patch/path create/update/
  delete/move and backup get/load. apikey mint, /backup/convert, logs,
  config, webhook and permission/template settings stay admin-only.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 15:06:17 +01:00
h z
9383f8cb03 Merge pull request 'feat: admin CLI for API key management (no admin login)' (#3) from feat/apikey-admin-cli into master
Reviewed-on: #3
2026-05-16 22:11:55 +00:00
67a04d67d9 feat: admin CLI for API key management (no admin login)
apikey_cli.py operates directly on the DB (run inside the backend
container). Subcommands: create (alias required; reusing an alias
renews — same key, validity reset, reactivated, name/roles updated;
roles allowlisted; configurable --ttl-days), list (masked keys,
--show-keys to reveal), revoke (by --alias or --key).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 23:10:31 +01:00
h z
f1584d1841 Merge pull request 'feat/apikey-alias-authorship' (#2) from feat/apikey-alias-authorship into master
Reviewed-on: #2
2026-05-16 22:06:14 +00:00
a3a6cbbec6 chore: standalone idempotent prod SQL migration (apikey alias + authorship)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 22:58:54 +01:00
bf4c0dbbbd feat: apikey alias/renewal + markdown/patch authorship
- APIKey.alias (unique, required). Creating with an existing alias
  renews that key: same key string kept, validity reset to 15d,
  reactivated, name/roles updated (response has renewed=true).
- get_actor(): X-API-Key -> key alias, Bearer -> 'admin'.
- markdown & patch create/update record author / created_at /
  updated_at / last_modified_by from the actor.
- Idempotent run_migrations() (information_schema-guarded ALTERs +
  backfill) so existing tables/data gain the new columns on startup;
  create_all still covers fresh DBs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 22:51:40 +01:00
h z
9e2477df8c Merge pull request 'fix/security-hardening' (#1) from fix/security-hardening into master
Reviewed-on: #1
2026-05-16 16:30:25 +00:00
155aa897c6 feat: markdown patch cards (model + API)
Add MarkdownPatch model (markdown_patch table, auto-created by
create_all) and /api/patch blueprint: list patches for a markdown
(inherits the parent's private/protected visibility), create/update
(admin|creator), delete (admin).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 17:28:04 +01:00
58f23ddcb8 Security hardening: fix RCE, auth and SSRF issues
Critical:
- backup: prevent Zip Slip path traversal and zip bombs in restore/convert
  via safe_extract(); serialize get_backup() with backup_lock and always
  restore CWD so concurrent requests can't corrupt the os.chdir state
- app: only enable the Werkzeug debugger/reloader when ENVIRONMENT=dev;
  always init rate limits (also under WSGI), not just under __main__
- apikey: fix create_key never committing (session.commit -> commit()),
  validate roles against an allowlist, and fix revoke_key/update_last_used
  operating on detached instances so revocation actually persists
- env_provider: redact DB_PASSWORD and SESSION_SECRET_KEY in summerize()

High:
- markdown: filter private/protected docs for non-admins in the listing,
  get_home, get_index and search endpoints (was an anonymous data leak);
  escape LIKE metacharacters and cap search results
- webhooks: validate target URL to block SSRF (loopback/private/link-local/
  metadata IPs), disable redirects, safely parse additional_header
- auth: validate JWT issuer and require exp/iat; add timeout to JWKS fetch;
  harden Authorization header parsing against malformed values
- log: require admin for GET /api/log and auth for POST; bound entry size

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:12:43 +01:00
1f4ca52a10 add: markdown deletion 2025-06-27 12:06:28 +01:00
16 changed files with 782 additions and 54 deletions

View File

@@ -37,7 +37,8 @@ def x5c_to_public_key(x5c):
def get_jwks():
url = f"{env_provider.KC_HOST}/realms/{env_provider.KC_REALM}/protocol/openid-connect/certs"
response = requests.get(url)
response = requests.get(url, timeout=5)
response.raise_for_status()
jwks = response.json()
return jwks
@@ -72,7 +73,9 @@ def verify_token(token):
token,
public_key,
algorithms=["RS256"],
audience=env_provider.KC_CLIENT_ID
audience=env_provider.KC_CLIENT_ID,
issuer=f"{env_provider.KC_HOST}/realms/{env_provider.KC_REALM}",
options={"require": ["exp", "iat"]},
)
return decoded
except ExpiredSignatureError as e:
@@ -86,7 +89,10 @@ def is_user_admin():
is_admin = False
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer'):
token = auth_header.split(" ")[1]
parts = auth_header.split(" ", 1)
if len(parts) != 2 or not parts[1].strip():
return is_admin
token = parts[1].strip()
decoded = verify_token(token)
if decoded:
user_roles = decoded.get("resource_access", {}).get(env_provider.KC_CLIENT_ID, {}).get("roles", [])
@@ -124,7 +130,10 @@ def require_auth(roles=[]):
if not auth_header or not auth_header.startswith('Bearer'):
return jsonify({"error": "Unauthorized"}), 401
token = auth_header.split(" ")[1]
parts = auth_header.split(" ", 1)
if len(parts) != 2 or not parts[1].strip():
return jsonify({"error": "Unauthorized"}), 401
token = parts[1].strip()
decoded = verify_token(token)
if not decoded:
@@ -207,6 +216,32 @@ def get_api_key(key):
return session.query(APIKey).filter_by(key=key, is_active=True).first()
def update_last_used(api_key):
# api_key comes from get_api_key()'s (now closed) session, so it is
# detached. Update by key in a fresh session instead of mutating the
# detached instance, which would never be persisted.
with get_db() as session:
api_key.last_used_at = datetime.now(UTC)
session.commit()
session.query(APIKey).filter_by(key=api_key.key).update(
{APIKey.last_used_at: datetime.now(UTC)}
)
session.commit()
def get_actor():
"""Identity string to record as author/last_modified_by.
- X-API-Key request -> the key's alias
- Keycloak Bearer request -> the literal 'admin' (the backend does not
track individual KC identities)
- otherwise -> None
Call only from endpoints already behind @require_auth.
"""
api_key_header = request.headers.get('X-API-Key')
if api_key_header:
api_key = get_api_key(api_key_header)
if api_key:
return api_key.alias
return None
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer'):
return 'admin'
return None

View File

@@ -1,36 +1,84 @@
from datetime import datetime, timedelta, UTC
from flask import Blueprint, request, jsonify
from api import get_api_key, generate_api_key
from api import generate_api_key
from db import get_db
from api import require_auth
from db.models.APIKey import APIKey
api_key_bp = Blueprint('apikey', __name__, url_prefix='/api/apikey')
# An API key must never be able to request a role broader than what the
# product defines, regardless of what the request body asks for.
ALLOWED_API_KEY_ROLES = {'admin', 'creator', 'user', 'agent'}
# Validity window applied on create and on every renewal.
KEY_TTL = timedelta(days=15)
@api_key_bp.route('/', methods=['POST'])
@require_auth(roles=['admin'])
def create_key():
data = request.get_json()
if not data or 'name' not in data:
"""Create an API key, or renew an existing one.
`alias` is required and unique. Creating with an alias that already
exists is treated as a RENEWAL of that key: the same key string is
kept (so existing integrations keep working), its validity window is
reset, it is reactivated, and name/roles are updated. The (unchanged)
key string is returned again.
"""
data = request.get_json(silent=True)
if not data:
return jsonify({"error": "invalid or missing JSON body"}), 400
alias = data.get('alias')
name = data.get('name')
if not alias or not str(alias).strip():
return jsonify({"error": "alias is required"}), 400
if not name:
return jsonify({"error": "Name is required"}), 400
alias = str(alias).strip()
roles = data.get('roles', [])
if not isinstance(roles, list) or any(r not in ALLOWED_API_KEY_ROLES for r in roles):
return jsonify({"error": f"roles must be a subset of {sorted(ALLOWED_API_KEY_ROLES)}"}), 400
try:
with get_db() as session:
apikey = APIKey(key=generate_api_key(),name=data['name'], roles=roles)
session.add(apikey);
session.commit
return jsonify(apikey.to_dict()), 201
existing = session.query(APIKey).filter_by(alias=alias).first()
if existing is not None:
# Renewal: keep the key string, reset validity, reactivate.
existing.name = name
existing.roles = roles
existing.is_active = True
existing.expire = datetime.now(UTC) + KEY_TTL
session.commit()
result = existing.to_dict()
result['renewed'] = True
return jsonify(result), 200
apikey = APIKey(
key=generate_api_key(),
alias=alias,
name=name,
roles=roles,
expire=datetime.now(UTC) + KEY_TTL,
)
session.add(apikey)
session.commit()
result = apikey.to_dict()
result['renewed'] = False
return jsonify(result), 201
except Exception as e:
return jsonify({"error": str(e)}), 500
@api_key_bp.route('/<key>', methods=['DELETE'])
@require_auth(roles=['admin'])
def revoke_key(key):
api_key = get_api_key(key)
# Query and mutate within the same session, otherwise the update is
# performed on a detached instance and silently never persists.
with get_db() as session:
api_key = session.query(APIKey).filter_by(key=key, is_active=True).first()
if not api_key:
return jsonify({"error": "API key not found"}), 404
api_key.is_active = False
session.commit()
return jsonify({"message": "API key revoked successfully"}), 200
return jsonify({"message": "API key revoked successfully"}), 200

View File

@@ -215,6 +215,40 @@ logger = logging.getLogger(__name__)
backup_bp = Blueprint('backup', __name__, url_prefix='/api/backup')
# Upper bounds to defend against zip bombs in uploaded backups.
_MAX_ARCHIVE_MEMBERS = 50000
_MAX_UNCOMPRESSED_BYTES = 1 * 1024 * 1024 * 1024 # 1 GiB
def safe_extract(zip_ref, dest_dir):
"""
Safely extract a zip archive into dest_dir.
Prevents "Zip Slip" path traversal (entries like ``../../etc/x`` or
absolute paths) and rejects zip bombs by capping the member count and
total uncompressed size. Backups are admin-uploaded but may originate
from an untrusted source (e.g. the /convert endpoint ingests foreign
backups), so the contents must be treated as hostile.
"""
dest_root = os.path.realpath(dest_dir)
members = zip_ref.infolist()
if len(members) > _MAX_ARCHIVE_MEMBERS:
raise ValueError("backup archive has too many entries")
total = 0
for member in members:
total += member.file_size
if total > _MAX_UNCOMPRESSED_BYTES:
raise ValueError("backup archive uncompressed size exceeds limit")
target = os.path.realpath(os.path.join(dest_root, member.filename))
if target != dest_root and not target.startswith(dest_root + os.sep):
raise ValueError(f"unsafe path in backup archive: {member.filename}")
zip_ref.extractall(dest_root)
def check_and_convert_backup_version(backup_dir):
"""
Check the backup version and convert it if necessary.
@@ -280,7 +314,7 @@ def convert_backup_endpoint():
uploaded_file.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(backup_dir)
safe_extract(zip_ref, backup_dir)
success, error_response = check_and_convert_backup_version(backup_dir)
if not success:
@@ -312,7 +346,7 @@ def convert_backup_endpoint():
backup_lock = threading.Lock()
@backup_bp.route('/', methods=['GET'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
def get_backup():
"""
Create a backup of the application's data.
@@ -331,7 +365,12 @@ def get_backup():
Response Codes:
- 200: Backup created successfully
- 500: Failed to create backup
- 429: Another backup operation is in progress
"""
if not backup_lock.acquire(blocking=False):
return jsonify({"error": "Another backup operation is in progress. Please try again later."}), 429
original_cwd = os.getcwd()
try:
if os.path.exists('Root'):
shutil.rmtree('Root')
@@ -370,6 +409,11 @@ def get_backup():
except Exception as e:
logger.error(f"Failed to get backup: {e}")
return jsonify({"error": "failed to get backup"}), 500
finally:
# os.chdir is process-global; always restore CWD so a failure
# mid-traverse can't corrupt later requests.
os.chdir(original_cwd)
backup_lock.release()
def create_and_cd(path_name):
@@ -514,7 +558,7 @@ def traverse(path_id, paths):
@backup_bp.route('/load', methods=['POST'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
def load_backup():
"""
Restore data from a backup file.
@@ -552,7 +596,7 @@ def load_backup():
uploaded_file.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
safe_extract(zip_ref, temp_dir)
root_dir = os.path.join(temp_dir, "Root")
if not os.path.exists(root_dir):

View File

@@ -1,12 +1,18 @@
#api/log.py
from flask import Blueprint, jsonify, request
from api import require_auth
from db import get_db
from db.models.Log import Log
from db.utils import insert_log
logs_bp = Blueprint('log', __name__, url_prefix='/api/log')
# Bound per-entry size so an authenticated-but-low-trust caller can't bloat
# the log table with multi-megabyte payloads.
_MAX_LOG_MESSAGE_LEN = 16 * 1024
@logs_bp.route('/', methods=['GET'])
@require_auth(roles=['admin'])
def get_logs():
level = request.args.get('level')
application = request.args.get('application')
@@ -29,14 +35,17 @@ def get_logs():
})
@logs_bp.route('/', methods=['POST'])
@require_auth()
def create_log():
data = request.json
data = request.get_json(silent=True)
if not data:
return jsonify({"error": "invalid or missing JSON body"}), 400
required_fields = ['level', 'message']
for field in required_fields:
if field not in data:
return jsonify({"error": f"missing {field} in request"}), 400
level = data.get('level')
message = data.get('message')
level = str(data.get('level'))[:64]
message = str(data.get('message'))[:_MAX_LOG_MESSAGE_LEN]
application = "frontend"
extra = data.get('extra', None)
log_entry = Log(level=level, message=message, application=application, extra=extra)

View File

@@ -1,8 +1,9 @@
from flask import Blueprint, request, jsonify
from sqlalchemy import or_
from api import limiter
from api import require_auth, etag_response, verify_token, is_user_admin
from api import require_auth, etag_response, verify_token, is_user_admin, get_actor
from contexts.RequestContext import RequestContext
from datetime import datetime, UTC
from db import get_db
from db.models.Markdown import Markdown
from db.models.MarkdownSetting import MarkdownSetting
@@ -15,6 +16,40 @@ logger = logging.getLogger(__name__)
markdown_bp = Blueprint('markdown', __name__, url_prefix='/api/markdown')
def _permission_of(session, markdown):
"""Return the effective permission string ('private'/'protected'/None)."""
if markdown.setting_id is None:
return None
setting = session.query(MarkdownSetting).get(markdown.setting_id)
if not setting or not setting.permission_setting_id:
return None
ps = session.query(MarkdownPermissionSetting).get(setting.permission_setting_id)
return ps.permission if ps else None
def _filter_visible(session, markdowns, is_admin):
"""
Apply markdown permission settings to a list for non-admin callers:
private documents are dropped entirely, protected documents have their
content masked. Admins see everything. Mirrors get_markdown()'s checks
so listing/search endpoints can no longer leak restricted content.
"""
if is_admin:
return [md.to_dict() for md in markdowns]
visible = []
for md in markdowns:
permission = _permission_of(session, md)
if permission == 'private':
continue
data = md.to_dict()
if permission == 'protected':
data['content'] = None
data['protected'] = True
visible.append(data)
return visible
@markdown_bp.route('/', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@etag_response
@@ -30,9 +65,10 @@ def get_markdowns():
Response Codes:
- 200: Success
"""
is_admin = is_user_admin()
with get_db() as session:
mds = session.query(Markdown).all()
return jsonify([md.to_dict() for md in mds]), 200
return jsonify(_filter_visible(session, mds, is_admin)), 200
@markdown_bp.route('/get_home', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@@ -51,11 +87,15 @@ def get_home():
- 200: Success
- 204: No content (home markdown not found)
"""
is_admin = is_user_admin()
with get_db() as session:
markdown = session.query(Markdown).filter(Markdown.path_id == 1, Markdown.title == "index").first()
if markdown is None:
return jsonify({}), 204
return jsonify(markdown.to_dict()), 200
visible = _filter_visible(session, [markdown], is_admin)
if not visible:
return jsonify({}), 204
return jsonify(visible[0]), 200
@markdown_bp.route('/by_path/<int:path_id>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@@ -75,9 +115,10 @@ def get_markdowns_by_path(path_id):
Response Codes:
- 200: Success
"""
is_admin = is_user_admin()
with get_db() as session:
markdowns = session.query(Markdown).filter(Markdown.path_id == path_id).all()
return jsonify([md.to_dict() for md in markdowns]), 200
return jsonify(_filter_visible(session, markdowns, is_admin)), 200
@markdown_bp.route('/get_index/<int:path_id>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@@ -99,11 +140,15 @@ def get_index(path_id):
- 200: Success
- 204: No content (index markdown not found)
"""
is_admin = is_user_admin()
with get_db() as session:
markdown = session.query(Markdown).filter(Markdown.path_id == path_id).filter(Markdown.title == "index").first()
if markdown is None:
return jsonify({}), 204
return jsonify(markdown.to_dict()), 200
visible = _filter_visible(session, [markdown], is_admin)
if not visible:
return jsonify({}), 204
return jsonify(visible[0]), 200
@@ -149,7 +194,7 @@ def get_markdown(markdown_id):
return jsonify(markdown.to_dict()), 200
@markdown_bp.route('/', methods=['POST'])
@require_auth(roles=['admin', 'creator'])
@require_auth(roles=['admin', 'creator', 'agent'])
@limiter.limit(api.get_rate_limit)
def create_markdown():
"""
@@ -181,7 +226,13 @@ def create_markdown():
setting_id = data.get('setting_id', None)
if not title or not content:
return jsonify({"error": "missing required fields"}), 400
new_markdown = Markdown(title=title, content=content, path_id=path_id, shortcut=shortcut, setting_id=setting_id)
actor = get_actor()
now = datetime.now(UTC)
new_markdown = Markdown(
title=title, content=content, path_id=path_id, shortcut=shortcut,
setting_id=setting_id, author=actor, last_modified_by=actor,
created_at=now, updated_at=now,
)
with get_db() as session:
try:
if shortcut != "":
@@ -199,7 +250,7 @@ def create_markdown():
return jsonify({"error": f"create failed - {errno}"}), 500
@markdown_bp.route('/<int:markdown_id>', methods=['PUT', 'PATCH'])
@require_auth(roles=['admin', 'creator'])
@require_auth(roles=['admin', 'creator', 'agent'])
@limiter.limit(api.get_rate_limit)
def update_markdown(markdown_id):
"""
@@ -257,12 +308,14 @@ def update_markdown(markdown_id):
markdown.shortcut = data.get('shortcut')
if 'setting_id' in data:
markdown.setting_id = data.get('setting_id')
markdown.updated_at = datetime.now(UTC)
markdown.last_modified_by = get_actor()
session.commit()
markdown_updated.send(None, payload=markdown.to_dict())
return jsonify(markdown.to_dict()), 200
@markdown_bp.route('/<int:markdown_id>', methods=['DELETE'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
@limiter.limit(api.get_rate_limit)
def delete_markdown(markdown_id):
"""
@@ -317,22 +370,28 @@ def delete_markdown(markdown_id):
session.delete(markdown_setting)
# Send webhook event before committing the transaction
# This ensures webhook handlers can still access related data
markdown_deleted.send(None, payload=md)
session.delete(markdown)
session.commit()
markdown_deleted.send(None, payload=md)
logger.info(f"Successfully deleted markdown {markdown_id} with cascade deletion")
return jsonify(md), 200
except Exception as e:
import traceback
logger.error(f"Failed to delete markdown {markdown_id}: {e}")
logger.error(f"Exception type: {type(e).__name__}")
logger.error(f"Full traceback:\n{traceback.format_exc()}")
errno = RequestContext.get_error_id()
session.rollback()
return jsonify({"error": f"delete failed - {errno}"}), 500
@markdown_bp.route('/move_forward/<int:markdown_id>', methods=['PATCH'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
@limiter.limit(api.get_rate_limit)
def move_forward(markdown_id):
"""
@@ -369,7 +428,7 @@ def move_forward(markdown_id):
@markdown_bp.route('/move_backward/<int:markdown_id>', methods=['PATCH'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
@limiter.limit(api.get_rate_limit)
def move_backward(markdown_id):
"""
@@ -423,11 +482,19 @@ def search_markdowns(keyword):
Response Codes:
- 200: Success
"""
is_admin = is_user_admin()
# Escape LIKE metacharacters so user input can't inject wildcards
# (full-table-scan DoS) and so search actually matches substrings.
escaped = keyword.replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_')
pattern = f"%{escaped}%"
with get_db() as session:
res = session.query(Markdown).filter(
or_(Markdown.title.like(keyword), Markdown.content.like(keyword))
).all()
return jsonify([md.to_dict() for md in res]), 200
or_(
Markdown.title.like(pattern, escape='\\'),
Markdown.content.like(pattern, escape='\\'),
)
).limit(200).all()
return jsonify(_filter_visible(session, res, is_admin)), 200
@markdown_bp.route('/links', methods=['GET'])
@limiter.limit(api.get_rate_limit)

131
api/patch.py Normal file
View File

@@ -0,0 +1,131 @@
from datetime import datetime, UTC
from flask import Blueprint, request, jsonify
from api import limiter, require_auth, is_user_admin, get_actor
from contexts.RequestContext import RequestContext
from db import get_db
from db.models.Markdown import Markdown
from db.models.MarkdownPatch import MarkdownPatch
from db.models.MarkdownSetting import MarkdownSetting
from db.models.MarkdownPermissionSetting import MarkdownPermissionSetting
import api
import logging
logger = logging.getLogger(__name__)
patch_bp = Blueprint('patch', __name__, url_prefix='/api/patch')
def _parent_visible_to_caller(session, markdown, is_admin):
"""Patch cards inherit the parent markdown's visibility. Admins see all;
non-admins are denied for private or protected parents (so patches can't
be used to leak restricted content)."""
if is_admin or markdown.setting_id is None:
return True
setting = session.query(MarkdownSetting).get(markdown.setting_id)
if not setting or not setting.permission_setting_id:
return True
ps = session.query(MarkdownPermissionSetting).get(setting.permission_setting_id)
if ps and ps.permission in ('private', 'protected'):
return False
return True
@patch_bp.route('/by_markdown/<int:markdown_id>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
def get_patches(markdown_id):
"""List patch cards for a markdown, ordered, respecting parent visibility."""
is_admin = is_user_admin()
with get_db() as session:
markdown = session.query(Markdown).get(markdown_id)
if markdown is None:
return jsonify({"error": "markdown not found"}), 404
if not _parent_visible_to_caller(session, markdown, is_admin):
return jsonify({"error": "permission denied"}), 403
patches = (
session.query(MarkdownPatch)
.filter(MarkdownPatch.markdown_id == markdown_id)
.order_by(MarkdownPatch.order.asc(), MarkdownPatch.id.asc())
.all()
)
return jsonify([p.to_dict() for p in patches]), 200
@patch_bp.route('/', methods=['POST'])
@require_auth(roles=['admin', 'creator', 'agent'])
@limiter.limit(api.get_rate_limit)
def create_patch():
"""Create a patch card. Body: markdown_id, content, title?, order?"""
data = request.get_json(silent=True)
if not data:
return jsonify({"error": "invalid or missing JSON body"}), 400
markdown_id = data.get('markdown_id')
content = data.get('content')
if not markdown_id or content is None or content == "":
return jsonify({"error": "missing required fields"}), 400
with get_db() as session:
if session.query(Markdown).get(markdown_id) is None:
return jsonify({"error": "markdown not found"}), 404
try:
actor = get_actor()
now = datetime.now(UTC)
patch = MarkdownPatch(
markdown_id=markdown_id,
title=data.get('title'),
content=content,
order=data.get('order', 0),
author=actor,
last_modified_by=actor,
created_at=now,
updated_at=now,
)
session.add(patch)
session.commit()
return jsonify(patch.to_dict()), 201
except Exception as e:
logger.error(f"failed to create patch: {e}")
errno = RequestContext.get_error_id()
session.rollback()
return jsonify({"error": f"create failed - {errno}"}), 500
@patch_bp.route('/<int:patch_id>', methods=['PUT', 'PATCH'])
@require_auth(roles=['admin', 'creator', 'agent'])
@limiter.limit(api.get_rate_limit)
def update_patch(patch_id):
"""Update a patch card (title/content/order)."""
data = request.get_json(silent=True)
if not data:
return jsonify({"error": "invalid or missing JSON body"}), 400
with get_db() as session:
patch = session.query(MarkdownPatch).get(patch_id)
if patch is None:
return jsonify({"error": "patch not found"}), 404
if request.method == "PUT":
patch.title = data.get('title')
patch.content = data.get('content')
patch.order = data.get('order', 0)
else:
if 'title' in data:
patch.title = data.get('title')
if 'content' in data:
patch.content = data.get('content')
if 'order' in data:
patch.order = data.get('order')
patch.updated_at = datetime.now(UTC)
patch.last_modified_by = get_actor()
session.commit()
return jsonify(patch.to_dict()), 200
@patch_bp.route('/<int:patch_id>', methods=['DELETE'])
@require_auth(roles=['admin', 'agent'])
@limiter.limit(api.get_rate_limit)
def delete_patch(patch_id):
"""Delete a patch card."""
with get_db() as session:
patch = session.query(MarkdownPatch).get(patch_id)
if patch is None:
return jsonify({"error": "patch not found"}), 404
session.delete(patch)
session.commit()
return jsonify({"message": "patch deleted"}), 200

View File

@@ -82,7 +82,7 @@ def get_path_by_parent(parent_id):
@path_bp.route('/', methods=['POST'])
@limiter.limit(api.get_rate_limit)
@require_auth(roles=['admin', 'creator'])
@require_auth(roles=['admin', 'creator', 'agent'])
def create_path():
"""
Create a new path.
@@ -119,7 +119,7 @@ def create_path():
@path_bp.route('/<int:path_id>', methods=['PUT'])
@limiter.limit(api.get_rate_limit)
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
def update_path(path_id):
"""
Update a path.
@@ -158,7 +158,7 @@ def update_path(path_id):
@path_bp.route('/<int:path_id>', methods=['PATCH'])
@limiter.limit(api.get_rate_limit)
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
def patch_path(path_id):
"""
Partially update a path.
@@ -205,7 +205,7 @@ def patch_path(path_id):
@path_bp.route('/<int:path_id>', methods=['DELETE'])
@limiter.limit(api.get_rate_limit)
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
def delete_path(path_id):
"""
Delete a path.
@@ -240,7 +240,7 @@ def delete_path(path_id):
@path_bp.route('/move_forward/<int:path_id>', methods=['PATCH'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
@limiter.limit(api.get_rate_limit)
def move_forward(path_id):
"""
@@ -277,7 +277,7 @@ def move_forward(path_id):
@path_bp.route('/move_backward/<int:path_id>', methods=['PATCH'])
@require_auth(roles=['admin'])
@require_auth(roles=['admin', 'agent'])
@limiter.limit(api.get_rate_limit)
def move_backward(path_id):
"""

134
apikey_cli.py Normal file
View File

@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""Admin CLI for API key management — no HTTP, no admin login.
Operates directly on the database (same env as the backend), so it must
run where the DB is reachable, e.g. inside the backend container:
docker compose exec backend python apikey_cli.py create \
--alias ci-bot --name "CI bot" --roles creator
docker compose exec backend python apikey_cli.py list
docker compose exec backend python apikey_cli.py revoke --alias ci-bot
`create` with an existing --alias renews that key (same key string,
validity reset, reactivated, name/roles updated) — matching the HTTP
POST /api/apikey behaviour.
"""
import argparse
import secrets
import string
import sys
from datetime import datetime, timedelta, UTC
from db import get_db
from db.models.APIKey import APIKey
# Keep in sync with api.apikey.ALLOWED_API_KEY_ROLES
ALLOWED_ROLES = {"admin", "creator", "user", "agent"}
KEY_TTL_DEFAULT_DAYS = 15
def _gen_key(length=32):
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def _validate_roles(roles):
bad = [r for r in roles if r not in ALLOWED_ROLES]
if bad:
sys.exit(f"error: invalid role(s) {bad}; allowed: {sorted(ALLOWED_ROLES)}")
def cmd_create(args):
alias = args.alias.strip()
if not alias:
sys.exit("error: --alias is required")
roles = args.roles or []
_validate_roles(roles)
expire = datetime.now(UTC) + timedelta(days=args.ttl_days)
with get_db() as session:
existing = session.query(APIKey).filter_by(alias=alias).first()
if existing is not None:
existing.name = args.name
existing.roles = roles
existing.is_active = True
existing.expire = expire
session.commit()
row, renewed = existing.to_dict(), True
else:
ak = APIKey(
key=_gen_key(), alias=alias, name=args.name,
roles=roles, expire=expire,
)
session.add(ak)
session.commit()
row, renewed = ak.to_dict(), False
print("renewed" if renewed else "created")
print(f" alias : {row['alias']}")
print(f" name : {row['name']}")
print(f" roles : {row['roles']}")
print(f" expire: {row['expire']}")
print(f" key : {row['key']}")
def cmd_list(args):
with get_db() as session:
keys = session.query(APIKey).order_by(APIKey.created_at).all()
rows = [k.to_dict() for k in keys]
if not rows:
print("(no API keys)")
return
for r in rows:
key = r["key"] if args.show_keys else (r["key"][:6] + "")
state = "active" if r["is_active"] else "revoked"
print(
f"{r['alias']!r:<22} {state:<8} roles={r['roles']} "
f"expire={r['expire']} last_used={r['last_used_at']} "
f"name={r['name']!r} key={key}"
)
def cmd_revoke(args):
with get_db() as session:
q = session.query(APIKey)
ak = (q.filter_by(alias=args.alias).first() if args.alias
else q.filter_by(key=args.key).first())
if ak is None:
sys.exit("error: API key not found")
ak.is_active = False
session.commit()
print(f"revoked: alias={ak.alias} name={ak.name!r}")
def main():
p = argparse.ArgumentParser(prog="apikey_cli", description=__doc__)
sub = p.add_subparsers(dest="cmd", required=True)
c = sub.add_parser("create", help="create or renew (by alias) an API key")
c.add_argument("--alias", required=True, help="unique alias; reuse to renew")
c.add_argument("--name", required=True, help="human-readable name")
c.add_argument("--roles", nargs="*", default=[],
help=f"subset of {sorted(ALLOWED_ROLES)}")
c.add_argument("--ttl-days", type=int, default=KEY_TTL_DEFAULT_DAYS,
dest="ttl_days", help="validity window in days")
c.set_defaults(func=cmd_create)
l = sub.add_parser("list", help="list all API keys")
l.add_argument("--show-keys", action="store_true",
help="print full key strings (default: masked)")
l.set_defaults(func=cmd_list)
r = sub.add_parser("revoke", help="deactivate an API key")
g = r.add_mutually_exclusive_group(required=True)
g.add_argument("--alias", help="revoke by alias")
g.add_argument("--key", help="revoke by key string")
r.set_defaults(func=cmd_revoke)
args = p.parse_args()
args.func(args)
if __name__ == "__main__":
main()

8
app.py
View File

@@ -53,8 +53,12 @@ def log_request():
logger.info(f"Request received: {request.method} {request.path} from {request.remote_addr}")
api.init_rate_limits(app)
if __name__ == '__main__':
api.init_rate_limits(app)
print("env")
pprint(env_provider.summerize())
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=True)
# The Werkzeug debugger allows remote code execution. Only enable it in
# an explicit dev environment, never in the published production image.
is_dev = env_provider.ENVIRONMENT == "dev"
app.run(host='0.0.0.0', port=5000, debug=is_dev, use_reloader=is_dev)

View File

@@ -59,12 +59,71 @@ def init_payload():
session.commit()
def _column_exists(conn, table, column):
row = conn.execute(text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_schema = :db AND table_name = :t AND column_name = :c"
), {"db": DB_NAME, "t": table, "c": column}).first()
return row is not None
def _index_exists(conn, table, index):
row = conn.execute(text(
"SELECT 1 FROM information_schema.statistics "
"WHERE table_schema = :db AND table_name = :t AND index_name = :i"
), {"db": DB_NAME, "t": table, "i": index}).first()
return row is not None
def run_migrations():
"""Idempotent additive schema migrations for already-existing tables.
create_all() creates missing tables (with the new columns) for a fresh
DB, but never alters existing ones. This adds the new columns to legacy
tables and backfills sensible defaults. Safe to run on every startup.
"""
# (table, column, DDL, backfill SQL or None)
steps = [
("apikey", "alias", "ALTER TABLE apikey ADD COLUMN alias VARCHAR(255) NULL",
"UPDATE apikey SET alias = `key` WHERE alias IS NULL"),
("markdown", "updated_at", "ALTER TABLE markdown ADD COLUMN updated_at DATETIME NULL",
"UPDATE markdown SET updated_at = created_at WHERE updated_at IS NULL"),
("markdown", "author", "ALTER TABLE markdown ADD COLUMN author VARCHAR(255) NULL",
"UPDATE markdown SET author = 'admin' WHERE author IS NULL"),
("markdown", "last_modified_by", "ALTER TABLE markdown ADD COLUMN last_modified_by VARCHAR(255) NULL",
"UPDATE markdown SET last_modified_by = 'admin' WHERE last_modified_by IS NULL"),
("markdown_patch", "author", "ALTER TABLE markdown_patch ADD COLUMN author VARCHAR(255) NULL",
"UPDATE markdown_patch SET author = 'admin' WHERE author IS NULL"),
("markdown_patch", "last_modified_by", "ALTER TABLE markdown_patch ADD COLUMN last_modified_by VARCHAR(255) NULL",
"UPDATE markdown_patch SET last_modified_by = 'admin' WHERE last_modified_by IS NULL"),
]
try:
with engine.begin() as conn:
for table, column, ddl, backfill in steps:
if not _column_exists(conn, table, column):
conn.execute(text(ddl))
if backfill:
conn.execute(text(backfill))
print(f"[ x ] migrated {table}.{column}")
# Unique constraint on apikey.alias once it is populated.
if not _index_exists(conn, "apikey", "uq_apikey_alias"):
conn.execute(text(
"ALTER TABLE apikey ADD CONSTRAINT uq_apikey_alias UNIQUE (alias)"
))
print("[ x ] migrated apikey.alias unique constraint")
except Exception as e:
# Don't block startup on a migration hiccup; surface loudly.
print(f"[ ! ] run_migrations error (continuing): {e}")
def setup_db():
if DB_SCHEMA_UPDATED:
clear_db()
print("[ x ] db cleared")
create_all()
print("[ x ] db created")
run_migrations()
print("[ x ] db migrations applied")
run_scripts()
print("[ x ] db scripts executed")
init_payload()

View File

@@ -0,0 +1,79 @@
-- ============================================================================
-- Production migration: apikey.alias (unique) + markdown/patch authorship
-- ============================================================================
-- Idempotent. Safe to run multiple times. Target: MySQL 8 (no native
-- ADD COLUMN IF NOT EXISTS, so columns are guarded via information_schema
-- + a prepared statement). Mirrors the app's db.run_migrations(); running
-- both is harmless.
--
-- Apply against the application schema, e.g.:
-- docker exec -i mysql sh -c 'mysql -uroot -p"$MYSQL_ROOT_PASSWORD" hangmanlab' \
-- < 2026-05-16_apikey_alias_authorship.sql
-- ============================================================================
SET @schema := DATABASE();
-- ---- helper macro is not available in plain SQL; repeat the guarded block --
-- apikey.alias --------------------------------------------------------------
SET @c := (SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema=@schema AND table_name='apikey' AND column_name='alias');
SET @ddl := IF(@c=0,
'ALTER TABLE apikey ADD COLUMN alias VARCHAR(255) NULL',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
-- backfill: existing keys get alias = their (unique) key string
UPDATE apikey SET alias = `key` WHERE alias IS NULL;
-- apikey unique constraint on alias -----------------------------------------
SET @i := (SELECT COUNT(*) FROM information_schema.statistics
WHERE table_schema=@schema AND table_name='apikey' AND index_name='uq_apikey_alias');
SET @ddl := IF(@i=0,
'ALTER TABLE apikey ADD CONSTRAINT uq_apikey_alias UNIQUE (alias)',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
-- markdown.updated_at -------------------------------------------------------
SET @c := (SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema=@schema AND table_name='markdown' AND column_name='updated_at');
SET @ddl := IF(@c=0,
'ALTER TABLE markdown ADD COLUMN updated_at DATETIME NULL',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
UPDATE markdown SET updated_at = created_at WHERE updated_at IS NULL;
-- markdown.author -----------------------------------------------------------
SET @c := (SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema=@schema AND table_name='markdown' AND column_name='author');
SET @ddl := IF(@c=0,
'ALTER TABLE markdown ADD COLUMN author VARCHAR(255) NULL',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
UPDATE markdown SET author = 'admin' WHERE author IS NULL;
-- markdown.last_modified_by -------------------------------------------------
SET @c := (SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema=@schema AND table_name='markdown' AND column_name='last_modified_by');
SET @ddl := IF(@c=0,
'ALTER TABLE markdown ADD COLUMN last_modified_by VARCHAR(255) NULL',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
UPDATE markdown SET last_modified_by = 'admin' WHERE last_modified_by IS NULL;
-- markdown_patch.author -----------------------------------------------------
SET @c := (SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema=@schema AND table_name='markdown_patch' AND column_name='author');
SET @ddl := IF(@c=0,
'ALTER TABLE markdown_patch ADD COLUMN author VARCHAR(255) NULL',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
UPDATE markdown_patch SET author = 'admin' WHERE author IS NULL;
-- markdown_patch.last_modified_by -------------------------------------------
SET @c := (SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema=@schema AND table_name='markdown_patch' AND column_name='last_modified_by');
SET @ddl := IF(@c=0,
'ALTER TABLE markdown_patch ADD COLUMN last_modified_by VARCHAR(255) NULL',
'DO 0');
PREPARE st FROM @ddl; EXECUTE st; DEALLOCATE PREPARE st;
UPDATE markdown_patch SET last_modified_by = 'admin' WHERE last_modified_by IS NULL;

View File

@@ -6,6 +6,9 @@ class APIKey(Base):
__tablename__ = 'apikey'
key = Column(String(64), primary_key=True)
# Stable human identity of the key. Unique; creating with an existing
# alias is treated as a renewal of that key (see api/apikey).
alias = Column(String(255), nullable=False, unique=True)
name = Column(String(255), nullable=False)
created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC))
last_used_at = Column(DateTime)
@@ -16,6 +19,7 @@ class APIKey(Base):
def to_dict(self):
return {
"key": self.key,
"alias": self.alias,
"name": self.name,
"created_at": self.created_at.isoformat() if self.created_at else None,
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,

View File

@@ -9,7 +9,15 @@ class Markdown(Base):
id = Column(Integer, primary_key=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.datetime.now(datetime.UTC))
created_at = Column(DateTime, default=lambda: datetime.datetime.now(datetime.UTC))
updated_at = Column(
DateTime,
default=lambda: datetime.datetime.now(datetime.UTC),
onupdate=lambda: datetime.datetime.now(datetime.UTC),
)
# Actor strings: alias of the API key, or 'admin' for KC-logged-in UI.
author = Column(String(255), nullable=True)
last_modified_by = Column(String(255), nullable=True)
path_id = Column(Integer, ForeignKey('path.id'), nullable=False)
order = Column(String(36), default=lambda: str(uuid.uuid4()))
shortcut = Column(String(36), default="")
@@ -21,6 +29,9 @@ class Markdown(Base):
'title': self.title,
'content': self.content,
'created_at': self.created_at,
'updated_at': self.updated_at,
'author': self.author,
'last_modified_by': self.last_modified_by,
'path_id': self.path_id,
'order': self.order,
'shortcut': self.shortcut,

View File

@@ -0,0 +1,42 @@
from sqlalchemy import Column, Text, Integer, String, DateTime, ForeignKey
from db.models import Base
import datetime
class MarkdownPatch(Base):
"""A markdown 'patch card' attached to a parent markdown document.
Its content is plain markdown text rendered below the parent's body.
Created/updated by admin|creator, deleted by admin; visibility is
inherited from the parent markdown (enforced in the API layer).
"""
__tablename__ = 'markdown_patch'
id = Column(Integer, primary_key=True)
markdown_id = Column(
Integer, ForeignKey('markdown.id'), nullable=False, index=True
)
title = Column(String(255), nullable=True)
content = Column(Text, nullable=False)
# Actor strings: alias of the API key, or 'admin' for KC-logged-in UI.
author = Column(String(255), nullable=True)
last_modified_by = Column(String(255), nullable=True)
order = Column(Integer, default=0)
created_at = Column(DateTime, default=lambda: datetime.datetime.now(datetime.UTC))
updated_at = Column(
DateTime,
default=lambda: datetime.datetime.now(datetime.UTC),
onupdate=lambda: datetime.datetime.now(datetime.UTC),
)
def to_dict(self):
return {
'id': self.id,
'markdown_id': self.markdown_id,
'title': self.title,
'content': self.content,
'author': self.author,
'last_modified_by': self.last_modified_by,
'order': self.order,
'created_at': self.created_at,
'updated_at': self.updated_at,
}

View File

@@ -23,6 +23,12 @@ KC_CLIENT_ID = os.getenv("KC_CLIENT_ID")
FRONTEND_HOST = os.getenv("FRONTEND_HOST")
BACKEND_HOST = os.getenv("BACKEND_HOST")
def _redact(value):
if not value:
return "<unset>"
return f"<set:{len(str(value))} chars>"
def summerize():
return {
"ENVIRONMENT": ENVIRONMENT,
@@ -30,9 +36,9 @@ def summerize():
'DB_PORT': DB_PORT,
'DB_NAME': DB_NAME,
'DB_USER': DB_USER,
'DB_PASSWORD': DB_PASSWORD,
'DB_PASSWORD': _redact(DB_PASSWORD),
'DB_SCHEMA_UPDATED': DB_SCHEMA_UPDATED,
'SESSION_SECRET_KEY': SESSION_SECRET_KEY,
'SESSION_SECRET_KEY': _redact(SESSION_SECRET_KEY),
'KC_HOST': KC_HOST,
'KC_REALM': KC_REALM,
'KC_CLIENT_ID': KC_CLIENT_ID,

View File

@@ -7,10 +7,55 @@ from events import MARKDOWN_CREATED_EVENT, MARKDOWN_UPDATED_EVENT, MARKDOWN_DELE
PATH_UPDATED_EVENT, PATH_DELETED_EVENT
import abc
import importlib
import ipaddress
import json
import pkgutil
import socket
import requests
import db
from urllib.parse import urlsplit
import logging
logger = logging.getLogger(__name__)
def is_safe_webhook_url(url):
"""
Reject webhook targets that could be used for SSRF: only http/https,
and the resolved host must not be loopback / private / link-local /
reserved. Defends internal services and cloud metadata endpoints
(e.g. 169.254.169.254) even when the stored URL came from a backup.
"""
try:
parts = urlsplit(url)
except Exception:
return False
if parts.scheme not in ("http", "https") or not parts.hostname:
return False
try:
infos = socket.getaddrinfo(parts.hostname, None)
except socket.gaierror:
return False
for info in infos:
ip = ipaddress.ip_address(info[4][0])
if (ip.is_private or ip.is_loopback or ip.is_link_local
or ip.is_reserved or ip.is_multicast or ip.is_unspecified):
return False
return True
def parse_additional_headers(raw):
"""Best-effort parse of the stored additional_header JSON object."""
if not raw:
return {}
try:
parsed = json.loads(raw)
except (ValueError, TypeError):
logger.warning("webhook additional_header is not valid JSON; ignoring")
return {}
if not isinstance(parsed, dict):
return {}
return {str(k): str(v) for k, v in parsed.items()}
event_type_map = {
@@ -36,15 +81,21 @@ class WebhookEventHandler(abc.ABC):
setting = self.get_setting(session, path_id)
if setting is None:
return
webhook_url = setting["webhook_url"]
if not is_safe_webhook_url(webhook_url):
logger.warning("blocked webhook to unsafe URL: %s", webhook_url)
return
headers = {'Content-Type': 'application/json', 'x-alchegos-event': event_type_map[self.event_type]}
if setting.get("additional_header", None) is not None:
headers.update(json.loads(setting["additional_header"]))
headers.update(parse_additional_headers(setting.get("additional_header")))
body = json.dumps(payload, default=str)
try:
response = requests.post(setting["webhook_url"], data=body, headers=headers, timeout=5)
response = requests.post(
webhook_url, data=body, headers=headers,
timeout=5, allow_redirects=False,
)
response.raise_for_status()
except Exception as e:
print(e)
logger.warning("webhook delivery failed: %s", e)
def get_setting(self, session: Session, path_id):
@@ -59,6 +110,10 @@ class WebhookEventHandler(abc.ABC):
if webhook_setting is None and p["parent_id"] != 1:
return self.get_setting(session, p["parent_id"])
# Check if webhook_setting is still None (e.g., when parent_id == 1 or no parent found)
if webhook_setting is None:
return None
setting = webhook_setting.to_dict()
if not setting["enabled"] or setting["on_events"] & self.event_type == 0: