Files
HangmanLab.Backend/api/markdown.py
hzhang 58f23ddcb8 Security hardening: fix RCE, auth and SSRF issues
Critical:
- backup: prevent Zip Slip path traversal and zip bombs in restore/convert
  via safe_extract(); serialize get_backup() with backup_lock and always
  restore CWD so concurrent requests can't corrupt the os.chdir state
- app: only enable the Werkzeug debugger/reloader when ENVIRONMENT=dev;
  always init rate limits (also under WSGI), not just under __main__
- apikey: fix create_key never committing (session.commit -> commit()),
  validate roles against an allowlist, and fix revoke_key/update_last_used
  operating on detached instances so revocation actually persists
- env_provider: redact DB_PASSWORD and SESSION_SECRET_KEY in summerize()

High:
- markdown: filter private/protected docs for non-admins in the listing,
  get_home, get_index and search endpoints (was an anonymous data leak);
  escape LIKE metacharacters and cap search results
- webhooks: validate target URL to block SSRF (loopback/private/link-local/
  metadata IPs), disable redirects, safely parse additional_header
- auth: validate JWT issuer and require exp/iat; add timeout to JWKS fetch;
  harden Authorization header parsing against malformed values
- log: require admin for GET /api/log and auth for POST; bound entry size

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:12:43 +01:00

509 lines
19 KiB
Python

from flask import Blueprint, request, jsonify
from sqlalchemy import or_
from api import limiter
from api import require_auth, etag_response, verify_token, is_user_admin
from contexts.RequestContext import RequestContext
from db import get_db
from db.models.Markdown import Markdown
from db.models.MarkdownSetting import MarkdownSetting
from db.models.MarkdownPermissionSetting import MarkdownPermissionSetting
from events import markdown_created, markdown_updated, markdown_deleted
import api
import env_provider
import logging
logger = logging.getLogger(__name__)
markdown_bp = Blueprint('markdown', __name__, url_prefix='/api/markdown')
def _permission_of(session, markdown):
"""Return the effective permission string ('private'/'protected'/None)."""
if markdown.setting_id is None:
return None
setting = session.query(MarkdownSetting).get(markdown.setting_id)
if not setting or not setting.permission_setting_id:
return None
ps = session.query(MarkdownPermissionSetting).get(setting.permission_setting_id)
return ps.permission if ps else None
def _filter_visible(session, markdowns, is_admin):
"""
Apply markdown permission settings to a list for non-admin callers:
private documents are dropped entirely, protected documents have their
content masked. Admins see everything. Mirrors get_markdown()'s checks
so listing/search endpoints can no longer leak restricted content.
"""
if is_admin:
return [md.to_dict() for md in markdowns]
visible = []
for md in markdowns:
permission = _permission_of(session, md)
if permission == 'private':
continue
data = md.to_dict()
if permission == 'protected':
data['content'] = None
data['protected'] = True
visible.append(data)
return visible
@markdown_bp.route('/', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@etag_response
def get_markdowns():
"""
Get all markdown documents.
This endpoint retrieves a list of all markdown documents in the system.
Returns:
A JSON array containing all markdown documents.
Response Codes:
- 200: Success
"""
is_admin = is_user_admin()
with get_db() as session:
mds = session.query(Markdown).all()
return jsonify(_filter_visible(session, mds, is_admin)), 200
@markdown_bp.route('/get_home', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@etag_response
def get_home():
"""
Get the home markdown document.
This endpoint retrieves the index markdown document from the root path (path_id=1).
This is typically the main landing page content.
Returns:
A JSON object containing the home markdown document.
Response Codes:
- 200: Success
- 204: No content (home markdown not found)
"""
is_admin = is_user_admin()
with get_db() as session:
markdown = session.query(Markdown).filter(Markdown.path_id == 1, Markdown.title == "index").first()
if markdown is None:
return jsonify({}), 204
visible = _filter_visible(session, [markdown], is_admin)
if not visible:
return jsonify({}), 204
return jsonify(visible[0]), 200
@markdown_bp.route('/by_path/<int:path_id>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@etag_response
def get_markdowns_by_path(path_id):
"""
Get all markdown documents in a specific path.
This endpoint retrieves a list of all markdown documents that belong to the specified path.
Request:
- path_id (int): The ID of the path to get markdowns from
Returns:
A JSON array containing all markdown documents in the specified path.
Response Codes:
- 200: Success
"""
is_admin = is_user_admin()
with get_db() as session:
markdowns = session.query(Markdown).filter(Markdown.path_id == path_id).all()
return jsonify(_filter_visible(session, markdowns, is_admin)), 200
@markdown_bp.route('/get_index/<int:path_id>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@etag_response
def get_index(path_id):
"""
Get the index markdown document for a specific path.
This endpoint retrieves the index markdown document from the specified path.
The index document typically serves as the main content or landing page for a path.
Request:
- path_id (int): The ID of the path to get the index markdown from
Returns:
A JSON object containing the index markdown document.
Response Codes:
- 200: Success
- 204: No content (index markdown not found)
"""
is_admin = is_user_admin()
with get_db() as session:
markdown = session.query(Markdown).filter(Markdown.path_id == path_id).filter(Markdown.title == "index").first()
if markdown is None:
return jsonify({}), 204
visible = _filter_visible(session, [markdown], is_admin)
if not visible:
return jsonify({}), 204
return jsonify(visible[0]), 200
@markdown_bp.route('/<int:markdown_id>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
@etag_response
def get_markdown(markdown_id):
"""
Get a specific markdown document by ID.
This endpoint retrieves a markdown document by its ID. It includes permission checks
based on the user's role and the markdown's permission settings.
Request:
- markdown_id (int): The ID of the markdown document to retrieve
Returns:
A JSON object containing the markdown document.
Response Codes:
- 200: Success
- 203: Permission denied (for protected markdowns when user is not an admin)
- 403: Permission denied (for private markdowns when user is not an admin)
- 404: Markdown not found
"""
is_admin = is_user_admin()
with get_db() as session:
markdown = session.query(Markdown).get(markdown_id)
if markdown is None:
return jsonify({"error": "file not found"}), 404
if not is_admin and markdown.setting_id is not None:
setting = session.query(MarkdownSetting).get(markdown.setting_id)
if setting and setting.permission_setting_id:
permission_setting = session.query(MarkdownPermissionSetting).get(setting.permission_setting_id)
if permission_setting:
if permission_setting.permission == 'private':
return jsonify({"msg": "permission denied"}), 403
elif permission_setting.permission == 'protected':
return jsonify({"msg": "permission denied"}), 203
return jsonify(markdown.to_dict()), 200
@markdown_bp.route('/', methods=['POST'])
@require_auth(roles=['admin', 'creator'])
@limiter.limit(api.get_rate_limit)
def create_markdown():
"""
Create a new markdown document.
This endpoint creates a new markdown document with the provided data.
It requires authentication with either 'admin' or 'creator' role.
Request:
- title (str): The title of the markdown document
- content (str): The content of the markdown document
- path_id (int): The ID of the path where the markdown will be created
- shortcut (str, optional): A shortcut identifier for the markdown
- setting_id (int, optional): The ID of the markdown settings
Returns:
A JSON object containing the created markdown document.
Response Codes:
- 201: Created successfully
- 400: Bad request (missing required fields or duplicate shortcut)
- 500: Server error
"""
data = request.json
title = data.get('title')
content = data.get('content')
path_id = data.get('path_id')
shortcut = data.get('shortcut', "")
setting_id = data.get('setting_id', None)
if not title or not content:
return jsonify({"error": "missing required fields"}), 400
new_markdown = Markdown(title=title, content=content, path_id=path_id, shortcut=shortcut, setting_id=setting_id)
with get_db() as session:
try:
if shortcut != "":
r = session.query(Markdown).filter(Markdown.shortcut == shortcut).all()
if len(r) > 0:
return jsonify({"error": "duplicate shortcut"}), 400
session.add(new_markdown)
session.commit()
markdown_created.send(None, payload=new_markdown.to_dict())
return jsonify(new_markdown.to_dict()), 201
except Exception as e:
logger.error(f"failed to create markdown: {e}")
errno = RequestContext.get_error_id()
session.rollback()
return jsonify({"error": f"create failed - {errno}"}), 500
@markdown_bp.route('/<int:markdown_id>', methods=['PUT', 'PATCH'])
@require_auth(roles=['admin', 'creator'])
@limiter.limit(api.get_rate_limit)
def update_markdown(markdown_id):
"""
Update a markdown document.
This endpoint updates an existing markdown document with the provided data.
It requires authentication with either 'admin' or 'creator' role.
- PUT: Replaces the entire markdown document
- PATCH: Updates only the specified fields
Request:
- markdown_id (int): The ID of the markdown document to update
- title (str, optional for PATCH): The new title for the markdown
- content (str, optional for PATCH): The new content for the markdown
- path_id (int, optional for PATCH): The new path ID for the markdown
- shortcut (str, optional): A shortcut identifier for the markdown
- setting_id (int, optional): The ID of the markdown settings
Returns:
A JSON object containing the updated markdown document.
Response Codes:
- 200: Updated successfully
- 400: Bad request (duplicate shortcut)
- 404: Markdown not found
"""
with get_db() as session:
markdown = session.query(Markdown).get(markdown_id)
if markdown is None:
return jsonify({"error": "file not found"}), 404
data = request.json
if data.get('shortcut', "") != "":
r = session.query(Markdown).filter(
Markdown.shortcut == data.get('shortcut')
).filter(
Markdown.id != markdown_id
).all()
if len(r) > 0:
return jsonify({"error": "duplicate shortcut"}), 400
if request.method == "PUT":
markdown.title = data.get('title')
markdown.content = data.get('content')
markdown.path_id = data.get('path_id')
markdown.shortcut = data.get('shortcut', '')
markdown.setting_id = data.get('setting_id', None)
elif request.method == "PATCH":
if 'title' in data:
markdown.title = data.get('title')
if 'content' in data:
markdown.content = data.get('content')
if 'path_id' in data:
markdown.path_id = data.get('path_id')
if 'shortcut' in data:
markdown.shortcut = data.get('shortcut')
if 'setting_id' in data:
markdown.setting_id = data.get('setting_id')
session.commit()
markdown_updated.send(None, payload=markdown.to_dict())
return jsonify(markdown.to_dict()), 200
@markdown_bp.route('/<int:markdown_id>', methods=['DELETE'])
@require_auth(roles=['admin'])
@limiter.limit(api.get_rate_limit)
def delete_markdown(markdown_id):
"""
Delete a markdown document.
This endpoint deletes an existing markdown document and cascades the deletion
to related settings to avoid foreign key conflicts.
It requires authentication with the 'admin' role.
Request:
- markdown_id (int): The ID of the markdown document to delete
Returns:
A JSON object containing the deleted markdown document.
Response Codes:
- 200: Deleted successfully
- 404: Markdown not found
- 500: Server error during cascade deletion
"""
with get_db() as session:
try:
markdown = session.get(Markdown, markdown_id)
if markdown is None:
logger.error(f"failed to delete markdown: {markdown_id}")
errno = RequestContext.get_error_id()
return jsonify({"error": f"file not found - {errno}"}), 404
md = markdown.to_dict()
if markdown.setting_id:
markdown_setting = session.query(MarkdownSetting).get(markdown.setting_id)
if markdown_setting:
template_setting_id = markdown_setting.template_setting_id
permission_setting_id = markdown_setting.permission_setting_id
markdown_setting.template_setting_id = None
markdown_setting.permission_setting_id = None
session.flush()
if template_setting_id:
from db.models.MarkdownTemplateSetting import MarkdownTemplateSetting
template_setting = session.query(MarkdownTemplateSetting).get(template_setting_id)
if template_setting:
session.delete(template_setting)
if permission_setting_id:
permission_setting = session.query(MarkdownPermissionSetting).get(permission_setting_id)
if permission_setting:
session.delete(permission_setting)
session.delete(markdown_setting)
# Send webhook event before committing the transaction
# This ensures webhook handlers can still access related data
markdown_deleted.send(None, payload=md)
session.delete(markdown)
session.commit()
logger.info(f"Successfully deleted markdown {markdown_id} with cascade deletion")
return jsonify(md), 200
except Exception as e:
import traceback
logger.error(f"Failed to delete markdown {markdown_id}: {e}")
logger.error(f"Exception type: {type(e).__name__}")
logger.error(f"Full traceback:\n{traceback.format_exc()}")
errno = RequestContext.get_error_id()
session.rollback()
return jsonify({"error": f"delete failed - {errno}"}), 500
@markdown_bp.route('/move_forward/<int:markdown_id>', methods=['PATCH'])
@require_auth(roles=['admin'])
@limiter.limit(api.get_rate_limit)
def move_forward(markdown_id):
"""
Move a markdown document forward in display order.
This endpoint moves a markdown document one position forward in the display order by swapping its order value
with the previous markdown in the same path. This affects how markdowns are displayed in the UI.
It requires authentication with the 'admin' role.
Request:
- markdown_id (int): The ID of the markdown document to move forward
Returns:
A JSON object containing the updated markdown document.
Response Codes:
- 200: Moved successfully
- 400: Bad request (already at the first position)
- 404: Markdown not found
"""
with get_db() as session:
markdown = session.query(Markdown).get(markdown_id)
if not markdown:
return jsonify({"error": "file not found"}), 404
siblings = session.query(Markdown).filter(Markdown.path_id == markdown.path_id).order_by(Markdown.order).all()
current_index = siblings.index(markdown)
if current_index == 0:
return jsonify({"error": "already at the first position"}), 400
previous_markdown = siblings[current_index - 1]
markdown.order, previous_markdown.order = previous_markdown.order, markdown.order
session.commit()
markdown_updated.send(None, payload=markdown.to_dict())
return jsonify(markdown.to_dict()), 200
@markdown_bp.route('/move_backward/<int:markdown_id>', methods=['PATCH'])
@require_auth(roles=['admin'])
@limiter.limit(api.get_rate_limit)
def move_backward(markdown_id):
"""
Move a markdown document backward in display order.
This endpoint moves a markdown document one position backward in the display order by swapping its order value
with the next markdown in the same path. This affects how markdowns are displayed in the UI.
It requires authentication with the 'admin' role.
Request:
- markdown_id (int): The ID of the markdown document to move backward
Returns:
A JSON object containing the updated markdown document.
Response Codes:
- 200: Moved successfully
- 400: Bad request (already at the last position)
- 404: Markdown not found
"""
with get_db() as session:
markdown = session.get(Markdown, markdown_id)
if not markdown:
return jsonify({"error": "file not found"}), 404
siblings = session.query(Markdown).filter(Markdown.path_id == markdown.path_id).order_by(Markdown.order).all()
current_index = siblings.index(markdown)
if current_index == len(siblings) - 1:
return jsonify({"error": "already at the last position"}), 400
next_markdown = siblings[current_index + 1]
markdown.order, next_markdown.order = next_markdown.order, markdown.order
session.commit()
markdown_updated.send(None, payload=markdown.to_dict())
return jsonify(markdown.to_dict()), 200
@markdown_bp.route('/search/<string:keyword>', methods=['GET'])
@limiter.limit(api.get_rate_limit)
def search_markdowns(keyword):
"""
Search for markdown documents.
This endpoint searches for markdown documents that contain the specified keyword
in either their title or content.
Request:
- keyword (str): The search term to look for in markdown titles and content
Returns:
A JSON array containing all markdown documents that match the search criteria.
Response Codes:
- 200: Success
"""
is_admin = is_user_admin()
# Escape LIKE metacharacters so user input can't inject wildcards
# (full-table-scan DoS) and so search actually matches substrings.
escaped = keyword.replace('\\', '\\\\').replace('%', '\\%').replace('_', '\\_')
pattern = f"%{escaped}%"
with get_db() as session:
res = session.query(Markdown).filter(
or_(
Markdown.title.like(pattern, escape='\\'),
Markdown.content.like(pattern, escape='\\'),
)
).limit(200).all()
return jsonify(_filter_visible(session, res, is_admin)), 200
@markdown_bp.route('/links', methods=['GET'])
@limiter.limit(api.get_rate_limit)
def get_links():
"""
Get all markdown shortcut links.
This endpoint retrieves a list of all markdown documents that have a shortcut defined,
formatted as links in the format "[shortcut]: id".
Returns:
A JSON array containing formatted links for all markdown documents with shortcuts.
Response Codes:
- 200: Success
"""
with get_db() as session:
mds = [md.to_dict() for md in session.query(Markdown).filter(Markdown.shortcut != "").all()]
links = [f"[{md['shortcut']}]: {md['id']}" for md in mds]
return jsonify(links), 200