from contextlib import contextmanager from sqlalchemy.schema import CreateTable from db.models import Base from sqlalchemy.orm import sessionmaker from sqlalchemy.dialects.mysql import insert from sqlalchemy import create_engine, text, inspect from env_provider import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD, DB_SCHEMA_UPDATED, ENVIRONMENT engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}") SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) @contextmanager def get_db(): db = SessionLocal() try: yield db finally: db.close() def clear_db(): with engine.connect() as conn: conn.execute(text("SET FOREIGN_KEY_CHECKS = 0;")) inspector = inspect(engine) for table_name in inspector.get_table_names(): conn.execute(text(f"DROP TABLE IF EXISTS {table_name}")) conn.execute(text("SET FOREIGN_KEY_CHECKS = 1;")) def create_all(): with engine.begin() as conn: Base.metadata.create_all(bind=conn) def run_scripts(): from db.models import table_models with get_db() as session: for model in table_models: if hasattr(model, "__scripts__"): scripts = model.__scripts__ for script in scripts: session.execute(script) session.commit() def init_payload(): from db.models import table_models with get_db() as session: session.execute(text("SET FOREIGN_KEY_CHECKS = 0;")) for model in table_models: print(str(CreateTable(model.__table__))) print(f"MODEL -- {model}, {hasattr(model, '__payload__')}") if hasattr(model, "__payload__"): payload = model.__payload__[ENVIRONMENT] print(f"- - [ - ] hasattr, {ENVIRONMENT} - {payload}") stmt = insert(model.__table__).values(payload).prefix_with("IGNORE") print(f"- - [ - ] {stmt}\n") session.execute(stmt) session.execute(text("SET FOREIGN_KEY_CHECKS = 1;")) session.commit() def _column_exists(conn, table, column): row = conn.execute(text( "SELECT 1 FROM information_schema.columns " "WHERE table_schema = :db AND table_name = :t AND column_name = :c" ), {"db": DB_NAME, "t": table, "c": column}).first() return row is not None def _index_exists(conn, table, index): row = conn.execute(text( "SELECT 1 FROM information_schema.statistics " "WHERE table_schema = :db AND table_name = :t AND index_name = :i" ), {"db": DB_NAME, "t": table, "i": index}).first() return row is not None def run_migrations(): """Idempotent additive schema migrations for already-existing tables. create_all() creates missing tables (with the new columns) for a fresh DB, but never alters existing ones. This adds the new columns to legacy tables and backfills sensible defaults. Safe to run on every startup. """ # (table, column, DDL, backfill SQL or None) steps = [ ("apikey", "alias", "ALTER TABLE apikey ADD COLUMN alias VARCHAR(255) NULL", "UPDATE apikey SET alias = `key` WHERE alias IS NULL"), ("markdown", "updated_at", "ALTER TABLE markdown ADD COLUMN updated_at DATETIME NULL", "UPDATE markdown SET updated_at = created_at WHERE updated_at IS NULL"), ("markdown", "author", "ALTER TABLE markdown ADD COLUMN author VARCHAR(255) NULL", "UPDATE markdown SET author = 'admin' WHERE author IS NULL"), ("markdown", "last_modified_by", "ALTER TABLE markdown ADD COLUMN last_modified_by VARCHAR(255) NULL", "UPDATE markdown SET last_modified_by = 'admin' WHERE last_modified_by IS NULL"), ("markdown_patch", "author", "ALTER TABLE markdown_patch ADD COLUMN author VARCHAR(255) NULL", "UPDATE markdown_patch SET author = 'admin' WHERE author IS NULL"), ("markdown_patch", "last_modified_by", "ALTER TABLE markdown_patch ADD COLUMN last_modified_by VARCHAR(255) NULL", "UPDATE markdown_patch SET last_modified_by = 'admin' WHERE last_modified_by IS NULL"), ] try: with engine.begin() as conn: for table, column, ddl, backfill in steps: if not _column_exists(conn, table, column): conn.execute(text(ddl)) if backfill: conn.execute(text(backfill)) print(f"[ x ] migrated {table}.{column}") # Unique constraint on apikey.alias once it is populated. if not _index_exists(conn, "apikey", "uq_apikey_alias"): conn.execute(text( "ALTER TABLE apikey ADD CONSTRAINT uq_apikey_alias UNIQUE (alias)" )) print("[ x ] migrated apikey.alias unique constraint") except Exception as e: # Don't block startup on a migration hiccup; surface loudly. print(f"[ ! ] run_migrations error (continuing): {e}") def setup_db(): if DB_SCHEMA_UPDATED: clear_db() print("[ x ] db cleared") create_all() print("[ x ] db created") run_migrations() print("[ x ] db migrations applied") run_scripts() print("[ x ] db scripts executed") init_payload() print("[ x ] payload loaded")