Files
HangmanLab.Backend/api/backup.py

864 lines
31 KiB
Python

"""
Backup and Restore System
=========================
This module provides functionality for creating backups of the application's data and restoring from those backups.
Backup Structure
---------------
The backup is a zip file containing the following structure:
- Root/
- tree/ # Contains the entire path hierarchy and markdown files
- .json.meta # Metadata for the root path
- [markdown].json # Markdown files directly under root
- [folder]/ # Subfolders representing paths
- .json.meta # Metadata for this path
- [markdown].json # Markdown files in this path
- template/ # Contains markdown templates
- [template].json # One JSON file per template, using title as filename
- webhook.json # Contains all webhooks and their settings
Data Format
-----------
1. Markdown files (.json):
{
"title": "...", # Title of the markdown
"content": "...", # Content of the markdown
"created_at": "...", # Creation timestamp
"order": "...", # Order value for sorting
"shortcut": "...", # Shortcut value if any
"backup_id": ..., # Reference ID for the backup
"setting": { # Optional settings object
"permission_setting": "...", # Permission value (private, protected, etc.)
"template_setting": { # Template settings if any
"template_id": ..., # Template ID reference
"template_ref": { # Reference to the template
"title": "..." # Title of the template for lookup
}
}
}
}
2. Path metadata (.json.meta):
{
"name": "...", # Name of the path
"order": "...", # Order value for sorting
"backup_id": ..., # Reference ID for the backup
"webhook_setting": { # Optional webhook settings
"recursive": true/false, # Whether webhook applies to subpaths
"additional_header": "...", # Additional headers for webhook
"enabled": true/false, # Whether webhook is enabled
"on_events": ... # Event types that trigger the webhook
},
"webhook_ref": { # Reference to the webhook
"hook_url": "..." # URL of the webhook for lookup
},
"template_setting": { # Template settings if any
"template_id": ... # Template ID reference
},
"template_ref": { # Reference to the template
"title": "..." # Title of the template for lookup
}
}
3. Template files (template/[name].json):
{
"title": "...", # Title of the template
"parameters": {...}, # Parameters for the template
"layout": "..." # Layout content of the template
}
4. Webhook file (webhook.json):
[
{
"backup_id": ..., # Reference ID for the backup
"hook_url": "...", # URL of the webhook
"settings": [ # Array of settings for this webhook
{
"recursive": true/false, # Whether webhook applies to subpaths
"additional_header": "...", # Additional headers for webhook
"enabled": true/false, # Whether webhook is enabled
"on_events": ... # Event types that trigger the webhook
}
]
}
]
How to Add New Information to Backup
-----------------------------------
To add new information to the backup system, follow these steps:
1. Adding a new field to an existing entity:
- For Path entities: Add the field to the path metadata in the traverse() function
- For Markdown entities: Add the field to the md_data dictionary in the traverse() function
- For Templates: Add the field to the template_dict in export_templates()
- For Webhooks: Add the field to the webhook_entry in export_webhooks()
2. Adding a new entity type:
- Create a new export_[entity]() function similar to export_webhooks() or export_templates()
- Call this function from get_backup()
- Create a corresponding import_[entity]() function for restoration
- Call this function from load_backup()
3. Example: Adding a new "tags" field to markdown:
In the traverse() function, modify the md_data creation:
md_data = {
"title": md.title,
"content": md.content,
"created_at": md.created_at,
"order": md.order,
"shortcut": md.shortcut,
"backup_id": md.id,
"tags": md.tags # New field
}
Then in process_markdown_file(), handle the new field:
tags = md_data.get("tags", []) # Get tags with default empty list
# Later when creating/updating the markdown:
if existing_md:
existing_md.tags = tags
else:
new_md = Markdown(
# other fields...
tags=tags
)
4. Example: Adding a new entity type "Comments":
Create export function:
def export_comments():
# Export all comments to comments.json file in the root directory
with get_db() as session:
comments = session.query(Comment).all()
comment_data = []
for comment in comments:
comment_dict = comment.to_dict()
# Process and add to comment_data
with open('comments.json', 'w') as f:
json.dump(comment_data, f, default=str, indent=2)
Call it from get_backup():
# After other exports
export_comments()
Create import function:
def import_comments(comments_file, session):
# Logic to import comments
Call it from load_backup():
# After other imports
import_comments(os.path.join(root_dir, "comments.json"), session)
Maintaining Backward Compatibility
---------------------------------
When adding new fields or entities:
1. Always use .get() with default values when reading JSON data
2. Check if fields exist before accessing them
3. Handle both old and new formats in import functions
4. Use conditional logic to process data based on available fields
5. Keep the basic structure of the backup intact
For example, in process_markdown_file():
# Handle both old and new formats
if "setting" in md_data:
# Process new format
else:
# Process old format for backward compatibility
ID Handling
----------
The backup system maintains its own ID references:
1. Database IDs are not directly used in the backup
2. Each entity gets a backup_id for reference within the backup
3. When restoring, new database IDs are generated
4. References between entities use lookup by natural keys (e.g., title, URL)
"""
import shutil
from datetime import datetime
import tempfile
import zipfile
from flask import Blueprint, send_file, jsonify, request
import os
import json
from api import require_auth
from db import get_db
from db.models.Markdown import Markdown
from db.models.Path import Path
from db.models.MarkdownSetting import MarkdownSetting
from db.models.MarkdownTemplateSetting import MarkdownTemplateSetting
from db.models.MarkdownPermissionSetting import MarkdownPermissionSetting
from db.models.MarkdownTemplate import MarkdownTemplate
from db.models.PathSetting import PathSetting
from db.models.WebhookSetting import WebhookSetting
from db.models.PathTemplate import PathTemplate
from db.models.Webhook import Webhook
import threading
import logging
logger = logging.getLogger(__name__)
backup_bp = Blueprint('backup', __name__, url_prefix='/api/backup')
def check_and_convert_backup_version(backup_dir):
"""
Check the backup version and convert it if necessary.
Args:
backup_dir (str): Path to the backup directory
Returns:
tuple: (success, error_response)
- success (bool): True if the check and conversion was successful, False otherwise
- error_response: None if successful, otherwise a Flask response object with an error message
"""
from misc.backup_converters import get_backup_version, CURRENT_VERSION, convert_backup
backup_version = get_backup_version(backup_dir)
if backup_version != CURRENT_VERSION:
logger.info(f"Converting backup from version {backup_version} to {CURRENT_VERSION}")
try:
convert_backup(backup_dir, CURRENT_VERSION)
return True, None
except ValueError as e:
logger.error(f"Failed to convert backup: {e}")
return False, jsonify({"error": f"Failed to convert backup: {e}"}), 400
return True, None
@backup_bp.route('/convert', methods=['POST'])
@require_auth(roles=['admin'])
def convert_backup_endpoint():
"""
Convert an old version backup to the current version format.
This endpoint accepts an uploaded backup file, converts it to the current version format,
and returns the converted backup file as an attachment. The conversion process handles the differences
between different backup formats, including the directory structure, file formats, and metadata.
Request:
- file: The backup file to convert (multipart/form-data)
Returns:
The converted backup file as an attachment.
Response Codes:
- 200: Conversion successful
- 400: No file provided or invalid file
- 429: Another backup operation is in progress
- 500: Conversion failed
"""
if not backup_lock.acquire(blocking=False):
return jsonify({"error": "Another backup operation is in progress. Please try again later."}), 429
try:
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
uploaded_file = request.files['file']
temp_dir = tempfile.mkdtemp()
backup_dir = os.path.join(temp_dir, "backup")
os.makedirs(backup_dir)
zip_path = os.path.join(temp_dir, "backup.zip")
uploaded_file.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(backup_dir)
success, error_response = check_and_convert_backup_version(backup_dir)
if not success:
return error_response
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
archive_name = f"converted_backup_{timestamp}"
archive_path = shutil.make_archive(
base_name=os.path.join(temp_dir, archive_name),
format='zip',
root_dir=backup_dir
)
shutil.rmtree(backup_dir)
return send_file(
archive_path,
as_attachment=True,
download_name=f"{archive_name}.zip",
)
except Exception as e:
logger.error(f"Failed to convert backup: {e}")
return jsonify({"error": f"Failed to convert backup: {e}"}), 500
finally:
backup_lock.release()
backup_lock = threading.Lock()
@backup_bp.route('/', methods=['GET'])
@require_auth(roles=['admin'])
def get_backup():
"""
Create a backup of the application's data.
This function creates a backup of the application's data, including:
- The tree structure (paths and markdowns)
- Templates
- Webhooks
- Version information
The backup is returned as a zip file attachment.
Returns:
A zip file containing the backup data.
Response Codes:
- 200: Backup created successfully
- 500: Failed to create backup
"""
try:
if os.path.exists('Root'):
shutil.rmtree('Root')
os.makedirs('Root')
os.chdir('Root')
os.makedirs('tree')
os.makedirs('template')
from misc.backup_converters import CURRENT_VERSION
with open('version.json', 'w') as f:
json.dump({"version": CURRENT_VERSION}, f, indent=2)
export_webhooks()
export_templates()
paths = {}
with get_db() as session:
pths = session.query(Path).all()
paths = {p.id : p for p in pths}
os.chdir('tree')
traverse(1, paths)
os.chdir('..')
os.chdir('..')
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
archive = shutil.make_archive(base_name=timestamp, format='zip', root_dir='Root')
shutil.rmtree('Root')
return send_file(
archive,
as_attachment=True,
download_name=timestamp + '.zip',
)
except Exception as e:
logger.error(f"Failed to get backup: {e}")
return jsonify({"error": "failed to get backup"}), 500
def create_and_cd(path_name):
if not os.path.exists(path_name) or not os.path.isdir(path_name):
os.makedirs(path_name)
os.chdir(path_name)
def cd_back():
os.chdir('..')
def export_webhooks():
with get_db() as session:
webhooks = session.query(Webhook).all()
webhook_data = []
for webhook in webhooks:
webhook_dict = webhook.to_dict()
backup_id = len(webhook_data) + 1
webhook_settings = session.query(WebhookSetting).filter_by(webhook_id=webhook.id).all()
settings_list = []
for setting in webhook_settings:
setting_dict = setting.to_dict()
setting_dict.pop('id', None)
setting_dict.pop('webhook_id', None)
settings_list.append(setting_dict)
webhook_entry = {
'backup_id': backup_id,
'hook_url': webhook_dict['hook_url'],
'settings': settings_list
}
webhook_data.append(webhook_entry)
with open('webhook.json', 'w') as f:
json.dump(webhook_data, f, default=str, indent=2)
def export_templates():
with get_db() as session:
templates = session.query(MarkdownTemplate).all()
for template in templates:
template_dict = template.to_dict()
filename = f"{template_dict['title']}.json"
template_dict.pop('id', None)
with open(os.path.join('template', filename), 'w') as f:
json.dump(template_dict, f, default=str, indent=2)
def traverse(path_id, paths):
current_path = paths[path_id]
if path_id != 1:
create_and_cd(current_path.name)
with get_db() as session:
path_meta = {
"name": current_path.name,
"order": current_path.order,
"backup_id": path_id
}
if current_path.setting_id:
path_setting = session.query(PathSetting).get(current_path.setting_id)
if path_setting:
if path_setting.webhook_setting_id:
webhook_setting = session.query(WebhookSetting).get(path_setting.webhook_setting_id)
if webhook_setting:
if webhook_setting.webhook_id:
webhook = session.query(Webhook).get(webhook_setting.webhook_id)
if webhook:
path_meta["webhook_ref"] = {
"hook_url": webhook.hook_url
}
path_meta["webhook_setting"] = {
"recursive": webhook_setting.recursive,
"additional_header": webhook_setting.additional_header,
"enabled": webhook_setting.enabled,
"on_events": webhook_setting.on_events
}
if path_setting.template_setting_id:
path_template = session.query(PathTemplate).get(path_setting.template_setting_id)
if path_template:
path_meta["template_ref"] = {
"title": path_template.title
}
path_meta["template_setting"] = {
"template_id": path_setting.template_setting_id
}
with open(".json.meta", "w") as meta_file:
json.dump(path_meta, meta_file, default=str, indent=2)
mds = session.query(Markdown).filter(Markdown.path_id == path_id).all()
for md in mds:
md_data = {
"title": md.title,
"content": md.content,
"created_at": md.created_at,
"order": md.order,
"shortcut": md.shortcut,
"backup_id": md.id
}
if md.setting_id:
md_setting = session.query(MarkdownSetting).get(md.setting_id)
if md_setting:
settings = {}
if md_setting.template_setting_id:
template_setting = session.query(MarkdownTemplateSetting).get(md_setting.template_setting_id)
if template_setting and template_setting.template_id:
template = session.query(MarkdownTemplate).get(template_setting.template_id)
if template:
settings["template_setting"] = {
"template_id": template_setting.template_id,
"template_ref": {
"title": template.title
}
}
if md_setting.permission_setting_id:
permission_setting = session.query(MarkdownPermissionSetting).get(md_setting.permission_setting_id)
if permission_setting:
settings["permission_setting"] = permission_setting.permission
if settings:
md_data["setting"] = settings
with open(f"{md.title}.json", "w") as md_file:
json.dump(md_data, md_file, default=str, indent=2)
children = [c for c in paths.values() if c.parent_id == path_id]
for child in children:
traverse(child.id, paths)
if path_id != 1:
cd_back()
@backup_bp.route('/load', methods=['POST'])
@require_auth(roles=['admin'])
def load_backup():
"""
Restore data from a backup file.
This function restores data from a backup file, including:
- The tree structure (paths and markdowns)
- Templates
- Webhooks
If the backup version does not match the current version, the backup will be
automatically converted to the current version before being restored.
Request:
- file: The backup file to restore (multipart/form-data)
Returns:
A JSON object with a success message.
Response Codes:
- 200: Backup restored successfully
- 400: No file provided or invalid backup format
- 429: Another backup restore is in progress
- 500: Failed to restore backup
"""
if not backup_lock.acquire(blocking=False):
return jsonify({"error": "Another backup restore is in progress. Please try again later."}), 429
try:
if 'file' not in request.files:
return jsonify({"error": "No file provided"}), 400
uploaded_file = request.files['file']
temp_dir = tempfile.mkdtemp()
zip_path = os.path.join(temp_dir, "backup.zip")
uploaded_file.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
root_dir = os.path.join(temp_dir, "Root")
if not os.path.exists(root_dir):
root_dir = temp_dir
success, error_response = check_and_convert_backup_version(root_dir)
if not success:
return error_response
tree_dir = os.path.join(root_dir, "tree")
template_dir = os.path.join(root_dir, "template")
if not os.path.exists(tree_dir) or not os.path.exists(template_dir):
return jsonify({"error": "Invalid backup format: missing tree or template directory"}), 400
with get_db() as session:
import_templates(template_dir, session)
webhook_mapping = import_webhooks(os.path.join(root_dir, "webhook.json"), session)
path_mapping = {}
restore_tree(tree_dir, None, session, path_mapping, webhook_mapping)
session.commit()
shutil.rmtree(temp_dir)
return jsonify({"success": True, "message": "Backup restored and merged successfully"}), 200
except Exception as e:
logger.error(f"Failed to load backup: {e}")
return jsonify({"error": f"Failed to load backup {e}"}), 500
finally:
backup_lock.release()
def import_templates(template_dir, session):
template_mapping = {}
for filename in os.listdir(template_dir):
if filename.endswith('.json'):
file_path = os.path.join(template_dir, filename)
try:
with open(file_path, 'r') as f:
template_data = json.load(f)
title = template_data.get('title')
if not title:
title = os.path.splitext(filename)[0]
existing_template = session.query(MarkdownTemplate).filter_by(title=title).first()
if existing_template:
template_mapping[title] = existing_template.id
else:
new_template = MarkdownTemplate(
title=title,
parameters=template_data.get('parameters'),
layout=template_data.get('layout')
)
session.add(new_template)
session.flush()
template_mapping[title] = new_template.id
except Exception as e:
logger.error(f"Error importing template {filename}: {e}")
return template_mapping
def import_webhooks(webhook_file, session):
webhook_mapping = {}
if not os.path.exists(webhook_file):
logger.warning(f"Webhook file not found: {webhook_file}")
return webhook_mapping
try:
with open(webhook_file, 'r') as f:
webhook_data = json.load(f)
for webhook_entry in webhook_data:
backup_id = webhook_entry.get('backup_id')
hook_url = webhook_entry.get('hook_url')
if not hook_url:
continue
existing_webhook = session.query(Webhook).filter_by(hook_url=hook_url).first()
if existing_webhook:
webhook_id = existing_webhook.id
else:
new_webhook = Webhook(hook_url=hook_url)
session.add(new_webhook)
session.flush()
webhook_id = new_webhook.id
webhook_mapping[backup_id] = webhook_id
settings = webhook_entry.get('settings', [])
for setting_data in settings:
new_setting = WebhookSetting(
webhook_id=webhook_id,
recursive=setting_data.get('recursive', False),
additional_header=setting_data.get('additional_header'),
enabled=setting_data.get('enabled', True),
on_events=setting_data.get('on_events', 0)
)
session.add(new_setting)
except Exception as e:
logger.error(f"Error importing webhooks: {e}")
return webhook_mapping
def process_markdown_file(file_path, file_name, new_path_id, session):
try:
with open(file_path, "r", encoding="utf-8") as f:
md_data = json.load(f)
md_title = md_data.get("title")
if not md_title:
md_title = os.path.splitext(file_name)[0]
content = md_data.get("content", "")
created_at_str = md_data.get("created_at")
created_at = datetime.now()
if created_at_str:
try:
created_at = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
pass
order = md_data.get("order", "")
shortcut = md_data.get("shortcut", "")
setting_id = None
template_setting_id = None
permission_setting_id = None
if "setting" in md_data:
settings = md_data.get("setting", {})
if "template_setting" in settings:
template_setting_data = settings["template_setting"]
template_title = None
if isinstance(template_setting_data, dict) and "template_ref" in template_setting_data:
template_title = template_setting_data["template_ref"].get("title")
if template_title:
existing_template = session.query(MarkdownTemplate).filter_by(title=template_title).first()
if existing_template:
template_id = existing_template.id
new_template_setting = MarkdownTemplateSetting(template_id=template_id)
session.add(new_template_setting)
session.flush()
template_setting_id = new_template_setting.id
if "permission_setting" in settings:
permission = settings["permission_setting"]
if permission:
new_permission_setting = MarkdownPermissionSetting(permission=permission)
session.add(new_permission_setting)
session.flush()
permission_setting_id = new_permission_setting.id
else:
if "template_ref" in md_data:
template_title = md_data["template_ref"].get("title")
if template_title:
existing_template = session.query(MarkdownTemplate).filter_by(title=template_title).first()
if existing_template:
template_id = existing_template.id
if "template_setting" in md_data:
pass
new_template_setting = MarkdownTemplateSetting(template_id=template_id)
session.add(new_template_setting)
session.flush()
template_setting_id = new_template_setting.id
if "permission" in md_data:
permission = md_data.get("permission")
if permission:
new_permission_setting = MarkdownPermissionSetting(permission=permission)
session.add(new_permission_setting)
session.flush()
permission_setting_id = new_permission_setting.id
if template_setting_id or permission_setting_id:
md_setting = MarkdownSetting(
template_setting_id=template_setting_id,
permission_setting_id=permission_setting_id
)
session.add(md_setting)
session.flush()
setting_id = md_setting.id
existing_md = session.query(Markdown).filter_by(path_id=new_path_id, title=md_title).first()
if existing_md:
existing_md.content = content
existing_md.created_at = created_at
existing_md.order = order
existing_md.shortcut = shortcut
existing_md.setting_id = setting_id
session.commit()
else:
new_md = Markdown(
title=md_title,
content=content,
path_id=new_path_id,
created_at=created_at,
order=order,
shortcut=shortcut,
setting_id=setting_id
)
session.add(new_md)
except Exception as e:
logger.error(f"Error processing markdown file {file_name}: {e}")
def restore_tree(dir_path, parent_id, session, path_mapping, webhook_mapping=None):
if webhook_mapping is None:
webhook_mapping = {}
dir_name = os.path.basename(dir_path)
if dir_name == "Root" or dir_name == "tree":
new_path_id = 1
path_mapping[dir_path] = new_path_id
for item in os.listdir(dir_path):
item_path = os.path.join(dir_path, item)
if os.path.isdir(item_path):
restore_tree(item_path, 1, session, path_mapping, webhook_mapping)
elif item.endswith(".json") and not item == ".json.meta":
process_markdown_file(item_path, item, new_path_id, session)
return
existing_path = session.query(Path).filter_by(parent_id=parent_id, name=dir_name).first()
if existing_path:
new_path_id = existing_path.id
else:
order = ''
setting_id = None
meta_file_path = os.path.join(dir_path, ".json.meta")
if os.path.exists(meta_file_path):
try:
with open(meta_file_path, "r") as meta_file:
path_meta = json.load(meta_file)
order = path_meta.get("order", '')
webhook_setting_id = None
if "webhook_ref" in path_meta and "webhook_setting" in path_meta:
hook_url = path_meta["webhook_ref"].get("hook_url")
webhook_id = None
existing_webhook = session.query(Webhook).filter_by(hook_url=hook_url).first()
if existing_webhook:
webhook_id = existing_webhook.id
else:
new_webhook = Webhook(hook_url=hook_url)
session.add(new_webhook)
session.flush()
webhook_id = new_webhook.id
if webhook_id:
webhook_setting_data = path_meta["webhook_setting"]
new_webhook_setting = WebhookSetting(
webhook_id=webhook_id,
recursive=webhook_setting_data.get("recursive", False),
additional_header=webhook_setting_data.get("additional_header"),
enabled=webhook_setting_data.get("enabled", True),
on_events=webhook_setting_data.get("on_events", 0)
)
session.add(new_webhook_setting)
session.flush()
webhook_setting_id = new_webhook_setting.id
template_setting_id = None
if "template_ref" in path_meta:
template_title = path_meta["template_ref"].get("title")
existing_template = session.query(PathTemplate).filter_by(title=template_title).first()
if existing_template:
template_setting_id = existing_template.id
if webhook_setting_id or template_setting_id:
path_setting = PathSetting(
webhook_setting_id=webhook_setting_id,
template_setting_id=template_setting_id
)
session.add(path_setting)
session.flush()
setting_id = path_setting.id
except Exception as e:
logger.error(f"Error parsing path metadata: {e}")
new_path = Path(name=dir_name, parent_id=parent_id, order=order, setting_id=setting_id)
session.add(new_path)
session.flush()
new_path_id = new_path.id
path_mapping[dir_path] = new_path_id
for file in os.listdir(dir_path):
file_path = os.path.join(dir_path, file)
if file.endswith(".json") and not file == ".json.meta":
process_markdown_file(file_path, file, new_path_id, session)
for item in os.listdir(dir_path):
item_path = os.path.join(dir_path, item)
if os.path.isdir(item_path):
restore_tree(item_path, new_path_id, session, path_mapping, webhook_mapping)