""" HarborForge initialization from AbstractWizard config volume. Reads config from shared volume (written by AbstractWizard). On startup, creates admin user and default project if not exists. """ import os import json import logging from sqlalchemy.orm import Session from app.models import models from app.models.role_permission import Role, Permission, RolePermission from app.api.deps import get_password_hash logger = logging.getLogger("harborforge.init") CONFIG_DIR = os.getenv("CONFIG_DIR", "/config") CONFIG_FILE = os.getenv("CONFIG_FILE", "harborforge.json") def load_config() -> dict | None: """Load initialization config from shared volume.""" config_path = os.path.join(CONFIG_DIR, CONFIG_FILE) if not os.path.exists(config_path): logger.info("No config file at %s, skipping initialization", config_path) return None try: with open(config_path, "r") as f: return json.load(f) except Exception as e: logger.warning("Failed to read config %s: %s", config_path, e) return None def get_db_url(config: dict) -> str | None: """Build DATABASE_URL from wizard config, or fall back to env.""" db_cfg = config.get("database") if not db_cfg: return os.getenv("DATABASE_URL") host = db_cfg.get("host", "mysql") port = db_cfg.get("port", 3306) user = db_cfg.get("user", "harborforge") password = db_cfg.get("password", "harborforge_pass") database = db_cfg.get("database", "harborforge") return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}" def init_admin_user(db: Session, admin_cfg: dict) -> models.User | None: """Create admin user if not exists.""" username = admin_cfg.get("username", "admin") existing = db.query(models.User).filter(models.User.username == username).first() if existing: logger.info("Admin user '%s' already exists (id=%d), skipping", username, existing.id) return existing password = admin_cfg.get("password", "changeme") user = models.User( username=username, email=admin_cfg.get("email", f"{username}@harborforge.local"), full_name=admin_cfg.get("full_name", "Admin"), hashed_password=get_password_hash(password), is_admin=True, is_active=True, ) db.add(user) db.commit() db.refresh(user) logger.info("Created admin user '%s' (id=%d)", username, user.id) return user def init_default_project(db: Session, project_cfg: dict, owner_id: int, owner_name: str = "") -> None: """Create default project if configured and not exists.""" name = project_cfg.get("name") if not name: return existing = db.query(models.Project).filter(models.Project.name == name).first() if existing: logger.info("Project '%s' already exists (id=%d), skipping", name, existing.id) return project = models.Project( name=name, description=project_cfg.get("description", ""), owner_name=project_cfg.get("owner") or owner_name or "", owner_id=owner_id, ) db.add(project) db.commit() db.refresh(project) logger.info("Created default project '%s' (id=%d)", name, project.id) # Default permissions that will be created if not exist DEFAULT_PERMISSIONS = [ # Project permissions ("project.read", "View project", "project"), ("project.write", "Edit project", "project"), ("project.delete", "Delete project", "project"), ("project.manage_members", "Manage project members", "project"), # Task/Milestone permissions ("task.create", "Create tasks", "task"), ("task.read", "View tasks", "task"), ("task.write", "Edit tasks", "task"), ("task.delete", "Delete tasks", "task"), ("milestone.create", "Create milestones", "milestone"), ("milestone.read", "View milestones", "milestone"), ("milestone.write", "Edit milestones", "milestone"), ("milestone.delete", "Delete milestones", "milestone"), # Milestone actions ("milestone.freeze", "Freeze milestone scope", "milestone"), ("milestone.start", "Start milestone execution", "milestone"), ("milestone.close", "Close / abort milestone", "milestone"), # Task actions ("task.close", "Close / cancel a task", "task"), ("task.reopen_closed", "Reopen a closed task", "task"), ("task.reopen_completed", "Reopen a completed task", "task"), # Propose actions ("propose.accept", "Accept a propose into a milestone", "propose"), ("propose.reject", "Reject a propose", "propose"), ("propose.reopen", "Reopen a rejected propose", "propose"), # Role/Permission management ("role.manage", "Manage roles and permissions", "admin"), ("account.create", "Create HarborForge accounts", "account"), # User management ("user.manage", "Manage users", "admin"), # API key management ("user.reset-self-apikey", "Reset own API key", "user"), ("user.reset-apikey", "Reset any user's API key", "admin"), # Monitor ("monitor.read", "View monitor", "monitor"), ("monitor.manage", "Manage monitor", "monitor"), # Webhook ("webhook.manage", "Manage webhooks", "admin"), ] def init_default_permissions(db: Session) -> list[Permission]: """Create default permissions if they don't exist. Returns all permissions.""" created = [] for name, description, category in DEFAULT_PERMISSIONS: existing = db.query(Permission).filter(Permission.name == name).first() if not existing: perm = Permission(name=name, description=description, category=category) db.add(perm) created.append(perm) logger.info("Created permission '%s'", name) if created: db.commit() # Return all permissions return db.query(Permission).all() # --------------------------------------------------------------------------- # Default role → permission mapping # --------------------------------------------------------------------------- # mgr: project management + all milestone/task/propose actions _MGR_PERMISSIONS = { "project.read", "project.write", "project.manage_members", "task.create", "task.read", "task.write", "task.delete", "milestone.create", "milestone.read", "milestone.write", "milestone.delete", "milestone.freeze", "milestone.start", "milestone.close", "task.close", "task.reopen_closed", "task.reopen_completed", "propose.accept", "propose.reject", "propose.reopen", "monitor.read", "user.reset-self-apikey", } # dev: day-to-day development work — no freeze/start/close milestone, no accept/reject propose _DEV_PERMISSIONS = { "project.read", "task.create", "task.read", "task.write", "milestone.read", "task.close", "task.reopen_closed", "task.reopen_completed", "monitor.read", "user.reset-self-apikey", } _ACCOUNT_MANAGER_PERMISSIONS = { "account.create", } # Role definitions: (name, description, permission_set) _DEFAULT_ROLES = [ ("admin", "Administrator - full access to all features", None), # None ⇒ all perms ("account-manager", "Account manager - can only create accounts", _ACCOUNT_MANAGER_PERMISSIONS), ("mgr", "Manager - project & milestone management", _MGR_PERMISSIONS), ("dev", "Developer - task execution & daily work", _DEV_PERMISSIONS), ("guest", "Guest - read-only access", None), # special: *.read only ] def _ensure_role(db: Session, name: str, description: str, is_global: bool = True) -> Role: """Get or create a role by name.""" role = db.query(Role).filter(Role.name == name).first() if not role: role = Role(name=name, description=description, is_global=is_global) db.add(role) db.commit() db.refresh(role) logger.info("Created role '%s' (id=%d)", name, role.id) return role def _sync_role_permissions(db: Session, role: Role, target_perm_names: set[str] | None) -> None: """Ensure *role* has exactly the permissions in *target_perm_names*. * ``None`` means **all** permissions (admin). * The special sentinel ``"__read_only__"`` is handled by the caller passing just the ``*.read`` names. Only adds missing permissions; never removes manually-granted ones (additive). """ all_perms = db.query(Permission).all() perm_by_name = {p.name: p for p in all_perms} if target_perm_names is None: wanted_ids = {p.id for p in all_perms} else: wanted_ids = {perm_by_name[n].id for n in target_perm_names if n in perm_by_name} existing_ids = {rp.permission_id for rp in role.permissions} added = 0 for pid in wanted_ids - existing_ids: db.add(RolePermission(role_id=role.id, permission_id=pid)) added += 1 if added: db.commit() logger.info("Assigned %d new permissions to role '%s'", added, role.name) def init_admin_role(db: Session, admin_user: models.User) -> None: """Create default roles (admin / mgr / dev / guest) with preset permissions.""" all_perms = db.query(Permission).all() read_perm_names = {p.name for p in all_perms if p.name.endswith(".read")} for name, description, perm_set in _DEFAULT_ROLES: role = _ensure_role(db, name, description) if name == "guest": _sync_role_permissions(db, role, read_perm_names) else: _sync_role_permissions(db, role, perm_set) logger.info("Default roles setup complete (admin, mgr, dev, guest)") def init_acc_mgr_user(db: Session) -> models.User | None: """Create the built-in acc-mgr user if not exists. This user: - Has role 'account-manager' (can only create accounts) - Cannot log in (no password, hashed_password=None) - Cannot be deleted (enforced in delete endpoint) - Is created automatically after wizard initialization """ username = "acc-mgr" existing = db.query(models.User).filter(models.User.username == username).first() if existing: logger.info("acc-mgr user already exists (id=%d), skipping", existing.id) return existing # Find account-manager role acc_mgr_role = db.query(Role).filter(Role.name == "account-manager").first() if not acc_mgr_role: logger.warning("account-manager role not found, skipping acc-mgr user creation") return None user = models.User( username=username, email="acc-mgr@harborforge.internal", full_name="Account Manager", hashed_password=None, # Cannot log in — no password is_admin=False, is_active=True, role_id=acc_mgr_role.id, ) db.add(user) db.commit() db.refresh(user) logger.info("Created acc-mgr user (id=%d) with account-manager role", user.id) return user def run_init(db: Session) -> None: """Main initialization entry point. Reads config from shared volume.""" config = load_config() if not config: return logger.info("Running HarborForge initialization from wizard config") # Initialize default permissions and admin role (always run) all_perms = init_default_permissions(db) logger.info("Default permissions initialized: %d total", len(all_perms)) # Admin user admin_cfg = config.get("admin") admin_user = None if admin_cfg: admin_user = init_admin_user(db, admin_cfg) # Create admin role and assign to admin user if admin_user: init_admin_role(db, admin_user) # Built-in acc-mgr user (after roles are created) init_acc_mgr_user(db) # Default project project_cfg = config.get("default_project") if project_cfg and admin_user: init_default_project(db, project_cfg, admin_user.id, admin_user.username) logger.info("Initialization complete")