Files
HarborForge.Backend/app/main.py

425 lines
19 KiB
Python

"""HarborForge API — Agent/人类协同任务管理平台"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="HarborForge API",
description="Agent/人类协同任务管理平台 API",
version="0.3.0"
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Health & version (kept at top level)
@app.get("/health", tags=["System"])
def health_check():
return {"status": "healthy"}
@app.get("/version", tags=["System"])
def version():
return {"name": "HarborForge", "version": "0.3.0", "description": "Agent/人类协同任务管理平台"}
@app.get("/config/status", tags=["System"])
def config_status():
"""Check if HarborForge has been initialized (reads from config volume).
Frontend uses this instead of contacting the wizard directly."""
import os, json
config_dir = os.getenv("CONFIG_DIR", "/config")
config_file = os.getenv("CONFIG_FILE", "harborforge.json")
config_path = os.path.join(config_dir, config_file)
if not os.path.exists(config_path):
return {"initialized": False}
try:
with open(config_path, "r") as f:
cfg = json.load(f)
return {
"initialized": cfg.get("initialized", False),
"backend_url": cfg.get("backend_url"),
}
except Exception:
return {"initialized": False}
# Register routers
from app.api.routers.auth import router as auth_router
from app.api.routers.tasks import router as tasks_router
from app.api.routers.projects import router as projects_router
from app.api.routers.users import router as users_router
from app.api.routers.comments import router as comments_router
from app.api.routers.webhooks import router as webhooks_router
from app.api.routers.misc import router as misc_router
from app.api.routers.monitor import router as monitor_router
from app.api.routers.milestones import router as milestones_router
from app.api.routers.roles import router as roles_router
from app.api.routers.proposals import router as proposals_router
from app.api.routers.proposes import router as proposes_router # legacy compat
from app.api.routers.milestone_actions import router as milestone_actions_router
from app.api.routers.meetings import router as meetings_router
from app.api.routers.essentials import router as essentials_router
from app.api.routers.calendar import router as calendar_router
app.include_router(auth_router)
app.include_router(tasks_router)
app.include_router(projects_router)
app.include_router(users_router)
app.include_router(comments_router)
app.include_router(webhooks_router)
app.include_router(misc_router)
app.include_router(monitor_router)
app.include_router(milestones_router)
app.include_router(roles_router)
app.include_router(proposals_router)
app.include_router(proposes_router) # legacy compat
app.include_router(milestone_actions_router)
app.include_router(meetings_router)
app.include_router(essentials_router)
app.include_router(calendar_router)
# Auto schema migration for lightweight deployments
def _migrate_schema():
from sqlalchemy import text
from app.core.config import SessionLocal
def _has_table(db, table_name: str) -> bool:
return db.execute(text("SHOW TABLES LIKE :table_name"), {"table_name": table_name}).fetchone() is not None
def _has_column(db, table_name: str, column_name: str) -> bool:
return db.execute(
text(f"SHOW COLUMNS FROM {table_name} LIKE :column_name"),
{"column_name": column_name},
).fetchone() is not None
def _has_index(db, table_name: str, index_name: str) -> bool:
return db.execute(
text(
"""
SELECT 1
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = :table_name
AND INDEX_NAME = :index_name
LIMIT 1
"""
),
{"table_name": table_name, "index_name": index_name},
).fetchone() is not None
def _ensure_unique_index(db, table_name: str, index_name: str, columns_sql: str):
if not _has_index(db, table_name, index_name):
db.execute(text(f"CREATE UNIQUE INDEX {index_name} ON {table_name} ({columns_sql})"))
def _drop_fk_constraints(db, table_name: str, referenced_table: str):
rows = db.execute(text(
"""
SELECT CONSTRAINT_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = :table_name
AND REFERENCED_TABLE_NAME = :referenced_table
AND CONSTRAINT_NAME <> 'PRIMARY'
"""
), {"table_name": table_name, "referenced_table": referenced_table}).fetchall()
for (constraint_name,) in rows:
db.execute(text(f"ALTER TABLE {table_name} DROP FOREIGN KEY `{constraint_name}`"))
def _ensure_fk(db, table_name: str, column_name: str, referenced_table: str, referenced_column: str, constraint_name: str):
exists = db.execute(text(
"""
SELECT 1
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = :table_name
AND COLUMN_NAME = :column_name
AND REFERENCED_TABLE_NAME = :referenced_table
AND REFERENCED_COLUMN_NAME = :referenced_column
LIMIT 1
"""
), {
"table_name": table_name,
"column_name": column_name,
"referenced_table": referenced_table,
"referenced_column": referenced_column,
}).fetchone()
if not exists:
db.execute(text(
f"ALTER TABLE {table_name} ADD CONSTRAINT `{constraint_name}` FOREIGN KEY ({column_name}) REFERENCES {referenced_table}({referenced_column})"
))
db = SessionLocal()
try:
# projects.project_code
result = db.execute(text("SHOW COLUMNS FROM projects LIKE 'project_code'"))
if not result.fetchone():
db.execute(text("ALTER TABLE projects ADD COLUMN project_code VARCHAR(16) NULL"))
_ensure_unique_index(db, "projects", "idx_projects_project_code", "project_code")
# projects.owner_name
result = db.execute(text("SHOW COLUMNS FROM projects LIKE 'owner_name'"))
if not result.fetchone():
db.execute(text("ALTER TABLE projects ADD COLUMN owner_name VARCHAR(128) NOT NULL DEFAULT ''"))
# projects.sub_projects / related_projects
result = db.execute(text("SHOW COLUMNS FROM projects LIKE 'sub_projects'"))
if not result.fetchone():
db.execute(text("ALTER TABLE projects ADD COLUMN sub_projects VARCHAR(512) NULL"))
result = db.execute(text("SHOW COLUMNS FROM projects LIKE 'related_projects'"))
if not result.fetchone():
db.execute(text("ALTER TABLE projects ADD COLUMN related_projects VARCHAR(512) NULL"))
# tasks extra fields
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'task_type'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN task_type VARCHAR(32) DEFAULT 'issue'"))
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'task_subtype'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN task_subtype VARCHAR(64) NULL"))
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'tags'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN tags VARCHAR(500) NULL"))
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'resolution_summary'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN resolution_summary TEXT NULL"))
db.execute(text("ALTER TABLE tasks ADD COLUMN positions TEXT NULL"))
db.execute(text("ALTER TABLE tasks ADD COLUMN pending_matters TEXT NULL"))
result = db.execute(text("SHOW COLUMNS FROM tasks LIKE 'created_by_id'"))
if not result.fetchone():
db.execute(text("ALTER TABLE tasks ADD COLUMN created_by_id INTEGER NULL"))
_ensure_fk(db, "tasks", "created_by_id", "users", "id", "fk_tasks_created_by_id")
if _has_column(db, "tasks", "task_code"):
_ensure_unique_index(db, "tasks", "idx_tasks_task_code", "task_code")
# milestones creator field
result = db.execute(text("SHOW COLUMNS FROM milestones LIKE 'created_by_id'"))
if not result.fetchone():
db.execute(text("ALTER TABLE milestones ADD COLUMN created_by_id INTEGER NULL"))
_ensure_fk(db, "milestones", "created_by_id", "users", "id", "fk_milestones_created_by_id")
# comments: issue_id -> task_id
if _has_table(db, "comments"):
_drop_fk_constraints(db, "comments", "issues")
if _has_column(db, "comments", "issue_id") and not _has_column(db, "comments", "task_id"):
db.execute(text("ALTER TABLE comments CHANGE COLUMN issue_id task_id INTEGER NOT NULL"))
if _has_column(db, "comments", "task_id"):
_ensure_fk(db, "comments", "task_id", "tasks", "id", "fk_comments_task_id")
# work_logs: issue_id -> task_id
if _has_table(db, "work_logs"):
_drop_fk_constraints(db, "work_logs", "issues")
if _has_column(db, "work_logs", "issue_id") and not _has_column(db, "work_logs", "task_id"):
db.execute(text("ALTER TABLE work_logs CHANGE COLUMN issue_id task_id INTEGER NOT NULL"))
if _has_column(db, "work_logs", "task_id"):
_ensure_fk(db, "work_logs", "task_id", "tasks", "id", "fk_work_logs_task_id")
# Drop issues table if it exists (no longer used anywhere)
if _has_table(db, "issues"):
db.execute(text("DROP TABLE issues"))
# --- Milestone status enum migration (old -> new) ---
if _has_table(db, "milestones"):
if _has_column(db, "milestones", "milestone_code"):
_ensure_unique_index(db, "milestones", "idx_milestones_milestone_code", "milestone_code")
# Alter enum column to accept new values
db.execute(text(
"ALTER TABLE milestones MODIFY COLUMN status "
"ENUM('open','pending','deferred','progressing','freeze','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# Migrate old values
db.execute(text("UPDATE milestones SET status='open' WHERE status='pending'"))
db.execute(text("UPDATE milestones SET status='closed' WHERE status='deferred'"))
db.execute(text("UPDATE milestones SET status='undergoing' WHERE status='progressing'"))
# Shrink enum to new-only values
db.execute(text(
"ALTER TABLE milestones MODIFY COLUMN status "
"ENUM('open','freeze','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# Add started_at if missing
if not _has_column(db, "milestones", "started_at"):
db.execute(text("ALTER TABLE milestones ADD COLUMN started_at DATETIME NULL"))
# --- P7.1: Migrate task_type='task' to 'issue' ---
if _has_table(db, "tasks") and _has_column(db, "tasks", "task_type"):
db.execute(text("UPDATE tasks SET task_type='issue' WHERE task_type='task'"))
# --- Task status enum migration (old -> new) ---
if _has_table(db, "tasks"):
# Widen enum first
db.execute(text(
"ALTER TABLE tasks MODIFY COLUMN status "
"ENUM('open','pending','progressing','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# Migrate old values
db.execute(text("UPDATE tasks SET status='undergoing' WHERE status='progressing'"))
# Shrink enum to new-only values
db.execute(text(
"ALTER TABLE tasks MODIFY COLUMN status "
"ENUM('open','pending','undergoing','completed','closed') "
"DEFAULT 'open'"
))
# --- users.role_id for single global account role ---
if _has_table(db, "users") and not _has_column(db, "users", "role_id"):
db.execute(text("ALTER TABLE users ADD COLUMN role_id INTEGER NULL"))
_ensure_fk(db, "users", "role_id", "roles", "id", "fk_users_role_id")
if _has_table(db, "users") and not _has_column(db, "users", "discord_user_id"):
db.execute(text("ALTER TABLE users ADD COLUMN discord_user_id VARCHAR(32) NULL"))
# --- monitored_servers.api_key for heartbeat v2 ---
if _has_table(db, "monitored_servers") and not _has_column(db, "monitored_servers", "api_key"):
db.execute(text("ALTER TABLE monitored_servers ADD COLUMN api_key VARCHAR(64) NULL"))
db.execute(text("CREATE UNIQUE INDEX idx_monitored_servers_api_key ON monitored_servers (api_key)"))
# --- server_states.plugin_version for monitor plugin telemetry ---
if _has_table(db, "server_states") and not _has_column(db, "server_states", "plugin_version"):
db.execute(text("ALTER TABLE server_states ADD COLUMN plugin_version VARCHAR(64) NULL"))
if _has_table(db, "meetings") and _has_column(db, "meetings", "meeting_code"):
_ensure_unique_index(db, "meetings", "idx_meetings_meeting_code", "meeting_code")
if _has_table(db, "supports") and _has_column(db, "supports", "support_code"):
_ensure_unique_index(db, "supports", "idx_supports_support_code", "support_code")
if _has_table(db, "proposes") and _has_column(db, "proposes", "propose_code"):
_ensure_unique_index(db, "proposes", "idx_proposes_propose_code", "propose_code")
if _has_table(db, "essentials") and _has_column(db, "essentials", "essential_code"):
_ensure_unique_index(db, "essentials", "idx_essentials_essential_code", "essential_code")
# --- server_states nginx telemetry for generic monitor client ---
if _has_table(db, "server_states") and not _has_column(db, "server_states", "nginx_installed"):
db.execute(text("ALTER TABLE server_states ADD COLUMN nginx_installed BOOLEAN NULL"))
if _has_table(db, "server_states") and not _has_column(db, "server_states", "nginx_sites_json"):
db.execute(text("ALTER TABLE server_states ADD COLUMN nginx_sites_json TEXT NULL"))
# --- agents table (BE-CAL-003) ---
if not _has_table(db, "agents"):
db.execute(text("""
CREATE TABLE agents (
id INTEGER NOT NULL AUTO_INCREMENT,
user_id INTEGER NOT NULL,
agent_id VARCHAR(128) NOT NULL,
claw_identifier VARCHAR(128) NOT NULL,
status ENUM('idle','on_call','busy','exhausted','offline') NOT NULL DEFAULT 'idle',
last_heartbeat DATETIME NULL,
exhausted_at DATETIME NULL,
recovery_at DATETIME NULL,
exhaust_reason ENUM('rate_limit','billing') NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE INDEX idx_agents_user_id (user_id),
UNIQUE INDEX idx_agents_agent_id (agent_id),
CONSTRAINT fk_agents_user_id FOREIGN KEY (user_id) REFERENCES users(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""))
# --- essentials table (BE-PR-003) ---
if not _has_table(db, "essentials"):
db.execute(text("""
CREATE TABLE essentials (
id INTEGER NOT NULL AUTO_INCREMENT,
essential_code VARCHAR(64) NOT NULL,
proposal_id INTEGER NOT NULL,
type ENUM('feature','improvement','refactor') NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NULL,
created_by_id INTEGER NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE INDEX idx_essentials_code (essential_code),
INDEX idx_essentials_proposal_id (proposal_id),
CONSTRAINT fk_essentials_proposal_id FOREIGN KEY (proposal_id) REFERENCES proposes(id),
CONSTRAINT fk_essentials_created_by_id FOREIGN KEY (created_by_id) REFERENCES users(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""))
# --- minimum_workloads table (BE-CAL-004) ---
if not _has_table(db, "minimum_workloads"):
db.execute(text("""
CREATE TABLE minimum_workloads (
id INTEGER NOT NULL AUTO_INCREMENT,
user_id INTEGER NOT NULL,
config JSON NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE INDEX idx_minimum_workloads_user_id (user_id),
CONSTRAINT fk_minimum_workloads_user_id FOREIGN KEY (user_id) REFERENCES users(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""))
db.commit()
except Exception as e:
db.rollback()
print(f"Migration warning: {e}")
finally:
db.close()
def _sync_default_user_roles(db):
from app.models import models
from app.models.role_permission import Role
admin_role = db.query(Role).filter(Role.name == "admin").first()
guest_role = db.query(Role).filter(Role.name == "guest").first()
if admin_role:
db.query(models.User).filter(models.User.is_admin == True).update(
{models.User.role_id: admin_role.id},
synchronize_session=False,
)
if guest_role:
db.query(models.User).filter(
models.User.role_id == None,
models.User.is_admin == False,
).update(
{models.User.role_id: guest_role.id},
synchronize_session=False,
)
db.commit()
# Run database migration on startup
@app.on_event("startup")
def startup():
from app.core.config import Base, engine, SessionLocal
from app.models import models, webhook, apikey, activity, milestone, notification, worklog, monitor, role_permission, task, support, meeting, proposal, propose, essential, agent, calendar, minimum_workload
Base.metadata.create_all(bind=engine)
_migrate_schema()
# Initialize from AbstractWizard (admin user, default project, etc.)
from app.init_wizard import run_init
db = SessionLocal()
try:
run_init(db)
_sync_default_user_roles(db)
finally:
db.close()
# Start lightweight monitor polling thread (every 10 minutes)
import threading, time
from app.services.monitoring import refresh_provider_usage_once
def _monitor_poll_loop():
while True:
db2 = SessionLocal()
try:
refresh_provider_usage_once(db2)
except Exception:
pass
finally:
db2.close()
time.sleep(600)
t = threading.Thread(target=_monitor_poll_loop, daemon=True)
t.start()