from flask import Blueprint, request, jsonify from sqlalchemy import or_ from api import limiter from api import require_auth, etag_response from contexts.RequestContext import RequestContext from db import get_db from db.models.Markdown import Markdown from events import markdown_created, markdown_updated, markdown_deleted import api import logging logger = logging.getLogger(__name__) markdown_bp = Blueprint('markdown', __name__, url_prefix='/api/markdown') @markdown_bp.route('/', methods=['GET']) @limiter.limit(api.get_rate_limit) @etag_response def get_markdowns(): with get_db() as session: mds = session.query(Markdown).all() return jsonify([md.to_dict() for md in mds]), 200 @markdown_bp.route('/get_home', methods=['GET']) @limiter.limit(api.get_rate_limit) @etag_response def get_home(): with get_db() as session: markdown = session.query(Markdown).filter(Markdown.path_id == 1, Markdown.title == "index").first() if markdown is None: return jsonify({}), 204 return jsonify(markdown.to_dict()), 200 @markdown_bp.route('/by_path/', methods=['GET']) @limiter.limit(api.get_rate_limit) @etag_response def get_markdowns_by_path(path_id): with get_db() as session: markdowns = session.query(Markdown).filter(Markdown.path_id == path_id).all() return jsonify([md.to_dict() for md in markdowns]), 200 @markdown_bp.route('/get_index/', methods=['GET']) @limiter.limit(api.get_rate_limit) @etag_response def get_index(path_id): with get_db() as session: markdown = session.query(Markdown).filter(Markdown.path_id == path_id).filter(Markdown.title == "index").first() if markdown is None: return jsonify({}), 204 return jsonify(markdown.to_dict()), 200 @markdown_bp.route('/', methods=['GET']) @limiter.limit(api.get_rate_limit) @etag_response def get_markdown(markdown_id): with get_db() as session: markdown = session.query(Markdown).get(markdown_id) if markdown is None: return jsonify({"error": "file not found"}), 404 return jsonify(markdown.to_dict()), 200 @markdown_bp.route('/', methods=['POST']) @require_auth(roles=['admin', 'creator']) @limiter.limit(api.get_rate_limit) def create_markdown(): data = request.json title = data.get('title') content = data.get('content') path_id = data.get('path_id') shortcut = data.get('shortcut', "") if not title or not content: return jsonify({"error": "missing required fields"}), 400 new_markdown = Markdown(title=title, content=content, path_id=path_id, shortcut=shortcut) with get_db() as session: try: if shortcut != "": r = session.query(Markdown).filter(Markdown.shortcut == shortcut).all() if len(r) > 0: return jsonify({"error": "duplicate shortcut"}), 400 session.add(new_markdown) session.commit() markdown_created.send(None, payload=new_markdown.to_dict()) return jsonify(new_markdown.to_dict()), 201 except Exception as e: logger.error(f"failed to create markdown: {e}") errno = RequestContext.get_error_id() session.rollback() return jsonify({"error": f"create failed - {errno}"}), 500 @markdown_bp.route('/', methods=['PUT', 'PATCH']) @require_auth(roles=['admin', 'creator']) @limiter.limit(api.get_rate_limit) def update_markdown(markdown_id): with get_db() as session: markdown = session.query(Markdown).get(markdown_id) if markdown is None: return jsonify({"error": "file not found"}), 404 data = request.json if data.get('shortcut', "") != "": r = session.query(Markdown).filter( Markdown.shortcut == data.get('shortcut') ).filter( Markdown.id != markdown_id ).all() if len(r) > 0: return jsonify({"error": "duplicate shortcut"}), 400 if request.method == "PUT": markdown.title = data.get('title') markdown.content = data.get('content') markdown.path_id = data.get('path_id') markdown.shortcut = data.get('shortcut', '') elif request.method == "PATCH": if 'title' in data: markdown.title = data.get('title') if 'content' in data: markdown.content = data.get('content') if 'path_id' in data: markdown.path_id = data.get('path_id') if 'shortcut' in data: markdown.shortcut = data.get('shortcut') session.commit() markdown_updated.send(None, payload=markdown.to_dict()) return jsonify(markdown.to_dict()), 200 @markdown_bp.route('/', methods=['DELETE']) @require_auth(roles=['admin']) @limiter.limit(api.get_rate_limit) def delete_markdown(markdown_id): with get_db() as session: markdown = session.query(Markdown).get(markdown_id) if markdown is None: logger.error(f"failed to delete markdown: {markdown_id}") errno = RequestContext.get_error_id() return jsonify({"error": f"file not found - {errno}"}), 404 md = markdown.to_dict() session.delete(markdown) session.commit() markdown_deleted.send(None, payload=md) return jsonify({"message": "deleted"}), 200 @markdown_bp.route('/move_forward/', methods=['PATCH']) @require_auth(roles=['admin']) @limiter.limit(api.get_rate_limit) def move_forward(markdown_id): with get_db() as session: markdown = session.query(Markdown).get(markdown_id) if not markdown: return jsonify({"error": "file not found"}), 404 siblings = session.query(Markdown).filter(Markdown.path_id == markdown.path_id).order_by(Markdown.order).all() current_index = siblings.index(markdown) if current_index == 0: return jsonify({"error": "already at the first position"}), 400 previous_markdown = siblings[current_index - 1] markdown.order, previous_markdown.order = previous_markdown.order, markdown.order session.commit() markdown_updated.send(None, payload=markdown.to_dict()) return jsonify(markdown.to_dict()), 200 @markdown_bp.route('/move_backward/', methods=['PATCH']) @require_auth(roles=['admin']) @limiter.limit(api.get_rate_limit) def move_backward(markdown_id): with get_db() as session: markdown = session.query(Markdown).get(markdown_id) if not markdown: return jsonify({"error": "file not found"}), 404 siblings = session.query(Markdown).filter(Markdown.path_id == markdown.path_id).order_by(Markdown.order).all() current_index = siblings.index(markdown) if current_index == len(siblings) - 1: return jsonify({"error": "already at the last position"}), 400 next_markdown = siblings[current_index + 1] markdown.order, next_markdown.order = next_markdown.order, markdown.order session.commit() markdown_updated.send(None, payload=markdown.to_dict()) return jsonify(markdown.to_dict()), 200 @markdown_bp.route('/search/', methods=['GET']) @limiter.limit(api.get_rate_limit) def search_markdowns(keyword): with get_db() as session: res = session.query(Markdown).filter( or_(Markdown.title.like(keyword), Markdown.content.like(keyword)) ).all() return jsonify([md.to_dict() for md in res]), 200 @markdown_bp.route('/links', methods=['GET']) @limiter.limit(api.get_rate_limit) def get_links(): with get_db() as session: mds = [md.to_dict() for md in session.query(Markdown).filter(Markdown.shortcut != "").all()] links = [f"[{md['shortcut']}]: {md['id']}" for md in mds] return jsonify(links), 200