feat: greenfield Go rewrite (Phase 2A + 2B + 2C core)

Replaces the Python v1 (preserved on archive/python-v1 branch).

Stack: Go 1.23 + chi router + sqlx + MySQL 8. Distroless static
container. 12-factor config from env. Embedded SQL migrations.

Schema (internal/db/migrations/001_init.sql):
- topics: 议题 with 4-timestamp lifecycle (signup_open/close +
  debate_start/end), visibility (default private), status state machine,
  verdict_schema FK
- signups: agent self-enrollment with willing_camps (JSON array of
  pro|con|judge), pre_validated audit flag, (topic,agent) unique
- camps: post-allocation lock (one row per topic+camp) — written by
  Phase 2D allocator
- rounds + arguments: chronological debate transcript
- verdicts: judge structured output, one per topic, with token-cost
  trail for future budgeting
- agent_keys + system_keys: peppered sha256 hashes, never raw
- verdict_schemas: seeded with binary, claim-resolution (for
  analyze-intel), policy-recommendation, free-form

Auth (internal/auth):
- AgentAPIKey: real bearer-token middleware against agent_keys;
  best-effort last_used_at touch on success
- OIDCBrowser: Phase 2 stub. Dev mode accepts x-dev-bypass header
  (constant-time compare); prod 401s with a Phase-4-pending hint.
  Real Keycloak JWKS verification lands with the frontend rewrite.

HTTP API (internal/httpapi):
- /api/healthz — db ping + version + uptime
- GET /api/topics — list with status/visibility/limit/offset filters;
  anonymous callers see public only
- GET /api/topics/{id} — visibility-gated (private → 404 hide)
- POST /api/topics — create with RFC3339 lifecycle validation
  (signup_open < signup_close <= debate_start < debate_end)
- PUT /api/topics/{id}/visibility — dialectic-admin role gate
- POST /api/topics/{id}/signups — agent self-enroll; rejects when
  topic.status != signup_open OR outside signup window; idempotent
  upsert per (topic, agent)
- GET /api/topics/{id}/signups — list (any authed caller)

Auth chains:
- optionalAuth: try bearer → try oidc → fall through anonymous
  (handlers branch on Caller.Kind == ""). Uses captureWriter to demote
  inner 401s to "try next" without leaking response bytes.
- requireAnyAuth: chain that 401s if neither succeeds.
- requireAgent: strict bearer-only (signup POST).

Run: `docker compose -f docker-compose.dev.yml up --build`. Migrations
auto-apply on first connect; idempotent on reboot. README documents
env vars, dev bypass usage, agent-key provisioning SQL, and the
Phase 2D/E/3/4/5 roadmap.

go vet clean, gofmt clean, single 11M static binary.
This commit is contained in:
h z
2026-05-23 11:51:48 +01:00
parent e049b1c4bd
commit e706f3d6ef
51 changed files with 1700 additions and 2324 deletions

5
.gitignore vendored
View File

@@ -1 +1,6 @@
/dialectic-backend
/dist/
/.idea/ /.idea/
/.vscode/
*.swp
.DS_Store

View File

@@ -1,22 +1,19 @@
FROM python:3.13-slim # syntax=docker/dockerfile:1.7
# Dialectic.Backend.Go — multi-stage build (compile static binary, run on distroless).
ENV PYTHONDONTWRITEBYTECODE=1 FROM golang:1.23-bookworm AS build
ENV PYTHONUNBUFFERED=1 WORKDIR /src
COPY go.mod go.sum ./
RUN go mod download
COPY . .
ARG VERSION=dev
RUN CGO_ENABLED=0 GOOS=linux go build \
-ldflags="-s -w -X main.Version=${VERSION}" \
-o /out/dialectic-backend .
FROM gcr.io/distroless/static-debian12:nonroot
WORKDIR /app WORKDIR /app
COPY --from=build /out/dialectic-backend /app/dialectic-backend
RUN apt-get update && apt-get install -y --no-install-recommends \ EXPOSE 8090
build-essential gcc \ USER nonroot:nonroot
python3-dev \ ENTRYPOINT ["/app/dialectic-backend"]
libssl-dev libffi-dev \
&& rm -rf /var/lib/apt/lists/*
RUN python -m pip install --upgrade pip setuptools wheel
COPY requirements.txt ./requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app/
EXPOSE 8000
CMD ["python3", "app.py"]

102
README.md Normal file
View File

@@ -0,0 +1,102 @@
# Dialectic.Backend — v2 (Go)
Greenfield Go rewrite of the Python v1 backend. Agent-native debate
platform per [`/home/hzhang/arch/DIALECTIC-V2-DESIGN.md`](../DIALECTIC-V2-DESIGN.md).
Python v1 history is preserved on branch `archive/python-v1`.
## What's here (Phase 2A + 2B + 2C, 2026-05-23)
| Subsystem | Status |
|-----------|--------|
| HTTP server (`chi` router) | ✅ |
| Config from env (`internal/config`) | ✅ |
| MySQL via `sqlx` + embedded SQL migrations | ✅ |
| Schema: `topics`, `signups`, `camps`, `rounds`, `arguments`, `verdicts`, `agent_keys`, `system_keys`, `verdict_schemas` | ✅ |
| Auth middlewares: agent bearer (real), OIDC browser (Phase 2 stub w/ dev bypass) | ✅ |
| `/api/healthz` | ✅ |
| `/api/topics` list / get / create / set-visibility | ✅ |
| `/api/topics/{id}/signups` list / create (agent self-enroll) | ✅ |
| Orchestration engine (camp allocation, round driver, judge invocation) | ⬜ Phase 2D |
| SSE live transcripts | ⬜ Phase 2D |
| Full OIDC + Keycloak JWKS verification | ⬜ Phase 4 |
| Nginx + CF Origin Cert on server.t3 | ⬜ Phase 2E |
## Layout
```
main.go entrypoint (load → wire → serve)
go.mod
Dockerfile
docker-compose.dev.yml backend + mysql for local iteration
internal/
config/ 12-factor env loader
db/
db.go sqlx + embedded migration runner
migrations/001_init.sql v2 schema, idempotent
models/ entity types (sqlx + json tags)
store/ query layer (per-entity)
auth/ agent api-key + oidc middlewares
httpapi/
routes.go chi router + auth chains
handlers/ per-endpoint handlers
```
## Run locally
```
docker compose -f docker-compose.dev.yml up --build
# backend on http://localhost:8090
curl http://localhost:8090/api/healthz
```
Env vars (see `internal/config/config.go` for the full list):
| Var | Default (dev) | Required in prod |
|-----|---------------|-------------------|
| `ENV_MODE` | `dev` | must be `prod` |
| `HTTP_ADDR` | `0.0.0.0:8090` | — |
| `CORS_ALLOW_ORIGINS` | `*` | concrete list (no `*`) |
| `DB_HOST/PORT/NAME/USER/PASSWORD` | dev defaults | ✓ password required |
| `AGENT_API_KEY_PEPPER` | — | ✓ |
| `OIDC_ISSUER` / `OIDC_CLIENT_ID` | — | ✓ |
| `OIDC_DEV_BYPASS_TOKEN` | unset | ignored in prod |
| `SYSTEM_API_KEY` | unset | populate when announce-channel push lands |
## Dev bypass for browser routes
In `ENV_MODE=dev` with `OIDC_DEV_BYPASS_TOKEN=<token>` set:
```
curl -H "x-dev-bypass: <token>" http://localhost:8090/api/topics
# attached as user 'dev-operator' with role 'dialectic-admin'
```
In `prod`, this header is ignored regardless of value.
## Agent bearer for plugin routes
The OpenClaw plugin (`Dialectic.OpenclawPlugin`, Phase 3) calls with:
```
Authorization: Bearer <raw-agent-api-key>
```
The key is hashed with `AGENT_API_KEY_PEPPER` and matched against
`agent_keys.key_hash`. To provision an agent's key (Phase 3 will add a
proper `hf user create-dialectic-key` CLI; for now, manual SQL):
```sql
INSERT INTO agent_keys (agent_id, key_hash)
VALUES ('manager', SHA2(CONCAT('<pepper>:', '<raw>'), 256));
```
## What's next
- **Phase 2D**: camp allocation algorithm + round driver + judge
invocation. Wired to Fabric announce channel (via system-api-key) +
the Dialectic.OpenclawPlugin's tool for agent argument submission.
- **Phase 2E**: nginx config + CF Origin Cert + deploy to server.t3.
- **Phase 3**: Dialectic.OpenclawPlugin — agent-facing tools.
- **Phase 4**: frontend rewrite (STYLE.md + real Keycloak OIDC + visibility toggle UI).
- **Phase 5**: end-to-end integration with `analyze-intel` workflow.

View File

@@ -1,4 +0,0 @@
from api.debates import router as debates_router
from api.api_keys import router as api_keys_router
from api.models import router as models_router
from api.setup import router as setup_router

View File

@@ -1,113 +0,0 @@
from fastapi import APIRouter, Depends, Form, HTTPException
from middleware.auth import require_auth
import aiohttp
from services.api_key_service import ApiKeyService
from db_models import get_db
router = APIRouter(tags=["api-keys"])
async def validate_api_key(provider: str, api_key: str):
"""
Validate an API key by listing models from the provider.
All providers validated the same way: if we can list models, the key is valid.
"""
try:
if provider == "openai":
import openai
async with openai.AsyncOpenAI(api_key=api_key, timeout=10.0) as client:
await client.models.list()
return True
# Claude, Qwen, DeepSeek: GET their models endpoint via aiohttp
endpoints = {
"claude": {
"url": "https://api.anthropic.com/v1/models",
"headers": {
"x-api-key": api_key,
"anthropic-version": "2023-06-01"
}
},
"qwen": {
"url": "https://dashscope.aliyuncs.com/compatible-mode/v1/models",
"headers": {"Authorization": f"Bearer {api_key}"}
},
"deepseek": {
"url": "https://api.deepseek.com/v1/models",
"headers": {"Authorization": f"Bearer {api_key}"}
}
}
if provider in endpoints:
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
ep = endpoints[provider]
async with session.get(ep["url"], headers=ep["headers"]) as response:
return response.status == 200
# Tavily: validate by calling REST API directly (no tavily package needed)
if provider == "tavily":
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
"https://api.tavily.com/search",
json={"api_key": api_key, "query": "test", "max_results": 1}
) as response:
if response.status == 200:
data = await response.json()
return bool(data.get("results"))
return False
return False
except Exception:
return False
@router.post("/api-keys/{provider}", dependencies=[Depends(require_auth)])
async def set_api_key(provider: str, api_key: str = Form(...), db=Depends(get_db)):
"""
Set API key for a specific provider
"""
try:
success = ApiKeyService.set_api_key(db, provider, api_key)
if success:
return {"message": f"API key for {provider} updated successfully"}
else:
raise HTTPException(status_code=500, detail="Failed to update API key")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api-keys/{provider}")
async def get_api_key(provider: str, db=Depends(get_db)):
"""
Get API key for a specific provider
"""
try:
api_key = ApiKeyService.get_api_key(db, provider)
if api_key:
return {"provider": provider, "api_key": api_key}
else:
raise HTTPException(status_code=404, detail=f"No API key found for {provider}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/validate-api-key/{provider}", dependencies=[Depends(require_auth)])
async def validate_api_key_endpoint(provider: str, api_key: str = Form(...)):
"""
Validate an API key by making a test request to the provider
This endpoint is used by the frontend to validate API keys without CORS issues
"""
try:
is_valid = await validate_api_key(provider, api_key)
if is_valid:
return {"valid": True, "message": f"Valid {provider} API key"}
else:
return {"valid": False, "message": f"Invalid {provider} API key"}
except Exception as e:
return {"valid": False, "message": str(e)}

View File

@@ -1,158 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException
from middleware.auth import require_auth
from sse_starlette.sse import EventSourceResponse
from typing import Dict, Any
import asyncio
import json
from orchestrator.debate_orchestrator import DebateOrchestrator
from models.debate import DebateRequest
from storage.session_manager import SessionManager
from db_models import get_db
router = APIRouter(tags=["debates"])
@router.post("/debate/create", dependencies=[Depends(require_auth)])
async def create_debate(debate_request: DebateRequest, db=Depends(get_db)) -> Dict[str, Any]:
"""
Create a new debate session with specified parameters
"""
try:
orchestrator = DebateOrchestrator(db)
session_id = await orchestrator.create_session(debate_request)
return {
"session_id": session_id,
"status": "created",
"message": f"Debate session {session_id} created successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/debate/{session_id}")
async def get_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]:
"""
Get the current state of a debate session
"""
try:
orchestrator = DebateOrchestrator(db)
session = await orchestrator.get_session_status(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Debate session {session_id} not found")
return session.dict()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/debate/{session_id}/start", dependencies=[Depends(require_auth)])
async def start_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]:
"""
Start a debate session and stream the results
"""
try:
orchestrator = DebateOrchestrator(db)
session = await orchestrator.run_debate(session_id)
return {
"session_id": session_id,
"status": session.status,
"message": f"Debate session {session_id} completed"
}
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/debate/{session_id}", dependencies=[Depends(require_auth)])
async def end_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]:
"""
End a debate session prematurely
"""
try:
orchestrator = DebateOrchestrator(db)
await orchestrator.terminate_session(session_id)
return {"session_id": session_id, "status": "terminated"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/debate/{session_id}/stream")
async def stream_debate(session_id: str, db=Depends(get_db)) -> EventSourceResponse:
"""
Stream debate updates in real-time using Server-Sent Events
"""
async def event_generator():
session_manager = SessionManager()
# Check if the session exists
session = await session_manager.get_session(db, session_id)
if not session:
yield {"event": "error", "data": json.dumps({"error": f"Session {session_id} not found"})}
return
# Yield initial state
yield {"event": "update", "data": json.dumps({
"session_id": session_id,
"status": session.status,
"rounds": [round.dict() for round in session.rounds],
"evidence_library": [e.dict() for e in session.evidence_library],
"message": "Debate session initialized"
}, default=str)}
last_round_count = len(session.rounds)
# Poll until debate completes or times out (max 5 min)
for _ in range(150): # 150 × 2s = 300s timeout
await asyncio.sleep(2)
# Reset transaction so we see commits from the run_debate request
db.commit()
updated_session = await session_manager.get_session(db, session_id)
if not updated_session:
break
# Only yield update when rounds actually changed
if len(updated_session.rounds) != last_round_count or updated_session.status != session.status:
last_round_count = len(updated_session.rounds)
session = updated_session
yield {"event": "update", "data": json.dumps({
"session_id": session_id,
"status": updated_session.status,
"rounds": [round.dict() for round in updated_session.rounds],
"evidence_library": [e.dict() for e in updated_session.evidence_library],
"current_round": len(updated_session.rounds)
}, default=str)}
if updated_session.status in ("completed", "terminated"):
yield {"event": "complete", "data": json.dumps({
"session_id": session_id,
"status": updated_session.status,
"summary": updated_session.summary,
"rounds": [round.dict() for round in updated_session.rounds],
"evidence_library": [e.dict() for e in updated_session.evidence_library]
}, default=str)}
break
return EventSourceResponse(event_generator())
@router.get("/sessions")
async def list_sessions(db=Depends(get_db)) -> Dict[str, Any]:
"""
List all debate sessions
"""
try:
session_manager = SessionManager()
sessions = await session_manager.list_sessions(db)
return {"sessions": sessions}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,173 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException
import aiohttp
from services.api_key_service import ApiKeyService
from db_models import get_db
from api.api_keys import validate_api_key
router = APIRouter(tags=["models"])
@router.get("/models/{provider}")
async def get_available_models(provider: str, db=Depends(get_db)):
"""
Get available models for a specific provider by fetching from their API.
Falls back to a curated default list if the API key is missing or the request fails.
"""
# Curated defaults for each provider — used as fallback
default_models = {
"openai": [
{"model_identifier": "gpt-4o", "display_name": "gpt-4o"},
{"model_identifier": "gpt-4o-mini", "display_name": "gpt-4o-mini"},
{"model_identifier": "gpt-4-turbo", "display_name": "gpt-4-turbo"},
{"model_identifier": "gpt-4", "display_name": "gpt-4"},
{"model_identifier": "o3-mini", "display_name": "o3-mini"},
{"model_identifier": "gpt-3.5-turbo", "display_name": "gpt-3.5-turbo"}
],
"claude": [
{"model_identifier": "claude-opus-4-5", "display_name": "claude-opus-4-5"},
{"model_identifier": "claude-sonnet-4-5", "display_name": "claude-sonnet-4-5"},
{"model_identifier": "claude-3-5-sonnet-20241022", "display_name": "claude-3-5-sonnet-20241022"},
{"model_identifier": "claude-3-5-haiku-20241022", "display_name": "claude-3-5-haiku-20241022"},
{"model_identifier": "claude-3-opus-20240229", "display_name": "claude-3-opus-20240229"}
],
"qwen": [
{"model_identifier": "qwen3-max", "display_name": "qwen3-max"},
{"model_identifier": "qwen3-plus", "display_name": "qwen3-plus"},
{"model_identifier": "qwen3-flash", "display_name": "qwen3-flash"},
{"model_identifier": "qwen-max", "display_name": "qwen-max"},
{"model_identifier": "qwen-plus", "display_name": "qwen-plus"},
{"model_identifier": "qwen-turbo", "display_name": "qwen-turbo"}
],
"deepseek": [
{"model_identifier": "deepseek-chat", "display_name": "deepseek-chat"},
{"model_identifier": "deepseek-reasoner", "display_name": "deepseek-reasoner"},
{"model_identifier": "deepseek-v3", "display_name": "deepseek-v3"},
{"model_identifier": "deepseek-r1", "display_name": "deepseek-r1"}
]
}
defaults = default_models.get(provider, [])
try:
# Retrieve and decrypt API key
decrypted_key = ApiKeyService.get_api_key(db, provider)
if not decrypted_key:
return {"provider": provider, "models": defaults}
# ---------- OpenAI ----------
if provider == "openai":
import openai
async with openai.AsyncOpenAI(api_key=decrypted_key, timeout=10.0) as client:
response = await client.models.list()
# Keep only chat / reasoning models, sorted newest-first by created timestamp
chat_prefixes = ('gpt-', 'o1', 'o3', 'o4', 'chatgpt')
models = []
seen = set()
for m in sorted(response.data, key=lambda x: x.created, reverse=True):
if m.id not in seen and m.id.startswith(chat_prefixes):
seen.add(m.id)
models.append({"model_identifier": m.id, "display_name": m.id})
if models:
return {"provider": provider, "models": models}
# ---------- Claude ----------
elif provider == "claude":
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
headers = {
"x-api-key": decrypted_key,
"anthropic-version": "2023-06-01"
}
async with session.get("https://api.anthropic.com/v1/models", headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
models = []
for m in data.get("data", []):
model_id = m.get("id", "")
if model_id.startswith("claude-"):
models.append({"model_identifier": model_id, "display_name": model_id})
if models:
return {"provider": provider, "models": models}
else:
print(f"Claude models API returned {resp.status}: {await resp.text()}")
# ---------- Qwen ----------
elif provider == "qwen":
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
headers = {"Authorization": f"Bearer {decrypted_key}"}
async with session.get("https://dashscope.aliyuncs.com/compatible-mode/v1/models", headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
exclude_keywords = [
'tts', 'vl', 'ocr', 'image', 'asr', '-mt-', '-mt',
'math', 'embed', 'rerank', 'coder', 'translate',
's2s', 'deep-search', 'omni', 'gui-'
]
models = []
for m in data.get("data", []):
model_id = m.get("id", "")
if not model_id:
continue
if not (model_id.startswith("qwen") or model_id.startswith("qwq")):
continue
if any(kw in model_id for kw in exclude_keywords):
continue
models.append({"model_identifier": model_id, "display_name": model_id})
if models:
return {"provider": provider, "models": models}
else:
print(f"Qwen models API returned {resp.status}: {await resp.text()}")
# ---------- DeepSeek ----------
elif provider == "deepseek":
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
headers = {"Authorization": f"Bearer {decrypted_key}"}
async with session.get("https://api.deepseek.com/v1/models", headers=headers) as resp:
if resp.status == 200:
data = await resp.json()
models = []
for m in data.get("data", []):
model_id = m.get("id", "")
if model_id.startswith("deepseek"):
models.append({"model_identifier": model_id, "display_name": model_id})
if models:
return {"provider": provider, "models": models}
else:
print(f"DeepSeek models API returned {resp.status}: {await resp.text()}")
# API fetch succeeded but returned empty list, or unknown provider — use defaults
return {"provider": provider, "models": defaults}
except Exception as e:
print(f"Error fetching models for {provider}: {e}")
import traceback
traceback.print_exc()
return {"provider": provider, "models": defaults}
@router.get("/providers")
async def get_available_providers(db=Depends(get_db)):
"""
Get all providers that have valid API keys set
"""
try:
available_providers = []
for provider in ("openai", "claude", "qwen", "deepseek", "tavily"):
decrypted_key = ApiKeyService.get_api_key(db, provider)
if not decrypted_key:
continue
is_valid = await validate_api_key(provider, decrypted_key)
if is_valid:
available_providers.append({
"provider": provider,
"has_valid_key": True
})
return {"providers": available_providers}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,192 +0,0 @@
import os
from typing import Optional
from urllib.parse import quote_plus
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from services.config_service import ConfigService
router = APIRouter(prefix="/api/setup", tags=["setup"])
PASSWORD_PLACEHOLDER = "********"
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class DatabaseConfig(BaseModel):
host: str
port: int = 3306
user: str = "root"
password: str = ""
database: str = "dialectica"
class KeycloakConfig(BaseModel):
host: str = ""
realm: str = ""
client_id: str = ""
class TlsConfig(BaseModel):
cert_path: str = ""
key_path: str = ""
force_https: bool = False
class FullConfig(BaseModel):
database: Optional[DatabaseConfig] = None
keycloak: Optional[KeycloakConfig] = None
tls: Optional[TlsConfig] = None
# ---------------------------------------------------------------------------
# Access control dependency
# ---------------------------------------------------------------------------
async def setup_guard(request: Request):
"""Three-phase access control for setup routes.
1. Not initialized → only localhost allowed
2. ENV_MODE=dev → open
3. ENV_MODE=prod → Keycloak admin JWT required
"""
config = ConfigService.load()
initialized = config.get("initialized", False)
env_mode = os.getenv("ENV_MODE", "dev")
if env_mode == "dev":
return # dev mode: no auth needed, even before initialisation
if not initialized:
# prod + not initialised: only localhost may configure
client_ip = request.client.host
if client_ip not in ("127.0.0.1", "::1"):
raise HTTPException(
status_code=403,
detail="初次设置仅允许从本机访问",
)
return
# prod → delegate to Keycloak middleware (Phase 3)
from app.middleware.auth import require_admin
await require_admin(request, config)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@router.get("/status")
async def setup_status():
"""Return current system initialisation state, including KC info for OIDC."""
config = ConfigService.load()
env_mode = os.getenv("ENV_MODE", "dev")
result = {
"initialized": config.get("initialized", False),
"env_mode": env_mode,
"db_configured": ConfigService.is_db_configured(),
}
# Include Keycloak info so the frontend can build OIDC config
kc = config.get("keycloak", {})
if env_mode == "prod" and kc.get("host") and kc.get("realm"):
result["keycloak"] = {
"authority": f"{kc['host']}/realms/{kc['realm']}",
"client_id": kc.get("client_id", ""),
}
return result
@router.get("/config", dependencies=[Depends(setup_guard)])
async def get_config():
"""Return full config with passwords replaced by placeholder."""
config = ConfigService.load()
if "database" in config and config["database"].get("password"):
config["database"]["password"] = PASSWORD_PLACEHOLDER
return config
@router.put("/config", dependencies=[Depends(setup_guard)])
async def update_config(payload: FullConfig):
"""Merge submitted config sections into the YAML file."""
config = ConfigService.load()
if payload.database is not None:
db_dict = payload.database.model_dump()
# If password is the placeholder, keep the existing real password
if db_dict.get("password") == PASSWORD_PLACEHOLDER:
db_dict["password"] = config.get("database", {}).get("password", "")
config["database"] = db_dict
if payload.keycloak is not None:
config["keycloak"] = payload.keycloak.model_dump()
if payload.tls is not None:
config["tls"] = payload.tls.model_dump()
ConfigService.save(config)
return {"message": "配置已保存"}
@router.post("/test-db", dependencies=[Depends(setup_guard)])
async def test_db_connection(db_config: DatabaseConfig):
"""Test a database connection with the provided parameters (no save)."""
from sqlalchemy import create_engine, text
password = db_config.password
# If password is the placeholder, use the real password from config
if password == PASSWORD_PLACEHOLDER:
password = ConfigService.load().get("database", {}).get("password", "")
url = (
f"mysql+pymysql://{quote_plus(db_config.user)}:{quote_plus(password)}"
f"@{db_config.host}:{db_config.port}/{db_config.database}"
)
try:
engine = create_engine(url, pool_pre_ping=True)
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
engine.dispose()
return {"success": True, "message": "数据库连接成功"}
except Exception as e:
return {"success": False, "message": str(e)}
@router.post("/test-keycloak", dependencies=[Depends(setup_guard)])
async def test_keycloak(kc_config: KeycloakConfig):
"""Test Keycloak connectivity by fetching the OIDC discovery document."""
import httpx
well_known = (
f"{kc_config.host}/realms/{kc_config.realm}"
f"/.well-known/openid-configuration"
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(well_known)
if resp.status_code == 200:
return {"success": True, "message": "Keycloak 连通正常"}
return {"success": False, "message": f"HTTP {resp.status_code}"}
except Exception as e:
return {"success": False, "message": str(e)}
@router.post("/initialize", dependencies=[Depends(setup_guard)])
async def initialize():
"""Mark system as initialised and reload DB connection."""
config = ConfigService.load()
if not ConfigService.is_db_configured():
raise HTTPException(status_code=400, detail="请先配置数据库连接")
# Reload DB engine so business routes can start working
from app.db_models import reload_db_connection
from app.storage.database import init_db
reload_db_connection()
init_db()
config["initialized"] = True
ConfigService.save(config)
return {"message": "系统初始化完成", "initialized": True}

60
app.py
View File

@@ -1,60 +0,0 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import JSONResponse
import uvicorn
from storage.database import init_db
from exceptions import ServiceNotConfiguredError
from middleware.config_guard import ConfigGuardMiddleware
from api import debates_router, api_keys_router, models_router, setup_router
app = FastAPI(
title="Dialectica - Multi-Model Debate Framework",
description="A framework for structured debates between multiple language models",
version="0.1.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for development
allow_credentials=True,
allow_methods=["*"], # Allow all methods
allow_headers=["*"], # Allow all headers
# Add exposed headers to allow frontend to access response headers
expose_headers=["Access-Control-Allow-Origin", "Access-Control-Allow-Credentials"]
)
# Config guard: return 503 for business routes when DB not configured
app.add_middleware(ConfigGuardMiddleware)
@app.exception_handler(ServiceNotConfiguredError)
async def not_configured_handler(request, exc):
return JSONResponse(
status_code=503,
content={"error_code": "SERVICE_NOT_CONFIGURED", "detail": str(exc)},
)
@app.on_event("startup")
def startup_event():
"""Initialize database on startup (skipped if not configured)."""
init_db()
# Register routers
app.include_router(debates_router)
app.include_router(api_keys_router)
app.include_router(models_router)
app.include_router(setup_router)
@app.get("/")
async def root():
return {"message": "Welcome to Dialectica - Multi-Model Debate Framework"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -1,79 +0,0 @@
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
from exceptions import ServiceNotConfiguredError
from services.config_service import ConfigService
Base = declarative_base()
class ApiKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True, index=True)
provider = Column(String(50), unique=True, index=True, nullable=False)
api_key_encrypted = Column(Text, nullable=False) # Encrypted API key
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow)
class ModelConfig(Base):
__tablename__ = "model_configs"
id = Column(Integer, primary_key=True, index=True)
provider = Column(String(50), nullable=False)
model_name = Column(String(100), nullable=False)
display_name = Column(String(100)) # Optional display name
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow)
__table_args__ = {'mysql_charset': 'utf8mb4'}
# ---------------------------------------------------------------------------
# Lazy engine / session factory — only created when first requested
# ---------------------------------------------------------------------------
_engine = None
_SessionLocal = None
def _get_engine():
from sqlalchemy import create_engine
global _engine
if _engine is None:
db_url = ConfigService.get_database_url()
if not db_url:
raise ServiceNotConfiguredError("数据库未配置")
_engine = create_engine(db_url)
return _engine
def _get_session_factory():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=_get_engine()
)
return _SessionLocal
def reload_db_connection():
"""Dispose current engine and reset so next call rebuilds from config."""
global _engine, _SessionLocal
if _engine is not None:
_engine.dispose()
_engine = None
_SessionLocal = None
def get_db():
"""FastAPI dependency that yields a DB session."""
session_factory = _get_session_factory()
db = session_factory()
try:
yield db
finally:
db.close()

45
docker-compose.dev.yml Normal file
View File

@@ -0,0 +1,45 @@
# Dev docker-compose: backend + MySQL only, exposed on localhost.
# Frontend / nginx are in the sibling top-level Dialectic repo's compose.
# For end-to-end dev: run that compose; for backend-only iteration, this.
services:
backend:
build:
context: .
args:
VERSION: dev-local
environment:
ENV_MODE: dev
HTTP_ADDR: 0.0.0.0:8090
CORS_ALLOW_ORIGINS: "*"
DB_HOST: mysql
DB_PORT: "3306"
DB_NAME: dialectic
DB_USER: dialectic
DB_PASSWORD: dialectic
AGENT_API_KEY_PEPPER: dev-pepper
OIDC_DEV_BYPASS_TOKEN: dev-bypass-token
ports: ["8090:8090"]
depends_on:
mysql:
condition: service_healthy
restart: unless-stopped
mysql:
image: mysql:8.4
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: dialectic
MYSQL_USER: dialectic
MYSQL_PASSWORD: dialectic
ports: ["3306:3306"]
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-pdialectic"]
interval: 5s
timeout: 3s
retries: 20
volumes:
- dialectic_mysql_data:/var/lib/mysql
volumes:
dialectic_mysql_data:

View File

@@ -1,3 +0,0 @@
class ServiceNotConfiguredError(Exception):
"""Raised when a required service (database, etc.) is not configured."""
pass

13
go.mod Normal file
View File

@@ -0,0 +1,13 @@
module git.hangman-lab.top/hzhang/Dialectic.Backend
go 1.23
require (
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
github.com/go-sql-driver/mysql v1.8.1
github.com/google/uuid v1.6.0
github.com/jmoiron/sqlx v1.4.0
)
require filippo.io/edwards25519 v1.1.0 // indirect

16
go.sum Normal file
View File

@@ -0,0 +1,16 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

144
internal/auth/auth.go Normal file
View File

@@ -0,0 +1,144 @@
// Package auth holds the two middlewares Dialectic v2 uses:
//
// - AgentAPIKey: validates `Authorization: Bearer <raw>` against
// the `agent_keys` table (hashed with the configured pepper).
// Used by Dialectic.OpenclawPlugin → backend calls.
//
// - OIDCBrowser: validates a Keycloak-issued JWT in the
// `dialectic_session` cookie. Used by the React frontend.
// Phase 2C ships a stub that accepts a dev-mode bypass token; the
// real JWKS verification + claim mapping lands with Phase 4.
//
// Both middlewares attach a typed Caller to the request context so
// downstream handlers can read identity uniformly.
package auth
import (
"context"
"crypto/sha256"
"crypto/subtle"
"database/sql"
"encoding/hex"
"errors"
"net/http"
"strings"
"github.com/jmoiron/sqlx"
)
type CallerKind string
const (
CallerAgent CallerKind = "agent"
CallerUser CallerKind = "user"
CallerSystem CallerKind = "system"
)
type Caller struct {
Kind CallerKind
ID string // agentId for CallerAgent; userId for CallerUser; key-name for CallerSystem
Roles []string // populated for CallerUser (from JWT claims); empty otherwise
}
type ctxKey struct{}
func WithCaller(ctx context.Context, c Caller) context.Context {
return context.WithValue(ctx, ctxKey{}, c)
}
// FromContext returns the caller attached by an auth middleware. The
// zero Caller (Kind == "") indicates an unauthenticated request reached
// a public route.
func FromContext(ctx context.Context) Caller {
c, _ := ctx.Value(ctxKey{}).(Caller)
return c
}
// HashKey peppers + sha256-hashes a raw API key. Constant pepper; same
// raw key always produces the same hash so lookups can equal-match on
// the key_hash column.
func HashKey(pepper, raw string) string {
h := sha256.Sum256([]byte(pepper + ":" + raw))
return hex.EncodeToString(h[:])
}
// AgentAPIKey middleware: extracts Bearer token, looks up agent_keys,
// 401 on miss. Updates last_used_at lazily (best-effort; failure here
// doesn't block the request).
func AgentAPIKey(db *sqlx.DB, pepper string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
raw := bearerToken(r)
if raw == "" {
http.Error(w, "missing bearer token", http.StatusUnauthorized)
return
}
hash := HashKey(pepper, raw)
var agentID string
err := db.GetContext(r.Context(), &agentID,
`SELECT agent_id FROM agent_keys WHERE key_hash = ? AND revoked_at IS NULL`, hash)
if errors.Is(err, sql.ErrNoRows) {
http.Error(w, "invalid agent key", http.StatusUnauthorized)
return
}
if err != nil {
http.Error(w, "auth lookup failed", http.StatusInternalServerError)
return
}
go func(h string) {
// best-effort touch — independent ctx so it survives
// even if the request was cancelled mid-handler.
_, _ = db.Exec(
`UPDATE agent_keys SET last_used_at = CURRENT_TIMESTAMP WHERE key_hash = ?`, h)
}(hash)
ctx := WithCaller(r.Context(), Caller{Kind: CallerAgent, ID: agentID})
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
// OIDCBrowser middleware (Phase 2C stub):
// - Dev mode + `x-dev-bypass: <token>` header → admit as a fake user.
// - Otherwise: 401 with a hint pointing to the (not-yet-wired)
// Keycloak redirect path. The real JWKS-verifying middleware lands
// when the frontend is wired up; until then, browser callers can
// only reach the API via the dev bypass.
func OIDCBrowser(devMode bool, devBypassToken string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if devMode && devBypassToken != "" {
if subtleEqual(r.Header.Get("x-dev-bypass"), devBypassToken) {
ctx := WithCaller(r.Context(), Caller{
Kind: CallerUser,
ID: "dev-operator",
Roles: []string{"dialectic-admin"},
})
next.ServeHTTP(w, r.WithContext(ctx))
return
}
}
// Production path goes through Keycloak — Phase 4.
http.Error(w, "oidc login required (Phase 4: not yet wired)", http.StatusUnauthorized)
})
}
}
func bearerToken(r *http.Request) string {
h := r.Header.Get("authorization")
const prefix = "Bearer "
if strings.HasPrefix(h, prefix) {
return strings.TrimSpace(h[len(prefix):])
}
if strings.HasPrefix(h, "bearer ") {
return strings.TrimSpace(h[len("bearer "):])
}
return ""
}
func subtleEqual(a, b string) bool {
if len(a) != len(b) {
return false
}
return subtle.ConstantTimeCompare([]byte(a), []byte(b)) == 1
}

141
internal/config/config.go Normal file
View File

@@ -0,0 +1,141 @@
// Package config loads runtime configuration from environment variables.
//
// Conventions:
// - 12-factor: every config knob is an env var; no config files.
// - Sensible dev defaults for local docker-compose; prod sets via env.
// - Sensitive values (DB password, system api key) are *required* in
// prod; LoadFromEnv() fails fast if absent and ENV_MODE != "dev".
package config
import (
"fmt"
"os"
"strings"
)
type Config struct {
// "dev" | "prod". Dev relaxes required-field checks and enables a
// dev-mode auth bypass token. Prod requires every sensitive field.
Mode string
// HTTP server bind. e.g. "0.0.0.0:8090".
HTTPAddr string
// CORS allowed origins (comma-separated; "*" allowed only in dev).
CORSAllowOrigins []string
// MySQL DSN parts.
DBHost string
DBPort string
DBName string
DBUser string
DBPassword string
// Auth.
//
// SystemAPIKey: Phase-1 system key for posting to announce channels
// in Fabric. Mirrored here so Dialectic backend itself can post topic
// announcements via Fabric's POST /channels/:id/messages with
// x-fabric-system-key header.
//
// AgentAPIKeyPepper: HMAC pepper for hashing agent API keys at rest
// (we store sha256(pepper || raw) not the raw key). Rotating the
// pepper invalidates all keys — that's intentional, an emergency
// kill switch.
//
// OIDCDevBypassToken: dev-mode only. If set AND Mode == "dev", a
// browser request with header `x-dev-bypass: <token>` bypasses OIDC
// and is treated as user "dev-operator" with role "dialectic-admin".
// Prod ignores this even if set.
SystemAPIKey string
AgentAPIKeyPepper string
OIDCDevBypassToken string
// OIDC issuer URL (Keycloak realm endpoint). e.g.
// https://auth.hangman-lab.top/realms/hangman-lab
// Phase 2C ships this as configured-but-not-verified; Phase 4 wires
// real JWKS validation.
OIDCIssuer string
OIDCClientID string
}
func LoadFromEnv() (*Config, error) {
c := &Config{
Mode: getenv("ENV_MODE", "dev"),
HTTPAddr: getenv("HTTP_ADDR", "0.0.0.0:8090"),
CORSAllowOrigins: splitCSV(getenv("CORS_ALLOW_ORIGINS", "*")),
DBHost: getenv("DB_HOST", "127.0.0.1"),
DBPort: getenv("DB_PORT", "3306"),
DBName: getenv("DB_NAME", "dialectic"),
DBUser: getenv("DB_USER", "dialectic"),
DBPassword: os.Getenv("DB_PASSWORD"),
SystemAPIKey: os.Getenv("SYSTEM_API_KEY"),
AgentAPIKeyPepper: os.Getenv("AGENT_API_KEY_PEPPER"),
OIDCDevBypassToken: os.Getenv("OIDC_DEV_BYPASS_TOKEN"),
OIDCIssuer: os.Getenv("OIDC_ISSUER"),
OIDCClientID: os.Getenv("OIDC_CLIENT_ID"),
}
if c.Mode != "dev" && c.Mode != "prod" {
return nil, fmt.Errorf("ENV_MODE must be dev|prod, got %q", c.Mode)
}
if c.Mode == "prod" {
var missing []string
if c.DBPassword == "" {
missing = append(missing, "DB_PASSWORD")
}
if c.AgentAPIKeyPepper == "" {
missing = append(missing, "AGENT_API_KEY_PEPPER")
}
if c.OIDCIssuer == "" {
missing = append(missing, "OIDC_ISSUER")
}
if c.OIDCClientID == "" {
missing = append(missing, "OIDC_CLIENT_ID")
}
if len(missing) > 0 {
return nil, fmt.Errorf("prod mode requires env: %s", strings.Join(missing, ", "))
}
// In prod, "*" CORS is never accepted.
for _, o := range c.CORSAllowOrigins {
if o == "*" {
return nil, fmt.Errorf("prod mode forbids CORS_ALLOW_ORIGINS='*'")
}
}
}
return c, nil
}
func (c *Config) IsDev() bool { return c.Mode == "dev" }
func (c *Config) DSN() string {
// MySQL DSN: user:pass@tcp(host:port)/dbname?params
return fmt.Sprintf(
"%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4&collation=utf8mb4_unicode_ci",
c.DBUser, c.DBPassword, c.DBHost, c.DBPort, c.DBName,
)
}
func getenv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func splitCSV(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}

119
internal/db/db.go Normal file
View File

@@ -0,0 +1,119 @@
// Package db wraps sqlx and runs embedded SQL migrations on startup.
//
// Migrations are flat files in migrations/, named NNN_*.sql. They run in
// lexical order. Each is executed in its own transaction; a missing
// schema_migrations row indicates "not yet applied". This is a
// deliberately simple migration runner — for this project's size + team
// size, pulling in golang-migrate or atlas adds complexity without
// payback. If migration count grows past ~20, revisit.
package db
import (
"context"
"database/sql"
"embed"
"fmt"
"sort"
"strings"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
func Open(ctx context.Context, dsn string) (*sqlx.DB, error) {
d, err := sqlx.ConnectContext(ctx, "mysql", dsn)
if err != nil {
return nil, fmt.Errorf("connect mysql: %w", err)
}
d.SetMaxOpenConns(25)
d.SetMaxIdleConns(5)
d.SetConnMaxLifetime(5 * time.Minute)
return d, nil
}
// RunMigrations applies any migrations that aren't yet present in the
// schema_migrations table. Idempotent — safe to call on every startup.
func RunMigrations(ctx context.Context, d *sqlx.DB) error {
// Bootstrap the tracker table itself.
if _, err := d.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS schema_migrations (
name VARCHAR(255) PRIMARY KEY,
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`); err != nil {
return fmt.Errorf("ensure schema_migrations: %w", err)
}
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
return fmt.Errorf("list migrations: %w", err)
}
var files []string
for _, e := range entries {
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
files = append(files, e.Name())
}
}
sort.Strings(files)
for _, name := range files {
var found string
err := d.GetContext(ctx, &found, `SELECT name FROM schema_migrations WHERE name = ?`, name)
if err == nil {
continue // already applied
}
if err != sql.ErrNoRows {
return fmt.Errorf("check migration %s: %w", name, err)
}
content, err := migrationsFS.ReadFile("migrations/" + name)
if err != nil {
return fmt.Errorf("read migration %s: %w", name, err)
}
// MySQL doesn't support multi-statement in a single Exec by default
// — split on ';' boundaries and run each individually. Comments are
// passed through (server-side parser handles).
statements := splitSQL(string(content))
tx, err := d.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("tx for %s: %w", name, err)
}
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := tx.ExecContext(ctx, stmt); err != nil {
_ = tx.Rollback()
return fmt.Errorf("apply %s [statement: %q]: %w", name, firstLine(stmt), err)
}
}
if _, err := tx.ExecContext(ctx, `INSERT INTO schema_migrations(name) VALUES (?)`, name); err != nil {
_ = tx.Rollback()
return fmt.Errorf("record %s: %w", name, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit %s: %w", name, err)
}
}
return nil
}
func splitSQL(s string) []string {
// Crude but adequate for our migrations (no string-literal semicolons).
// If we ever need to embed semicolons inside strings, switch to a
// proper SQL tokenizer.
return strings.Split(s, ";")
}
func firstLine(s string) string {
if i := strings.IndexByte(s, '\n'); i >= 0 {
return s[:i]
}
return s
}

View File

@@ -0,0 +1,141 @@
-- 001_init.sql — Dialectic v2 schema (greenfield, replaces Python v1).
-- See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md for the design.
-- Verdict schemas — declared at topic-creation time; judge produces output matching.
CREATE TABLE verdict_schemas (
id VARCHAR(64) NOT NULL PRIMARY KEY,
description TEXT NOT NULL,
shape_json JSON NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Seed v1 schemas.
INSERT INTO verdict_schemas (id, description, shape_json) VALUES
('binary', 'pro|con|draw with confidence + key reasoning', JSON_OBJECT('decision', 'pro|con|draw', 'confidence', 'number 0..1', 'key_reasoning', 'string')),
('claim-resolution', 'analyze-intel contested-cluster resolution', JSON_OBJECT('verdict', 'resolved-toward-A|resolved-toward-B|irreducibly-contested', 'winning_claim', 'string', 'dissenting_points', 'array of string', 'confidence', 'number 0..1')),
('policy-recommendation', 'recommended action with alternatives and risks', JSON_OBJECT('recommended_action', 'string', 'alternatives', 'array of string', 'conditions_for_alternatives', 'array of string', 'risks_noted', 'array of string')),
('free-form', 'unstructured summary escape hatch', JSON_OBJECT('summary', 'string'));
-- Topics (议题) — the unit of debate.
CREATE TABLE topics (
id CHAR(36) NOT NULL PRIMARY KEY,
title VARCHAR(255) NOT NULL,
summary TEXT NOT NULL,
visibility ENUM('public','private') NOT NULL DEFAULT 'private',
verdict_schema_id VARCHAR(64) NOT NULL,
status ENUM('created','signup_open','signup_closed','debating','completed','cancelled') NOT NULL DEFAULT 'created',
-- Lifecycle timestamps (per section 3 of design doc)
signup_open_at TIMESTAMP NOT NULL,
signup_close_at TIMESTAMP NOT NULL,
debate_start_at TIMESTAMP NOT NULL,
debate_end_at TIMESTAMP NOT NULL,
-- Audit
creator_user_id CHAR(36) NOT NULL,
visibility_changed_by CHAR(36) NULL,
visibility_changed_at TIMESTAMP NULL,
cancelled_reason VARCHAR(255) NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_topics_status (status, signup_open_at),
INDEX idx_topics_visibility (visibility, created_at),
CONSTRAINT fk_topics_schema FOREIGN KEY (verdict_schema_id) REFERENCES verdict_schemas(id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Signups: an agent volunteers for one or more camps on a topic.
-- willing_camps is a JSON array of camp names (subset of {pro, con, judge}).
-- (agent_id, topic_id) is unique — re-signup updates willing_camps.
CREATE TABLE signups (
id CHAR(36) NOT NULL PRIMARY KEY,
topic_id CHAR(36) NOT NULL,
agent_id VARCHAR(64) NOT NULL,
willing_camps JSON NOT NULL,
-- Pre-validation result captured at signup time (plugin verifies the
-- agent has an on_call slot covering the debate window; backend
-- records what the agent told it for audit).
pre_validated BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_signups (topic_id, agent_id),
CONSTRAINT fk_signups_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Camps: the post-allocation assignment. One row per (topic, camp) with
-- the locked-in agent. Written by camp-allocation algorithm at
-- signup_close_at; immutable afterwards (no drop-out / replacement in v1).
CREATE TABLE camps (
id CHAR(36) NOT NULL PRIMARY KEY,
topic_id CHAR(36) NOT NULL,
camp ENUM('pro','con','judge') NOT NULL,
agent_id VARCHAR(64) NOT NULL,
allocated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_camps (topic_id, camp),
INDEX idx_camps_agent (agent_id),
CONSTRAINT fk_camps_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Rounds: chronological partition of arguments. Each topic has N rounds
-- (typically 3-5); round 0 is the opening. Round transitions are driven
-- by the orchestrator on a schedule (or all-participants-posted).
CREATE TABLE rounds (
id CHAR(36) NOT NULL PRIMARY KEY,
topic_id CHAR(36) NOT NULL,
round_no INT NOT NULL,
opened_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
closed_at TIMESTAMP NULL,
UNIQUE KEY uq_rounds (topic_id, round_no),
CONSTRAINT fk_rounds_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Arguments: an individual contribution within a round by a camp's agent.
-- For pro/con these are claims/rebuttals; for judge these are clarifying
-- questions (judge is silent observer in v1 except for clarifications).
CREATE TABLE arguments (
id CHAR(36) NOT NULL PRIMARY KEY,
topic_id CHAR(36) NOT NULL,
round_id CHAR(36) NOT NULL,
camp ENUM('pro','con','judge') NOT NULL,
agent_id VARCHAR(64) NOT NULL,
content MEDIUMTEXT NOT NULL,
posted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_arguments_round (round_id, posted_at),
INDEX idx_arguments_topic (topic_id, posted_at),
CONSTRAINT fk_arguments_round FOREIGN KEY (round_id) REFERENCES rounds(id) ON DELETE CASCADE,
CONSTRAINT fk_arguments_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Verdicts: judge's structured output, one per topic (one verdict per
-- debate). verdict_json shape matches the topic's verdict_schema_id.
CREATE TABLE verdicts (
id CHAR(36) NOT NULL PRIMARY KEY,
topic_id CHAR(36) NOT NULL UNIQUE,
judge_agent_id VARCHAR(64) NOT NULL,
verdict_json JSON NOT NULL,
rationale TEXT NOT NULL,
-- Token cost trail for accounting (Phase 1: not enforced; Phase N: budget gate)
tokens_input INT NOT NULL DEFAULT 0,
tokens_output INT NOT NULL DEFAULT 0,
produced_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT fk_verdicts_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- Agent API keys: provisioned per agent at recruitment time. Stored as
-- sha256(pepper || raw); pepper rotation invalidates all keys.
CREATE TABLE agent_keys (
agent_id VARCHAR(64) NOT NULL PRIMARY KEY,
key_hash CHAR(64) NOT NULL UNIQUE,
issued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP NULL,
revoked_at TIMESTAMP NULL,
INDEX idx_agent_keys_hash (key_hash)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- System keys: out-of-band credentials for non-agent callers (e.g. the
-- analyze-intel workflow running via a system identity that creates
-- topics on behalf of the analyzing agent). Also stored as hash.
CREATE TABLE system_keys (
name VARCHAR(64) NOT NULL PRIMARY KEY,
key_hash CHAR(64) NOT NULL UNIQUE,
description TEXT NULL,
issued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
revoked_at TIMESTAMP NULL,
INDEX idx_system_keys_hash (key_hash)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@@ -0,0 +1,38 @@
package handlers
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/jmoiron/sqlx"
)
type HealthHandler struct {
db *sqlx.DB
version string
startedAt time.Time
}
func NewHealthHandler(db *sqlx.DB, version string) *HealthHandler {
return &HealthHandler{db: db, version: version, startedAt: time.Now()}
}
func (h *HealthHandler) Healthz(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()
dbOK := h.db.PingContext(ctx) == nil
status := http.StatusOK
if !dbOK {
status = http.StatusServiceUnavailable
}
w.Header().Set("content-type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(map[string]any{
"ok": dbOK,
"version": h.version,
"uptime_s": int(time.Since(h.startedAt).Seconds()),
"checked_at": time.Now().UTC().Format(time.RFC3339),
})
}

View File

@@ -0,0 +1,131 @@
package handlers
import (
"encoding/json"
"errors"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
)
type SignupsHandler struct {
topics *store.TopicStore
signups *store.SignupStore
}
func NewSignupsHandler(t *store.TopicStore, s *store.SignupStore) *SignupsHandler {
return &SignupsHandler{topics: t, signups: s}
}
type signupBody struct {
WillingCamps []models.Camp `json:"willing_camps"`
PreValidated bool `json:"pre_validated"`
}
// POST /api/topics/{id}/signups
//
// Agent self-enrollment. Only CallerAgent is allowed — browsers can't
// sign up on behalf of an agent (would defeat the on_call pre-check
// that the plugin does before calling this endpoint).
//
// Body: { willing_camps: [pro|con|judge ...], pre_validated: bool }
//
// pre_validated is the agent's plugin's claim that it verified the
// agent has an on_call HF slot covering [debate_start_at, debate_end_at].
// Backend trusts but logs — Phase N may add server-side verification.
//
// Topic must be in status `signup_open`. Outside that window → 409.
func (h *SignupsHandler) Create(w http.ResponseWriter, r *http.Request) {
caller := auth.FromContext(r.Context())
if caller.Kind != auth.CallerAgent {
http.Error(w, "signup is agent-only", http.StatusForbidden)
return
}
topicID := chi.URLParam(r, "id")
t, err := h.topics.GetByID(r.Context(), topicID)
if errors.Is(err, store.ErrNotFound) {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
if err != nil {
http.Error(w, "lookup failed", http.StatusInternalServerError)
return
}
if t.Status != models.TopicStatusSignupOpen {
http.Error(w, "signup window not open (status="+string(t.Status)+")", http.StatusConflict)
return
}
now := time.Now()
if now.Before(t.SignupOpenAt) {
http.Error(w, "signup not yet open", http.StatusConflict)
return
}
if now.After(t.SignupCloseAt) {
http.Error(w, "signup closed", http.StatusConflict)
return
}
var body signupBody
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad body", http.StatusBadRequest)
return
}
if len(body.WillingCamps) == 0 {
http.Error(w, "willing_camps required", http.StatusBadRequest)
return
}
// Dedup the camps so a buggy plugin can't insert duplicates.
camps := dedupCamps(body.WillingCamps)
view, err := h.signups.Upsert(r.Context(), store.UpsertSignupInput{
TopicID: topicID,
AgentID: caller.ID,
WillingCamps: camps,
PreValidated: body.PreValidated,
})
if err != nil {
http.Error(w, "upsert failed: "+err.Error(), http.StatusBadRequest)
return
}
writeJSON(w, http.StatusCreated, view)
}
// GET /api/topics/{id}/signups — list all signups for a topic.
//
// Visible to topic creator + admins + agents. Public anonymous see
// nothing (avoid leaking who-signed-up for private topics).
func (h *SignupsHandler) List(w http.ResponseWriter, r *http.Request) {
caller := auth.FromContext(r.Context())
if caller.Kind == "" {
http.Error(w, "auth required", http.StatusUnauthorized)
return
}
topicID := chi.URLParam(r, "id")
if _, err := h.topics.GetByID(r.Context(), topicID); err != nil {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
rows, err := h.signups.ListByTopic(r.Context(), topicID)
if err != nil {
http.Error(w, "list failed", http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, map[string]any{"signups": rows, "count": len(rows)})
}
func dedupCamps(in []models.Camp) []models.Camp {
seen := map[models.Camp]struct{}{}
out := make([]models.Camp, 0, len(in))
for _, c := range in {
if _, ok := seen[c]; ok {
continue
}
seen[c] = struct{}{}
out = append(out, c)
}
return out
}

View File

@@ -0,0 +1,221 @@
package handlers
import (
"encoding/json"
"errors"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
)
type TopicsHandler struct {
store *store.TopicStore
}
func NewTopicsHandler(s *store.TopicStore) *TopicsHandler { return &TopicsHandler{store: s} }
// GET /api/topics?status=...&visibility=...&limit=...&offset=...
//
// Visibility filter is enforced at the auth layer: anonymous callers
// only see visibility=public; authenticated users (CallerUser) see all
// they're entitled to (Phase 2 v1: all; Phase 4 may add per-user ACLs).
// Agent callers (CallerAgent) see all — they're acting as system on
// behalf of the platform.
func (h *TopicsHandler) List(w http.ResponseWriter, r *http.Request) {
caller := auth.FromContext(r.Context())
f := store.ListFilter{
Status: r.URL.Query().Get("status"),
Visibility: r.URL.Query().Get("visibility"),
}
if v, _ := strconv.Atoi(r.URL.Query().Get("limit")); v > 0 {
f.Limit = v
}
if v, _ := strconv.Atoi(r.URL.Query().Get("offset")); v > 0 {
f.Offset = v
}
// Anonymous: force visibility=public.
if caller.Kind == "" {
f.Visibility = string(models.VisibilityPublic)
}
rows, err := h.store.List(r.Context(), f)
if err != nil {
http.Error(w, "list failed: "+err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, map[string]any{"topics": rows, "count": len(rows)})
}
// GET /api/topics/{id}
func (h *TopicsHandler) Get(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if id == "" {
http.Error(w, "missing id", http.StatusBadRequest)
return
}
t, err := h.store.GetByID(r.Context(), id)
if errors.Is(err, store.ErrNotFound) {
http.Error(w, "topic not found", http.StatusNotFound)
return
}
if err != nil {
http.Error(w, "get failed: "+err.Error(), http.StatusInternalServerError)
return
}
// Visibility gate: anonymous can only see public; authenticated see all.
caller := auth.FromContext(r.Context())
if caller.Kind == "" && t.Visibility != models.VisibilityPublic {
http.Error(w, "not found", http.StatusNotFound) // 404 not 403 — hide existence
return
}
writeJSON(w, http.StatusOK, t)
}
type createTopicBody struct {
Title string `json:"title"`
Summary string `json:"summary"`
Visibility string `json:"visibility"` // default "private"
VerdictSchemaID string `json:"verdict_schema_id"` // default "free-form"
SignupOpenAt string `json:"signup_open_at"` // RFC3339
SignupCloseAt string `json:"signup_close_at"`
DebateStartAt string `json:"debate_start_at"`
DebateEndAt string `json:"debate_end_at"`
}
// POST /api/topics
//
// Allowed callers: agent or authenticated user. Anonymous rejected
// (the route wires the auth-required middleware).
func (h *TopicsHandler) Create(w http.ResponseWriter, r *http.Request) {
caller := auth.FromContext(r.Context())
if caller.Kind == "" {
http.Error(w, "auth required", http.StatusUnauthorized)
return
}
var body createTopicBody
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad body: "+err.Error(), http.StatusBadRequest)
return
}
if body.Title == "" || body.Summary == "" {
http.Error(w, "title and summary required", http.StatusBadRequest)
return
}
if body.Visibility == "" {
body.Visibility = string(models.VisibilityPrivate)
}
if body.VerdictSchemaID == "" {
body.VerdictSchemaID = "free-form"
}
if err := validateLifecycleTimes(body); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
created, err := h.store.Create(r.Context(), store.CreateTopicInput{
Title: body.Title,
Summary: body.Summary,
Visibility: models.Visibility(body.Visibility),
VerdictSchemaID: body.VerdictSchemaID,
SignupOpenAt: body.SignupOpenAt,
SignupCloseAt: body.SignupCloseAt,
DebateStartAt: body.DebateStartAt,
DebateEndAt: body.DebateEndAt,
CreatorUserID: caller.ID,
})
if err != nil {
http.Error(w, "create failed: "+err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusCreated, created)
}
// validateLifecycleTimes enforces:
//
// signup_open < signup_close <= debate_start < debate_end
//
// All four timestamps must be parsable as RFC3339; failure → 400.
func validateLifecycleTimes(b createTopicBody) error {
type p struct {
name string
raw string
}
parts := []p{
{"signup_open_at", b.SignupOpenAt},
{"signup_close_at", b.SignupCloseAt},
{"debate_start_at", b.DebateStartAt},
{"debate_end_at", b.DebateEndAt},
}
parsed := make([]time.Time, 4)
for i, x := range parts {
t, err := time.Parse(time.RFC3339, x.raw)
if err != nil {
return errors.New(x.name + ": must be RFC3339")
}
parsed[i] = t
}
if !parsed[0].Before(parsed[1]) {
return errors.New("signup_open_at must be before signup_close_at")
}
if parsed[1].After(parsed[2]) {
return errors.New("signup_close_at must be <= debate_start_at")
}
if !parsed[2].Before(parsed[3]) {
return errors.New("debate_start_at must be before debate_end_at")
}
return nil
}
// PUT /api/topics/{id}/visibility — admin-only flip (Phase 2 stub: any
// authenticated user; Phase 4 will check the dialectic-admin role from JWT).
func (h *TopicsHandler) SetVisibility(w http.ResponseWriter, r *http.Request) {
caller := auth.FromContext(r.Context())
if caller.Kind == "" {
http.Error(w, "auth required", http.StatusUnauthorized)
return
}
if caller.Kind == auth.CallerUser && !hasRole(caller, "dialectic-admin") {
http.Error(w, "dialectic-admin role required", http.StatusForbidden)
return
}
id := chi.URLParam(r, "id")
var body struct {
Visibility string `json:"visibility"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "bad body", http.StatusBadRequest)
return
}
v := models.Visibility(body.Visibility)
if v != models.VisibilityPublic && v != models.VisibilityPrivate {
http.Error(w, "visibility must be public|private", http.StatusBadRequest)
return
}
t, err := h.store.SetVisibility(r.Context(), id, v, caller.ID)
if err != nil {
http.Error(w, "update failed: "+err.Error(), http.StatusInternalServerError)
return
}
writeJSON(w, http.StatusOK, t)
}
func hasRole(c auth.Caller, role string) bool {
for _, r := range c.Roles {
if r == role {
return true
}
}
return false
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}

177
internal/httpapi/routes.go Normal file
View File

@@ -0,0 +1,177 @@
package httpapi
import (
"net/http"
"time"
"github.com/go-chi/chi/v5"
chimw "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
"github.com/jmoiron/sqlx"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi/handlers"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
)
// Mount returns the root router with all v2 endpoints wired. Owners of
// individual middleware chains:
//
// - /api/healthz : public (no auth)
// - /api/topics : mixed — list/get optional auth (anon
// sees public only); create requires CallerAgent or CallerUser
// - /api/topics/{id}/signups : agent-only (CallerAgent)
//
// Browser-side OIDC and agent-side bearer middlewares co-exist on the
// same route by being "optional auth" — if either succeeds, Caller is
// attached; otherwise the handler sees anonymous and decides whether
// to 401 or fall through to public behavior.
func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler {
r := chi.NewRouter()
// Boilerplate middleware — these run on every request.
r.Use(chimw.RealIP)
r.Use(chimw.RequestID)
r.Use(chimw.Logger)
r.Use(chimw.Recoverer)
r.Use(chimw.Timeout(30 * time.Second))
r.Use(cors.Handler(cors.Options{
AllowedOrigins: cfg.CORSAllowOrigins,
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "x-dev-bypass"},
ExposedHeaders: []string{},
AllowCredentials: true,
MaxAge: 300,
}))
// Auth middlewares — composed as "try agent, then user, else pass anonymous".
optionalAuth := optionalAuthChain(db, cfg)
requireAgent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper) // strict bearer
requireAnyAuth := requireAnyAuthChain(db, cfg)
// Handler instances.
topicStore := store.NewTopicStore(db)
signupStore := store.NewSignupStore(db)
health := handlers.NewHealthHandler(db, version)
topicsH := handlers.NewTopicsHandler(topicStore)
signupsH := handlers.NewSignupsHandler(topicStore, signupStore)
// Routes.
r.Route("/api", func(r chi.Router) {
r.Get("/healthz", health.Healthz)
// Topics: list+get optional-auth (visibility-gated by handler);
// create+visibility-flip require any auth.
r.Group(func(r chi.Router) {
r.Use(optionalAuth)
r.Get("/topics", topicsH.List)
r.Get("/topics/{id}", topicsH.Get)
})
r.Group(func(r chi.Router) {
r.Use(requireAnyAuth)
r.Post("/topics", topicsH.Create)
r.Put("/topics/{id}/visibility", topicsH.SetVisibility)
})
// Signups: agent-only.
r.Group(func(r chi.Router) {
r.Use(requireAgent)
r.Post("/topics/{id}/signups", signupsH.Create)
})
// List signups: any authenticated caller.
r.Group(func(r chi.Router) {
r.Use(requireAnyAuth)
r.Get("/topics/{id}/signups", signupsH.List)
})
})
return r
}
// optionalAuthChain: if either auth method succeeds, attach Caller;
// otherwise let the request through anonymous. Handlers decide what
// to do with anonymous (typically: serve public subset, hide private).
func optionalAuthChain(db *sqlx.DB, cfg *config.Config) func(http.Handler) http.Handler {
agent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper)
oidc := auth.OIDCBrowser(cfg.IsDev(), cfg.OIDCDevBypassToken)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Bearer present → try agent path; on success it ServeHTTPs next.
// On failure it 401s, which we want to demote to "anonymous" for
// optional auth. The pattern is: capture the response; if it's
// 401, fall through to OIDC; if OIDC also 401s, finally fall
// through to next (anonymous).
if r.Header.Get("authorization") != "" {
rw := &captureWriter{ResponseWriter: w}
agent(next).ServeHTTP(rw, r)
if rw.status != http.StatusUnauthorized {
return
}
// reset captured state and try anon path (since OIDC
// won't apply if there's no cookie / bypass header)
}
if r.Header.Get("x-dev-bypass") != "" {
rw := &captureWriter{ResponseWriter: w}
oidc(next).ServeHTTP(rw, r)
if rw.status != http.StatusUnauthorized {
return
}
}
// Anonymous — call next with no Caller attached.
next.ServeHTTP(w, r)
})
}
}
// requireAnyAuthChain: 401 if neither agent nor user auth succeeds.
func requireAnyAuthChain(db *sqlx.DB, cfg *config.Config) func(http.Handler) http.Handler {
agent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper)
oidc := auth.OIDCBrowser(cfg.IsDev(), cfg.OIDCDevBypassToken)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("authorization") != "" {
rw := &captureWriter{ResponseWriter: w}
agent(next).ServeHTTP(rw, r)
if rw.status != http.StatusUnauthorized {
return
}
}
oidc(next).ServeHTTP(w, r)
})
}
}
// captureWriter records the status so the optional-auth chain can
// distinguish "401 from inner middleware (try next)" from "actual
// response from handler (deliver)". Body bytes are passed through
// when status != 401.
type captureWriter struct {
http.ResponseWriter
status int
wroteHeader bool
suppressing bool
}
func (c *captureWriter) WriteHeader(s int) {
c.status = s
c.wroteHeader = true
if s == http.StatusUnauthorized {
// don't actually write — we may fall through
c.suppressing = true
return
}
c.ResponseWriter.WriteHeader(s)
}
func (c *captureWriter) Write(b []byte) (int, error) {
if c.suppressing {
// swallow; caller will fall through to next chain step
return len(b), nil
}
if !c.wroteHeader {
c.ResponseWriter.WriteHeader(http.StatusOK)
c.wroteHeader = true
}
return c.ResponseWriter.Write(b)
}

37
internal/models/signup.go Normal file
View File

@@ -0,0 +1,37 @@
package models
import "time"
type Signup struct {
ID string `db:"id" json:"id"`
TopicID string `db:"topic_id" json:"topic_id"`
AgentID string `db:"agent_id" json:"agent_id"`
WillingCamps []byte `db:"willing_camps" json:"-"` // JSON column; surface as typed via View()
PreValidated bool `db:"pre_validated" json:"pre_validated"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
}
// SignupView is the JSON-friendly projection that decodes WillingCamps.
type SignupView struct {
ID string `json:"id"`
TopicID string `json:"topic_id"`
AgentID string `json:"agent_id"`
WillingCamps []Camp `json:"willing_camps"`
PreValidated bool `json:"pre_validated"`
CreatedAt time.Time `json:"created_at"`
}
func (s *Signup) View() (SignupView, error) {
var camps SignupCampsJSON
if err := camps.UnmarshalDB(s.WillingCamps); err != nil {
return SignupView{}, err
}
return SignupView{
ID: s.ID,
TopicID: s.TopicID,
AgentID: s.AgentID,
WillingCamps: camps,
PreValidated: s.PreValidated,
CreatedAt: s.CreatedAt,
}, nil
}

78
internal/models/topic.go Normal file
View File

@@ -0,0 +1,78 @@
package models
import (
"encoding/json"
"time"
)
type Visibility string
const (
VisibilityPublic Visibility = "public"
VisibilityPrivate Visibility = "private"
)
type TopicStatus string
const (
TopicStatusCreated TopicStatus = "created"
TopicStatusSignupOpen TopicStatus = "signup_open"
TopicStatusSignupClosed TopicStatus = "signup_closed"
TopicStatusDebating TopicStatus = "debating"
TopicStatusCompleted TopicStatus = "completed"
TopicStatusCancelled TopicStatus = "cancelled"
)
type Camp string
const (
CampPro Camp = "pro"
CampCon Camp = "con"
CampJudge Camp = "judge"
)
// AllCamps is the canonical iteration order used by the allocation algorithm.
var AllCamps = [3]Camp{CampPro, CampCon, CampJudge}
type Topic struct {
ID string `db:"id" json:"id"`
Title string `db:"title" json:"title"`
Summary string `db:"summary" json:"summary"`
Visibility Visibility `db:"visibility" json:"visibility"`
VerdictSchemaID string `db:"verdict_schema_id" json:"verdict_schema_id"`
Status TopicStatus `db:"status" json:"status"`
SignupOpenAt time.Time `db:"signup_open_at" json:"signup_open_at"`
SignupCloseAt time.Time `db:"signup_close_at" json:"signup_close_at"`
DebateStartAt time.Time `db:"debate_start_at" json:"debate_start_at"`
DebateEndAt time.Time `db:"debate_end_at" json:"debate_end_at"`
CreatorUserID string `db:"creator_user_id" json:"creator_user_id"`
VisibilityChangedBy *string `db:"visibility_changed_by" json:"visibility_changed_by,omitempty"`
VisibilityChangedAt *time.Time `db:"visibility_changed_at" json:"visibility_changed_at,omitempty"`
CancelledReason *string `db:"cancelled_reason" json:"cancelled_reason,omitempty"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
}
// IsCampValid returns true iff c is one of pro|con|judge.
func IsCampValid(c Camp) bool {
for _, k := range AllCamps {
if k == c {
return true
}
}
return false
}
// SignupCampsJSON is a typed wrapper around the JSON-stored willing_camps
// column. We marshal/unmarshal at the boundary so handlers can work with
// the typed slice.
type SignupCampsJSON []Camp
func (s SignupCampsJSON) Marshal() ([]byte, error) { return json.Marshal(s) }
func (s *SignupCampsJSON) UnmarshalDB(raw []byte) error {
if len(raw) == 0 {
*s = nil
return nil
}
return json.Unmarshal(raw, s)
}

View File

@@ -0,0 +1,95 @@
package store
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
)
type SignupStore struct {
db *sqlx.DB
}
func NewSignupStore(db *sqlx.DB) *SignupStore { return &SignupStore{db: db} }
type UpsertSignupInput struct {
TopicID string
AgentID string
WillingCamps []models.Camp
PreValidated bool
}
// Upsert creates or updates an agent's signup for a topic. Re-signup
// replaces willing_camps (intentional: lets an agent change their mind
// before signup_close_at).
func (s *SignupStore) Upsert(ctx context.Context, in UpsertSignupInput) (*models.SignupView, error) {
if len(in.WillingCamps) == 0 {
return nil, fmt.Errorf("willing_camps must be non-empty")
}
for _, c := range in.WillingCamps {
if !models.IsCampValid(c) {
return nil, fmt.Errorf("invalid camp %q", c)
}
}
raw, err := json.Marshal(in.WillingCamps)
if err != nil {
return nil, err
}
// Try insert; on duplicate (topic, agent), update.
id := uuid.NewString()
_, err = s.db.ExecContext(ctx, `
INSERT INTO signups (id, topic_id, agent_id, willing_camps, pre_validated)
VALUES (?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
willing_camps = VALUES(willing_camps),
pre_validated = VALUES(pre_validated)`,
id, in.TopicID, in.AgentID, raw, in.PreValidated)
if err != nil {
return nil, fmt.Errorf("upsert signup: %w", err)
}
return s.GetByPair(ctx, in.TopicID, in.AgentID)
}
func (s *SignupStore) GetByPair(ctx context.Context, topicID, agentID string) (*models.SignupView, error) {
var row models.Signup
err := s.db.GetContext(ctx, &row,
`SELECT * FROM signups WHERE topic_id = ? AND agent_id = ?`, topicID, agentID)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, err
}
v, err := row.View()
if err != nil {
return nil, err
}
return &v, nil
}
// ListByTopic returns all signups for a topic. Used by the allocation
// algorithm at signup_close_at and by the topic-detail UI.
func (s *SignupStore) ListByTopic(ctx context.Context, topicID string) ([]models.SignupView, error) {
var rows []models.Signup
if err := s.db.SelectContext(ctx, &rows,
`SELECT * FROM signups WHERE topic_id = ? ORDER BY created_at ASC`, topicID); err != nil {
return nil, err
}
out := make([]models.SignupView, 0, len(rows))
for _, r := range rows {
v, err := r.View()
if err != nil {
return nil, err
}
out = append(out, v)
}
return out, nil
}

View File

@@ -0,0 +1,106 @@
package store
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
)
var ErrNotFound = errors.New("not found")
type TopicStore struct {
db *sqlx.DB
}
func NewTopicStore(db *sqlx.DB) *TopicStore { return &TopicStore{db: db} }
type CreateTopicInput struct {
Title string
Summary string
Visibility models.Visibility
VerdictSchemaID string
SignupOpenAt string // RFC3339; parsed by SQL
SignupCloseAt string
DebateStartAt string
DebateEndAt string
CreatorUserID string
}
func (s *TopicStore) Create(ctx context.Context, in CreateTopicInput) (*models.Topic, error) {
id := uuid.NewString()
_, err := s.db.ExecContext(ctx, `
INSERT INTO topics (id, title, summary, visibility, verdict_schema_id,
signup_open_at, signup_close_at, debate_start_at, debate_end_at, creator_user_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
id, in.Title, in.Summary, in.Visibility, in.VerdictSchemaID,
in.SignupOpenAt, in.SignupCloseAt, in.DebateStartAt, in.DebateEndAt, in.CreatorUserID)
if err != nil {
return nil, fmt.Errorf("insert topic: %w", err)
}
return s.GetByID(ctx, id)
}
func (s *TopicStore) GetByID(ctx context.Context, id string) (*models.Topic, error) {
var t models.Topic
err := s.db.GetContext(ctx, &t, `SELECT * FROM topics WHERE id = ?`, id)
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNotFound
}
if err != nil {
return nil, err
}
return &t, nil
}
type ListFilter struct {
Status string // empty = all
Visibility string // empty = all
Limit int // 0 = default 50
Offset int
}
func (s *TopicStore) List(ctx context.Context, f ListFilter) ([]models.Topic, error) {
if f.Limit <= 0 || f.Limit > 200 {
f.Limit = 50
}
q := "SELECT * FROM topics"
args := []any{}
var clauses []string
if f.Status != "" {
clauses = append(clauses, "status = ?")
args = append(args, f.Status)
}
if f.Visibility != "" {
clauses = append(clauses, "visibility = ?")
args = append(args, f.Visibility)
}
if len(clauses) > 0 {
q += " WHERE " + strings.Join(clauses, " AND ")
}
q += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
args = append(args, f.Limit, f.Offset)
var rows []models.Topic
if err := s.db.SelectContext(ctx, &rows, q, args...); err != nil {
return nil, err
}
return rows, nil
}
// SetVisibility flips public/private; records who/when. Returns updated row.
func (s *TopicStore) SetVisibility(ctx context.Context, id string, v models.Visibility, byUserID string) (*models.Topic, error) {
_, err := s.db.ExecContext(ctx, `
UPDATE topics SET visibility = ?, visibility_changed_by = ?, visibility_changed_at = CURRENT_TIMESTAMP
WHERE id = ?`, v, byUserID, id)
if err != nil {
return nil, err
}
return s.GetByID(ctx, id)
}

75
main.go Normal file
View File

@@ -0,0 +1,75 @@
// Dialectic.Backend.Go — entrypoint.
//
// Greenfield Go rewrite of the Python v1 backend; agent-only debate
// platform per /home/hzhang/arch/DIALECTIC-V2-DESIGN.md.
//
// This file: load config → open db → run migrations → mount routes →
// serve until SIGINT/SIGTERM. Everything else lives in internal/.
package main
import (
"context"
"errors"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/db"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi"
)
// Version is overridden at build time via -ldflags="-X main.Version=...".
var Version = "dev"
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)
cfg, err := config.LoadFromEnv()
if err != nil {
log.Fatalf("config: %v", err)
}
log.Printf("starting dialectic-backend %s mode=%s addr=%s", Version, cfg.Mode, cfg.HTTPAddr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Open(ctx, cfg.DSN())
if err != nil {
log.Fatalf("db open: %v", err)
}
defer conn.Close()
if err := db.RunMigrations(ctx, conn); err != nil {
log.Fatalf("migrations: %v", err)
}
log.Printf("migrations: ok")
srv := &http.Server{
Addr: cfg.HTTPAddr,
Handler: httpapi.Mount(cfg, conn, Version),
ReadHeaderTimeout: 10 * time.Second,
}
// Graceful shutdown on SIGINT/SIGTERM.
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
go func() {
<-shutdown
log.Printf("shutdown signal received")
ctx2, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
log.Printf("http shutdown error: %v", err)
}
}()
log.Printf("http server listening on %s", cfg.HTTPAddr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("http serve: %v", err)
}
log.Printf("bye")
}

View File

View File

@@ -1,114 +0,0 @@
"""Keycloak JWT authentication middleware."""
import os
from fastapi import HTTPException, Request
from jose import jwt, JWTError, jwk
from jose.utils import base64url_decode
import httpx
# Cache JWKS per (host, realm) to avoid fetching on every request
_jwks_cache: dict[str, dict] = {}
async def _get_jwks(kc_host: str, realm: str) -> dict:
cache_key = f"{kc_host}/{realm}"
if cache_key in _jwks_cache:
return _jwks_cache[cache_key]
url = f"{kc_host}/realms/{realm}/protocol/openid-connect/certs"
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
if resp.status_code != 200:
raise HTTPException(
status_code=502,
detail=f"无法获取 Keycloak JWKS: HTTP {resp.status_code}",
)
data = resp.json()
_jwks_cache[cache_key] = data
return data
def _find_rsa_key(jwks: dict, token: str) -> dict | None:
"""Find the matching RSA key from JWKS for the token's kid."""
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
for key in jwks.get("keys", []):
if key.get("kid") == kid:
return key
return None
async def verify_token(request: Request, kc_host: str, realm: str) -> dict:
"""Extract and verify the Bearer JWT from the Authorization header.
Returns the decoded payload on success.
Raises HTTPException(401) on missing/invalid token.
"""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="缺少 Authorization Bearer token")
token = auth_header[7:]
jwks = await _get_jwks(kc_host, realm)
rsa_key = _find_rsa_key(jwks, token)
if rsa_key is None:
# Clear cache in case keys rotated
_jwks_cache.pop(f"{kc_host}/{realm}", None)
jwks = await _get_jwks(kc_host, realm)
rsa_key = _find_rsa_key(jwks, token)
if rsa_key is None:
raise HTTPException(status_code=401, detail="无法匹配 JWT 签名密钥")
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=["RS256"],
options={"verify_aud": False}, # Keycloak audience varies by client
)
return payload
except JWTError as e:
raise HTTPException(status_code=401, detail=f"JWT 验证失败: {e}")
async def require_auth(request: Request):
"""Verify Bearer JWT for write endpoints.
Dev mode: passthrough (no auth required).
Prod mode: validates JWT via Keycloak JWKS.
"""
if os.getenv("ENV_MODE", "dev") == "dev":
return None
from app.services.config_service import ConfigService
config = ConfigService.load()
kc = config.get("keycloak", {})
if not kc.get("host"):
return None # KC not configured allow access
return await verify_token(request, kc["host"], kc.get("realm", ""))
async def require_admin(request: Request, config: dict):
"""Verify the request carries a valid Keycloak JWT with admin role.
Raises HTTPException(401/403) on failure.
"""
kc = config.get("keycloak", {})
kc_host = kc.get("host")
realm = kc.get("realm")
if not kc_host or not realm:
raise HTTPException(
status_code=503,
detail="Keycloak 未配置,无法进行鉴权",
)
payload = await verify_token(request, kc_host, realm)
roles = payload.get("realm_access", {}).get("roles", [])
if "admin" not in roles:
raise HTTPException(status_code=403, detail="需要 admin 角色")
return payload

View File

@@ -1,32 +0,0 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from services.config_service import ConfigService
# Paths that are always accessible, even when DB is not configured
_ALLOWED_PREFIXES = ("/api/setup", "/docs", "/openapi", "/redoc")
class ConfigGuardMiddleware(BaseHTTPMiddleware):
"""Return 503 for all business routes when the database is not configured."""
async def dispatch(self, request, call_next):
path = request.url.path
# Always allow: setup routes, root, docs, OPTIONS (CORS preflight)
if path == "/" or request.method == "OPTIONS":
return await call_next(request)
for prefix in _ALLOWED_PREFIXES:
if path.startswith(prefix):
return await call_next(request)
if not ConfigService.is_db_configured():
return JSONResponse(
status_code=503,
content={
"error_code": "SERVICE_NOT_CONFIGURED",
"detail": "数据库未配置,请先完成系统初始化",
},
)
return await call_next(request)

View File

View File

@@ -1,92 +0,0 @@
from pydantic import BaseModel
from typing import List, Optional
from enum import Enum
from datetime import datetime
class ModelProvider(str, Enum):
OPENAI = "openai"
DEEPSEEK = "deepseek"
QWEN = "qwen"
CLAUDE = "claude"
class DebateStance(str, Enum):
PRO = "pro"
CON = "con"
class SearchResult(BaseModel):
title: str
url: str
snippet: str
score: Optional[float] = None
class SearchEvidence(BaseModel):
query: str
results: List[SearchResult]
mode: str # "auto", "tool", "both"
class DebateRound(BaseModel):
round_number: int
speaker: str # Model identifier
stance: DebateStance
content: str
timestamp: datetime
token_count: Optional[int] = None
search_evidence: Optional[SearchEvidence] = None
class DebateParticipant(BaseModel):
model_config = {"protected_namespaces": ()}
model_identifier: str
provider: ModelProvider
stance: DebateStance
api_key: Optional[str] = None
class DebateConstraints(BaseModel):
max_rounds: int = 5
max_tokens_per_turn: int = 500
max_total_tokens: Optional[int] = None
forbid_repetition: bool = True
must_respond_to_opponent: bool = True
web_search_enabled: bool = False
web_search_mode: str = "auto" # "auto", "tool", "both"
class DebateRequest(BaseModel):
topic: str
participants: List[DebateParticipant]
constraints: DebateConstraints
custom_system_prompt: Optional[str] = None
class EvidenceReference(BaseModel):
round_number: int
speaker: str
stance: DebateStance
class EvidenceEntry(BaseModel):
title: str
url: str
snippet: str
score: Optional[float] = None
references: List[EvidenceReference]
class DebateSession(BaseModel):
session_id: str
topic: str
participants: List[DebateParticipant]
constraints: DebateConstraints
rounds: List[DebateRound]
status: str # "active", "completed", "terminated"
created_at: datetime
completed_at: Optional[datetime] = None
summary: Optional[str] = None
evidence_library: List[EvidenceEntry] = []

View File

@@ -1,358 +0,0 @@
import asyncio
import uuid
from typing import Dict, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from models.debate import (
DebateRequest, DebateSession, DebateRound,
DebateStance, DebateParticipant, SearchEvidence, SearchResult,
EvidenceEntry, EvidenceReference
)
from providers.provider_factory import ProviderFactory
from storage.session_manager import SessionManager
from utils.summarizer import summarize_debate
from services.search_service import SearchService
from services.api_key_service import ApiKeyService
class DebateOrchestrator:
"""
Orchestrates the debate between multiple language models
"""
def __init__(self, db: Session):
self.db = db
self.session_manager = SessionManager()
self.provider_factory = ProviderFactory()
async def create_session(self, debate_request: DebateRequest) -> str:
"""
Create a new debate session
"""
session_id = str(uuid.uuid4())
session = DebateSession(
session_id=session_id,
topic=debate_request.topic,
participants=debate_request.participants,
constraints=debate_request.constraints,
rounds=[],
status="active",
created_at=datetime.now()
)
await self.session_manager.save_session(self.db, session)
return session_id
def _get_search_service(self) -> Optional[SearchService]:
"""
Get a SearchService instance if Tavily API key is available.
"""
tavily_key = ApiKeyService.get_api_key(self.db, "tavily")
if tavily_key:
return SearchService(api_key=tavily_key)
return None
async def run_debate(self, session_id: str) -> DebateSession:
"""
Run the complete debate process
"""
session = await self.session_manager.get_session(self.db, session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
# Initialize providers for each participant
providers = {}
for participant in session.participants:
provider = self.provider_factory.create_provider(
self.db,
participant.provider,
participant.api_key # This can be None, and the provider will fetch from DB
)
providers[participant.model_identifier] = provider
# Initialize search service if web search is enabled
search_service = None
web_search_enabled = session.constraints.web_search_enabled
web_search_mode = session.constraints.web_search_mode
if web_search_enabled:
search_service = self._get_search_service()
if not search_service:
print("Warning: Web search enabled but no Tavily API key found. Disabling search.")
web_search_enabled = False
# Run the debate rounds
for round_num in range(session.constraints.max_rounds):
if session.status != "active":
break
# Alternate between participants
current_participant = session.participants[round_num % len(session.participants)]
provider = providers[current_participant.model_identifier]
# Perform automatic search if enabled
search_evidence = None
if web_search_enabled and web_search_mode in ("auto", "both"):
search_evidence = self._perform_automatic_search(
search_service, session, round_num
)
# Prepare context for the current turn (with search results if available)
context = self._prepare_context(session, current_participant.stance, search_evidence)
# Determine if we should use tool calling for this round
use_tool_calling = (
web_search_enabled
and web_search_mode in ("tool", "both")
and provider.supports_tools()
)
if use_tool_calling:
response, tool_evidence = await self._handle_tool_calls(
provider, current_participant.model_identifier,
context, search_service
)
# Merge tool-based evidence with auto evidence
if tool_evidence:
if search_evidence:
search_evidence.results.extend(tool_evidence.results)
search_evidence.query += f" | {tool_evidence.query}"
search_evidence.mode = "both"
else:
search_evidence = tool_evidence
else:
response = await provider.generate_response(
model=current_participant.model_identifier,
prompt=context
)
# Clean the response to remove any echoed prompt/meta text
response = self._clean_response(response)
# Create a new round
round_data = DebateRound(
round_number=round_num + 1,
speaker=current_participant.model_identifier,
stance=current_participant.stance,
content=response,
timestamp=datetime.now(),
token_count=len(response.split()), # Approximate token count
search_evidence=search_evidence
)
session.rounds.append(round_data)
# Update evidence library with search results from this round
if round_data.search_evidence:
self._update_evidence_library(session, round_data)
# Update session in storage
await self.session_manager.update_session(self.db, session)
# Small delay between rounds to simulate realistic interaction
await asyncio.sleep(1)
# Generate summary after all rounds are complete
summary = await summarize_debate(session)
session.summary = summary
session.status = "completed"
session.completed_at = datetime.now()
await self.session_manager.update_session(self.db, session)
return session
def _perform_automatic_search(
self, search_service: SearchService, session: DebateSession, round_num: int
) -> Optional[SearchEvidence]:
"""
Perform an automatic web search based on the topic and last opponent argument.
"""
last_opponent_arg = None
if session.rounds:
last_opponent_arg = session.rounds[-1].content
query = SearchService.generate_search_query(session.topic, last_opponent_arg)
results = search_service.search(query, max_results=3)
if results:
return SearchEvidence(
query=query,
results=results,
mode="auto"
)
return None
async def _handle_tool_calls(
self, provider, model: str, context: str, search_service: SearchService
) -> tuple:
"""
Handle tool calling flow: send prompt with tools, execute any tool calls,
then re-prompt with results. Max 2 tool call iterations.
Returns (final_response_text, SearchEvidence_or_None).
"""
tools = [SearchService.get_tool_definition()]
all_search_results = []
all_queries = []
text, tool_calls = await provider.generate_response_with_tools(
model=model,
prompt=context,
tools=tools,
max_tokens=500
)
# If no tool calls, return the text response directly
if not tool_calls:
return text, None
# Process up to 2 rounds of tool calls
for iteration in range(2):
if not tool_calls:
break
# Execute each tool call
tool_results_text = []
for tc in tool_calls:
if tc["name"] == "web_search":
query = tc["arguments"].get("query", "")
all_queries.append(query)
results = search_service.search(query, max_results=3)
all_search_results.extend(results)
# Format results for the model
evidence = SearchEvidence(query=query, results=results, mode="tool")
tool_results_text.append(SearchService.format_results_for_context(evidence))
# Re-prompt the model with tool results
augmented_context = context + "\n" + "\n".join(tool_results_text)
augmented_context += "\n请基于以上搜索结果和辩论历史,给出你的论点。"
text, tool_calls = await provider.generate_response_with_tools(
model=model,
prompt=augmented_context,
tools=tools,
max_tokens=500
)
# Build combined evidence
evidence = None
if all_search_results:
evidence = SearchEvidence(
query=" | ".join(all_queries),
results=all_search_results,
mode="tool"
)
# If we still got no text (model keeps calling tools), fall back
if not text:
text = await provider.generate_response(model=model, prompt=context, max_tokens=500)
return text, evidence
def _update_evidence_library(self, session: DebateSession, round_data: DebateRound):
"""
Merge search results from a round into the session's evidence library, deduplicating by URL.
"""
ref = EvidenceReference(
round_number=round_data.round_number,
speaker=round_data.speaker,
stance=round_data.stance
)
url_index = {entry.url: i for i, entry in enumerate(session.evidence_library)}
for result in round_data.search_evidence.results:
if result.url in url_index:
entry = session.evidence_library[url_index[result.url]]
# Avoid duplicate references (same round + speaker)
if not any(
r.round_number == ref.round_number and r.speaker == ref.speaker
for r in entry.references
):
entry.references.append(ref)
else:
new_entry = EvidenceEntry(
title=result.title,
url=result.url,
snippet=result.snippet,
score=result.score,
references=[ref]
)
session.evidence_library.append(new_entry)
url_index[result.url] = len(session.evidence_library) - 1
def _prepare_context(
self, session: DebateSession, current_stance: DebateStance,
search_evidence: Optional[SearchEvidence] = None
) -> str:
"""
Prepare the context/prompt for the current model turn
"""
# Determine the stance of the current speaker
if current_stance == DebateStance.PRO:
position_desc = "正方(支持方)"
opposing_desc = "反方(反对方)"
else:
position_desc = "反方(反对方)"
opposing_desc = "正方(支持方)"
# Build the context with previous rounds
context_parts = [
f"辩论主题: {session.topic}",
f"你的立场: {position_desc}",
"辩论规则:",
"- 必须回应对方上一轮的核心论点",
"- 不得重复自己已提出的观点",
"- 输出长度限制在合理范围内",
"\n历史辩论记录:"
]
for round_data in session.rounds:
stance_text = "正方" if round_data.stance == DebateStance.PRO else "反方"
context_parts.append(f"{round_data.round_number}轮 - {stance_text}: {round_data.content}")
# Inject search results if available
if search_evidence:
context_parts.append(SearchService.format_results_for_context(search_evidence))
context_parts.append(f"\n现在轮到你 ({position_desc}) 发言,请基于以上内容进行回应。注意:直接给出你的论点内容,不要重复上述提示词、辩论规则或历史记录。")
return "\n".join(context_parts)
def _clean_response(self, response: str) -> str:
"""
Clean the model response to remove any echoed prompt/meta text
"""
import re
# Remove common prompt echoes and meta prefixes
patterns_to_remove = [
r'^第\d+轮\s*[-:]\s*(正方|反方)\s*[:]?\s*', # 第X轮 - 正方/反方:
r'^(正方|反方)\s*[(][^)]*[)]\s*[:]?\s*', # 正方(支持方):
r'^(正方|反方)\s*[:]\s*', # 正方: or 反方:
r'^我的立场\s*[:]\s*', # 我的立场:
r'^回应\s*[:]\s*', # 回应:
r'^辩论发言\s*[:]\s*', # 辩论发言:
]
cleaned = response.strip()
for pattern in patterns_to_remove:
cleaned = re.sub(pattern, '', cleaned, flags=re.MULTILINE)
return cleaned.strip()
async def get_session_status(self, session_id: str) -> Optional[DebateSession]:
"""
Get the current status of a debate session
"""
return await self.session_manager.get_session(self.db, session_id)
async def terminate_session(self, session_id: str):
"""
Terminate a debate session prematurely
"""
session = await self.session_manager.get_session(self.db, session_id)
if session:
session.status = "terminated"
await self.session_manager.update_session(self.db, session)

View File

View File

@@ -1,73 +0,0 @@
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
class LLMProvider(ABC):
"""
Abstract base class for LLM providers
"""
@abstractmethod
def __init__(self, db: Session, api_key: Optional[str] = None):
"""
Initialize the provider with a database session and optional API key
"""
pass
@abstractmethod
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
"""
Generate a response from the LLM
"""
pass
def supports_tools(self) -> bool:
"""
Whether this provider supports function/tool calling.
Override in subclasses that support it.
"""
return False
async def generate_response_with_tools(
self,
model: str,
prompt: str,
tools: List[Dict[str, Any]],
max_tokens: Optional[int] = None
) -> Tuple[str, List[Dict[str, Any]]]:
"""
Generate a response with tool definitions available.
Returns (text_content, tool_calls) where tool_calls is a list of
dicts with keys: name, arguments (dict).
If no tools are called, tool_calls is empty and text_content has the response.
Default implementation falls back to regular generation.
"""
text = await self.generate_response(model, prompt, max_tokens)
return text, []
class ProviderFactory:
"""
Factory class to create provider instances
"""
@staticmethod
def create_provider(db: Session, provider_type: str, api_key: Optional[str] = None):
"""
Create a provider instance based on the type
"""
if provider_type.value == "openai":
from app.providers.openai_provider import OpenAIProvider
return OpenAIProvider(db, api_key)
elif provider_type.value == "claude":
from app.providers.claude_provider import ClaudeProvider
return ClaudeProvider(db, api_key)
elif provider_type.value == "qwen":
from app.providers.qwen_provider import QwenProvider
return QwenProvider(db, api_key)
elif provider_type.value == "deepseek":
from app.providers.deepseek_provider import DeepSeekProvider
return DeepSeekProvider(db, api_key)
else:
raise ValueError(f"Unsupported provider type: {provider_type}")

View File

@@ -1,85 +0,0 @@
import anthropic
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from providers.base_provider import LLMProvider
from services.api_key_service import ApiKeyService
SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
class ClaudeProvider(LLMProvider):
"""
Anthropic Claude API provider implementation
"""
def __init__(self, db: Session, api_key: Optional[str] = None):
if not api_key:
api_key = ApiKeyService.get_api_key(db, "claude")
if api_key:
self.client = anthropic.AsyncAnthropic(api_key=api_key)
else:
raise ValueError("Claude API key not found in database or provided")
def supports_tools(self) -> bool:
return True
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
try:
response = await self.client.messages.create(
model=model,
max_tokens=max_tokens or 500,
temperature=0.7,
system=SYSTEM_PROMPT,
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text
except Exception as e:
raise Exception(f"Error calling Claude API: {str(e)}")
async def generate_response_with_tools(
self,
model: str,
prompt: str,
tools: List[Dict[str, Any]],
max_tokens: Optional[int] = None
) -> Tuple[str, List[Dict[str, Any]]]:
try:
# Convert OpenAI-format tools to Anthropic format
anthropic_tools = []
for tool in tools:
func = tool.get("function", tool)
anthropic_tools.append({
"name": func["name"],
"description": func.get("description", ""),
"input_schema": func.get("parameters", func.get("input_schema", {}))
})
response = await self.client.messages.create(
model=model,
max_tokens=max_tokens or 500,
temperature=0.7,
system=SYSTEM_PROMPT,
messages=[
{"role": "user", "content": prompt}
],
tools=anthropic_tools
)
text_content = ""
tool_calls = []
for block in response.content:
if block.type == "text":
text_content += block.text
elif block.type == "tool_use":
tool_calls.append({
"name": block.name,
"arguments": block.input
})
return text_content.strip(), tool_calls
except Exception as e:
raise Exception(f"Error calling Claude API with tools: {str(e)}")

View File

@@ -1,64 +0,0 @@
from typing import Optional
from sqlalchemy.orm import Session
from openai import AsyncOpenAI
from providers.base_provider import LLMProvider
from services.api_key_service import ApiKeyService
class DeepSeekProvider(LLMProvider):
"""
DeepSeek API provider implementation using OpenAI-compatible API
"""
def __init__(self, db: Session, api_key: Optional[str] = None):
if not api_key:
api_key = ApiKeyService.get_api_key(db, "deepseek")
if not api_key:
raise ValueError("DeepSeek API key not found in database or provided")
self.client = AsyncOpenAI(
api_key=api_key,
base_url="https://api.deepseek.com"
)
def supports_tools(self) -> bool:
return False
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
try:
is_reasoner = "reasoner" in model or "r1" in model.lower()
messages = [{"role": "user", "content": prompt}]
# deepseek-reasoner 不支持 system message把指令放进 user message
if not is_reasoner:
messages.insert(0, {
"role": "system",
"content": "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
})
kwargs = {
"model": model,
"messages": messages,
}
# reasoner 模型不支持 max_tokens使用 max_completion_tokens
if is_reasoner:
kwargs["max_completion_tokens"] = max_tokens or 4096
else:
kwargs["max_tokens"] = max_tokens or 500
response = await self.client.chat.completions.create(**kwargs)
message = response.choices[0].message
content = message.content or ""
# deepseek-reasoner 的主要内容可能在 reasoning_content 中
if not content.strip() and is_reasoner:
reasoning = getattr(message, "reasoning_content", None)
if reasoning:
content = reasoning
return content.strip()
except Exception as e:
raise Exception(f"Error calling DeepSeek API: {str(e)}")

View File

@@ -1,72 +0,0 @@
import json
import openai
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from providers.base_provider import LLMProvider
from services.api_key_service import ApiKeyService
SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
class OpenAIProvider(LLMProvider):
"""
OpenAI API provider implementation
"""
def __init__(self, db: Session, api_key: Optional[str] = None):
if not api_key:
api_key = ApiKeyService.get_api_key(db, "openai")
if api_key:
self.client = openai.AsyncOpenAI(api_key=api_key)
else:
raise ValueError("OpenAI API key not found in database or provided")
def supports_tools(self) -> bool:
return True
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
try:
response = await self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens or 500
)
return response.choices[0].message.content.strip()
except Exception as e:
raise Exception(f"Error calling OpenAI API: {str(e)}")
async def generate_response_with_tools(
self,
model: str,
prompt: str,
tools: List[Dict[str, Any]],
max_tokens: Optional[int] = None
) -> Tuple[str, List[Dict[str, Any]]]:
try:
response = await self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
tools=tools,
tool_choice="auto",
max_tokens=max_tokens or 500
)
message = response.choices[0].message
text_content = message.content or ""
tool_calls = []
if message.tool_calls:
for tc in message.tool_calls:
tool_calls.append({
"name": tc.function.name,
"arguments": json.loads(tc.function.arguments)
})
return text_content.strip(), tool_calls
except Exception as e:
raise Exception(f"Error calling OpenAI API with tools: {str(e)}")

View File

@@ -1,49 +0,0 @@
from abc import ABC, abstractmethod
from typing import Optional
from sqlalchemy.orm import Session
class LLMProvider(ABC):
"""
Abstract base class for LLM providers
"""
@abstractmethod
def __init__(self, db: Session, api_key: Optional[str] = None):
"""
Initialize the provider with a database session and optional API key
"""
pass
@abstractmethod
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
"""
Generate a response from the LLM
"""
pass
class ProviderFactory:
"""
Factory class to create provider instances
"""
@staticmethod
def create_provider(db: Session, provider_type: str, api_key: Optional[str] = None):
"""
Create a provider instance based on the type
"""
if provider_type.value == "openai":
from providers.openai_provider import OpenAIProvider
return OpenAIProvider(db, api_key)
elif provider_type.value == "claude":
from providers.claude_provider import ClaudeProvider
return ClaudeProvider(db, api_key)
elif provider_type.value == "qwen":
from providers.qwen_provider import QwenProvider
return QwenProvider(db, api_key)
elif provider_type.value == "deepseek":
from providers.deepseek_provider import DeepSeekProvider
return DeepSeekProvider(db, api_key)
else:
raise ValueError(f"Unsupported provider type: {provider_type}")

View File

@@ -1,75 +0,0 @@
import json
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from openai import AsyncOpenAI
from providers.base_provider import LLMProvider
from services.api_key_service import ApiKeyService
SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
class QwenProvider(LLMProvider):
"""
Qwen API provider implementation using DashScope OpenAI-compatible API
"""
def __init__(self, db: Session, api_key: Optional[str] = None):
if not api_key:
api_key = ApiKeyService.get_api_key(db, "qwen")
if not api_key:
raise ValueError("Qwen API key not found in database or provided")
self.client = AsyncOpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
def supports_tools(self) -> bool:
return True
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
try:
response = await self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens or 500
)
return response.choices[0].message.content.strip()
except Exception as e:
raise Exception(f"Error calling Qwen API: {str(e)}")
async def generate_response_with_tools(
self,
model: str,
prompt: str,
tools: List[Dict[str, Any]],
max_tokens: Optional[int] = None
) -> Tuple[str, List[Dict[str, Any]]]:
try:
response = await self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
tools=tools,
tool_choice="auto",
max_tokens=max_tokens or 500
)
message = response.choices[0].message
text_content = message.content or ""
tool_calls = []
if message.tool_calls:
for tc in message.tool_calls:
tool_calls.append({
"name": tc.function.name,
"arguments": json.loads(tc.function.arguments)
})
return text_content.strip(), tool_calls
except Exception as e:
raise Exception(f"Error calling Qwen API with tools: {str(e)}")

View File

@@ -1,19 +0,0 @@
fastapi==0.115.0
uvicorn[standard]==0.32.0
pydantic==2.9.2
pydantic-settings==2.6.1
sqlalchemy==2.0.35
aiosqlite==0.20.0
pymysql==1.1.1
cryptography==43.0.1
python-multipart==0.0.20
sse-starlette==2.1.3
openai==1.52.2
anthropic==0.40.0
tiktoken==0.8.0
python-dotenv==1.0.1
aiohttp==3.9.0
httpx==0.27.2
tavily-python==0.5.0
pyyaml==6.0.2
python-jose[cryptography]==3.3.0

View File

View File

@@ -1,71 +0,0 @@
from cryptography.fernet import Fernet, InvalidToken
from sqlalchemy.orm import Session
from db_models import ApiKey
import os
# Initialize the encryption key from environment or generate a new one
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode())
cipher_suite = Fernet(ENCRYPTION_KEY.encode())
class ApiKeyService:
"""
Service for managing API keys in the database
"""
@staticmethod
def encrypt_api_key(api_key: str) -> str:
"""
Encrypt an API key
"""
encrypted_key = cipher_suite.encrypt(api_key.encode())
return encrypted_key.decode()
@staticmethod
def decrypt_api_key(encrypted_api_key: str) -> str:
"""
Decrypt an API key
"""
decrypted_key = cipher_suite.decrypt(encrypted_api_key.encode())
return decrypted_key.decode()
@staticmethod
def get_api_key(db: Session, provider: str) -> str:
"""
Retrieve and decrypt an API key for a provider
"""
api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first()
if not api_key_record or not api_key_record.api_key_encrypted:
return None
try:
return ApiKeyService.decrypt_api_key(api_key_record.api_key_encrypted)
except InvalidToken:
return None
@staticmethod
def set_api_key(db: Session, provider: str, api_key: str) -> bool:
"""
Encrypt and store an API key for a provider
"""
encrypted_key = ApiKeyService.encrypt_api_key(api_key)
# Check if record exists
api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first()
if api_key_record:
# Update existing record
api_key_record.api_key_encrypted = encrypted_key
else:
# Create new record
api_key_record = ApiKey(
provider=provider,
api_key_encrypted=encrypted_key
)
db.add(api_key_record)
try:
db.commit()
return True
except Exception:
db.rollback()
return False

View File

@@ -1,100 +0,0 @@
import os
from pathlib import Path
from urllib.parse import quote_plus
import yaml
from cryptography.fernet import Fernet, InvalidToken
CONFIG_PATH = Path(os.getenv("CONFIG_PATH", "/app/config/dialectica.yaml"))
# Reuse the same encryption key used for API keys
_ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", "")
_cipher = Fernet(_ENCRYPTION_KEY.encode()) if _ENCRYPTION_KEY else None
# Fields that should be encrypted in the YAML file
_SECRET_FIELDS = {"password"}
def _encrypt(value: str) -> str:
if not _cipher or not value:
return value
return "ENC:" + _cipher.encrypt(value.encode()).decode()
def _decrypt(value: str) -> str:
if not _cipher or not isinstance(value, str) or not value.startswith("ENC:"):
return value
try:
return _cipher.decrypt(value[4:].encode()).decode()
except InvalidToken:
return value
def _encrypt_secrets(data: dict) -> dict:
"""Deep-copy dict, encrypting secret fields."""
out = {}
for k, v in data.items():
if isinstance(v, dict):
out[k] = _encrypt_secrets(v)
elif k in _SECRET_FIELDS and isinstance(v, str) and not v.startswith("ENC:"):
out[k] = _encrypt(v)
else:
out[k] = v
return out
def _decrypt_secrets(data: dict) -> dict:
"""Deep-copy dict, decrypting secret fields."""
out = {}
for k, v in data.items():
if isinstance(v, dict):
out[k] = _decrypt_secrets(v)
elif k in _SECRET_FIELDS and isinstance(v, str):
out[k] = _decrypt(v)
else:
out[k] = v
return out
class ConfigService:
"""Read / write config/dialectica.yaml."""
@staticmethod
def load() -> dict:
"""Load config, returning decrypted values. Empty dict if file missing."""
if not CONFIG_PATH.exists():
return {}
with open(CONFIG_PATH) as f:
raw = yaml.safe_load(f) or {}
return _decrypt_secrets(raw)
@staticmethod
def save(config: dict):
"""Save config, encrypting secret fields."""
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
encrypted = _encrypt_secrets(config)
with open(CONFIG_PATH, "w") as f:
yaml.dump(encrypted, f, default_flow_style=False, allow_unicode=True)
@staticmethod
def is_db_configured() -> bool:
config = ConfigService.load()
db = config.get("database", {})
return bool(db.get("host") and db.get("database"))
@staticmethod
def get_database_url() -> str | None:
config = ConfigService.load()
db = config.get("database", {})
if not (db.get("host") and db.get("database")):
return None
user = db.get("user", "root")
password = db.get("password", "")
host = db["host"]
port = db.get("port", 3306)
database = db["database"]
return f"mysql+pymysql://{quote_plus(user)}:{quote_plus(password)}@{host}:{port}/{database}"
@staticmethod
def is_initialized() -> bool:
return ConfigService.load().get("initialized", False)

View File

@@ -1,104 +0,0 @@
from typing import List, Optional
from models.debate import SearchResult, SearchEvidence
class SearchService:
"""
Tavily web search wrapper for debate research.
"""
def __init__(self, api_key: str):
from tavily import TavilyClient
self.client = TavilyClient(api_key=api_key)
def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""
Perform a web search using Tavily and return structured results.
"""
try:
response = self.client.search(
query=query,
max_results=max_results,
search_depth="basic"
)
results = []
for item in response.get("results", []):
results.append(SearchResult(
title=item.get("title", ""),
url=item.get("url", ""),
snippet=item.get("content", "")[:500], # Truncate to avoid token bloat
score=item.get("score")
))
return results
except Exception as e:
print(f"Tavily search error: {e}")
return []
@staticmethod
def format_results_for_context(evidence: SearchEvidence) -> str:
"""
Format search results into a string suitable for injecting into the LLM context.
"""
if not evidence or not evidence.results:
return ""
lines = [f"\n[网络搜索结果] 搜索词: \"{evidence.query}\""]
for i, r in enumerate(evidence.results, 1):
lines.append(f" {i}. {r.title}")
lines.append(f" {r.snippet}")
lines.append(f" 来源: {r.url}")
lines.append("[搜索结果结束]\n")
return "\n".join(lines)
@staticmethod
def generate_search_query(topic: str, last_opponent_argument: Optional[str] = None) -> str:
"""
Generate a search query from the debate topic and the opponent's last argument.
"""
if last_opponent_argument:
# Extract key phrases from the opponent's argument (first ~100 chars)
snippet = last_opponent_argument[:100].strip()
return f"{topic} {snippet}"
return topic
@staticmethod
def get_tool_definition() -> dict:
"""
Return the web_search tool definition for function calling.
"""
return {
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for current information relevant to the debate topic. Use this to find facts, statistics, or recent news that support your argument.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query to look up"
}
},
"required": ["query"]
}
}
}
@staticmethod
def get_tool_definition_anthropic() -> dict:
"""
Return the web_search tool definition in Anthropic format.
"""
return {
"name": "web_search",
"description": "Search the web for current information relevant to the debate topic. Use this to find facts, statistics, or recent news that support your argument.",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query to look up"
}
},
"required": ["query"]
}
}

View File

View File

@@ -1,109 +0,0 @@
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
from models.debate import DebateSession
from exceptions import ServiceNotConfiguredError
from services.config_service import ConfigService
Base = declarative_base()
class DebateSessionDB(Base):
__tablename__ = "debate_sessions"
id = Column(Integer, primary_key=True, index=True)
session_id = Column(String(255), unique=True, index=True, nullable=False)
topic = Column(Text, nullable=False)
participants = Column(Text, nullable=False) # JSON string
constraints = Column(Text, nullable=False) # JSON string
rounds = Column(Text, nullable=False) # JSON string
status = Column(String(50), nullable=False)
created_at = Column(DateTime, nullable=False)
completed_at = Column(DateTime, nullable=True)
summary = Column(Text, nullable=True)
evidence_library = Column(Text, nullable=True) # JSON string
# ---------------------------------------------------------------------------
# Lazy engine / session factory
# ---------------------------------------------------------------------------
_engine = None
_SessionLocal = None
def _get_engine():
from sqlalchemy import create_engine
global _engine
if _engine is None:
db_url = ConfigService.get_database_url()
if not db_url:
raise ServiceNotConfiguredError("数据库未配置")
_engine = create_engine(db_url)
return _engine
def _get_session_factory():
global _SessionLocal
if _SessionLocal is None:
_SessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=_get_engine()
)
return _SessionLocal
def init_db():
"""Create all tables if DB is configured; silently skip otherwise."""
if not ConfigService.is_db_configured():
print("WARNING: Database not configured, skipping table creation.")
return
try:
from db_models import Base as ApiBase
engine = _get_engine()
Base.metadata.create_all(bind=engine)
ApiBase.metadata.create_all(bind=engine)
except Exception as e:
global _engine, _SessionLocal
print(f"WARNING: Database connection failed, skipping table creation: {e}")
if _engine is not None:
_engine.dispose()
_engine = None
_SessionLocal = None
def debate_session_from_db(db_session) -> DebateSession:
"""Convert database session to Pydantic model."""
evidence_library = []
if db_session.evidence_library:
evidence_library = json.loads(db_session.evidence_library)
return DebateSession(
session_id=db_session.session_id,
topic=db_session.topic,
participants=json.loads(db_session.participants),
constraints=json.loads(db_session.constraints),
rounds=json.loads(db_session.rounds),
status=db_session.status,
created_at=db_session.created_at,
completed_at=db_session.completed_at,
summary=db_session.summary,
evidence_library=evidence_library
)
def debate_session_to_db(session: DebateSession) -> DebateSessionDB:
"""Convert Pydantic model to database model."""
return DebateSessionDB(
session_id=session.session_id,
topic=session.topic,
participants=json.dumps([p.dict() for p in session.participants]),
constraints=json.dumps(session.constraints.dict()),
rounds=json.dumps([r.dict() for r in session.rounds], default=str),
status=session.status,
created_at=session.created_at,
completed_at=session.completed_at,
summary=session.summary,
evidence_library=json.dumps([e.dict() for e in session.evidence_library], default=str) if session.evidence_library else None
)

View File

@@ -1,67 +0,0 @@
from sqlalchemy.orm import Session
from datetime import datetime
import json
from typing import Optional
from models.debate import DebateSession
from storage.database import DebateSessionDB, debate_session_from_db, debate_session_to_db
class SessionManager:
"""
Manages debate sessions in storage
"""
def __init__(self):
pass
async def save_session(self, db: Session, session: DebateSession):
"""
Save a debate session to the database
"""
db_session = debate_session_to_db(session)
db.add(db_session)
db.commit()
db.refresh(db_session)
async def get_session(self, db: Session, session_id: str) -> Optional[DebateSession]:
"""
Retrieve a debate session from the database
"""
db_session = db.query(DebateSessionDB).filter(DebateSessionDB.session_id == session_id).first()
if not db_session:
return None
return debate_session_from_db(db_session)
async def update_session(self, db: Session, session: DebateSession):
"""
Update an existing debate session in the database
"""
db_session = db.query(DebateSessionDB).filter(DebateSessionDB.session_id == session.session_id).first()
if db_session:
db_session.topic = session.topic
db_session.participants = json.dumps([p.dict() for p in session.participants])
db_session.constraints = json.dumps(session.constraints.dict())
db_session.rounds = json.dumps([r.dict() for r in session.rounds], default=str)
db_session.status = session.status
db_session.completed_at = session.completed_at
db_session.summary = session.summary
db_session.evidence_library = json.dumps([e.dict() for e in session.evidence_library], default=str) if session.evidence_library else None
db.commit()
db.refresh(db_session)
async def list_sessions(self, db: Session):
"""
List all debate sessions
"""
db_sessions = db.query(DebateSessionDB).all()
return [
{
"session_id": db_session.session_id,
"topic": db_session.topic,
"status": db_session.status,
"created_at": db_session.created_at.isoformat() if db_session.created_at else None
}
for db_session in db_sessions
]

View File

View File

@@ -1,39 +0,0 @@
async def summarize_debate(session):
"""
Generate a summary of the debate session
"""
if not session.rounds:
return "No rounds were completed in this debate."
# Extract key points from each side
pro_points = []
con_points = []
for round_data in session.rounds:
if round_data.stance.value == "pro":
pro_points.append(round_data.content)
else:
con_points.append(round_data.content)
# Create a summary
summary_parts = [
f"辩论主题: {session.topic}",
"",
"正方主要观点:",
]
for i, point in enumerate(pro_points, 1):
summary_parts.append(f"{i}. {point[:100]}...") # Truncate for brevity
summary_parts.append("")
summary_parts.append("反方主要观点:")
for i, point in enumerate(con_points, 1):
summary_parts.append(f"{i}. {point[:100]}...") # Truncate for brevity
summary_parts.append("")
summary_parts.append("总结: 本次辩论完成了 {} 轮,双方就 '{}' 主题进行了充分的讨论。".format(
len(session.rounds), session.topic
))
return "\n".join(summary_parts)