""" HarborForge initialization from AbstractWizard config volume. Reads config from shared volume (written by AbstractWizard). On startup, creates admin user and default project if not exists. """ import os import json import logging from sqlalchemy.orm import Session from app.models import models from app.models.role_permission import Role, Permission, RolePermission from app.api.deps import get_password_hash logger = logging.getLogger("harborforge.init") CONFIG_DIR = os.getenv("CONFIG_DIR", "/config") CONFIG_FILE = os.getenv("CONFIG_FILE", "harborforge.json") def load_config() -> dict | None: """Load initialization config from shared volume.""" config_path = os.path.join(CONFIG_DIR, CONFIG_FILE) if not os.path.exists(config_path): logger.info("No config file at %s, skipping initialization", config_path) return None try: with open(config_path, "r") as f: return json.load(f) except Exception as e: logger.warning("Failed to read config %s: %s", config_path, e) return None def get_db_url(config: dict) -> str | None: """Build DATABASE_URL from wizard config, or fall back to env.""" db_cfg = config.get("database") if not db_cfg: return os.getenv("DATABASE_URL") host = db_cfg.get("host", "mysql") port = db_cfg.get("port", 3306) user = db_cfg.get("user", "harborforge") password = db_cfg.get("password", "harborforge_pass") database = db_cfg.get("database", "harborforge") return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}" def init_admin_user(db: Session, admin_cfg: dict) -> models.User | None: """Create admin user if not exists.""" username = admin_cfg.get("username", "admin") existing = db.query(models.User).filter(models.User.username == username).first() if existing: logger.info("Admin user '%s' already exists (id=%d), skipping", username, existing.id) return existing password = admin_cfg.get("password", "changeme") user = models.User( username=username, email=admin_cfg.get("email", f"{username}@harborforge.local"), full_name=admin_cfg.get("full_name", "Admin"), hashed_password=get_password_hash(password), is_admin=True, is_active=True, ) db.add(user) db.commit() db.refresh(user) logger.info("Created admin user '%s' (id=%d)", username, user.id) return user def init_default_project(db: Session, project_cfg: dict, owner_id: int, owner_name: str = "") -> None: """Create default project if configured and not exists.""" name = project_cfg.get("name") if not name: return existing = db.query(models.Project).filter(models.Project.name == name).first() if existing: logger.info("Project '%s' already exists (id=%d), skipping", name, existing.id) return project = models.Project( name=name, description=project_cfg.get("description", ""), owner_name=project_cfg.get("owner") or owner_name or "", owner_id=owner_id, ) db.add(project) db.commit() db.refresh(project) logger.info("Created default project '%s' (id=%d)", name, project.id) # Default permissions that will be created if not exist DEFAULT_PERMISSIONS = [ # Project permissions ("project.read", "View project", "project"), ("project.write", "Edit project", "project"), ("project.delete", "Delete project", "project"), ("project.manage_members", "Manage project members", "project"), # Task/Milestone permissions ("task.create", "Create tasks", "task"), ("task.read", "View tasks", "task"), ("task.write", "Edit tasks", "task"), ("task.delete", "Delete tasks", "task"), ("milestone.create", "Create milestones", "milestone"), ("milestone.read", "View milestones", "milestone"), ("milestone.write", "Edit milestones", "milestone"), ("milestone.delete", "Delete milestones", "milestone"), # Role/Permission management ("role.manage", "Manage roles and permissions", "admin"), # User management ("user.manage", "Manage users", "admin"), # Monitor ("monitor.read", "View monitor", "monitor"), ("monitor.manage", "Manage monitor", "monitor"), # Webhook ("webhook.manage", "Manage webhooks", "admin"), ] def init_default_permissions(db: Session) -> list[Permission]: """Create default permissions if they don't exist. Returns all permissions.""" created = [] for name, description, category in DEFAULT_PERMISSIONS: existing = db.query(Permission).filter(Permission.name == name).first() if not existing: perm = Permission(name=name, description=description, category=category) db.add(perm) created.append(perm) logger.info("Created permission '%s'", name) if created: db.commit() # Return all permissions return db.query(Permission).all() def init_admin_role(db: Session, admin_user: models.User) -> None: """Create admin role with all permissions and guest role with minimal permissions.""" # Check if admin role already exists admin_role = db.query(Role).filter(Role.name == "admin").first() if not admin_role: admin_role = Role( name="admin", description="Administrator - full access to all features", is_global=True ) db.add(admin_role) db.commit() db.refresh(admin_role) logger.info("Created admin role (id=%d)", admin_role.id) # Check if guest role already exists guest_role = db.query(Role).filter(Role.name == "guest").first() if not guest_role: guest_role = Role( name="guest", description="Guest - read-only access", is_global=True ) db.add(guest_role) db.commit() db.refresh(guest_role) logger.info("Created guest role (id=%d)", guest_role.id) # Get all permissions all_perms = db.query(Permission).all() # Assign all permissions to admin role existing_admin_perm_ids = {rp.permission_id for rp in admin_role.permissions} for perm in all_perms: if perm.id not in existing_admin_perm_ids: rp = RolePermission(role_id=admin_role.id, permission_id=perm.id) db.add(rp) if all_perms: db.commit() logger.info("Assigned %d permissions to admin role", len(all_perms)) # Assign only read permissions to guest role read_perms = db.query(Permission).filter(Permission.name.like("%.read")).all() existing_guest_perm_ids = {rp.permission_id for rp in guest_role.permissions} for perm in read_perms: if perm.id not in existing_guest_perm_ids: rp = RolePermission(role_id=guest_role.id, permission_id=perm.id) db.add(rp) if read_perms: db.commit() logger.info("Assigned %d read permissions to guest role", len(read_perms)) logger.info("Admin and guest roles setup complete") def run_init(db: Session) -> None: """Main initialization entry point. Reads config from shared volume.""" config = load_config() if not config: return logger.info("Running HarborForge initialization from wizard config") # Initialize default permissions and admin role (always run) all_perms = init_default_permissions(db) logger.info("Default permissions initialized: %d total", len(all_perms)) # Admin user admin_cfg = config.get("admin") admin_user = None if admin_cfg: admin_user = init_admin_user(db, admin_cfg) # Create admin role and assign to admin user if admin_user: init_admin_role(db, admin_user) # Default project project_cfg = config.get("default_project") if project_cfg and admin_user: init_default_project(db, project_cfg, admin_user.id, admin_user.username) logger.info("Initialization complete")