import shutil from datetime import datetime import tempfile import zipfile from flask import Blueprint, send_file, jsonify, request import os from api import require_auth from db import get_db from db.models.Markdown import Markdown from db.models.Path import Path import threading import logging logger = logging.getLogger(__name__) backup_bp = Blueprint('backup', __name__, url_prefix='/api/backup') backup_lock = threading.Lock() @backup_bp.route('/', methods=['GET']) @require_auth(roles=['admin']) def get_backup(): try: paths = {} with get_db() as session: pths = session.query(Path).all() paths = {p.id : p for p in pths} traverse(1, paths) timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') archive = shutil.make_archive(base_name=timestamp, format='zip', root_dir='Root') shutil.rmtree('Root') return send_file( archive, as_attachment=True, download_name=timestamp + '.zip', ) except Exception as e: logger.error(f"Failed to get backup: {e}") return jsonify({"error": "failed to get backup"}), 500 def create_and_cd(path_name): if not os.path.exists(path_name) or not os.path.isdir(path_name): os.makedirs(path_name) os.chdir(path_name) def cd_back(): os.chdir('..') def traverse(path_id, paths): current_path = paths[path_id] if path_id == 1: create_and_cd("Root") else: create_and_cd(current_path.name) with open(".meta", "w") as meta_file: meta_file.write(f"order: {current_path.order}\n") with get_db() as session: mds = session.query(Markdown).filter(Markdown.path_id == path_id).all() for md in mds: with open(f"{md.title}.md", "w") as md_file: md_file.write(md.content) with open(f"{md.title}.mdmeta", "w") as meta_file: meta_file.write(f"created_at: {md.created_at}\n") meta_file.write(f"order: {md.order}\n") meta_file.write(f"shortcut: {md.shortcut}\n") children = [c for c in paths.values() if c.parent_id == path_id] for child in children: traverse(child.id, paths) cd_back() @backup_bp.route('/load', methods=['POST']) @require_auth(roles=['admin']) def load_backup(): if not backup_lock.acquire(blocking=False): return jsonify({"error": "Another backup restore is in progress. Please try again later."}), 429 try: if 'file' not in request.files: return jsonify({"error": "No file provided"}), 400 uploaded_file = request.files['file'] temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, "backup.zip") uploaded_file.save(zip_path) with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) root_dir = temp_dir if not os.path.exists(root_dir): return jsonify({"error": "Invalid backup format"}), 400 with get_db() as session: path_mapping = {} restore_tree(root_dir, None, session, path_mapping) session.commit() shutil.rmtree(temp_dir) return jsonify({"success": True, "message": "Backup restored and merged successfully"}) except Exception as e: logger.error(f"Failed to load backup: {e}") return jsonify({"error": f"Failed to load backup {e}"}), 500 finally: backup_lock.release() def restore_tree(dir_path, parent_id, session, path_mapping): dir_name = os.path.basename(dir_path) existing_path = session.query(Path).filter_by(parent_id=parent_id, name=dir_name).first() if parent_id is None: new_path_id = 1 elif existing_path: new_path_id = existing_path.id else: order = '' meta_file_path = os.path.join(dir_path, ".meta") if os.path.exists(meta_file_path): with open(meta_file_path, "r") as meta_file: for line in meta_file: key, value = line.strip().split(": ", 1) if key == "order": order = value new_path = Path(name=dir_name, parent_id=parent_id, order=order) session.add(new_path) session.flush() new_path_id = new_path.id path_mapping[dir_path] = new_path_id for file in os.listdir(dir_path): file_path = os.path.join(dir_path, file) if file.endswith(".md"): md_title = file[:-3] mdmeta_path = file_path + "meta" created_at = datetime.now() order = '' shortcut = "" if os.path.exists(mdmeta_path): with open(mdmeta_path, "r") as meta_file: for line in meta_file: key, value = line.strip().split(": ", 1) if key == "created_at": created_at = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") elif key == "order": order = value elif key == "shortcut": shortcut = value with open(file_path, "r", encoding="utf-8") as md_file: content = md_file.read() unique_title = get_unique_markdown_title(session, md_title, new_path_id) new_md = Markdown(title=unique_title, content=content, path_id=new_path_id, created_at=created_at, order=order, shortcut=shortcut) session.add(new_md) for item in os.listdir(dir_path): item_path = os.path.join(dir_path, item) if os.path.isdir(item_path): restore_tree(item_path, new_path_id, session, path_mapping) def get_unique_markdown_title(session, title, path_id): existing_titles = {md.title for md in session.query(Markdown.title).filter_by(path_id=path_id).all()} unique_title = title while unique_title in existing_titles: unique_title += ".bp" return unique_title