82 lines
2.4 KiB
Python
82 lines
2.4 KiB
Python
#db/__init__.py
|
|
import os
|
|
import pkgutil
|
|
import subprocess
|
|
from contextlib import contextmanager
|
|
|
|
|
|
from db.models import Base
|
|
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from sqlalchemy import create_engine, text, inspect
|
|
from env_provider import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD, DB_SCHEMA_UPDATED, ENVIRONMENT
|
|
|
|
engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
@contextmanager
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def dump_db():
|
|
try:
|
|
os.environ['MYSQL_PWD'] = DB_PASSWORD
|
|
dump_cmd = f"mysqldump --no-tablespaces -h {DB_HOST} -P {DB_PORT} -u {DB_USER} {DB_NAME} > /app/dump/db_dump.sql"
|
|
subprocess.run(dump_cmd, shell=True, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Failed to dump database: {e}")
|
|
raise e
|
|
|
|
def clear_db():
|
|
with engine.connect() as conn:
|
|
conn.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
|
|
inspector = inspect(engine)
|
|
for table_name in inspector.get_table_names():
|
|
conn.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
|
|
conn.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
|
|
|
|
|
|
def create_all():
|
|
with engine.begin() as conn:
|
|
Base.metadata.create_all(bind=conn)
|
|
|
|
|
|
def init_payload():
|
|
|
|
from db.models import table_models
|
|
|
|
with get_db() as session:
|
|
session.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
|
|
for model in table_models:
|
|
print(f"MODEL -- {model}, {hasattr(model, '__pay_load__')}")
|
|
if hasattr(model, "__pay_load__"):
|
|
|
|
payload =model.__pay_load__[ENVIRONMENT]
|
|
print(f"- - [ - ] hasattr, {ENVIRONMENT} - {payload}")
|
|
stmt = insert(model.__table__).values(payload).prefix_with("IGNORE")
|
|
print(f"- - [ - ] {stmt}\n")
|
|
session.execute(stmt)
|
|
session.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
|
|
session.commit()
|
|
|
|
|
|
def setup_db():
|
|
if DB_SCHEMA_UPDATED:
|
|
try:
|
|
dump_db()
|
|
print("[ x ] db dumped")
|
|
except Exception as e:
|
|
print(f"[ x ] Failed to dump database: {e}")
|
|
clear_db()
|
|
print("[ x ] db cleared")
|
|
create_all()
|
|
print("[ x ] db created")
|
|
init_payload()
|
|
print("[ x ] payload loaded")
|
|
|