diff --git a/.gitignore b/.gitignore index 85e7c1d..c67cf86 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,6 @@ +/dialectic-backend +/dist/ /.idea/ +/.vscode/ +*.swp +.DS_Store diff --git a/Dockerfile b/Dockerfile index 5c4f487..3e3eb92 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,22 +1,19 @@ -FROM python:3.13-slim +# syntax=docker/dockerfile:1.7 +# Dialectic.Backend.Go — multi-stage build (compile static binary, run on distroless). -ENV PYTHONDONTWRITEBYTECODE=1 -ENV PYTHONUNBUFFERED=1 +FROM golang:1.23-bookworm AS build +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download +COPY . . +ARG VERSION=dev +RUN CGO_ENABLED=0 GOOS=linux go build \ + -ldflags="-s -w -X main.Version=${VERSION}" \ + -o /out/dialectic-backend . +FROM gcr.io/distroless/static-debian12:nonroot WORKDIR /app - -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential gcc \ - python3-dev \ - libssl-dev libffi-dev \ - && rm -rf /var/lib/apt/lists/* - -RUN python -m pip install --upgrade pip setuptools wheel - -COPY requirements.txt ./requirements.txt -RUN pip install --no-cache-dir -r requirements.txt - -COPY . /app/ - -EXPOSE 8000 -CMD ["python3", "app.py"] \ No newline at end of file +COPY --from=build /out/dialectic-backend /app/dialectic-backend +EXPOSE 8090 +USER nonroot:nonroot +ENTRYPOINT ["/app/dialectic-backend"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..9ccfd04 --- /dev/null +++ b/README.md @@ -0,0 +1,102 @@ +# Dialectic.Backend — v2 (Go) + +Greenfield Go rewrite of the Python v1 backend. Agent-native debate +platform per [`/home/hzhang/arch/DIALECTIC-V2-DESIGN.md`](../DIALECTIC-V2-DESIGN.md). + +Python v1 history is preserved on branch `archive/python-v1`. + +## What's here (Phase 2A + 2B + 2C, 2026-05-23) + +| Subsystem | Status | +|-----------|--------| +| HTTP server (`chi` router) | ✅ | +| Config from env (`internal/config`) | ✅ | +| MySQL via `sqlx` + embedded SQL migrations | ✅ | +| Schema: `topics`, `signups`, `camps`, `rounds`, `arguments`, `verdicts`, `agent_keys`, `system_keys`, `verdict_schemas` | ✅ | +| Auth middlewares: agent bearer (real), OIDC browser (Phase 2 stub w/ dev bypass) | ✅ | +| `/api/healthz` | ✅ | +| `/api/topics` list / get / create / set-visibility | ✅ | +| `/api/topics/{id}/signups` list / create (agent self-enroll) | ✅ | +| Orchestration engine (camp allocation, round driver, judge invocation) | ⬜ Phase 2D | +| SSE live transcripts | ⬜ Phase 2D | +| Full OIDC + Keycloak JWKS verification | ⬜ Phase 4 | +| Nginx + CF Origin Cert on server.t3 | ⬜ Phase 2E | + +## Layout + +``` +main.go entrypoint (load → wire → serve) +go.mod +Dockerfile +docker-compose.dev.yml backend + mysql for local iteration +internal/ + config/ 12-factor env loader + db/ + db.go sqlx + embedded migration runner + migrations/001_init.sql v2 schema, idempotent + models/ entity types (sqlx + json tags) + store/ query layer (per-entity) + auth/ agent api-key + oidc middlewares + httpapi/ + routes.go chi router + auth chains + handlers/ per-endpoint handlers +``` + +## Run locally + +``` +docker compose -f docker-compose.dev.yml up --build +# backend on http://localhost:8090 +curl http://localhost:8090/api/healthz +``` + +Env vars (see `internal/config/config.go` for the full list): + +| Var | Default (dev) | Required in prod | +|-----|---------------|-------------------| +| `ENV_MODE` | `dev` | must be `prod` | +| `HTTP_ADDR` | `0.0.0.0:8090` | — | +| `CORS_ALLOW_ORIGINS` | `*` | concrete list (no `*`) | +| `DB_HOST/PORT/NAME/USER/PASSWORD` | dev defaults | ✓ password required | +| `AGENT_API_KEY_PEPPER` | — | ✓ | +| `OIDC_ISSUER` / `OIDC_CLIENT_ID` | — | ✓ | +| `OIDC_DEV_BYPASS_TOKEN` | unset | ignored in prod | +| `SYSTEM_API_KEY` | unset | populate when announce-channel push lands | + +## Dev bypass for browser routes + +In `ENV_MODE=dev` with `OIDC_DEV_BYPASS_TOKEN=` set: + +``` +curl -H "x-dev-bypass: " http://localhost:8090/api/topics +# attached as user 'dev-operator' with role 'dialectic-admin' +``` + +In `prod`, this header is ignored regardless of value. + +## Agent bearer for plugin routes + +The OpenClaw plugin (`Dialectic.OpenclawPlugin`, Phase 3) calls with: + +``` +Authorization: Bearer +``` + +The key is hashed with `AGENT_API_KEY_PEPPER` and matched against +`agent_keys.key_hash`. To provision an agent's key (Phase 3 will add a +proper `hf user create-dialectic-key` CLI; for now, manual SQL): + +```sql +INSERT INTO agent_keys (agent_id, key_hash) +VALUES ('manager', SHA2(CONCAT(':', ''), 256)); +``` + +## What's next + +- **Phase 2D**: camp allocation algorithm + round driver + judge + invocation. Wired to Fabric announce channel (via system-api-key) + + the Dialectic.OpenclawPlugin's tool for agent argument submission. +- **Phase 2E**: nginx config + CF Origin Cert + deploy to server.t3. +- **Phase 3**: Dialectic.OpenclawPlugin — agent-facing tools. +- **Phase 4**: frontend rewrite (STYLE.md + real Keycloak OIDC + visibility toggle UI). +- **Phase 5**: end-to-end integration with `analyze-intel` workflow. diff --git a/api/__init__.py b/api/__init__.py deleted file mode 100644 index cdca84d..0000000 --- a/api/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from api.debates import router as debates_router -from api.api_keys import router as api_keys_router -from api.models import router as models_router -from api.setup import router as setup_router diff --git a/api/api_keys.py b/api/api_keys.py deleted file mode 100644 index e325bf6..0000000 --- a/api/api_keys.py +++ /dev/null @@ -1,113 +0,0 @@ -from fastapi import APIRouter, Depends, Form, HTTPException - -from middleware.auth import require_auth -import aiohttp - -from services.api_key_service import ApiKeyService -from db_models import get_db - -router = APIRouter(tags=["api-keys"]) - - -async def validate_api_key(provider: str, api_key: str): - """ - Validate an API key by listing models from the provider. - All providers validated the same way: if we can list models, the key is valid. - """ - try: - if provider == "openai": - import openai - async with openai.AsyncOpenAI(api_key=api_key, timeout=10.0) as client: - await client.models.list() - return True - - # Claude, Qwen, DeepSeek: GET their models endpoint via aiohttp - endpoints = { - "claude": { - "url": "https://api.anthropic.com/v1/models", - "headers": { - "x-api-key": api_key, - "anthropic-version": "2023-06-01" - } - }, - "qwen": { - "url": "https://dashscope.aliyuncs.com/compatible-mode/v1/models", - "headers": {"Authorization": f"Bearer {api_key}"} - }, - "deepseek": { - "url": "https://api.deepseek.com/v1/models", - "headers": {"Authorization": f"Bearer {api_key}"} - } - } - - if provider in endpoints: - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession(timeout=timeout) as session: - ep = endpoints[provider] - async with session.get(ep["url"], headers=ep["headers"]) as response: - return response.status == 200 - - # Tavily: validate by calling REST API directly (no tavily package needed) - if provider == "tavily": - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession(timeout=timeout) as session: - async with session.post( - "https://api.tavily.com/search", - json={"api_key": api_key, "query": "test", "max_results": 1} - ) as response: - if response.status == 200: - data = await response.json() - return bool(data.get("results")) - return False - - return False - except Exception: - return False - - -@router.post("/api-keys/{provider}", dependencies=[Depends(require_auth)]) -async def set_api_key(provider: str, api_key: str = Form(...), db=Depends(get_db)): - """ - Set API key for a specific provider - """ - try: - success = ApiKeyService.set_api_key(db, provider, api_key) - if success: - return {"message": f"API key for {provider} updated successfully"} - else: - raise HTTPException(status_code=500, detail="Failed to update API key") - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@router.get("/api-keys/{provider}") -async def get_api_key(provider: str, db=Depends(get_db)): - """ - Get API key for a specific provider - """ - try: - api_key = ApiKeyService.get_api_key(db, provider) - if api_key: - return {"provider": provider, "api_key": api_key} - else: - raise HTTPException(status_code=404, detail=f"No API key found for {provider}") - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@router.post("/validate-api-key/{provider}", dependencies=[Depends(require_auth)]) -async def validate_api_key_endpoint(provider: str, api_key: str = Form(...)): - """ - Validate an API key by making a test request to the provider - This endpoint is used by the frontend to validate API keys without CORS issues - """ - try: - is_valid = await validate_api_key(provider, api_key) - if is_valid: - return {"valid": True, "message": f"Valid {provider} API key"} - else: - return {"valid": False, "message": f"Invalid {provider} API key"} - except Exception as e: - return {"valid": False, "message": str(e)} diff --git a/api/debates.py b/api/debates.py deleted file mode 100644 index 0825d59..0000000 --- a/api/debates.py +++ /dev/null @@ -1,158 +0,0 @@ -from fastapi import APIRouter, Depends, HTTPException - -from middleware.auth import require_auth -from sse_starlette.sse import EventSourceResponse -from typing import Dict, Any -import asyncio -import json - -from orchestrator.debate_orchestrator import DebateOrchestrator -from models.debate import DebateRequest -from storage.session_manager import SessionManager -from db_models import get_db - -router = APIRouter(tags=["debates"]) - - -@router.post("/debate/create", dependencies=[Depends(require_auth)]) -async def create_debate(debate_request: DebateRequest, db=Depends(get_db)) -> Dict[str, Any]: - """ - Create a new debate session with specified parameters - """ - try: - orchestrator = DebateOrchestrator(db) - session_id = await orchestrator.create_session(debate_request) - - return { - "session_id": session_id, - "status": "created", - "message": f"Debate session {session_id} created successfully" - } - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@router.get("/debate/{session_id}") -async def get_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]: - """ - Get the current state of a debate session - """ - try: - orchestrator = DebateOrchestrator(db) - session = await orchestrator.get_session_status(session_id) - - if not session: - raise HTTPException(status_code=404, detail=f"Debate session {session_id} not found") - - return session.dict() - except HTTPException: - raise - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@router.post("/debate/{session_id}/start", dependencies=[Depends(require_auth)]) -async def start_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]: - """ - Start a debate session and stream the results - """ - try: - orchestrator = DebateOrchestrator(db) - session = await orchestrator.run_debate(session_id) - - return { - "session_id": session_id, - "status": session.status, - "message": f"Debate session {session_id} completed" - } - except Exception as e: - import traceback - traceback.print_exc() - raise HTTPException(status_code=500, detail=str(e)) - - -@router.delete("/debate/{session_id}", dependencies=[Depends(require_auth)]) -async def end_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]: - """ - End a debate session prematurely - """ - try: - orchestrator = DebateOrchestrator(db) - await orchestrator.terminate_session(session_id) - - return {"session_id": session_id, "status": "terminated"} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - - -@router.get("/debate/{session_id}/stream") -async def stream_debate(session_id: str, db=Depends(get_db)) -> EventSourceResponse: - """ - Stream debate updates in real-time using Server-Sent Events - """ - async def event_generator(): - session_manager = SessionManager() - - # Check if the session exists - session = await session_manager.get_session(db, session_id) - if not session: - yield {"event": "error", "data": json.dumps({"error": f"Session {session_id} not found"})} - return - - # Yield initial state - yield {"event": "update", "data": json.dumps({ - "session_id": session_id, - "status": session.status, - "rounds": [round.dict() for round in session.rounds], - "evidence_library": [e.dict() for e in session.evidence_library], - "message": "Debate session initialized" - }, default=str)} - - last_round_count = len(session.rounds) - # Poll until debate completes or times out (max 5 min) - for _ in range(150): # 150 × 2s = 300s timeout - await asyncio.sleep(2) - - # Reset transaction so we see commits from the run_debate request - db.commit() - - updated_session = await session_manager.get_session(db, session_id) - if not updated_session: - break - - # Only yield update when rounds actually changed - if len(updated_session.rounds) != last_round_count or updated_session.status != session.status: - last_round_count = len(updated_session.rounds) - session = updated_session - yield {"event": "update", "data": json.dumps({ - "session_id": session_id, - "status": updated_session.status, - "rounds": [round.dict() for round in updated_session.rounds], - "evidence_library": [e.dict() for e in updated_session.evidence_library], - "current_round": len(updated_session.rounds) - }, default=str)} - - if updated_session.status in ("completed", "terminated"): - yield {"event": "complete", "data": json.dumps({ - "session_id": session_id, - "status": updated_session.status, - "summary": updated_session.summary, - "rounds": [round.dict() for round in updated_session.rounds], - "evidence_library": [e.dict() for e in updated_session.evidence_library] - }, default=str)} - break - - return EventSourceResponse(event_generator()) - - -@router.get("/sessions") -async def list_sessions(db=Depends(get_db)) -> Dict[str, Any]: - """ - List all debate sessions - """ - try: - session_manager = SessionManager() - sessions = await session_manager.list_sessions(db) - return {"sessions": sessions} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) diff --git a/api/models.py b/api/models.py deleted file mode 100644 index 0c82db3..0000000 --- a/api/models.py +++ /dev/null @@ -1,173 +0,0 @@ -from fastapi import APIRouter, Depends, HTTPException -import aiohttp - -from services.api_key_service import ApiKeyService -from db_models import get_db -from api.api_keys import validate_api_key - -router = APIRouter(tags=["models"]) - - -@router.get("/models/{provider}") -async def get_available_models(provider: str, db=Depends(get_db)): - """ - Get available models for a specific provider by fetching from their API. - Falls back to a curated default list if the API key is missing or the request fails. - """ - # Curated defaults for each provider — used as fallback - default_models = { - "openai": [ - {"model_identifier": "gpt-4o", "display_name": "gpt-4o"}, - {"model_identifier": "gpt-4o-mini", "display_name": "gpt-4o-mini"}, - {"model_identifier": "gpt-4-turbo", "display_name": "gpt-4-turbo"}, - {"model_identifier": "gpt-4", "display_name": "gpt-4"}, - {"model_identifier": "o3-mini", "display_name": "o3-mini"}, - {"model_identifier": "gpt-3.5-turbo", "display_name": "gpt-3.5-turbo"} - ], - "claude": [ - {"model_identifier": "claude-opus-4-5", "display_name": "claude-opus-4-5"}, - {"model_identifier": "claude-sonnet-4-5", "display_name": "claude-sonnet-4-5"}, - {"model_identifier": "claude-3-5-sonnet-20241022", "display_name": "claude-3-5-sonnet-20241022"}, - {"model_identifier": "claude-3-5-haiku-20241022", "display_name": "claude-3-5-haiku-20241022"}, - {"model_identifier": "claude-3-opus-20240229", "display_name": "claude-3-opus-20240229"} - ], - "qwen": [ - {"model_identifier": "qwen3-max", "display_name": "qwen3-max"}, - {"model_identifier": "qwen3-plus", "display_name": "qwen3-plus"}, - {"model_identifier": "qwen3-flash", "display_name": "qwen3-flash"}, - {"model_identifier": "qwen-max", "display_name": "qwen-max"}, - {"model_identifier": "qwen-plus", "display_name": "qwen-plus"}, - {"model_identifier": "qwen-turbo", "display_name": "qwen-turbo"} - ], - "deepseek": [ - {"model_identifier": "deepseek-chat", "display_name": "deepseek-chat"}, - {"model_identifier": "deepseek-reasoner", "display_name": "deepseek-reasoner"}, - {"model_identifier": "deepseek-v3", "display_name": "deepseek-v3"}, - {"model_identifier": "deepseek-r1", "display_name": "deepseek-r1"} - ] - } - - defaults = default_models.get(provider, []) - - try: - # Retrieve and decrypt API key - decrypted_key = ApiKeyService.get_api_key(db, provider) - if not decrypted_key: - return {"provider": provider, "models": defaults} - - # ---------- OpenAI ---------- - if provider == "openai": - import openai - async with openai.AsyncOpenAI(api_key=decrypted_key, timeout=10.0) as client: - response = await client.models.list() - - # Keep only chat / reasoning models, sorted newest-first by created timestamp - chat_prefixes = ('gpt-', 'o1', 'o3', 'o4', 'chatgpt') - models = [] - seen = set() - for m in sorted(response.data, key=lambda x: x.created, reverse=True): - if m.id not in seen and m.id.startswith(chat_prefixes): - seen.add(m.id) - models.append({"model_identifier": m.id, "display_name": m.id}) - - if models: - return {"provider": provider, "models": models} - - # ---------- Claude ---------- - elif provider == "claude": - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession(timeout=timeout) as session: - headers = { - "x-api-key": decrypted_key, - "anthropic-version": "2023-06-01" - } - async with session.get("https://api.anthropic.com/v1/models", headers=headers) as resp: - if resp.status == 200: - data = await resp.json() - models = [] - for m in data.get("data", []): - model_id = m.get("id", "") - if model_id.startswith("claude-"): - models.append({"model_identifier": model_id, "display_name": model_id}) - if models: - return {"provider": provider, "models": models} - else: - print(f"Claude models API returned {resp.status}: {await resp.text()}") - - # ---------- Qwen ---------- - elif provider == "qwen": - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession(timeout=timeout) as session: - headers = {"Authorization": f"Bearer {decrypted_key}"} - async with session.get("https://dashscope.aliyuncs.com/compatible-mode/v1/models", headers=headers) as resp: - if resp.status == 200: - data = await resp.json() - exclude_keywords = [ - 'tts', 'vl', 'ocr', 'image', 'asr', '-mt-', '-mt', - 'math', 'embed', 'rerank', 'coder', 'translate', - 's2s', 'deep-search', 'omni', 'gui-' - ] - models = [] - for m in data.get("data", []): - model_id = m.get("id", "") - if not model_id: - continue - if not (model_id.startswith("qwen") or model_id.startswith("qwq")): - continue - if any(kw in model_id for kw in exclude_keywords): - continue - models.append({"model_identifier": model_id, "display_name": model_id}) - if models: - return {"provider": provider, "models": models} - else: - print(f"Qwen models API returned {resp.status}: {await resp.text()}") - - # ---------- DeepSeek ---------- - elif provider == "deepseek": - timeout = aiohttp.ClientTimeout(total=10) - async with aiohttp.ClientSession(timeout=timeout) as session: - headers = {"Authorization": f"Bearer {decrypted_key}"} - async with session.get("https://api.deepseek.com/v1/models", headers=headers) as resp: - if resp.status == 200: - data = await resp.json() - models = [] - for m in data.get("data", []): - model_id = m.get("id", "") - if model_id.startswith("deepseek"): - models.append({"model_identifier": model_id, "display_name": model_id}) - if models: - return {"provider": provider, "models": models} - else: - print(f"DeepSeek models API returned {resp.status}: {await resp.text()}") - - # API fetch succeeded but returned empty list, or unknown provider — use defaults - return {"provider": provider, "models": defaults} - - except Exception as e: - print(f"Error fetching models for {provider}: {e}") - import traceback - traceback.print_exc() - return {"provider": provider, "models": defaults} - - -@router.get("/providers") -async def get_available_providers(db=Depends(get_db)): - """ - Get all providers that have valid API keys set - """ - try: - available_providers = [] - for provider in ("openai", "claude", "qwen", "deepseek", "tavily"): - decrypted_key = ApiKeyService.get_api_key(db, provider) - if not decrypted_key: - continue - is_valid = await validate_api_key(provider, decrypted_key) - if is_valid: - available_providers.append({ - "provider": provider, - "has_valid_key": True - }) - - return {"providers": available_providers} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) diff --git a/api/setup.py b/api/setup.py deleted file mode 100644 index 43342ba..0000000 --- a/api/setup.py +++ /dev/null @@ -1,192 +0,0 @@ -import os -from typing import Optional -from urllib.parse import quote_plus - -from fastapi import APIRouter, Depends, HTTPException, Request -from pydantic import BaseModel - -from services.config_service import ConfigService - -router = APIRouter(prefix="/api/setup", tags=["setup"]) - -PASSWORD_PLACEHOLDER = "********" - - -# --------------------------------------------------------------------------- -# Request / response models -# --------------------------------------------------------------------------- -class DatabaseConfig(BaseModel): - host: str - port: int = 3306 - user: str = "root" - password: str = "" - database: str = "dialectica" - - -class KeycloakConfig(BaseModel): - host: str = "" - realm: str = "" - client_id: str = "" - - -class TlsConfig(BaseModel): - cert_path: str = "" - key_path: str = "" - force_https: bool = False - - -class FullConfig(BaseModel): - database: Optional[DatabaseConfig] = None - keycloak: Optional[KeycloakConfig] = None - tls: Optional[TlsConfig] = None - - -# --------------------------------------------------------------------------- -# Access control dependency -# --------------------------------------------------------------------------- -async def setup_guard(request: Request): - """Three-phase access control for setup routes. - - 1. Not initialized → only localhost allowed - 2. ENV_MODE=dev → open - 3. ENV_MODE=prod → Keycloak admin JWT required - """ - config = ConfigService.load() - initialized = config.get("initialized", False) - env_mode = os.getenv("ENV_MODE", "dev") - - if env_mode == "dev": - return # dev mode: no auth needed, even before initialisation - - if not initialized: - # prod + not initialised: only localhost may configure - client_ip = request.client.host - if client_ip not in ("127.0.0.1", "::1"): - raise HTTPException( - status_code=403, - detail="初次设置仅允许从本机访问", - ) - return - - # prod → delegate to Keycloak middleware (Phase 3) - from app.middleware.auth import require_admin - await require_admin(request, config) - - -# --------------------------------------------------------------------------- -# Routes -# --------------------------------------------------------------------------- -@router.get("/status") -async def setup_status(): - """Return current system initialisation state, including KC info for OIDC.""" - config = ConfigService.load() - env_mode = os.getenv("ENV_MODE", "dev") - - result = { - "initialized": config.get("initialized", False), - "env_mode": env_mode, - "db_configured": ConfigService.is_db_configured(), - } - - # Include Keycloak info so the frontend can build OIDC config - kc = config.get("keycloak", {}) - if env_mode == "prod" and kc.get("host") and kc.get("realm"): - result["keycloak"] = { - "authority": f"{kc['host']}/realms/{kc['realm']}", - "client_id": kc.get("client_id", ""), - } - - return result - - -@router.get("/config", dependencies=[Depends(setup_guard)]) -async def get_config(): - """Return full config with passwords replaced by placeholder.""" - config = ConfigService.load() - if "database" in config and config["database"].get("password"): - config["database"]["password"] = PASSWORD_PLACEHOLDER - return config - - -@router.put("/config", dependencies=[Depends(setup_guard)]) -async def update_config(payload: FullConfig): - """Merge submitted config sections into the YAML file.""" - config = ConfigService.load() - - if payload.database is not None: - db_dict = payload.database.model_dump() - # If password is the placeholder, keep the existing real password - if db_dict.get("password") == PASSWORD_PLACEHOLDER: - db_dict["password"] = config.get("database", {}).get("password", "") - config["database"] = db_dict - if payload.keycloak is not None: - config["keycloak"] = payload.keycloak.model_dump() - if payload.tls is not None: - config["tls"] = payload.tls.model_dump() - - ConfigService.save(config) - return {"message": "配置已保存"} - - -@router.post("/test-db", dependencies=[Depends(setup_guard)]) -async def test_db_connection(db_config: DatabaseConfig): - """Test a database connection with the provided parameters (no save).""" - from sqlalchemy import create_engine, text - - password = db_config.password - # If password is the placeholder, use the real password from config - if password == PASSWORD_PLACEHOLDER: - password = ConfigService.load().get("database", {}).get("password", "") - - url = ( - f"mysql+pymysql://{quote_plus(db_config.user)}:{quote_plus(password)}" - f"@{db_config.host}:{db_config.port}/{db_config.database}" - ) - try: - engine = create_engine(url, pool_pre_ping=True) - with engine.connect() as conn: - conn.execute(text("SELECT 1")) - engine.dispose() - return {"success": True, "message": "数据库连接成功"} - except Exception as e: - return {"success": False, "message": str(e)} - - -@router.post("/test-keycloak", dependencies=[Depends(setup_guard)]) -async def test_keycloak(kc_config: KeycloakConfig): - """Test Keycloak connectivity by fetching the OIDC discovery document.""" - import httpx - - well_known = ( - f"{kc_config.host}/realms/{kc_config.realm}" - f"/.well-known/openid-configuration" - ) - try: - async with httpx.AsyncClient(timeout=10) as client: - resp = await client.get(well_known) - if resp.status_code == 200: - return {"success": True, "message": "Keycloak 连通正常"} - return {"success": False, "message": f"HTTP {resp.status_code}"} - except Exception as e: - return {"success": False, "message": str(e)} - - -@router.post("/initialize", dependencies=[Depends(setup_guard)]) -async def initialize(): - """Mark system as initialised and reload DB connection.""" - config = ConfigService.load() - - if not ConfigService.is_db_configured(): - raise HTTPException(status_code=400, detail="请先配置数据库连接") - - # Reload DB engine so business routes can start working - from app.db_models import reload_db_connection - from app.storage.database import init_db - - reload_db_connection() - init_db() - - config["initialized"] = True - ConfigService.save(config) - - return {"message": "系统初始化完成", "initialized": True} diff --git a/app.py b/app.py deleted file mode 100644 index 58dd26e..0000000 --- a/app.py +++ /dev/null @@ -1,60 +0,0 @@ -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware -from starlette.responses import JSONResponse -import uvicorn - -from storage.database import init_db -from exceptions import ServiceNotConfiguredError -from middleware.config_guard import ConfigGuardMiddleware -from api import debates_router, api_keys_router, models_router, setup_router - -app = FastAPI( - title="Dialectica - Multi-Model Debate Framework", - description="A framework for structured debates between multiple language models", - version="0.1.0" -) - -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Allow all origins for development - allow_credentials=True, - allow_methods=["*"], # Allow all methods - allow_headers=["*"], # Allow all headers - # Add exposed headers to allow frontend to access response headers - expose_headers=["Access-Control-Allow-Origin", "Access-Control-Allow-Credentials"] -) - -# Config guard: return 503 for business routes when DB not configured -app.add_middleware(ConfigGuardMiddleware) - - -@app.exception_handler(ServiceNotConfiguredError) -async def not_configured_handler(request, exc): - return JSONResponse( - status_code=503, - content={"error_code": "SERVICE_NOT_CONFIGURED", "detail": str(exc)}, - ) - - -@app.on_event("startup") -def startup_event(): - """Initialize database on startup (skipped if not configured).""" - init_db() - - -# Register routers -app.include_router(debates_router) -app.include_router(api_keys_router) -app.include_router(models_router) -app.include_router(setup_router) - - -@app.get("/") -async def root(): - return {"message": "Welcome to Dialectica - Multi-Model Debate Framework"} - - -if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0", port=8000) - diff --git a/db_models/__init__.py b/db_models/__init__.py deleted file mode 100644 index c05a4b6..0000000 --- a/db_models/__init__.py +++ /dev/null @@ -1,79 +0,0 @@ -from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker -from datetime import datetime - -from exceptions import ServiceNotConfiguredError -from services.config_service import ConfigService - -Base = declarative_base() - - -class ApiKey(Base): - __tablename__ = "api_keys" - - id = Column(Integer, primary_key=True, index=True) - provider = Column(String(50), unique=True, index=True, nullable=False) - api_key_encrypted = Column(Text, nullable=False) # Encrypted API key - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow) - - -class ModelConfig(Base): - __tablename__ = "model_configs" - - id = Column(Integer, primary_key=True, index=True) - provider = Column(String(50), nullable=False) - model_name = Column(String(100), nullable=False) - display_name = Column(String(100)) # Optional display name - is_active = Column(Boolean, default=True) - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow) - - __table_args__ = {'mysql_charset': 'utf8mb4'} - - -# --------------------------------------------------------------------------- -# Lazy engine / session factory — only created when first requested -# --------------------------------------------------------------------------- -_engine = None -_SessionLocal = None - - -def _get_engine(): - from sqlalchemy import create_engine - global _engine - if _engine is None: - db_url = ConfigService.get_database_url() - if not db_url: - raise ServiceNotConfiguredError("数据库未配置") - _engine = create_engine(db_url) - return _engine - - -def _get_session_factory(): - global _SessionLocal - if _SessionLocal is None: - _SessionLocal = sessionmaker( - autocommit=False, autoflush=False, bind=_get_engine() - ) - return _SessionLocal - - -def reload_db_connection(): - """Dispose current engine and reset so next call rebuilds from config.""" - global _engine, _SessionLocal - if _engine is not None: - _engine.dispose() - _engine = None - _SessionLocal = None - - -def get_db(): - """FastAPI dependency that yields a DB session.""" - session_factory = _get_session_factory() - db = session_factory() - try: - yield db - finally: - db.close() diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..df1a4dc --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,45 @@ +# Dev docker-compose: backend + MySQL only, exposed on localhost. +# Frontend / nginx are in the sibling top-level Dialectic repo's compose. +# For end-to-end dev: run that compose; for backend-only iteration, this. + +services: + backend: + build: + context: . + args: + VERSION: dev-local + environment: + ENV_MODE: dev + HTTP_ADDR: 0.0.0.0:8090 + CORS_ALLOW_ORIGINS: "*" + DB_HOST: mysql + DB_PORT: "3306" + DB_NAME: dialectic + DB_USER: dialectic + DB_PASSWORD: dialectic + AGENT_API_KEY_PEPPER: dev-pepper + OIDC_DEV_BYPASS_TOKEN: dev-bypass-token + ports: ["8090:8090"] + depends_on: + mysql: + condition: service_healthy + restart: unless-stopped + + mysql: + image: mysql:8.4 + environment: + MYSQL_ROOT_PASSWORD: rootpassword + MYSQL_DATABASE: dialectic + MYSQL_USER: dialectic + MYSQL_PASSWORD: dialectic + ports: ["3306:3306"] + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-pdialectic"] + interval: 5s + timeout: 3s + retries: 20 + volumes: + - dialectic_mysql_data:/var/lib/mysql + +volumes: + dialectic_mysql_data: diff --git a/exceptions/__init__.py b/exceptions/__init__.py deleted file mode 100644 index 4a171f7..0000000 --- a/exceptions/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -class ServiceNotConfiguredError(Exception): - """Raised when a required service (database, etc.) is not configured.""" - pass diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4cf4ee0 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module git.hangman-lab.top/hzhang/Dialectic.Backend + +go 1.23 + +require ( + github.com/go-chi/chi/v5 v5.1.0 + github.com/go-chi/cors v1.2.1 + github.com/go-sql-driver/mysql v1.8.1 + github.com/google/uuid v1.6.0 + github.com/jmoiron/sqlx v1.4.0 +) + +require filippo.io/edwards25519 v1.1.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b26d9bb --- /dev/null +++ b/go.sum @@ -0,0 +1,16 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= +github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= +github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/internal/auth/auth.go b/internal/auth/auth.go new file mode 100644 index 0000000..01b02b3 --- /dev/null +++ b/internal/auth/auth.go @@ -0,0 +1,144 @@ +// Package auth holds the two middlewares Dialectic v2 uses: +// +// - AgentAPIKey: validates `Authorization: Bearer ` against +// the `agent_keys` table (hashed with the configured pepper). +// Used by Dialectic.OpenclawPlugin → backend calls. +// +// - OIDCBrowser: validates a Keycloak-issued JWT in the +// `dialectic_session` cookie. Used by the React frontend. +// Phase 2C ships a stub that accepts a dev-mode bypass token; the +// real JWKS verification + claim mapping lands with Phase 4. +// +// Both middlewares attach a typed Caller to the request context so +// downstream handlers can read identity uniformly. +package auth + +import ( + "context" + "crypto/sha256" + "crypto/subtle" + "database/sql" + "encoding/hex" + "errors" + "net/http" + "strings" + + "github.com/jmoiron/sqlx" +) + +type CallerKind string + +const ( + CallerAgent CallerKind = "agent" + CallerUser CallerKind = "user" + CallerSystem CallerKind = "system" +) + +type Caller struct { + Kind CallerKind + ID string // agentId for CallerAgent; userId for CallerUser; key-name for CallerSystem + Roles []string // populated for CallerUser (from JWT claims); empty otherwise +} + +type ctxKey struct{} + +func WithCaller(ctx context.Context, c Caller) context.Context { + return context.WithValue(ctx, ctxKey{}, c) +} + +// FromContext returns the caller attached by an auth middleware. The +// zero Caller (Kind == "") indicates an unauthenticated request reached +// a public route. +func FromContext(ctx context.Context) Caller { + c, _ := ctx.Value(ctxKey{}).(Caller) + return c +} + +// HashKey peppers + sha256-hashes a raw API key. Constant pepper; same +// raw key always produces the same hash so lookups can equal-match on +// the key_hash column. +func HashKey(pepper, raw string) string { + h := sha256.Sum256([]byte(pepper + ":" + raw)) + return hex.EncodeToString(h[:]) +} + +// AgentAPIKey middleware: extracts Bearer token, looks up agent_keys, +// 401 on miss. Updates last_used_at lazily (best-effort; failure here +// doesn't block the request). +func AgentAPIKey(db *sqlx.DB, pepper string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + raw := bearerToken(r) + if raw == "" { + http.Error(w, "missing bearer token", http.StatusUnauthorized) + return + } + hash := HashKey(pepper, raw) + var agentID string + err := db.GetContext(r.Context(), &agentID, + `SELECT agent_id FROM agent_keys WHERE key_hash = ? AND revoked_at IS NULL`, hash) + if errors.Is(err, sql.ErrNoRows) { + http.Error(w, "invalid agent key", http.StatusUnauthorized) + return + } + if err != nil { + http.Error(w, "auth lookup failed", http.StatusInternalServerError) + return + } + go func(h string) { + // best-effort touch — independent ctx so it survives + // even if the request was cancelled mid-handler. + _, _ = db.Exec( + `UPDATE agent_keys SET last_used_at = CURRENT_TIMESTAMP WHERE key_hash = ?`, h) + }(hash) + + ctx := WithCaller(r.Context(), Caller{Kind: CallerAgent, ID: agentID}) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// OIDCBrowser middleware (Phase 2C stub): +// - Dev mode + `x-dev-bypass: ` header → admit as a fake user. +// - Otherwise: 401 with a hint pointing to the (not-yet-wired) +// Keycloak redirect path. The real JWKS-verifying middleware lands +// when the frontend is wired up; until then, browser callers can +// only reach the API via the dev bypass. +func OIDCBrowser(devMode bool, devBypassToken string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if devMode && devBypassToken != "" { + if subtleEqual(r.Header.Get("x-dev-bypass"), devBypassToken) { + ctx := WithCaller(r.Context(), Caller{ + Kind: CallerUser, + ID: "dev-operator", + Roles: []string{"dialectic-admin"}, + }) + next.ServeHTTP(w, r.WithContext(ctx)) + return + } + } + // Production path goes through Keycloak — Phase 4. + http.Error(w, "oidc login required (Phase 4: not yet wired)", http.StatusUnauthorized) + }) + } +} + +func bearerToken(r *http.Request) string { + h := r.Header.Get("authorization") + const prefix = "Bearer " + if strings.HasPrefix(h, prefix) { + return strings.TrimSpace(h[len(prefix):]) + } + if strings.HasPrefix(h, "bearer ") { + return strings.TrimSpace(h[len("bearer "):]) + } + return "" +} + +func subtleEqual(a, b string) bool { + if len(a) != len(b) { + return false + } + return subtle.ConstantTimeCompare([]byte(a), []byte(b)) == 1 +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..5fee442 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,141 @@ +// Package config loads runtime configuration from environment variables. +// +// Conventions: +// - 12-factor: every config knob is an env var; no config files. +// - Sensible dev defaults for local docker-compose; prod sets via env. +// - Sensitive values (DB password, system api key) are *required* in +// prod; LoadFromEnv() fails fast if absent and ENV_MODE != "dev". +package config + +import ( + "fmt" + "os" + "strings" +) + +type Config struct { + // "dev" | "prod". Dev relaxes required-field checks and enables a + // dev-mode auth bypass token. Prod requires every sensitive field. + Mode string + + // HTTP server bind. e.g. "0.0.0.0:8090". + HTTPAddr string + + // CORS allowed origins (comma-separated; "*" allowed only in dev). + CORSAllowOrigins []string + + // MySQL DSN parts. + DBHost string + DBPort string + DBName string + DBUser string + DBPassword string + + // Auth. + // + // SystemAPIKey: Phase-1 system key for posting to announce channels + // in Fabric. Mirrored here so Dialectic backend itself can post topic + // announcements via Fabric's POST /channels/:id/messages with + // x-fabric-system-key header. + // + // AgentAPIKeyPepper: HMAC pepper for hashing agent API keys at rest + // (we store sha256(pepper || raw) not the raw key). Rotating the + // pepper invalidates all keys — that's intentional, an emergency + // kill switch. + // + // OIDCDevBypassToken: dev-mode only. If set AND Mode == "dev", a + // browser request with header `x-dev-bypass: ` bypasses OIDC + // and is treated as user "dev-operator" with role "dialectic-admin". + // Prod ignores this even if set. + SystemAPIKey string + AgentAPIKeyPepper string + OIDCDevBypassToken string + + // OIDC issuer URL (Keycloak realm endpoint). e.g. + // https://auth.hangman-lab.top/realms/hangman-lab + // Phase 2C ships this as configured-but-not-verified; Phase 4 wires + // real JWKS validation. + OIDCIssuer string + OIDCClientID string +} + +func LoadFromEnv() (*Config, error) { + c := &Config{ + Mode: getenv("ENV_MODE", "dev"), + HTTPAddr: getenv("HTTP_ADDR", "0.0.0.0:8090"), + CORSAllowOrigins: splitCSV(getenv("CORS_ALLOW_ORIGINS", "*")), + DBHost: getenv("DB_HOST", "127.0.0.1"), + DBPort: getenv("DB_PORT", "3306"), + DBName: getenv("DB_NAME", "dialectic"), + DBUser: getenv("DB_USER", "dialectic"), + DBPassword: os.Getenv("DB_PASSWORD"), + SystemAPIKey: os.Getenv("SYSTEM_API_KEY"), + AgentAPIKeyPepper: os.Getenv("AGENT_API_KEY_PEPPER"), + OIDCDevBypassToken: os.Getenv("OIDC_DEV_BYPASS_TOKEN"), + OIDCIssuer: os.Getenv("OIDC_ISSUER"), + OIDCClientID: os.Getenv("OIDC_CLIENT_ID"), + } + + if c.Mode != "dev" && c.Mode != "prod" { + return nil, fmt.Errorf("ENV_MODE must be dev|prod, got %q", c.Mode) + } + + if c.Mode == "prod" { + var missing []string + if c.DBPassword == "" { + missing = append(missing, "DB_PASSWORD") + } + if c.AgentAPIKeyPepper == "" { + missing = append(missing, "AGENT_API_KEY_PEPPER") + } + if c.OIDCIssuer == "" { + missing = append(missing, "OIDC_ISSUER") + } + if c.OIDCClientID == "" { + missing = append(missing, "OIDC_CLIENT_ID") + } + if len(missing) > 0 { + return nil, fmt.Errorf("prod mode requires env: %s", strings.Join(missing, ", ")) + } + // In prod, "*" CORS is never accepted. + for _, o := range c.CORSAllowOrigins { + if o == "*" { + return nil, fmt.Errorf("prod mode forbids CORS_ALLOW_ORIGINS='*'") + } + } + } + + return c, nil +} + +func (c *Config) IsDev() bool { return c.Mode == "dev" } + +func (c *Config) DSN() string { + // MySQL DSN: user:pass@tcp(host:port)/dbname?params + return fmt.Sprintf( + "%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4&collation=utf8mb4_unicode_ci", + c.DBUser, c.DBPassword, c.DBHost, c.DBPort, c.DBName, + ) +} + +func getenv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func splitCSV(s string) []string { + if s == "" { + return nil + } + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + p = strings.TrimSpace(p) + if p != "" { + out = append(out, p) + } + } + return out +} diff --git a/internal/db/db.go b/internal/db/db.go new file mode 100644 index 0000000..48e8d24 --- /dev/null +++ b/internal/db/db.go @@ -0,0 +1,119 @@ +// Package db wraps sqlx and runs embedded SQL migrations on startup. +// +// Migrations are flat files in migrations/, named NNN_*.sql. They run in +// lexical order. Each is executed in its own transaction; a missing +// schema_migrations row indicates "not yet applied". This is a +// deliberately simple migration runner — for this project's size + team +// size, pulling in golang-migrate or atlas adds complexity without +// payback. If migration count grows past ~20, revisit. +package db + +import ( + "context" + "database/sql" + "embed" + "fmt" + "sort" + "strings" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" +) + +//go:embed migrations/*.sql +var migrationsFS embed.FS + +func Open(ctx context.Context, dsn string) (*sqlx.DB, error) { + d, err := sqlx.ConnectContext(ctx, "mysql", dsn) + if err != nil { + return nil, fmt.Errorf("connect mysql: %w", err) + } + d.SetMaxOpenConns(25) + d.SetMaxIdleConns(5) + d.SetConnMaxLifetime(5 * time.Minute) + return d, nil +} + +// RunMigrations applies any migrations that aren't yet present in the +// schema_migrations table. Idempotent — safe to call on every startup. +func RunMigrations(ctx context.Context, d *sqlx.DB) error { + // Bootstrap the tracker table itself. + if _, err := d.ExecContext(ctx, ` + CREATE TABLE IF NOT EXISTS schema_migrations ( + name VARCHAR(255) PRIMARY KEY, + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`); err != nil { + return fmt.Errorf("ensure schema_migrations: %w", err) + } + + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + return fmt.Errorf("list migrations: %w", err) + } + var files []string + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") { + files = append(files, e.Name()) + } + } + sort.Strings(files) + + for _, name := range files { + var found string + err := d.GetContext(ctx, &found, `SELECT name FROM schema_migrations WHERE name = ?`, name) + if err == nil { + continue // already applied + } + if err != sql.ErrNoRows { + return fmt.Errorf("check migration %s: %w", name, err) + } + + content, err := migrationsFS.ReadFile("migrations/" + name) + if err != nil { + return fmt.Errorf("read migration %s: %w", name, err) + } + + // MySQL doesn't support multi-statement in a single Exec by default + // — split on ';' boundaries and run each individually. Comments are + // passed through (server-side parser handles). + statements := splitSQL(string(content)) + + tx, err := d.BeginTxx(ctx, nil) + if err != nil { + return fmt.Errorf("tx for %s: %w", name, err) + } + for _, stmt := range statements { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + if _, err := tx.ExecContext(ctx, stmt); err != nil { + _ = tx.Rollback() + return fmt.Errorf("apply %s [statement: %q]: %w", name, firstLine(stmt), err) + } + } + if _, err := tx.ExecContext(ctx, `INSERT INTO schema_migrations(name) VALUES (?)`, name); err != nil { + _ = tx.Rollback() + return fmt.Errorf("record %s: %w", name, err) + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit %s: %w", name, err) + } + } + return nil +} + +func splitSQL(s string) []string { + // Crude but adequate for our migrations (no string-literal semicolons). + // If we ever need to embed semicolons inside strings, switch to a + // proper SQL tokenizer. + return strings.Split(s, ";") +} + +func firstLine(s string) string { + if i := strings.IndexByte(s, '\n'); i >= 0 { + return s[:i] + } + return s +} diff --git a/internal/db/migrations/001_init.sql b/internal/db/migrations/001_init.sql new file mode 100644 index 0000000..2ced584 --- /dev/null +++ b/internal/db/migrations/001_init.sql @@ -0,0 +1,141 @@ +-- 001_init.sql — Dialectic v2 schema (greenfield, replaces Python v1). +-- See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md for the design. + +-- Verdict schemas — declared at topic-creation time; judge produces output matching. +CREATE TABLE verdict_schemas ( + id VARCHAR(64) NOT NULL PRIMARY KEY, + description TEXT NOT NULL, + shape_json JSON NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Seed v1 schemas. +INSERT INTO verdict_schemas (id, description, shape_json) VALUES + ('binary', 'pro|con|draw with confidence + key reasoning', JSON_OBJECT('decision', 'pro|con|draw', 'confidence', 'number 0..1', 'key_reasoning', 'string')), + ('claim-resolution', 'analyze-intel contested-cluster resolution', JSON_OBJECT('verdict', 'resolved-toward-A|resolved-toward-B|irreducibly-contested', 'winning_claim', 'string', 'dissenting_points', 'array of string', 'confidence', 'number 0..1')), + ('policy-recommendation', 'recommended action with alternatives and risks', JSON_OBJECT('recommended_action', 'string', 'alternatives', 'array of string', 'conditions_for_alternatives', 'array of string', 'risks_noted', 'array of string')), + ('free-form', 'unstructured summary escape hatch', JSON_OBJECT('summary', 'string')); + +-- Topics (议题) — the unit of debate. +CREATE TABLE topics ( + id CHAR(36) NOT NULL PRIMARY KEY, + title VARCHAR(255) NOT NULL, + summary TEXT NOT NULL, + visibility ENUM('public','private') NOT NULL DEFAULT 'private', + verdict_schema_id VARCHAR(64) NOT NULL, + status ENUM('created','signup_open','signup_closed','debating','completed','cancelled') NOT NULL DEFAULT 'created', + -- Lifecycle timestamps (per section 3 of design doc) + signup_open_at TIMESTAMP NOT NULL, + signup_close_at TIMESTAMP NOT NULL, + debate_start_at TIMESTAMP NOT NULL, + debate_end_at TIMESTAMP NOT NULL, + -- Audit + creator_user_id CHAR(36) NOT NULL, + visibility_changed_by CHAR(36) NULL, + visibility_changed_at TIMESTAMP NULL, + cancelled_reason VARCHAR(255) NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_topics_status (status, signup_open_at), + INDEX idx_topics_visibility (visibility, created_at), + CONSTRAINT fk_topics_schema FOREIGN KEY (verdict_schema_id) REFERENCES verdict_schemas(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Signups: an agent volunteers for one or more camps on a topic. +-- willing_camps is a JSON array of camp names (subset of {pro, con, judge}). +-- (agent_id, topic_id) is unique — re-signup updates willing_camps. +CREATE TABLE signups ( + id CHAR(36) NOT NULL PRIMARY KEY, + topic_id CHAR(36) NOT NULL, + agent_id VARCHAR(64) NOT NULL, + willing_camps JSON NOT NULL, + -- Pre-validation result captured at signup time (plugin verifies the + -- agent has an on_call slot covering the debate window; backend + -- records what the agent told it for audit). + pre_validated BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uq_signups (topic_id, agent_id), + CONSTRAINT fk_signups_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Camps: the post-allocation assignment. One row per (topic, camp) with +-- the locked-in agent. Written by camp-allocation algorithm at +-- signup_close_at; immutable afterwards (no drop-out / replacement in v1). +CREATE TABLE camps ( + id CHAR(36) NOT NULL PRIMARY KEY, + topic_id CHAR(36) NOT NULL, + camp ENUM('pro','con','judge') NOT NULL, + agent_id VARCHAR(64) NOT NULL, + allocated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE KEY uq_camps (topic_id, camp), + INDEX idx_camps_agent (agent_id), + CONSTRAINT fk_camps_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Rounds: chronological partition of arguments. Each topic has N rounds +-- (typically 3-5); round 0 is the opening. Round transitions are driven +-- by the orchestrator on a schedule (or all-participants-posted). +CREATE TABLE rounds ( + id CHAR(36) NOT NULL PRIMARY KEY, + topic_id CHAR(36) NOT NULL, + round_no INT NOT NULL, + opened_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + closed_at TIMESTAMP NULL, + UNIQUE KEY uq_rounds (topic_id, round_no), + CONSTRAINT fk_rounds_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Arguments: an individual contribution within a round by a camp's agent. +-- For pro/con these are claims/rebuttals; for judge these are clarifying +-- questions (judge is silent observer in v1 except for clarifications). +CREATE TABLE arguments ( + id CHAR(36) NOT NULL PRIMARY KEY, + topic_id CHAR(36) NOT NULL, + round_id CHAR(36) NOT NULL, + camp ENUM('pro','con','judge') NOT NULL, + agent_id VARCHAR(64) NOT NULL, + content MEDIUMTEXT NOT NULL, + posted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + INDEX idx_arguments_round (round_id, posted_at), + INDEX idx_arguments_topic (topic_id, posted_at), + CONSTRAINT fk_arguments_round FOREIGN KEY (round_id) REFERENCES rounds(id) ON DELETE CASCADE, + CONSTRAINT fk_arguments_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Verdicts: judge's structured output, one per topic (one verdict per +-- debate). verdict_json shape matches the topic's verdict_schema_id. +CREATE TABLE verdicts ( + id CHAR(36) NOT NULL PRIMARY KEY, + topic_id CHAR(36) NOT NULL UNIQUE, + judge_agent_id VARCHAR(64) NOT NULL, + verdict_json JSON NOT NULL, + rationale TEXT NOT NULL, + -- Token cost trail for accounting (Phase 1: not enforced; Phase N: budget gate) + tokens_input INT NOT NULL DEFAULT 0, + tokens_output INT NOT NULL DEFAULT 0, + produced_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT fk_verdicts_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Agent API keys: provisioned per agent at recruitment time. Stored as +-- sha256(pepper || raw); pepper rotation invalidates all keys. +CREATE TABLE agent_keys ( + agent_id VARCHAR(64) NOT NULL PRIMARY KEY, + key_hash CHAR(64) NOT NULL UNIQUE, + issued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_used_at TIMESTAMP NULL, + revoked_at TIMESTAMP NULL, + INDEX idx_agent_keys_hash (key_hash) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- System keys: out-of-band credentials for non-agent callers (e.g. the +-- analyze-intel workflow running via a system identity that creates +-- topics on behalf of the analyzing agent). Also stored as hash. +CREATE TABLE system_keys ( + name VARCHAR(64) NOT NULL PRIMARY KEY, + key_hash CHAR(64) NOT NULL UNIQUE, + description TEXT NULL, + issued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + revoked_at TIMESTAMP NULL, + INDEX idx_system_keys_hash (key_hash) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/internal/httpapi/handlers/health.go b/internal/httpapi/handlers/health.go new file mode 100644 index 0000000..5689542 --- /dev/null +++ b/internal/httpapi/handlers/health.go @@ -0,0 +1,38 @@ +package handlers + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/jmoiron/sqlx" +) + +type HealthHandler struct { + db *sqlx.DB + version string + startedAt time.Time +} + +func NewHealthHandler(db *sqlx.DB, version string) *HealthHandler { + return &HealthHandler{db: db, version: version, startedAt: time.Now()} +} + +func (h *HealthHandler) Healthz(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second) + defer cancel() + dbOK := h.db.PingContext(ctx) == nil + status := http.StatusOK + if !dbOK { + status = http.StatusServiceUnavailable + } + w.Header().Set("content-type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(map[string]any{ + "ok": dbOK, + "version": h.version, + "uptime_s": int(time.Since(h.startedAt).Seconds()), + "checked_at": time.Now().UTC().Format(time.RFC3339), + }) +} diff --git a/internal/httpapi/handlers/signups.go b/internal/httpapi/handlers/signups.go new file mode 100644 index 0000000..f9b38ba --- /dev/null +++ b/internal/httpapi/handlers/signups.go @@ -0,0 +1,131 @@ +package handlers + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" +) + +type SignupsHandler struct { + topics *store.TopicStore + signups *store.SignupStore +} + +func NewSignupsHandler(t *store.TopicStore, s *store.SignupStore) *SignupsHandler { + return &SignupsHandler{topics: t, signups: s} +} + +type signupBody struct { + WillingCamps []models.Camp `json:"willing_camps"` + PreValidated bool `json:"pre_validated"` +} + +// POST /api/topics/{id}/signups +// +// Agent self-enrollment. Only CallerAgent is allowed — browsers can't +// sign up on behalf of an agent (would defeat the on_call pre-check +// that the plugin does before calling this endpoint). +// +// Body: { willing_camps: [pro|con|judge ...], pre_validated: bool } +// +// pre_validated is the agent's plugin's claim that it verified the +// agent has an on_call HF slot covering [debate_start_at, debate_end_at]. +// Backend trusts but logs — Phase N may add server-side verification. +// +// Topic must be in status `signup_open`. Outside that window → 409. +func (h *SignupsHandler) Create(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + if caller.Kind != auth.CallerAgent { + http.Error(w, "signup is agent-only", http.StatusForbidden) + return + } + topicID := chi.URLParam(r, "id") + t, err := h.topics.GetByID(r.Context(), topicID) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "lookup failed", http.StatusInternalServerError) + return + } + if t.Status != models.TopicStatusSignupOpen { + http.Error(w, "signup window not open (status="+string(t.Status)+")", http.StatusConflict) + return + } + now := time.Now() + if now.Before(t.SignupOpenAt) { + http.Error(w, "signup not yet open", http.StatusConflict) + return + } + if now.After(t.SignupCloseAt) { + http.Error(w, "signup closed", http.StatusConflict) + return + } + + var body signupBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad body", http.StatusBadRequest) + return + } + if len(body.WillingCamps) == 0 { + http.Error(w, "willing_camps required", http.StatusBadRequest) + return + } + // Dedup the camps so a buggy plugin can't insert duplicates. + camps := dedupCamps(body.WillingCamps) + view, err := h.signups.Upsert(r.Context(), store.UpsertSignupInput{ + TopicID: topicID, + AgentID: caller.ID, + WillingCamps: camps, + PreValidated: body.PreValidated, + }) + if err != nil { + http.Error(w, "upsert failed: "+err.Error(), http.StatusBadRequest) + return + } + writeJSON(w, http.StatusCreated, view) +} + +// GET /api/topics/{id}/signups — list all signups for a topic. +// +// Visible to topic creator + admins + agents. Public anonymous see +// nothing (avoid leaking who-signed-up for private topics). +func (h *SignupsHandler) List(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + if caller.Kind == "" { + http.Error(w, "auth required", http.StatusUnauthorized) + return + } + topicID := chi.URLParam(r, "id") + if _, err := h.topics.GetByID(r.Context(), topicID); err != nil { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + rows, err := h.signups.ListByTopic(r.Context(), topicID) + if err != nil { + http.Error(w, "list failed", http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, map[string]any{"signups": rows, "count": len(rows)}) +} + +func dedupCamps(in []models.Camp) []models.Camp { + seen := map[models.Camp]struct{}{} + out := make([]models.Camp, 0, len(in)) + for _, c := range in { + if _, ok := seen[c]; ok { + continue + } + seen[c] = struct{}{} + out = append(out, c) + } + return out +} diff --git a/internal/httpapi/handlers/topics.go b/internal/httpapi/handlers/topics.go new file mode 100644 index 0000000..4ce7a7e --- /dev/null +++ b/internal/httpapi/handlers/topics.go @@ -0,0 +1,221 @@ +package handlers + +import ( + "encoding/json" + "errors" + "net/http" + "strconv" + "time" + + "github.com/go-chi/chi/v5" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" +) + +type TopicsHandler struct { + store *store.TopicStore +} + +func NewTopicsHandler(s *store.TopicStore) *TopicsHandler { return &TopicsHandler{store: s} } + +// GET /api/topics?status=...&visibility=...&limit=...&offset=... +// +// Visibility filter is enforced at the auth layer: anonymous callers +// only see visibility=public; authenticated users (CallerUser) see all +// they're entitled to (Phase 2 v1: all; Phase 4 may add per-user ACLs). +// Agent callers (CallerAgent) see all — they're acting as system on +// behalf of the platform. +func (h *TopicsHandler) List(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + f := store.ListFilter{ + Status: r.URL.Query().Get("status"), + Visibility: r.URL.Query().Get("visibility"), + } + if v, _ := strconv.Atoi(r.URL.Query().Get("limit")); v > 0 { + f.Limit = v + } + if v, _ := strconv.Atoi(r.URL.Query().Get("offset")); v > 0 { + f.Offset = v + } + + // Anonymous: force visibility=public. + if caller.Kind == "" { + f.Visibility = string(models.VisibilityPublic) + } + + rows, err := h.store.List(r.Context(), f) + if err != nil { + http.Error(w, "list failed: "+err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, map[string]any{"topics": rows, "count": len(rows)}) +} + +// GET /api/topics/{id} +func (h *TopicsHandler) Get(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + if id == "" { + http.Error(w, "missing id", http.StatusBadRequest) + return + } + t, err := h.store.GetByID(r.Context(), id) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "get failed: "+err.Error(), http.StatusInternalServerError) + return + } + // Visibility gate: anonymous can only see public; authenticated see all. + caller := auth.FromContext(r.Context()) + if caller.Kind == "" && t.Visibility != models.VisibilityPublic { + http.Error(w, "not found", http.StatusNotFound) // 404 not 403 — hide existence + return + } + writeJSON(w, http.StatusOK, t) +} + +type createTopicBody struct { + Title string `json:"title"` + Summary string `json:"summary"` + Visibility string `json:"visibility"` // default "private" + VerdictSchemaID string `json:"verdict_schema_id"` // default "free-form" + SignupOpenAt string `json:"signup_open_at"` // RFC3339 + SignupCloseAt string `json:"signup_close_at"` + DebateStartAt string `json:"debate_start_at"` + DebateEndAt string `json:"debate_end_at"` +} + +// POST /api/topics +// +// Allowed callers: agent or authenticated user. Anonymous rejected +// (the route wires the auth-required middleware). +func (h *TopicsHandler) Create(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + if caller.Kind == "" { + http.Error(w, "auth required", http.StatusUnauthorized) + return + } + var body createTopicBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad body: "+err.Error(), http.StatusBadRequest) + return + } + if body.Title == "" || body.Summary == "" { + http.Error(w, "title and summary required", http.StatusBadRequest) + return + } + if body.Visibility == "" { + body.Visibility = string(models.VisibilityPrivate) + } + if body.VerdictSchemaID == "" { + body.VerdictSchemaID = "free-form" + } + if err := validateLifecycleTimes(body); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + created, err := h.store.Create(r.Context(), store.CreateTopicInput{ + Title: body.Title, + Summary: body.Summary, + Visibility: models.Visibility(body.Visibility), + VerdictSchemaID: body.VerdictSchemaID, + SignupOpenAt: body.SignupOpenAt, + SignupCloseAt: body.SignupCloseAt, + DebateStartAt: body.DebateStartAt, + DebateEndAt: body.DebateEndAt, + CreatorUserID: caller.ID, + }) + if err != nil { + http.Error(w, "create failed: "+err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusCreated, created) +} + +// validateLifecycleTimes enforces: +// +// signup_open < signup_close <= debate_start < debate_end +// +// All four timestamps must be parsable as RFC3339; failure → 400. +func validateLifecycleTimes(b createTopicBody) error { + type p struct { + name string + raw string + } + parts := []p{ + {"signup_open_at", b.SignupOpenAt}, + {"signup_close_at", b.SignupCloseAt}, + {"debate_start_at", b.DebateStartAt}, + {"debate_end_at", b.DebateEndAt}, + } + parsed := make([]time.Time, 4) + for i, x := range parts { + t, err := time.Parse(time.RFC3339, x.raw) + if err != nil { + return errors.New(x.name + ": must be RFC3339") + } + parsed[i] = t + } + if !parsed[0].Before(parsed[1]) { + return errors.New("signup_open_at must be before signup_close_at") + } + if parsed[1].After(parsed[2]) { + return errors.New("signup_close_at must be <= debate_start_at") + } + if !parsed[2].Before(parsed[3]) { + return errors.New("debate_start_at must be before debate_end_at") + } + return nil +} + +// PUT /api/topics/{id}/visibility — admin-only flip (Phase 2 stub: any +// authenticated user; Phase 4 will check the dialectic-admin role from JWT). +func (h *TopicsHandler) SetVisibility(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + if caller.Kind == "" { + http.Error(w, "auth required", http.StatusUnauthorized) + return + } + if caller.Kind == auth.CallerUser && !hasRole(caller, "dialectic-admin") { + http.Error(w, "dialectic-admin role required", http.StatusForbidden) + return + } + id := chi.URLParam(r, "id") + var body struct { + Visibility string `json:"visibility"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad body", http.StatusBadRequest) + return + } + v := models.Visibility(body.Visibility) + if v != models.VisibilityPublic && v != models.VisibilityPrivate { + http.Error(w, "visibility must be public|private", http.StatusBadRequest) + return + } + t, err := h.store.SetVisibility(r.Context(), id, v, caller.ID) + if err != nil { + http.Error(w, "update failed: "+err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, t) +} + +func hasRole(c auth.Caller, role string) bool { + for _, r := range c.Roles { + if r == role { + return true + } + } + return false +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("content-type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} diff --git a/internal/httpapi/routes.go b/internal/httpapi/routes.go new file mode 100644 index 0000000..e2bc297 --- /dev/null +++ b/internal/httpapi/routes.go @@ -0,0 +1,177 @@ +package httpapi + +import ( + "net/http" + "time" + + "github.com/go-chi/chi/v5" + chimw "github.com/go-chi/chi/v5/middleware" + "github.com/go-chi/cors" + "github.com/jmoiron/sqlx" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi/handlers" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" +) + +// Mount returns the root router with all v2 endpoints wired. Owners of +// individual middleware chains: +// +// - /api/healthz : public (no auth) +// - /api/topics : mixed — list/get optional auth (anon +// sees public only); create requires CallerAgent or CallerUser +// - /api/topics/{id}/signups : agent-only (CallerAgent) +// +// Browser-side OIDC and agent-side bearer middlewares co-exist on the +// same route by being "optional auth" — if either succeeds, Caller is +// attached; otherwise the handler sees anonymous and decides whether +// to 401 or fall through to public behavior. +func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler { + r := chi.NewRouter() + + // Boilerplate middleware — these run on every request. + r.Use(chimw.RealIP) + r.Use(chimw.RequestID) + r.Use(chimw.Logger) + r.Use(chimw.Recoverer) + r.Use(chimw.Timeout(30 * time.Second)) + r.Use(cors.Handler(cors.Options{ + AllowedOrigins: cfg.CORSAllowOrigins, + AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, + AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "x-dev-bypass"}, + ExposedHeaders: []string{}, + AllowCredentials: true, + MaxAge: 300, + })) + + // Auth middlewares — composed as "try agent, then user, else pass anonymous". + optionalAuth := optionalAuthChain(db, cfg) + requireAgent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper) // strict bearer + requireAnyAuth := requireAnyAuthChain(db, cfg) + + // Handler instances. + topicStore := store.NewTopicStore(db) + signupStore := store.NewSignupStore(db) + health := handlers.NewHealthHandler(db, version) + topicsH := handlers.NewTopicsHandler(topicStore) + signupsH := handlers.NewSignupsHandler(topicStore, signupStore) + + // Routes. + r.Route("/api", func(r chi.Router) { + r.Get("/healthz", health.Healthz) + + // Topics: list+get optional-auth (visibility-gated by handler); + // create+visibility-flip require any auth. + r.Group(func(r chi.Router) { + r.Use(optionalAuth) + r.Get("/topics", topicsH.List) + r.Get("/topics/{id}", topicsH.Get) + }) + r.Group(func(r chi.Router) { + r.Use(requireAnyAuth) + r.Post("/topics", topicsH.Create) + r.Put("/topics/{id}/visibility", topicsH.SetVisibility) + }) + + // Signups: agent-only. + r.Group(func(r chi.Router) { + r.Use(requireAgent) + r.Post("/topics/{id}/signups", signupsH.Create) + }) + // List signups: any authenticated caller. + r.Group(func(r chi.Router) { + r.Use(requireAnyAuth) + r.Get("/topics/{id}/signups", signupsH.List) + }) + }) + + return r +} + +// optionalAuthChain: if either auth method succeeds, attach Caller; +// otherwise let the request through anonymous. Handlers decide what +// to do with anonymous (typically: serve public subset, hide private). +func optionalAuthChain(db *sqlx.DB, cfg *config.Config) func(http.Handler) http.Handler { + agent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper) + oidc := auth.OIDCBrowser(cfg.IsDev(), cfg.OIDCDevBypassToken) + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Bearer present → try agent path; on success it ServeHTTPs next. + // On failure it 401s, which we want to demote to "anonymous" for + // optional auth. The pattern is: capture the response; if it's + // 401, fall through to OIDC; if OIDC also 401s, finally fall + // through to next (anonymous). + if r.Header.Get("authorization") != "" { + rw := &captureWriter{ResponseWriter: w} + agent(next).ServeHTTP(rw, r) + if rw.status != http.StatusUnauthorized { + return + } + // reset captured state and try anon path (since OIDC + // won't apply if there's no cookie / bypass header) + } + if r.Header.Get("x-dev-bypass") != "" { + rw := &captureWriter{ResponseWriter: w} + oidc(next).ServeHTTP(rw, r) + if rw.status != http.StatusUnauthorized { + return + } + } + // Anonymous — call next with no Caller attached. + next.ServeHTTP(w, r) + }) + } +} + +// requireAnyAuthChain: 401 if neither agent nor user auth succeeds. +func requireAnyAuthChain(db *sqlx.DB, cfg *config.Config) func(http.Handler) http.Handler { + agent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper) + oidc := auth.OIDCBrowser(cfg.IsDev(), cfg.OIDCDevBypassToken) + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("authorization") != "" { + rw := &captureWriter{ResponseWriter: w} + agent(next).ServeHTTP(rw, r) + if rw.status != http.StatusUnauthorized { + return + } + } + oidc(next).ServeHTTP(w, r) + }) + } +} + +// captureWriter records the status so the optional-auth chain can +// distinguish "401 from inner middleware (try next)" from "actual +// response from handler (deliver)". Body bytes are passed through +// when status != 401. +type captureWriter struct { + http.ResponseWriter + status int + wroteHeader bool + suppressing bool +} + +func (c *captureWriter) WriteHeader(s int) { + c.status = s + c.wroteHeader = true + if s == http.StatusUnauthorized { + // don't actually write — we may fall through + c.suppressing = true + return + } + c.ResponseWriter.WriteHeader(s) +} + +func (c *captureWriter) Write(b []byte) (int, error) { + if c.suppressing { + // swallow; caller will fall through to next chain step + return len(b), nil + } + if !c.wroteHeader { + c.ResponseWriter.WriteHeader(http.StatusOK) + c.wroteHeader = true + } + return c.ResponseWriter.Write(b) +} diff --git a/internal/models/signup.go b/internal/models/signup.go new file mode 100644 index 0000000..423f636 --- /dev/null +++ b/internal/models/signup.go @@ -0,0 +1,37 @@ +package models + +import "time" + +type Signup struct { + ID string `db:"id" json:"id"` + TopicID string `db:"topic_id" json:"topic_id"` + AgentID string `db:"agent_id" json:"agent_id"` + WillingCamps []byte `db:"willing_camps" json:"-"` // JSON column; surface as typed via View() + PreValidated bool `db:"pre_validated" json:"pre_validated"` + CreatedAt time.Time `db:"created_at" json:"created_at"` +} + +// SignupView is the JSON-friendly projection that decodes WillingCamps. +type SignupView struct { + ID string `json:"id"` + TopicID string `json:"topic_id"` + AgentID string `json:"agent_id"` + WillingCamps []Camp `json:"willing_camps"` + PreValidated bool `json:"pre_validated"` + CreatedAt time.Time `json:"created_at"` +} + +func (s *Signup) View() (SignupView, error) { + var camps SignupCampsJSON + if err := camps.UnmarshalDB(s.WillingCamps); err != nil { + return SignupView{}, err + } + return SignupView{ + ID: s.ID, + TopicID: s.TopicID, + AgentID: s.AgentID, + WillingCamps: camps, + PreValidated: s.PreValidated, + CreatedAt: s.CreatedAt, + }, nil +} diff --git a/internal/models/topic.go b/internal/models/topic.go new file mode 100644 index 0000000..c99e827 --- /dev/null +++ b/internal/models/topic.go @@ -0,0 +1,78 @@ +package models + +import ( + "encoding/json" + "time" +) + +type Visibility string + +const ( + VisibilityPublic Visibility = "public" + VisibilityPrivate Visibility = "private" +) + +type TopicStatus string + +const ( + TopicStatusCreated TopicStatus = "created" + TopicStatusSignupOpen TopicStatus = "signup_open" + TopicStatusSignupClosed TopicStatus = "signup_closed" + TopicStatusDebating TopicStatus = "debating" + TopicStatusCompleted TopicStatus = "completed" + TopicStatusCancelled TopicStatus = "cancelled" +) + +type Camp string + +const ( + CampPro Camp = "pro" + CampCon Camp = "con" + CampJudge Camp = "judge" +) + +// AllCamps is the canonical iteration order used by the allocation algorithm. +var AllCamps = [3]Camp{CampPro, CampCon, CampJudge} + +type Topic struct { + ID string `db:"id" json:"id"` + Title string `db:"title" json:"title"` + Summary string `db:"summary" json:"summary"` + Visibility Visibility `db:"visibility" json:"visibility"` + VerdictSchemaID string `db:"verdict_schema_id" json:"verdict_schema_id"` + Status TopicStatus `db:"status" json:"status"` + SignupOpenAt time.Time `db:"signup_open_at" json:"signup_open_at"` + SignupCloseAt time.Time `db:"signup_close_at" json:"signup_close_at"` + DebateStartAt time.Time `db:"debate_start_at" json:"debate_start_at"` + DebateEndAt time.Time `db:"debate_end_at" json:"debate_end_at"` + CreatorUserID string `db:"creator_user_id" json:"creator_user_id"` + VisibilityChangedBy *string `db:"visibility_changed_by" json:"visibility_changed_by,omitempty"` + VisibilityChangedAt *time.Time `db:"visibility_changed_at" json:"visibility_changed_at,omitempty"` + CancelledReason *string `db:"cancelled_reason" json:"cancelled_reason,omitempty"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + UpdatedAt time.Time `db:"updated_at" json:"updated_at"` +} + +// IsCampValid returns true iff c is one of pro|con|judge. +func IsCampValid(c Camp) bool { + for _, k := range AllCamps { + if k == c { + return true + } + } + return false +} + +// SignupCampsJSON is a typed wrapper around the JSON-stored willing_camps +// column. We marshal/unmarshal at the boundary so handlers can work with +// the typed slice. +type SignupCampsJSON []Camp + +func (s SignupCampsJSON) Marshal() ([]byte, error) { return json.Marshal(s) } +func (s *SignupCampsJSON) UnmarshalDB(raw []byte) error { + if len(raw) == 0 { + *s = nil + return nil + } + return json.Unmarshal(raw, s) +} diff --git a/internal/store/signup_store.go b/internal/store/signup_store.go new file mode 100644 index 0000000..fbceb65 --- /dev/null +++ b/internal/store/signup_store.go @@ -0,0 +1,95 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" +) + +type SignupStore struct { + db *sqlx.DB +} + +func NewSignupStore(db *sqlx.DB) *SignupStore { return &SignupStore{db: db} } + +type UpsertSignupInput struct { + TopicID string + AgentID string + WillingCamps []models.Camp + PreValidated bool +} + +// Upsert creates or updates an agent's signup for a topic. Re-signup +// replaces willing_camps (intentional: lets an agent change their mind +// before signup_close_at). +func (s *SignupStore) Upsert(ctx context.Context, in UpsertSignupInput) (*models.SignupView, error) { + if len(in.WillingCamps) == 0 { + return nil, fmt.Errorf("willing_camps must be non-empty") + } + for _, c := range in.WillingCamps { + if !models.IsCampValid(c) { + return nil, fmt.Errorf("invalid camp %q", c) + } + } + raw, err := json.Marshal(in.WillingCamps) + if err != nil { + return nil, err + } + + // Try insert; on duplicate (topic, agent), update. + id := uuid.NewString() + _, err = s.db.ExecContext(ctx, ` + INSERT INTO signups (id, topic_id, agent_id, willing_camps, pre_validated) + VALUES (?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + willing_camps = VALUES(willing_camps), + pre_validated = VALUES(pre_validated)`, + id, in.TopicID, in.AgentID, raw, in.PreValidated) + if err != nil { + return nil, fmt.Errorf("upsert signup: %w", err) + } + return s.GetByPair(ctx, in.TopicID, in.AgentID) +} + +func (s *SignupStore) GetByPair(ctx context.Context, topicID, agentID string) (*models.SignupView, error) { + var row models.Signup + err := s.db.GetContext(ctx, &row, + `SELECT * FROM signups WHERE topic_id = ? AND agent_id = ?`, topicID, agentID) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + v, err := row.View() + if err != nil { + return nil, err + } + return &v, nil +} + +// ListByTopic returns all signups for a topic. Used by the allocation +// algorithm at signup_close_at and by the topic-detail UI. +func (s *SignupStore) ListByTopic(ctx context.Context, topicID string) ([]models.SignupView, error) { + var rows []models.Signup + if err := s.db.SelectContext(ctx, &rows, + `SELECT * FROM signups WHERE topic_id = ? ORDER BY created_at ASC`, topicID); err != nil { + return nil, err + } + out := make([]models.SignupView, 0, len(rows)) + for _, r := range rows { + v, err := r.View() + if err != nil { + return nil, err + } + out = append(out, v) + } + return out, nil +} diff --git a/internal/store/topic_store.go b/internal/store/topic_store.go new file mode 100644 index 0000000..833e191 --- /dev/null +++ b/internal/store/topic_store.go @@ -0,0 +1,106 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" +) + +var ErrNotFound = errors.New("not found") + +type TopicStore struct { + db *sqlx.DB +} + +func NewTopicStore(db *sqlx.DB) *TopicStore { return &TopicStore{db: db} } + +type CreateTopicInput struct { + Title string + Summary string + Visibility models.Visibility + VerdictSchemaID string + SignupOpenAt string // RFC3339; parsed by SQL + SignupCloseAt string + DebateStartAt string + DebateEndAt string + CreatorUserID string +} + +func (s *TopicStore) Create(ctx context.Context, in CreateTopicInput) (*models.Topic, error) { + id := uuid.NewString() + _, err := s.db.ExecContext(ctx, ` + INSERT INTO topics (id, title, summary, visibility, verdict_schema_id, + signup_open_at, signup_close_at, debate_start_at, debate_end_at, creator_user_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + id, in.Title, in.Summary, in.Visibility, in.VerdictSchemaID, + in.SignupOpenAt, in.SignupCloseAt, in.DebateStartAt, in.DebateEndAt, in.CreatorUserID) + if err != nil { + return nil, fmt.Errorf("insert topic: %w", err) + } + return s.GetByID(ctx, id) +} + +func (s *TopicStore) GetByID(ctx context.Context, id string) (*models.Topic, error) { + var t models.Topic + err := s.db.GetContext(ctx, &t, `SELECT * FROM topics WHERE id = ?`, id) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return &t, nil +} + +type ListFilter struct { + Status string // empty = all + Visibility string // empty = all + Limit int // 0 = default 50 + Offset int +} + +func (s *TopicStore) List(ctx context.Context, f ListFilter) ([]models.Topic, error) { + if f.Limit <= 0 || f.Limit > 200 { + f.Limit = 50 + } + q := "SELECT * FROM topics" + args := []any{} + var clauses []string + if f.Status != "" { + clauses = append(clauses, "status = ?") + args = append(args, f.Status) + } + if f.Visibility != "" { + clauses = append(clauses, "visibility = ?") + args = append(args, f.Visibility) + } + if len(clauses) > 0 { + q += " WHERE " + strings.Join(clauses, " AND ") + } + q += " ORDER BY created_at DESC LIMIT ? OFFSET ?" + args = append(args, f.Limit, f.Offset) + + var rows []models.Topic + if err := s.db.SelectContext(ctx, &rows, q, args...); err != nil { + return nil, err + } + return rows, nil +} + +// SetVisibility flips public/private; records who/when. Returns updated row. +func (s *TopicStore) SetVisibility(ctx context.Context, id string, v models.Visibility, byUserID string) (*models.Topic, error) { + _, err := s.db.ExecContext(ctx, ` + UPDATE topics SET visibility = ?, visibility_changed_by = ?, visibility_changed_at = CURRENT_TIMESTAMP + WHERE id = ?`, v, byUserID, id) + if err != nil { + return nil, err + } + return s.GetByID(ctx, id) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..12244af --- /dev/null +++ b/main.go @@ -0,0 +1,75 @@ +// Dialectic.Backend.Go — entrypoint. +// +// Greenfield Go rewrite of the Python v1 backend; agent-only debate +// platform per /home/hzhang/arch/DIALECTIC-V2-DESIGN.md. +// +// This file: load config → open db → run migrations → mount routes → +// serve until SIGINT/SIGTERM. Everything else lives in internal/. +package main + +import ( + "context" + "errors" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/db" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi" +) + +// Version is overridden at build time via -ldflags="-X main.Version=...". +var Version = "dev" + +func main() { + log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile) + + cfg, err := config.LoadFromEnv() + if err != nil { + log.Fatalf("config: %v", err) + } + log.Printf("starting dialectic-backend %s mode=%s addr=%s", Version, cfg.Mode, cfg.HTTPAddr) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conn, err := db.Open(ctx, cfg.DSN()) + if err != nil { + log.Fatalf("db open: %v", err) + } + defer conn.Close() + + if err := db.RunMigrations(ctx, conn); err != nil { + log.Fatalf("migrations: %v", err) + } + log.Printf("migrations: ok") + + srv := &http.Server{ + Addr: cfg.HTTPAddr, + Handler: httpapi.Mount(cfg, conn, Version), + ReadHeaderTimeout: 10 * time.Second, + } + + // Graceful shutdown on SIGINT/SIGTERM. + shutdown := make(chan os.Signal, 1) + signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) + go func() { + <-shutdown + log.Printf("shutdown signal received") + ctx2, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := srv.Shutdown(ctx2); err != nil { + log.Printf("http shutdown error: %v", err) + } + }() + + log.Printf("http server listening on %s", cfg.HTTPAddr) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("http serve: %v", err) + } + log.Printf("bye") +} diff --git a/middleware/__init__.py b/middleware/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/middleware/auth.py b/middleware/auth.py deleted file mode 100644 index e1b0cc4..0000000 --- a/middleware/auth.py +++ /dev/null @@ -1,114 +0,0 @@ -"""Keycloak JWT authentication middleware.""" - -import os -from fastapi import HTTPException, Request -from jose import jwt, JWTError, jwk -from jose.utils import base64url_decode -import httpx - -# Cache JWKS per (host, realm) to avoid fetching on every request -_jwks_cache: dict[str, dict] = {} - - -async def _get_jwks(kc_host: str, realm: str) -> dict: - cache_key = f"{kc_host}/{realm}" - if cache_key in _jwks_cache: - return _jwks_cache[cache_key] - - url = f"{kc_host}/realms/{realm}/protocol/openid-connect/certs" - async with httpx.AsyncClient(timeout=10) as client: - resp = await client.get(url) - if resp.status_code != 200: - raise HTTPException( - status_code=502, - detail=f"无法获取 Keycloak JWKS: HTTP {resp.status_code}", - ) - data = resp.json() - _jwks_cache[cache_key] = data - return data - - -def _find_rsa_key(jwks: dict, token: str) -> dict | None: - """Find the matching RSA key from JWKS for the token's kid.""" - unverified_header = jwt.get_unverified_header(token) - kid = unverified_header.get("kid") - for key in jwks.get("keys", []): - if key.get("kid") == kid: - return key - return None - - -async def verify_token(request: Request, kc_host: str, realm: str) -> dict: - """Extract and verify the Bearer JWT from the Authorization header. - - Returns the decoded payload on success. - Raises HTTPException(401) on missing/invalid token. - """ - auth_header = request.headers.get("Authorization", "") - if not auth_header.startswith("Bearer "): - raise HTTPException(status_code=401, detail="缺少 Authorization Bearer token") - - token = auth_header[7:] - - jwks = await _get_jwks(kc_host, realm) - rsa_key = _find_rsa_key(jwks, token) - if rsa_key is None: - # Clear cache in case keys rotated - _jwks_cache.pop(f"{kc_host}/{realm}", None) - jwks = await _get_jwks(kc_host, realm) - rsa_key = _find_rsa_key(jwks, token) - if rsa_key is None: - raise HTTPException(status_code=401, detail="无法匹配 JWT 签名密钥") - - try: - payload = jwt.decode( - token, - rsa_key, - algorithms=["RS256"], - options={"verify_aud": False}, # Keycloak audience varies by client - ) - return payload - except JWTError as e: - raise HTTPException(status_code=401, detail=f"JWT 验证失败: {e}") - - -async def require_auth(request: Request): - """Verify Bearer JWT for write endpoints. - - Dev mode: passthrough (no auth required). - Prod mode: validates JWT via Keycloak JWKS. - """ - if os.getenv("ENV_MODE", "dev") == "dev": - return None - - from app.services.config_service import ConfigService - config = ConfigService.load() - kc = config.get("keycloak", {}) - if not kc.get("host"): - return None # KC not configured – allow access - - return await verify_token(request, kc["host"], kc.get("realm", "")) - - -async def require_admin(request: Request, config: dict): - """Verify the request carries a valid Keycloak JWT with admin role. - - Raises HTTPException(401/403) on failure. - """ - kc = config.get("keycloak", {}) - kc_host = kc.get("host") - realm = kc.get("realm") - - if not kc_host or not realm: - raise HTTPException( - status_code=503, - detail="Keycloak 未配置,无法进行鉴权", - ) - - payload = await verify_token(request, kc_host, realm) - - roles = payload.get("realm_access", {}).get("roles", []) - if "admin" not in roles: - raise HTTPException(status_code=403, detail="需要 admin 角色") - - return payload diff --git a/middleware/config_guard.py b/middleware/config_guard.py deleted file mode 100644 index fb666d9..0000000 --- a/middleware/config_guard.py +++ /dev/null @@ -1,32 +0,0 @@ -from starlette.middleware.base import BaseHTTPMiddleware -from starlette.responses import JSONResponse - -from services.config_service import ConfigService - -# Paths that are always accessible, even when DB is not configured -_ALLOWED_PREFIXES = ("/api/setup", "/docs", "/openapi", "/redoc") - - -class ConfigGuardMiddleware(BaseHTTPMiddleware): - """Return 503 for all business routes when the database is not configured.""" - - async def dispatch(self, request, call_next): - path = request.url.path - - # Always allow: setup routes, root, docs, OPTIONS (CORS preflight) - if path == "/" or request.method == "OPTIONS": - return await call_next(request) - for prefix in _ALLOWED_PREFIXES: - if path.startswith(prefix): - return await call_next(request) - - if not ConfigService.is_db_configured(): - return JSONResponse( - status_code=503, - content={ - "error_code": "SERVICE_NOT_CONFIGURED", - "detail": "数据库未配置,请先完成系统初始化", - }, - ) - - return await call_next(request) diff --git a/models/__init__.py b/models/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/models/debate.py b/models/debate.py deleted file mode 100644 index cf30ee3..0000000 --- a/models/debate.py +++ /dev/null @@ -1,92 +0,0 @@ -from pydantic import BaseModel -from typing import List, Optional -from enum import Enum -from datetime import datetime - - -class ModelProvider(str, Enum): - OPENAI = "openai" - DEEPSEEK = "deepseek" - QWEN = "qwen" - CLAUDE = "claude" - - -class DebateStance(str, Enum): - PRO = "pro" - CON = "con" - - -class SearchResult(BaseModel): - title: str - url: str - snippet: str - score: Optional[float] = None - - -class SearchEvidence(BaseModel): - query: str - results: List[SearchResult] - mode: str # "auto", "tool", "both" - - -class DebateRound(BaseModel): - round_number: int - speaker: str # Model identifier - stance: DebateStance - content: str - timestamp: datetime - token_count: Optional[int] = None - search_evidence: Optional[SearchEvidence] = None - - -class DebateParticipant(BaseModel): - model_config = {"protected_namespaces": ()} - - model_identifier: str - provider: ModelProvider - stance: DebateStance - api_key: Optional[str] = None - - -class DebateConstraints(BaseModel): - max_rounds: int = 5 - max_tokens_per_turn: int = 500 - max_total_tokens: Optional[int] = None - forbid_repetition: bool = True - must_respond_to_opponent: bool = True - web_search_enabled: bool = False - web_search_mode: str = "auto" # "auto", "tool", "both" - - -class DebateRequest(BaseModel): - topic: str - participants: List[DebateParticipant] - constraints: DebateConstraints - custom_system_prompt: Optional[str] = None - - -class EvidenceReference(BaseModel): - round_number: int - speaker: str - stance: DebateStance - - -class EvidenceEntry(BaseModel): - title: str - url: str - snippet: str - score: Optional[float] = None - references: List[EvidenceReference] - - -class DebateSession(BaseModel): - session_id: str - topic: str - participants: List[DebateParticipant] - constraints: DebateConstraints - rounds: List[DebateRound] - status: str # "active", "completed", "terminated" - created_at: datetime - completed_at: Optional[datetime] = None - summary: Optional[str] = None - evidence_library: List[EvidenceEntry] = [] \ No newline at end of file diff --git a/orchestrator/__init__.py b/orchestrator/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/orchestrator/debate_orchestrator.py b/orchestrator/debate_orchestrator.py deleted file mode 100644 index 61ecb9c..0000000 --- a/orchestrator/debate_orchestrator.py +++ /dev/null @@ -1,358 +0,0 @@ -import asyncio -import uuid -from typing import Dict, List, Optional -from datetime import datetime -from sqlalchemy.orm import Session - -from models.debate import ( - DebateRequest, DebateSession, DebateRound, - DebateStance, DebateParticipant, SearchEvidence, SearchResult, - EvidenceEntry, EvidenceReference -) -from providers.provider_factory import ProviderFactory -from storage.session_manager import SessionManager -from utils.summarizer import summarize_debate -from services.search_service import SearchService -from services.api_key_service import ApiKeyService - - -class DebateOrchestrator: - """ - Orchestrates the debate between multiple language models - """ - - def __init__(self, db: Session): - self.db = db - self.session_manager = SessionManager() - self.provider_factory = ProviderFactory() - - async def create_session(self, debate_request: DebateRequest) -> str: - """ - Create a new debate session - """ - session_id = str(uuid.uuid4()) - - session = DebateSession( - session_id=session_id, - topic=debate_request.topic, - participants=debate_request.participants, - constraints=debate_request.constraints, - rounds=[], - status="active", - created_at=datetime.now() - ) - - await self.session_manager.save_session(self.db, session) - return session_id - - def _get_search_service(self) -> Optional[SearchService]: - """ - Get a SearchService instance if Tavily API key is available. - """ - tavily_key = ApiKeyService.get_api_key(self.db, "tavily") - if tavily_key: - return SearchService(api_key=tavily_key) - return None - - async def run_debate(self, session_id: str) -> DebateSession: - """ - Run the complete debate process - """ - session = await self.session_manager.get_session(self.db, session_id) - if not session: - raise ValueError(f"Session {session_id} not found") - - # Initialize providers for each participant - providers = {} - for participant in session.participants: - provider = self.provider_factory.create_provider( - self.db, - participant.provider, - participant.api_key # This can be None, and the provider will fetch from DB - ) - providers[participant.model_identifier] = provider - - # Initialize search service if web search is enabled - search_service = None - web_search_enabled = session.constraints.web_search_enabled - web_search_mode = session.constraints.web_search_mode - if web_search_enabled: - search_service = self._get_search_service() - if not search_service: - print("Warning: Web search enabled but no Tavily API key found. Disabling search.") - web_search_enabled = False - - # Run the debate rounds - for round_num in range(session.constraints.max_rounds): - if session.status != "active": - break - - # Alternate between participants - current_participant = session.participants[round_num % len(session.participants)] - provider = providers[current_participant.model_identifier] - - # Perform automatic search if enabled - search_evidence = None - if web_search_enabled and web_search_mode in ("auto", "both"): - search_evidence = self._perform_automatic_search( - search_service, session, round_num - ) - - # Prepare context for the current turn (with search results if available) - context = self._prepare_context(session, current_participant.stance, search_evidence) - - # Determine if we should use tool calling for this round - use_tool_calling = ( - web_search_enabled - and web_search_mode in ("tool", "both") - and provider.supports_tools() - ) - - if use_tool_calling: - response, tool_evidence = await self._handle_tool_calls( - provider, current_participant.model_identifier, - context, search_service - ) - # Merge tool-based evidence with auto evidence - if tool_evidence: - if search_evidence: - search_evidence.results.extend(tool_evidence.results) - search_evidence.query += f" | {tool_evidence.query}" - search_evidence.mode = "both" - else: - search_evidence = tool_evidence - else: - response = await provider.generate_response( - model=current_participant.model_identifier, - prompt=context - ) - - # Clean the response to remove any echoed prompt/meta text - response = self._clean_response(response) - - # Create a new round - round_data = DebateRound( - round_number=round_num + 1, - speaker=current_participant.model_identifier, - stance=current_participant.stance, - content=response, - timestamp=datetime.now(), - token_count=len(response.split()), # Approximate token count - search_evidence=search_evidence - ) - - session.rounds.append(round_data) - - # Update evidence library with search results from this round - if round_data.search_evidence: - self._update_evidence_library(session, round_data) - - # Update session in storage - await self.session_manager.update_session(self.db, session) - - # Small delay between rounds to simulate realistic interaction - await asyncio.sleep(1) - - # Generate summary after all rounds are complete - summary = await summarize_debate(session) - session.summary = summary - session.status = "completed" - session.completed_at = datetime.now() - - await self.session_manager.update_session(self.db, session) - return session - - def _perform_automatic_search( - self, search_service: SearchService, session: DebateSession, round_num: int - ) -> Optional[SearchEvidence]: - """ - Perform an automatic web search based on the topic and last opponent argument. - """ - last_opponent_arg = None - if session.rounds: - last_opponent_arg = session.rounds[-1].content - - query = SearchService.generate_search_query(session.topic, last_opponent_arg) - results = search_service.search(query, max_results=3) - - if results: - return SearchEvidence( - query=query, - results=results, - mode="auto" - ) - return None - - async def _handle_tool_calls( - self, provider, model: str, context: str, search_service: SearchService - ) -> tuple: - """ - Handle tool calling flow: send prompt with tools, execute any tool calls, - then re-prompt with results. Max 2 tool call iterations. - Returns (final_response_text, SearchEvidence_or_None). - """ - tools = [SearchService.get_tool_definition()] - all_search_results = [] - all_queries = [] - - text, tool_calls = await provider.generate_response_with_tools( - model=model, - prompt=context, - tools=tools, - max_tokens=500 - ) - - # If no tool calls, return the text response directly - if not tool_calls: - return text, None - - # Process up to 2 rounds of tool calls - for iteration in range(2): - if not tool_calls: - break - - # Execute each tool call - tool_results_text = [] - for tc in tool_calls: - if tc["name"] == "web_search": - query = tc["arguments"].get("query", "") - all_queries.append(query) - results = search_service.search(query, max_results=3) - all_search_results.extend(results) - - # Format results for the model - evidence = SearchEvidence(query=query, results=results, mode="tool") - tool_results_text.append(SearchService.format_results_for_context(evidence)) - - # Re-prompt the model with tool results - augmented_context = context + "\n" + "\n".join(tool_results_text) - augmented_context += "\n请基于以上搜索结果和辩论历史,给出你的论点。" - - text, tool_calls = await provider.generate_response_with_tools( - model=model, - prompt=augmented_context, - tools=tools, - max_tokens=500 - ) - - # Build combined evidence - evidence = None - if all_search_results: - evidence = SearchEvidence( - query=" | ".join(all_queries), - results=all_search_results, - mode="tool" - ) - - # If we still got no text (model keeps calling tools), fall back - if not text: - text = await provider.generate_response(model=model, prompt=context, max_tokens=500) - - return text, evidence - - def _update_evidence_library(self, session: DebateSession, round_data: DebateRound): - """ - Merge search results from a round into the session's evidence library, deduplicating by URL. - """ - ref = EvidenceReference( - round_number=round_data.round_number, - speaker=round_data.speaker, - stance=round_data.stance - ) - - url_index = {entry.url: i for i, entry in enumerate(session.evidence_library)} - - for result in round_data.search_evidence.results: - if result.url in url_index: - entry = session.evidence_library[url_index[result.url]] - # Avoid duplicate references (same round + speaker) - if not any( - r.round_number == ref.round_number and r.speaker == ref.speaker - for r in entry.references - ): - entry.references.append(ref) - else: - new_entry = EvidenceEntry( - title=result.title, - url=result.url, - snippet=result.snippet, - score=result.score, - references=[ref] - ) - session.evidence_library.append(new_entry) - url_index[result.url] = len(session.evidence_library) - 1 - - def _prepare_context( - self, session: DebateSession, current_stance: DebateStance, - search_evidence: Optional[SearchEvidence] = None - ) -> str: - """ - Prepare the context/prompt for the current model turn - """ - # Determine the stance of the current speaker - if current_stance == DebateStance.PRO: - position_desc = "正方(支持方)" - opposing_desc = "反方(反对方)" - else: - position_desc = "反方(反对方)" - opposing_desc = "正方(支持方)" - - # Build the context with previous rounds - context_parts = [ - f"辩论主题: {session.topic}", - f"你的立场: {position_desc}", - "辩论规则:", - "- 必须回应对方上一轮的核心论点", - "- 不得重复自己已提出的观点", - "- 输出长度限制在合理范围内", - "\n历史辩论记录:" - ] - - for round_data in session.rounds: - stance_text = "正方" if round_data.stance == DebateStance.PRO else "反方" - context_parts.append(f"第{round_data.round_number}轮 - {stance_text}: {round_data.content}") - - # Inject search results if available - if search_evidence: - context_parts.append(SearchService.format_results_for_context(search_evidence)) - - context_parts.append(f"\n现在轮到你 ({position_desc}) 发言,请基于以上内容进行回应。注意:直接给出你的论点内容,不要重复上述提示词、辩论规则或历史记录。") - - return "\n".join(context_parts) - - def _clean_response(self, response: str) -> str: - """ - Clean the model response to remove any echoed prompt/meta text - """ - import re - - # Remove common prompt echoes and meta prefixes - patterns_to_remove = [ - r'^第\d+轮\s*[-::]\s*(正方|反方)\s*[::]?\s*', # 第X轮 - 正方/反方: - r'^(正方|反方)\s*[((][^))]*[))]\s*[::]?\s*', # 正方(支持方): - r'^(正方|反方)\s*[::]\s*', # 正方: or 反方: - r'^我的立场\s*[::]\s*', # 我的立场: - r'^回应\s*[::]\s*', # 回应: - r'^辩论发言\s*[::]\s*', # 辩论发言: - ] - - cleaned = response.strip() - for pattern in patterns_to_remove: - cleaned = re.sub(pattern, '', cleaned, flags=re.MULTILINE) - - return cleaned.strip() - - async def get_session_status(self, session_id: str) -> Optional[DebateSession]: - """ - Get the current status of a debate session - """ - return await self.session_manager.get_session(self.db, session_id) - - async def terminate_session(self, session_id: str): - """ - Terminate a debate session prematurely - """ - session = await self.session_manager.get_session(self.db, session_id) - if session: - session.status = "terminated" - await self.session_manager.update_session(self.db, session) diff --git a/providers/__init__.py b/providers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/providers/base_provider.py b/providers/base_provider.py deleted file mode 100644 index 05fcb0c..0000000 --- a/providers/base_provider.py +++ /dev/null @@ -1,73 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Optional, List, Dict, Any, Tuple -from sqlalchemy.orm import Session - - -class LLMProvider(ABC): - """ - Abstract base class for LLM providers - """ - - @abstractmethod - def __init__(self, db: Session, api_key: Optional[str] = None): - """ - Initialize the provider with a database session and optional API key - """ - pass - - @abstractmethod - async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str: - """ - Generate a response from the LLM - """ - pass - - def supports_tools(self) -> bool: - """ - Whether this provider supports function/tool calling. - Override in subclasses that support it. - """ - return False - - async def generate_response_with_tools( - self, - model: str, - prompt: str, - tools: List[Dict[str, Any]], - max_tokens: Optional[int] = None - ) -> Tuple[str, List[Dict[str, Any]]]: - """ - Generate a response with tool definitions available. - Returns (text_content, tool_calls) where tool_calls is a list of - dicts with keys: name, arguments (dict). - If no tools are called, tool_calls is empty and text_content has the response. - Default implementation falls back to regular generation. - """ - text = await self.generate_response(model, prompt, max_tokens) - return text, [] - - -class ProviderFactory: - """ - Factory class to create provider instances - """ - - @staticmethod - def create_provider(db: Session, provider_type: str, api_key: Optional[str] = None): - """ - Create a provider instance based on the type - """ - if provider_type.value == "openai": - from app.providers.openai_provider import OpenAIProvider - return OpenAIProvider(db, api_key) - elif provider_type.value == "claude": - from app.providers.claude_provider import ClaudeProvider - return ClaudeProvider(db, api_key) - elif provider_type.value == "qwen": - from app.providers.qwen_provider import QwenProvider - return QwenProvider(db, api_key) - elif provider_type.value == "deepseek": - from app.providers.deepseek_provider import DeepSeekProvider - return DeepSeekProvider(db, api_key) - else: - raise ValueError(f"Unsupported provider type: {provider_type}") \ No newline at end of file diff --git a/providers/claude_provider.py b/providers/claude_provider.py deleted file mode 100644 index 0795038..0000000 --- a/providers/claude_provider.py +++ /dev/null @@ -1,85 +0,0 @@ -import anthropic -from typing import Optional, List, Dict, Any, Tuple -from sqlalchemy.orm import Session - -from providers.base_provider import LLMProvider -from services.api_key_service import ApiKeyService - -SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。" - - -class ClaudeProvider(LLMProvider): - """ - Anthropic Claude API provider implementation - """ - - def __init__(self, db: Session, api_key: Optional[str] = None): - if not api_key: - api_key = ApiKeyService.get_api_key(db, "claude") - - if api_key: - self.client = anthropic.AsyncAnthropic(api_key=api_key) - else: - raise ValueError("Claude API key not found in database or provided") - - def supports_tools(self) -> bool: - return True - - async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str: - try: - response = await self.client.messages.create( - model=model, - max_tokens=max_tokens or 500, - temperature=0.7, - system=SYSTEM_PROMPT, - messages=[ - {"role": "user", "content": prompt} - ] - ) - return response.content[0].text - except Exception as e: - raise Exception(f"Error calling Claude API: {str(e)}") - - async def generate_response_with_tools( - self, - model: str, - prompt: str, - tools: List[Dict[str, Any]], - max_tokens: Optional[int] = None - ) -> Tuple[str, List[Dict[str, Any]]]: - try: - # Convert OpenAI-format tools to Anthropic format - anthropic_tools = [] - for tool in tools: - func = tool.get("function", tool) - anthropic_tools.append({ - "name": func["name"], - "description": func.get("description", ""), - "input_schema": func.get("parameters", func.get("input_schema", {})) - }) - - response = await self.client.messages.create( - model=model, - max_tokens=max_tokens or 500, - temperature=0.7, - system=SYSTEM_PROMPT, - messages=[ - {"role": "user", "content": prompt} - ], - tools=anthropic_tools - ) - - text_content = "" - tool_calls = [] - for block in response.content: - if block.type == "text": - text_content += block.text - elif block.type == "tool_use": - tool_calls.append({ - "name": block.name, - "arguments": block.input - }) - - return text_content.strip(), tool_calls - except Exception as e: - raise Exception(f"Error calling Claude API with tools: {str(e)}") \ No newline at end of file diff --git a/providers/deepseek_provider.py b/providers/deepseek_provider.py deleted file mode 100644 index dee3972..0000000 --- a/providers/deepseek_provider.py +++ /dev/null @@ -1,64 +0,0 @@ -from typing import Optional -from sqlalchemy.orm import Session -from openai import AsyncOpenAI - -from providers.base_provider import LLMProvider -from services.api_key_service import ApiKeyService - - -class DeepSeekProvider(LLMProvider): - """ - DeepSeek API provider implementation using OpenAI-compatible API - """ - - def __init__(self, db: Session, api_key: Optional[str] = None): - if not api_key: - api_key = ApiKeyService.get_api_key(db, "deepseek") - - if not api_key: - raise ValueError("DeepSeek API key not found in database or provided") - - self.client = AsyncOpenAI( - api_key=api_key, - base_url="https://api.deepseek.com" - ) - - def supports_tools(self) -> bool: - return False - - async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str: - try: - is_reasoner = "reasoner" in model or "r1" in model.lower() - - messages = [{"role": "user", "content": prompt}] - # deepseek-reasoner 不支持 system message,把指令放进 user message - if not is_reasoner: - messages.insert(0, { - "role": "system", - "content": "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。" - }) - - kwargs = { - "model": model, - "messages": messages, - } - # reasoner 模型不支持 max_tokens,使用 max_completion_tokens - if is_reasoner: - kwargs["max_completion_tokens"] = max_tokens or 4096 - else: - kwargs["max_tokens"] = max_tokens or 500 - - response = await self.client.chat.completions.create(**kwargs) - - message = response.choices[0].message - content = message.content or "" - - # deepseek-reasoner 的主要内容可能在 reasoning_content 中 - if not content.strip() and is_reasoner: - reasoning = getattr(message, "reasoning_content", None) - if reasoning: - content = reasoning - - return content.strip() - except Exception as e: - raise Exception(f"Error calling DeepSeek API: {str(e)}") \ No newline at end of file diff --git a/providers/openai_provider.py b/providers/openai_provider.py deleted file mode 100644 index a7c0bb8..0000000 --- a/providers/openai_provider.py +++ /dev/null @@ -1,72 +0,0 @@ -import json -import openai -from typing import Optional, List, Dict, Any, Tuple -from sqlalchemy.orm import Session - -from providers.base_provider import LLMProvider -from services.api_key_service import ApiKeyService - -SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。" - - -class OpenAIProvider(LLMProvider): - """ - OpenAI API provider implementation - """ - - def __init__(self, db: Session, api_key: Optional[str] = None): - if not api_key: - api_key = ApiKeyService.get_api_key(db, "openai") - - if api_key: - self.client = openai.AsyncOpenAI(api_key=api_key) - else: - raise ValueError("OpenAI API key not found in database or provided") - - def supports_tools(self) -> bool: - return True - - async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str: - try: - response = await self.client.chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": prompt} - ], - max_tokens=max_tokens or 500 - ) - return response.choices[0].message.content.strip() - except Exception as e: - raise Exception(f"Error calling OpenAI API: {str(e)}") - - async def generate_response_with_tools( - self, - model: str, - prompt: str, - tools: List[Dict[str, Any]], - max_tokens: Optional[int] = None - ) -> Tuple[str, List[Dict[str, Any]]]: - try: - response = await self.client.chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": prompt} - ], - tools=tools, - tool_choice="auto", - max_tokens=max_tokens or 500 - ) - message = response.choices[0].message - text_content = message.content or "" - tool_calls = [] - if message.tool_calls: - for tc in message.tool_calls: - tool_calls.append({ - "name": tc.function.name, - "arguments": json.loads(tc.function.arguments) - }) - return text_content.strip(), tool_calls - except Exception as e: - raise Exception(f"Error calling OpenAI API with tools: {str(e)}") \ No newline at end of file diff --git a/providers/provider_factory.py b/providers/provider_factory.py deleted file mode 100644 index 01aa67c..0000000 --- a/providers/provider_factory.py +++ /dev/null @@ -1,49 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Optional -from sqlalchemy.orm import Session - - -class LLMProvider(ABC): - """ - Abstract base class for LLM providers - """ - - @abstractmethod - def __init__(self, db: Session, api_key: Optional[str] = None): - """ - Initialize the provider with a database session and optional API key - """ - pass - - @abstractmethod - async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str: - """ - Generate a response from the LLM - """ - pass - - -class ProviderFactory: - """ - Factory class to create provider instances - """ - - @staticmethod - def create_provider(db: Session, provider_type: str, api_key: Optional[str] = None): - """ - Create a provider instance based on the type - """ - if provider_type.value == "openai": - from providers.openai_provider import OpenAIProvider - return OpenAIProvider(db, api_key) - elif provider_type.value == "claude": - from providers.claude_provider import ClaudeProvider - return ClaudeProvider(db, api_key) - elif provider_type.value == "qwen": - from providers.qwen_provider import QwenProvider - return QwenProvider(db, api_key) - elif provider_type.value == "deepseek": - from providers.deepseek_provider import DeepSeekProvider - return DeepSeekProvider(db, api_key) - else: - raise ValueError(f"Unsupported provider type: {provider_type}") \ No newline at end of file diff --git a/providers/qwen_provider.py b/providers/qwen_provider.py deleted file mode 100644 index 9b8eb42..0000000 --- a/providers/qwen_provider.py +++ /dev/null @@ -1,75 +0,0 @@ -import json -from typing import Optional, List, Dict, Any, Tuple -from sqlalchemy.orm import Session -from openai import AsyncOpenAI - -from providers.base_provider import LLMProvider -from services.api_key_service import ApiKeyService - -SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。" - - -class QwenProvider(LLMProvider): - """ - Qwen API provider implementation using DashScope OpenAI-compatible API - """ - - def __init__(self, db: Session, api_key: Optional[str] = None): - if not api_key: - api_key = ApiKeyService.get_api_key(db, "qwen") - - if not api_key: - raise ValueError("Qwen API key not found in database or provided") - - self.client = AsyncOpenAI( - api_key=api_key, - base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" - ) - - def supports_tools(self) -> bool: - return True - - async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str: - try: - response = await self.client.chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": prompt} - ], - max_tokens=max_tokens or 500 - ) - return response.choices[0].message.content.strip() - except Exception as e: - raise Exception(f"Error calling Qwen API: {str(e)}") - - async def generate_response_with_tools( - self, - model: str, - prompt: str, - tools: List[Dict[str, Any]], - max_tokens: Optional[int] = None - ) -> Tuple[str, List[Dict[str, Any]]]: - try: - response = await self.client.chat.completions.create( - model=model, - messages=[ - {"role": "system", "content": SYSTEM_PROMPT}, - {"role": "user", "content": prompt} - ], - tools=tools, - tool_choice="auto", - max_tokens=max_tokens or 500 - ) - message = response.choices[0].message - text_content = message.content or "" - tool_calls = [] - if message.tool_calls: - for tc in message.tool_calls: - tool_calls.append({ - "name": tc.function.name, - "arguments": json.loads(tc.function.arguments) - }) - return text_content.strip(), tool_calls - except Exception as e: - raise Exception(f"Error calling Qwen API with tools: {str(e)}") diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index f4a8f21..0000000 --- a/requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -fastapi==0.115.0 -uvicorn[standard]==0.32.0 -pydantic==2.9.2 -pydantic-settings==2.6.1 -sqlalchemy==2.0.35 -aiosqlite==0.20.0 -pymysql==1.1.1 -cryptography==43.0.1 -python-multipart==0.0.20 -sse-starlette==2.1.3 -openai==1.52.2 -anthropic==0.40.0 -tiktoken==0.8.0 -python-dotenv==1.0.1 -aiohttp==3.9.0 -httpx==0.27.2 -tavily-python==0.5.0 -pyyaml==6.0.2 -python-jose[cryptography]==3.3.0 diff --git a/services/__init__.py b/services/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/services/api_key_service.py b/services/api_key_service.py deleted file mode 100644 index 4a82b67..0000000 --- a/services/api_key_service.py +++ /dev/null @@ -1,71 +0,0 @@ -from cryptography.fernet import Fernet, InvalidToken -from sqlalchemy.orm import Session -from db_models import ApiKey -import os - -# Initialize the encryption key from environment or generate a new one -ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode()) -cipher_suite = Fernet(ENCRYPTION_KEY.encode()) - - -class ApiKeyService: - """ - Service for managing API keys in the database - """ - - @staticmethod - def encrypt_api_key(api_key: str) -> str: - """ - Encrypt an API key - """ - encrypted_key = cipher_suite.encrypt(api_key.encode()) - return encrypted_key.decode() - - @staticmethod - def decrypt_api_key(encrypted_api_key: str) -> str: - """ - Decrypt an API key - """ - decrypted_key = cipher_suite.decrypt(encrypted_api_key.encode()) - return decrypted_key.decode() - - @staticmethod - def get_api_key(db: Session, provider: str) -> str: - """ - Retrieve and decrypt an API key for a provider - """ - api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first() - if not api_key_record or not api_key_record.api_key_encrypted: - return None - try: - return ApiKeyService.decrypt_api_key(api_key_record.api_key_encrypted) - except InvalidToken: - return None - - @staticmethod - def set_api_key(db: Session, provider: str, api_key: str) -> bool: - """ - Encrypt and store an API key for a provider - """ - encrypted_key = ApiKeyService.encrypt_api_key(api_key) - - # Check if record exists - api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first() - - if api_key_record: - # Update existing record - api_key_record.api_key_encrypted = encrypted_key - else: - # Create new record - api_key_record = ApiKey( - provider=provider, - api_key_encrypted=encrypted_key - ) - db.add(api_key_record) - - try: - db.commit() - return True - except Exception: - db.rollback() - return False \ No newline at end of file diff --git a/services/config_service.py b/services/config_service.py deleted file mode 100644 index 74c5e2d..0000000 --- a/services/config_service.py +++ /dev/null @@ -1,100 +0,0 @@ -import os -from pathlib import Path -from urllib.parse import quote_plus - -import yaml -from cryptography.fernet import Fernet, InvalidToken - -CONFIG_PATH = Path(os.getenv("CONFIG_PATH", "/app/config/dialectica.yaml")) - -# Reuse the same encryption key used for API keys -_ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", "") -_cipher = Fernet(_ENCRYPTION_KEY.encode()) if _ENCRYPTION_KEY else None - -# Fields that should be encrypted in the YAML file -_SECRET_FIELDS = {"password"} - - -def _encrypt(value: str) -> str: - if not _cipher or not value: - return value - return "ENC:" + _cipher.encrypt(value.encode()).decode() - - -def _decrypt(value: str) -> str: - if not _cipher or not isinstance(value, str) or not value.startswith("ENC:"): - return value - try: - return _cipher.decrypt(value[4:].encode()).decode() - except InvalidToken: - return value - - -def _encrypt_secrets(data: dict) -> dict: - """Deep-copy dict, encrypting secret fields.""" - out = {} - for k, v in data.items(): - if isinstance(v, dict): - out[k] = _encrypt_secrets(v) - elif k in _SECRET_FIELDS and isinstance(v, str) and not v.startswith("ENC:"): - out[k] = _encrypt(v) - else: - out[k] = v - return out - - -def _decrypt_secrets(data: dict) -> dict: - """Deep-copy dict, decrypting secret fields.""" - out = {} - for k, v in data.items(): - if isinstance(v, dict): - out[k] = _decrypt_secrets(v) - elif k in _SECRET_FIELDS and isinstance(v, str): - out[k] = _decrypt(v) - else: - out[k] = v - return out - - -class ConfigService: - """Read / write config/dialectica.yaml.""" - - @staticmethod - def load() -> dict: - """Load config, returning decrypted values. Empty dict if file missing.""" - if not CONFIG_PATH.exists(): - return {} - with open(CONFIG_PATH) as f: - raw = yaml.safe_load(f) or {} - return _decrypt_secrets(raw) - - @staticmethod - def save(config: dict): - """Save config, encrypting secret fields.""" - CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) - encrypted = _encrypt_secrets(config) - with open(CONFIG_PATH, "w") as f: - yaml.dump(encrypted, f, default_flow_style=False, allow_unicode=True) - - @staticmethod - def is_db_configured() -> bool: - config = ConfigService.load() - db = config.get("database", {}) - return bool(db.get("host") and db.get("database")) - - @staticmethod - def get_database_url() -> str | None: - config = ConfigService.load() - db = config.get("database", {}) - if not (db.get("host") and db.get("database")): - return None - user = db.get("user", "root") - password = db.get("password", "") - host = db["host"] - port = db.get("port", 3306) - database = db["database"] - return f"mysql+pymysql://{quote_plus(user)}:{quote_plus(password)}@{host}:{port}/{database}" - - @staticmethod - def is_initialized() -> bool: - return ConfigService.load().get("initialized", False) diff --git a/services/search_service.py b/services/search_service.py deleted file mode 100644 index 2665128..0000000 --- a/services/search_service.py +++ /dev/null @@ -1,104 +0,0 @@ -from typing import List, Optional -from models.debate import SearchResult, SearchEvidence - - -class SearchService: - """ - Tavily web search wrapper for debate research. - """ - - def __init__(self, api_key: str): - from tavily import TavilyClient - self.client = TavilyClient(api_key=api_key) - - def search(self, query: str, max_results: int = 5) -> List[SearchResult]: - """ - Perform a web search using Tavily and return structured results. - """ - try: - response = self.client.search( - query=query, - max_results=max_results, - search_depth="basic" - ) - results = [] - for item in response.get("results", []): - results.append(SearchResult( - title=item.get("title", ""), - url=item.get("url", ""), - snippet=item.get("content", "")[:500], # Truncate to avoid token bloat - score=item.get("score") - )) - return results - except Exception as e: - print(f"Tavily search error: {e}") - return [] - - @staticmethod - def format_results_for_context(evidence: SearchEvidence) -> str: - """ - Format search results into a string suitable for injecting into the LLM context. - """ - if not evidence or not evidence.results: - return "" - lines = [f"\n[网络搜索结果] 搜索词: \"{evidence.query}\""] - for i, r in enumerate(evidence.results, 1): - lines.append(f" {i}. {r.title}") - lines.append(f" {r.snippet}") - lines.append(f" 来源: {r.url}") - lines.append("[搜索结果结束]\n") - return "\n".join(lines) - - @staticmethod - def generate_search_query(topic: str, last_opponent_argument: Optional[str] = None) -> str: - """ - Generate a search query from the debate topic and the opponent's last argument. - """ - if last_opponent_argument: - # Extract key phrases from the opponent's argument (first ~100 chars) - snippet = last_opponent_argument[:100].strip() - return f"{topic} {snippet}" - return topic - - @staticmethod - def get_tool_definition() -> dict: - """ - Return the web_search tool definition for function calling. - """ - return { - "type": "function", - "function": { - "name": "web_search", - "description": "Search the web for current information relevant to the debate topic. Use this to find facts, statistics, or recent news that support your argument.", - "parameters": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "The search query to look up" - } - }, - "required": ["query"] - } - } - } - - @staticmethod - def get_tool_definition_anthropic() -> dict: - """ - Return the web_search tool definition in Anthropic format. - """ - return { - "name": "web_search", - "description": "Search the web for current information relevant to the debate topic. Use this to find facts, statistics, or recent news that support your argument.", - "input_schema": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "The search query to look up" - } - }, - "required": ["query"] - } - } diff --git a/storage/__init__.py b/storage/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/storage/database.py b/storage/database.py deleted file mode 100644 index fe97d0a..0000000 --- a/storage/database.py +++ /dev/null @@ -1,109 +0,0 @@ -from sqlalchemy import Column, Integer, String, DateTime, Text -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker -from datetime import datetime -import json - -from models.debate import DebateSession -from exceptions import ServiceNotConfiguredError -from services.config_service import ConfigService - -Base = declarative_base() - - -class DebateSessionDB(Base): - __tablename__ = "debate_sessions" - - id = Column(Integer, primary_key=True, index=True) - session_id = Column(String(255), unique=True, index=True, nullable=False) - topic = Column(Text, nullable=False) - participants = Column(Text, nullable=False) # JSON string - constraints = Column(Text, nullable=False) # JSON string - rounds = Column(Text, nullable=False) # JSON string - status = Column(String(50), nullable=False) - created_at = Column(DateTime, nullable=False) - completed_at = Column(DateTime, nullable=True) - summary = Column(Text, nullable=True) - evidence_library = Column(Text, nullable=True) # JSON string - - -# --------------------------------------------------------------------------- -# Lazy engine / session factory -# --------------------------------------------------------------------------- -_engine = None -_SessionLocal = None - - -def _get_engine(): - from sqlalchemy import create_engine - global _engine - if _engine is None: - db_url = ConfigService.get_database_url() - if not db_url: - raise ServiceNotConfiguredError("数据库未配置") - _engine = create_engine(db_url) - return _engine - - -def _get_session_factory(): - global _SessionLocal - if _SessionLocal is None: - _SessionLocal = sessionmaker( - autocommit=False, autoflush=False, bind=_get_engine() - ) - return _SessionLocal - - -def init_db(): - """Create all tables if DB is configured; silently skip otherwise.""" - if not ConfigService.is_db_configured(): - print("WARNING: Database not configured, skipping table creation.") - return - try: - from db_models import Base as ApiBase - engine = _get_engine() - Base.metadata.create_all(bind=engine) - ApiBase.metadata.create_all(bind=engine) - except Exception as e: - global _engine, _SessionLocal - print(f"WARNING: Database connection failed, skipping table creation: {e}") - if _engine is not None: - _engine.dispose() - _engine = None - _SessionLocal = None - - -def debate_session_from_db(db_session) -> DebateSession: - """Convert database session to Pydantic model.""" - evidence_library = [] - if db_session.evidence_library: - evidence_library = json.loads(db_session.evidence_library) - - return DebateSession( - session_id=db_session.session_id, - topic=db_session.topic, - participants=json.loads(db_session.participants), - constraints=json.loads(db_session.constraints), - rounds=json.loads(db_session.rounds), - status=db_session.status, - created_at=db_session.created_at, - completed_at=db_session.completed_at, - summary=db_session.summary, - evidence_library=evidence_library - ) - - -def debate_session_to_db(session: DebateSession) -> DebateSessionDB: - """Convert Pydantic model to database model.""" - return DebateSessionDB( - session_id=session.session_id, - topic=session.topic, - participants=json.dumps([p.dict() for p in session.participants]), - constraints=json.dumps(session.constraints.dict()), - rounds=json.dumps([r.dict() for r in session.rounds], default=str), - status=session.status, - created_at=session.created_at, - completed_at=session.completed_at, - summary=session.summary, - evidence_library=json.dumps([e.dict() for e in session.evidence_library], default=str) if session.evidence_library else None - ) diff --git a/storage/session_manager.py b/storage/session_manager.py deleted file mode 100644 index 2f650f2..0000000 --- a/storage/session_manager.py +++ /dev/null @@ -1,67 +0,0 @@ -from sqlalchemy.orm import Session -from datetime import datetime -import json -from typing import Optional - -from models.debate import DebateSession -from storage.database import DebateSessionDB, debate_session_from_db, debate_session_to_db - - -class SessionManager: - """ - Manages debate sessions in storage - """ - - def __init__(self): - pass - - async def save_session(self, db: Session, session: DebateSession): - """ - Save a debate session to the database - """ - db_session = debate_session_to_db(session) - db.add(db_session) - db.commit() - db.refresh(db_session) - - async def get_session(self, db: Session, session_id: str) -> Optional[DebateSession]: - """ - Retrieve a debate session from the database - """ - db_session = db.query(DebateSessionDB).filter(DebateSessionDB.session_id == session_id).first() - if not db_session: - return None - return debate_session_from_db(db_session) - - async def update_session(self, db: Session, session: DebateSession): - """ - Update an existing debate session in the database - """ - db_session = db.query(DebateSessionDB).filter(DebateSessionDB.session_id == session.session_id).first() - if db_session: - db_session.topic = session.topic - db_session.participants = json.dumps([p.dict() for p in session.participants]) - db_session.constraints = json.dumps(session.constraints.dict()) - db_session.rounds = json.dumps([r.dict() for r in session.rounds], default=str) - db_session.status = session.status - db_session.completed_at = session.completed_at - db_session.summary = session.summary - db_session.evidence_library = json.dumps([e.dict() for e in session.evidence_library], default=str) if session.evidence_library else None - - db.commit() - db.refresh(db_session) - - async def list_sessions(self, db: Session): - """ - List all debate sessions - """ - db_sessions = db.query(DebateSessionDB).all() - return [ - { - "session_id": db_session.session_id, - "topic": db_session.topic, - "status": db_session.status, - "created_at": db_session.created_at.isoformat() if db_session.created_at else None - } - for db_session in db_sessions - ] \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/utils/summarizer.py b/utils/summarizer.py deleted file mode 100644 index 7188e83..0000000 --- a/utils/summarizer.py +++ /dev/null @@ -1,39 +0,0 @@ -async def summarize_debate(session): - """ - Generate a summary of the debate session - """ - if not session.rounds: - return "No rounds were completed in this debate." - - # Extract key points from each side - pro_points = [] - con_points = [] - - for round_data in session.rounds: - if round_data.stance.value == "pro": - pro_points.append(round_data.content) - else: - con_points.append(round_data.content) - - # Create a summary - summary_parts = [ - f"辩论主题: {session.topic}", - "", - "正方主要观点:", - ] - - for i, point in enumerate(pro_points, 1): - summary_parts.append(f"{i}. {point[:100]}...") # Truncate for brevity - - summary_parts.append("") - summary_parts.append("反方主要观点:") - - for i, point in enumerate(con_points, 1): - summary_parts.append(f"{i}. {point[:100]}...") # Truncate for brevity - - summary_parts.append("") - summary_parts.append("总结: 本次辩论完成了 {} 轮,双方就 '{}' 主题进行了充分的讨论。".format( - len(session.rounds), session.topic - )) - - return "\n".join(summary_parts) \ No newline at end of file