- APIKey.alias (unique, required). Creating with an existing alias renews that key: same key string kept, validity reset to 15d, reactivated, name/roles updated (response has renewed=true). - get_actor(): X-API-Key -> key alias, Bearer -> 'admin'. - markdown & patch create/update record author / created_at / updated_at / last_modified_by from the actor. - Idempotent run_migrations() (information_schema-guarded ALTERs + backfill) so existing tables/data gain the new columns on startup; create_all still covers fresh DBs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
132 lines
5.2 KiB
Python
132 lines
5.2 KiB
Python
from contextlib import contextmanager
|
|
from sqlalchemy.schema import CreateTable
|
|
|
|
from db.models import Base
|
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from sqlalchemy import create_engine, text, inspect
|
|
from env_provider import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD, DB_SCHEMA_UPDATED, ENVIRONMENT
|
|
engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
@contextmanager
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def clear_db():
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
|
|
inspector = inspect(engine)
|
|
for table_name in inspector.get_table_names():
|
|
conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
|
|
conn.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
|
|
|
|
|
|
def create_all():
|
|
with engine.begin() as conn:
|
|
Base.metadata.create_all(bind=conn)
|
|
|
|
def run_scripts():
|
|
from db.models import table_models
|
|
with get_db() as session:
|
|
for model in table_models:
|
|
if hasattr(model, "__scripts__"):
|
|
scripts = model.__scripts__
|
|
for script in scripts:
|
|
session.execute(script)
|
|
session.commit()
|
|
|
|
def init_payload():
|
|
from db.models import table_models
|
|
with get_db() as session:
|
|
session.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
|
|
for model in table_models:
|
|
print(str(CreateTable(model.__table__)))
|
|
|
|
print(f"MODEL -- {model}, {hasattr(model, '__payload__')}")
|
|
if hasattr(model, "__payload__"):
|
|
payload = model.__payload__[ENVIRONMENT]
|
|
print(f"- - [ - ] hasattr, {ENVIRONMENT} - {payload}")
|
|
stmt = insert(model.__table__).values(payload).prefix_with("IGNORE")
|
|
print(f"- - [ - ] {stmt}\n")
|
|
session.execute(stmt)
|
|
session.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
|
|
session.commit()
|
|
|
|
|
|
def _column_exists(conn, table, column):
|
|
row = conn.execute(text(
|
|
"SELECT 1 FROM information_schema.columns "
|
|
"WHERE table_schema = :db AND table_name = :t AND column_name = :c"
|
|
), {"db": DB_NAME, "t": table, "c": column}).first()
|
|
return row is not None
|
|
|
|
|
|
def _index_exists(conn, table, index):
|
|
row = conn.execute(text(
|
|
"SELECT 1 FROM information_schema.statistics "
|
|
"WHERE table_schema = :db AND table_name = :t AND index_name = :i"
|
|
), {"db": DB_NAME, "t": table, "i": index}).first()
|
|
return row is not None
|
|
|
|
|
|
def run_migrations():
|
|
"""Idempotent additive schema migrations for already-existing tables.
|
|
|
|
create_all() creates missing tables (with the new columns) for a fresh
|
|
DB, but never alters existing ones. This adds the new columns to legacy
|
|
tables and backfills sensible defaults. Safe to run on every startup.
|
|
"""
|
|
# (table, column, DDL, backfill SQL or None)
|
|
steps = [
|
|
("apikey", "alias", "ALTER TABLE apikey ADD COLUMN alias VARCHAR(255) NULL",
|
|
"UPDATE apikey SET alias = `key` WHERE alias IS NULL"),
|
|
("markdown", "updated_at", "ALTER TABLE markdown ADD COLUMN updated_at DATETIME NULL",
|
|
"UPDATE markdown SET updated_at = created_at WHERE updated_at IS NULL"),
|
|
("markdown", "author", "ALTER TABLE markdown ADD COLUMN author VARCHAR(255) NULL",
|
|
"UPDATE markdown SET author = 'admin' WHERE author IS NULL"),
|
|
("markdown", "last_modified_by", "ALTER TABLE markdown ADD COLUMN last_modified_by VARCHAR(255) NULL",
|
|
"UPDATE markdown SET last_modified_by = 'admin' WHERE last_modified_by IS NULL"),
|
|
("markdown_patch", "author", "ALTER TABLE markdown_patch ADD COLUMN author VARCHAR(255) NULL",
|
|
"UPDATE markdown_patch SET author = 'admin' WHERE author IS NULL"),
|
|
("markdown_patch", "last_modified_by", "ALTER TABLE markdown_patch ADD COLUMN last_modified_by VARCHAR(255) NULL",
|
|
"UPDATE markdown_patch SET last_modified_by = 'admin' WHERE last_modified_by IS NULL"),
|
|
]
|
|
try:
|
|
with engine.begin() as conn:
|
|
for table, column, ddl, backfill in steps:
|
|
if not _column_exists(conn, table, column):
|
|
conn.execute(text(ddl))
|
|
if backfill:
|
|
conn.execute(text(backfill))
|
|
print(f"[ x ] migrated {table}.{column}")
|
|
# Unique constraint on apikey.alias once it is populated.
|
|
if not _index_exists(conn, "apikey", "uq_apikey_alias"):
|
|
conn.execute(text(
|
|
"ALTER TABLE apikey ADD CONSTRAINT uq_apikey_alias UNIQUE (alias)"
|
|
))
|
|
print("[ x ] migrated apikey.alias unique constraint")
|
|
except Exception as e:
|
|
# Don't block startup on a migration hiccup; surface loudly.
|
|
print(f"[ ! ] run_migrations error (continuing): {e}")
|
|
|
|
|
|
def setup_db():
|
|
if DB_SCHEMA_UPDATED:
|
|
clear_db()
|
|
print("[ x ] db cleared")
|
|
create_all()
|
|
print("[ x ] db created")
|
|
run_migrations()
|
|
print("[ x ] db migrations applied")
|
|
run_scripts()
|
|
print("[ x ] db scripts executed")
|
|
init_payload()
|
|
print("[ x ] payload loaded")
|
|
|