Compare commits
9 Commits
archive/py
...
0b16b52ee7
| Author | SHA1 | Date | |
|---|---|---|---|
| 0b16b52ee7 | |||
| 5cf4302d50 | |||
| 22d9fb7ed5 | |||
| a43ff2de62 | |||
| b2a0cac460 | |||
| 15bb942d9b | |||
| 03b89a547c | |||
| 57a1fa1b33 | |||
| e706f3d6ef |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1 +1,6 @@
|
||||
/dialectic-backend
|
||||
/dist/
|
||||
/.idea/
|
||||
/.vscode/
|
||||
*.swp
|
||||
.DS_Store
|
||||
|
||||
35
Dockerfile
35
Dockerfile
@@ -1,22 +1,19 @@
|
||||
FROM python:3.13-slim
|
||||
# syntax=docker/dockerfile:1.7
|
||||
# Dialectic.Backend.Go — multi-stage build (compile static binary, run on distroless).
|
||||
|
||||
ENV PYTHONDONTWRITEBYTECODE=1
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
FROM golang:1.23-bookworm AS build
|
||||
WORKDIR /src
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
COPY . .
|
||||
ARG VERSION=dev
|
||||
RUN CGO_ENABLED=0 GOOS=linux go build \
|
||||
-ldflags="-s -w -X main.Version=${VERSION}" \
|
||||
-o /out/dialectic-backend .
|
||||
|
||||
FROM gcr.io/distroless/static-debian12:nonroot
|
||||
WORKDIR /app
|
||||
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
build-essential gcc \
|
||||
python3-dev \
|
||||
libssl-dev libffi-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
RUN python -m pip install --upgrade pip setuptools wheel
|
||||
|
||||
COPY requirements.txt ./requirements.txt
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . /app/
|
||||
|
||||
EXPOSE 8000
|
||||
CMD ["python3", "app.py"]
|
||||
COPY --from=build /out/dialectic-backend /app/dialectic-backend
|
||||
EXPOSE 8090
|
||||
USER nonroot:nonroot
|
||||
ENTRYPOINT ["/app/dialectic-backend"]
|
||||
|
||||
102
README.md
Normal file
102
README.md
Normal file
@@ -0,0 +1,102 @@
|
||||
# Dialectic.Backend — v2 (Go)
|
||||
|
||||
Greenfield Go rewrite of the Python v1 backend. Agent-native debate
|
||||
platform per [`/home/hzhang/arch/DIALECTIC-V2-DESIGN.md`](../DIALECTIC-V2-DESIGN.md).
|
||||
|
||||
Python v1 history is preserved on branch `archive/python-v1`.
|
||||
|
||||
## What's here (Phase 2A + 2B + 2C, 2026-05-23)
|
||||
|
||||
| Subsystem | Status |
|
||||
|-----------|--------|
|
||||
| HTTP server (`chi` router) | ✅ |
|
||||
| Config from env (`internal/config`) | ✅ |
|
||||
| MySQL via `sqlx` + embedded SQL migrations | ✅ |
|
||||
| Schema: `topics`, `signups`, `camps`, `rounds`, `arguments`, `verdicts`, `agent_keys`, `system_keys`, `verdict_schemas` | ✅ |
|
||||
| Auth middlewares: agent bearer (real), OIDC browser (Phase 2 stub w/ dev bypass) | ✅ |
|
||||
| `/api/healthz` | ✅ |
|
||||
| `/api/topics` list / get / create / set-visibility | ✅ |
|
||||
| `/api/topics/{id}/signups` list / create (agent self-enroll) | ✅ |
|
||||
| Orchestration engine (camp allocation, round driver, judge invocation) | ⬜ Phase 2D |
|
||||
| SSE live transcripts | ⬜ Phase 2D |
|
||||
| Full OIDC + Keycloak JWKS verification | ⬜ Phase 4 |
|
||||
| Nginx + CF Origin Cert on server.t3 | ⬜ Phase 2E |
|
||||
|
||||
## Layout
|
||||
|
||||
```
|
||||
main.go entrypoint (load → wire → serve)
|
||||
go.mod
|
||||
Dockerfile
|
||||
docker-compose.dev.yml backend + mysql for local iteration
|
||||
internal/
|
||||
config/ 12-factor env loader
|
||||
db/
|
||||
db.go sqlx + embedded migration runner
|
||||
migrations/001_init.sql v2 schema, idempotent
|
||||
models/ entity types (sqlx + json tags)
|
||||
store/ query layer (per-entity)
|
||||
auth/ agent api-key + oidc middlewares
|
||||
httpapi/
|
||||
routes.go chi router + auth chains
|
||||
handlers/ per-endpoint handlers
|
||||
```
|
||||
|
||||
## Run locally
|
||||
|
||||
```
|
||||
docker compose -f docker-compose.dev.yml up --build
|
||||
# backend on http://localhost:8090
|
||||
curl http://localhost:8090/api/healthz
|
||||
```
|
||||
|
||||
Env vars (see `internal/config/config.go` for the full list):
|
||||
|
||||
| Var | Default (dev) | Required in prod |
|
||||
|-----|---------------|-------------------|
|
||||
| `ENV_MODE` | `dev` | must be `prod` |
|
||||
| `HTTP_ADDR` | `0.0.0.0:8090` | — |
|
||||
| `CORS_ALLOW_ORIGINS` | `*` | concrete list (no `*`) |
|
||||
| `DB_HOST/PORT/NAME/USER/PASSWORD` | dev defaults | ✓ password required |
|
||||
| `AGENT_API_KEY_PEPPER` | — | ✓ |
|
||||
| `OIDC_ISSUER` / `OIDC_CLIENT_ID` | — | ✓ |
|
||||
| `OIDC_DEV_BYPASS_TOKEN` | unset | ignored in prod |
|
||||
| `SYSTEM_API_KEY` | unset | populate when announce-channel push lands |
|
||||
|
||||
## Dev bypass for browser routes
|
||||
|
||||
In `ENV_MODE=dev` with `OIDC_DEV_BYPASS_TOKEN=<token>` set:
|
||||
|
||||
```
|
||||
curl -H "x-dev-bypass: <token>" http://localhost:8090/api/topics
|
||||
# attached as user 'dev-operator' with role 'dialectic-admin'
|
||||
```
|
||||
|
||||
In `prod`, this header is ignored regardless of value.
|
||||
|
||||
## Agent bearer for plugin routes
|
||||
|
||||
The OpenClaw plugin (`Dialectic.OpenclawPlugin`, Phase 3) calls with:
|
||||
|
||||
```
|
||||
Authorization: Bearer <raw-agent-api-key>
|
||||
```
|
||||
|
||||
The key is hashed with `AGENT_API_KEY_PEPPER` and matched against
|
||||
`agent_keys.key_hash`. To provision an agent's key (Phase 3 will add a
|
||||
proper `hf user create-dialectic-key` CLI; for now, manual SQL):
|
||||
|
||||
```sql
|
||||
INSERT INTO agent_keys (agent_id, key_hash)
|
||||
VALUES ('manager', SHA2(CONCAT('<pepper>:', '<raw>'), 256));
|
||||
```
|
||||
|
||||
## What's next
|
||||
|
||||
- **Phase 2D**: camp allocation algorithm + round driver + judge
|
||||
invocation. Wired to Fabric announce channel (via system-api-key) +
|
||||
the Dialectic.OpenclawPlugin's tool for agent argument submission.
|
||||
- **Phase 2E**: nginx config + CF Origin Cert + deploy to server.t3.
|
||||
- **Phase 3**: Dialectic.OpenclawPlugin — agent-facing tools.
|
||||
- **Phase 4**: frontend rewrite (STYLE.md + real Keycloak OIDC + visibility toggle UI).
|
||||
- **Phase 5**: end-to-end integration with `analyze-intel` workflow.
|
||||
@@ -1,4 +0,0 @@
|
||||
from api.debates import router as debates_router
|
||||
from api.api_keys import router as api_keys_router
|
||||
from api.models import router as models_router
|
||||
from api.setup import router as setup_router
|
||||
113
api/api_keys.py
113
api/api_keys.py
@@ -1,113 +0,0 @@
|
||||
from fastapi import APIRouter, Depends, Form, HTTPException
|
||||
|
||||
from middleware.auth import require_auth
|
||||
import aiohttp
|
||||
|
||||
from services.api_key_service import ApiKeyService
|
||||
from db_models import get_db
|
||||
|
||||
router = APIRouter(tags=["api-keys"])
|
||||
|
||||
|
||||
async def validate_api_key(provider: str, api_key: str):
|
||||
"""
|
||||
Validate an API key by listing models from the provider.
|
||||
All providers validated the same way: if we can list models, the key is valid.
|
||||
"""
|
||||
try:
|
||||
if provider == "openai":
|
||||
import openai
|
||||
async with openai.AsyncOpenAI(api_key=api_key, timeout=10.0) as client:
|
||||
await client.models.list()
|
||||
return True
|
||||
|
||||
# Claude, Qwen, DeepSeek: GET their models endpoint via aiohttp
|
||||
endpoints = {
|
||||
"claude": {
|
||||
"url": "https://api.anthropic.com/v1/models",
|
||||
"headers": {
|
||||
"x-api-key": api_key,
|
||||
"anthropic-version": "2023-06-01"
|
||||
}
|
||||
},
|
||||
"qwen": {
|
||||
"url": "https://dashscope.aliyuncs.com/compatible-mode/v1/models",
|
||||
"headers": {"Authorization": f"Bearer {api_key}"}
|
||||
},
|
||||
"deepseek": {
|
||||
"url": "https://api.deepseek.com/v1/models",
|
||||
"headers": {"Authorization": f"Bearer {api_key}"}
|
||||
}
|
||||
}
|
||||
|
||||
if provider in endpoints:
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
ep = endpoints[provider]
|
||||
async with session.get(ep["url"], headers=ep["headers"]) as response:
|
||||
return response.status == 200
|
||||
|
||||
# Tavily: validate by calling REST API directly (no tavily package needed)
|
||||
if provider == "tavily":
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.post(
|
||||
"https://api.tavily.com/search",
|
||||
json={"api_key": api_key, "query": "test", "max_results": 1}
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
return bool(data.get("results"))
|
||||
return False
|
||||
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
@router.post("/api-keys/{provider}", dependencies=[Depends(require_auth)])
|
||||
async def set_api_key(provider: str, api_key: str = Form(...), db=Depends(get_db)):
|
||||
"""
|
||||
Set API key for a specific provider
|
||||
"""
|
||||
try:
|
||||
success = ApiKeyService.set_api_key(db, provider, api_key)
|
||||
if success:
|
||||
return {"message": f"API key for {provider} updated successfully"}
|
||||
else:
|
||||
raise HTTPException(status_code=500, detail="Failed to update API key")
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/api-keys/{provider}")
|
||||
async def get_api_key(provider: str, db=Depends(get_db)):
|
||||
"""
|
||||
Get API key for a specific provider
|
||||
"""
|
||||
try:
|
||||
api_key = ApiKeyService.get_api_key(db, provider)
|
||||
if api_key:
|
||||
return {"provider": provider, "api_key": api_key}
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail=f"No API key found for {provider}")
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post("/validate-api-key/{provider}", dependencies=[Depends(require_auth)])
|
||||
async def validate_api_key_endpoint(provider: str, api_key: str = Form(...)):
|
||||
"""
|
||||
Validate an API key by making a test request to the provider
|
||||
This endpoint is used by the frontend to validate API keys without CORS issues
|
||||
"""
|
||||
try:
|
||||
is_valid = await validate_api_key(provider, api_key)
|
||||
if is_valid:
|
||||
return {"valid": True, "message": f"Valid {provider} API key"}
|
||||
else:
|
||||
return {"valid": False, "message": f"Invalid {provider} API key"}
|
||||
except Exception as e:
|
||||
return {"valid": False, "message": str(e)}
|
||||
158
api/debates.py
158
api/debates.py
@@ -1,158 +0,0 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
|
||||
from middleware.auth import require_auth
|
||||
from sse_starlette.sse import EventSourceResponse
|
||||
from typing import Dict, Any
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from orchestrator.debate_orchestrator import DebateOrchestrator
|
||||
from models.debate import DebateRequest
|
||||
from storage.session_manager import SessionManager
|
||||
from db_models import get_db
|
||||
|
||||
router = APIRouter(tags=["debates"])
|
||||
|
||||
|
||||
@router.post("/debate/create", dependencies=[Depends(require_auth)])
|
||||
async def create_debate(debate_request: DebateRequest, db=Depends(get_db)) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new debate session with specified parameters
|
||||
"""
|
||||
try:
|
||||
orchestrator = DebateOrchestrator(db)
|
||||
session_id = await orchestrator.create_session(debate_request)
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"status": "created",
|
||||
"message": f"Debate session {session_id} created successfully"
|
||||
}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/debate/{session_id}")
|
||||
async def get_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]:
|
||||
"""
|
||||
Get the current state of a debate session
|
||||
"""
|
||||
try:
|
||||
orchestrator = DebateOrchestrator(db)
|
||||
session = await orchestrator.get_session_status(session_id)
|
||||
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail=f"Debate session {session_id} not found")
|
||||
|
||||
return session.dict()
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.post("/debate/{session_id}/start", dependencies=[Depends(require_auth)])
|
||||
async def start_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]:
|
||||
"""
|
||||
Start a debate session and stream the results
|
||||
"""
|
||||
try:
|
||||
orchestrator = DebateOrchestrator(db)
|
||||
session = await orchestrator.run_debate(session_id)
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"status": session.status,
|
||||
"message": f"Debate session {session_id} completed"
|
||||
}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.delete("/debate/{session_id}", dependencies=[Depends(require_auth)])
|
||||
async def end_debate(session_id: str, db=Depends(get_db)) -> Dict[str, Any]:
|
||||
"""
|
||||
End a debate session prematurely
|
||||
"""
|
||||
try:
|
||||
orchestrator = DebateOrchestrator(db)
|
||||
await orchestrator.terminate_session(session_id)
|
||||
|
||||
return {"session_id": session_id, "status": "terminated"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@router.get("/debate/{session_id}/stream")
|
||||
async def stream_debate(session_id: str, db=Depends(get_db)) -> EventSourceResponse:
|
||||
"""
|
||||
Stream debate updates in real-time using Server-Sent Events
|
||||
"""
|
||||
async def event_generator():
|
||||
session_manager = SessionManager()
|
||||
|
||||
# Check if the session exists
|
||||
session = await session_manager.get_session(db, session_id)
|
||||
if not session:
|
||||
yield {"event": "error", "data": json.dumps({"error": f"Session {session_id} not found"})}
|
||||
return
|
||||
|
||||
# Yield initial state
|
||||
yield {"event": "update", "data": json.dumps({
|
||||
"session_id": session_id,
|
||||
"status": session.status,
|
||||
"rounds": [round.dict() for round in session.rounds],
|
||||
"evidence_library": [e.dict() for e in session.evidence_library],
|
||||
"message": "Debate session initialized"
|
||||
}, default=str)}
|
||||
|
||||
last_round_count = len(session.rounds)
|
||||
# Poll until debate completes or times out (max 5 min)
|
||||
for _ in range(150): # 150 × 2s = 300s timeout
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Reset transaction so we see commits from the run_debate request
|
||||
db.commit()
|
||||
|
||||
updated_session = await session_manager.get_session(db, session_id)
|
||||
if not updated_session:
|
||||
break
|
||||
|
||||
# Only yield update when rounds actually changed
|
||||
if len(updated_session.rounds) != last_round_count or updated_session.status != session.status:
|
||||
last_round_count = len(updated_session.rounds)
|
||||
session = updated_session
|
||||
yield {"event": "update", "data": json.dumps({
|
||||
"session_id": session_id,
|
||||
"status": updated_session.status,
|
||||
"rounds": [round.dict() for round in updated_session.rounds],
|
||||
"evidence_library": [e.dict() for e in updated_session.evidence_library],
|
||||
"current_round": len(updated_session.rounds)
|
||||
}, default=str)}
|
||||
|
||||
if updated_session.status in ("completed", "terminated"):
|
||||
yield {"event": "complete", "data": json.dumps({
|
||||
"session_id": session_id,
|
||||
"status": updated_session.status,
|
||||
"summary": updated_session.summary,
|
||||
"rounds": [round.dict() for round in updated_session.rounds],
|
||||
"evidence_library": [e.dict() for e in updated_session.evidence_library]
|
||||
}, default=str)}
|
||||
break
|
||||
|
||||
return EventSourceResponse(event_generator())
|
||||
|
||||
|
||||
@router.get("/sessions")
|
||||
async def list_sessions(db=Depends(get_db)) -> Dict[str, Any]:
|
||||
"""
|
||||
List all debate sessions
|
||||
"""
|
||||
try:
|
||||
session_manager = SessionManager()
|
||||
sessions = await session_manager.list_sessions(db)
|
||||
return {"sessions": sessions}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
173
api/models.py
173
api/models.py
@@ -1,173 +0,0 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
import aiohttp
|
||||
|
||||
from services.api_key_service import ApiKeyService
|
||||
from db_models import get_db
|
||||
from api.api_keys import validate_api_key
|
||||
|
||||
router = APIRouter(tags=["models"])
|
||||
|
||||
|
||||
@router.get("/models/{provider}")
|
||||
async def get_available_models(provider: str, db=Depends(get_db)):
|
||||
"""
|
||||
Get available models for a specific provider by fetching from their API.
|
||||
Falls back to a curated default list if the API key is missing or the request fails.
|
||||
"""
|
||||
# Curated defaults for each provider — used as fallback
|
||||
default_models = {
|
||||
"openai": [
|
||||
{"model_identifier": "gpt-4o", "display_name": "gpt-4o"},
|
||||
{"model_identifier": "gpt-4o-mini", "display_name": "gpt-4o-mini"},
|
||||
{"model_identifier": "gpt-4-turbo", "display_name": "gpt-4-turbo"},
|
||||
{"model_identifier": "gpt-4", "display_name": "gpt-4"},
|
||||
{"model_identifier": "o3-mini", "display_name": "o3-mini"},
|
||||
{"model_identifier": "gpt-3.5-turbo", "display_name": "gpt-3.5-turbo"}
|
||||
],
|
||||
"claude": [
|
||||
{"model_identifier": "claude-opus-4-5", "display_name": "claude-opus-4-5"},
|
||||
{"model_identifier": "claude-sonnet-4-5", "display_name": "claude-sonnet-4-5"},
|
||||
{"model_identifier": "claude-3-5-sonnet-20241022", "display_name": "claude-3-5-sonnet-20241022"},
|
||||
{"model_identifier": "claude-3-5-haiku-20241022", "display_name": "claude-3-5-haiku-20241022"},
|
||||
{"model_identifier": "claude-3-opus-20240229", "display_name": "claude-3-opus-20240229"}
|
||||
],
|
||||
"qwen": [
|
||||
{"model_identifier": "qwen3-max", "display_name": "qwen3-max"},
|
||||
{"model_identifier": "qwen3-plus", "display_name": "qwen3-plus"},
|
||||
{"model_identifier": "qwen3-flash", "display_name": "qwen3-flash"},
|
||||
{"model_identifier": "qwen-max", "display_name": "qwen-max"},
|
||||
{"model_identifier": "qwen-plus", "display_name": "qwen-plus"},
|
||||
{"model_identifier": "qwen-turbo", "display_name": "qwen-turbo"}
|
||||
],
|
||||
"deepseek": [
|
||||
{"model_identifier": "deepseek-chat", "display_name": "deepseek-chat"},
|
||||
{"model_identifier": "deepseek-reasoner", "display_name": "deepseek-reasoner"},
|
||||
{"model_identifier": "deepseek-v3", "display_name": "deepseek-v3"},
|
||||
{"model_identifier": "deepseek-r1", "display_name": "deepseek-r1"}
|
||||
]
|
||||
}
|
||||
|
||||
defaults = default_models.get(provider, [])
|
||||
|
||||
try:
|
||||
# Retrieve and decrypt API key
|
||||
decrypted_key = ApiKeyService.get_api_key(db, provider)
|
||||
if not decrypted_key:
|
||||
return {"provider": provider, "models": defaults}
|
||||
|
||||
# ---------- OpenAI ----------
|
||||
if provider == "openai":
|
||||
import openai
|
||||
async with openai.AsyncOpenAI(api_key=decrypted_key, timeout=10.0) as client:
|
||||
response = await client.models.list()
|
||||
|
||||
# Keep only chat / reasoning models, sorted newest-first by created timestamp
|
||||
chat_prefixes = ('gpt-', 'o1', 'o3', 'o4', 'chatgpt')
|
||||
models = []
|
||||
seen = set()
|
||||
for m in sorted(response.data, key=lambda x: x.created, reverse=True):
|
||||
if m.id not in seen and m.id.startswith(chat_prefixes):
|
||||
seen.add(m.id)
|
||||
models.append({"model_identifier": m.id, "display_name": m.id})
|
||||
|
||||
if models:
|
||||
return {"provider": provider, "models": models}
|
||||
|
||||
# ---------- Claude ----------
|
||||
elif provider == "claude":
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
headers = {
|
||||
"x-api-key": decrypted_key,
|
||||
"anthropic-version": "2023-06-01"
|
||||
}
|
||||
async with session.get("https://api.anthropic.com/v1/models", headers=headers) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
models = []
|
||||
for m in data.get("data", []):
|
||||
model_id = m.get("id", "")
|
||||
if model_id.startswith("claude-"):
|
||||
models.append({"model_identifier": model_id, "display_name": model_id})
|
||||
if models:
|
||||
return {"provider": provider, "models": models}
|
||||
else:
|
||||
print(f"Claude models API returned {resp.status}: {await resp.text()}")
|
||||
|
||||
# ---------- Qwen ----------
|
||||
elif provider == "qwen":
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
headers = {"Authorization": f"Bearer {decrypted_key}"}
|
||||
async with session.get("https://dashscope.aliyuncs.com/compatible-mode/v1/models", headers=headers) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
exclude_keywords = [
|
||||
'tts', 'vl', 'ocr', 'image', 'asr', '-mt-', '-mt',
|
||||
'math', 'embed', 'rerank', 'coder', 'translate',
|
||||
's2s', 'deep-search', 'omni', 'gui-'
|
||||
]
|
||||
models = []
|
||||
for m in data.get("data", []):
|
||||
model_id = m.get("id", "")
|
||||
if not model_id:
|
||||
continue
|
||||
if not (model_id.startswith("qwen") or model_id.startswith("qwq")):
|
||||
continue
|
||||
if any(kw in model_id for kw in exclude_keywords):
|
||||
continue
|
||||
models.append({"model_identifier": model_id, "display_name": model_id})
|
||||
if models:
|
||||
return {"provider": provider, "models": models}
|
||||
else:
|
||||
print(f"Qwen models API returned {resp.status}: {await resp.text()}")
|
||||
|
||||
# ---------- DeepSeek ----------
|
||||
elif provider == "deepseek":
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
headers = {"Authorization": f"Bearer {decrypted_key}"}
|
||||
async with session.get("https://api.deepseek.com/v1/models", headers=headers) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.json()
|
||||
models = []
|
||||
for m in data.get("data", []):
|
||||
model_id = m.get("id", "")
|
||||
if model_id.startswith("deepseek"):
|
||||
models.append({"model_identifier": model_id, "display_name": model_id})
|
||||
if models:
|
||||
return {"provider": provider, "models": models}
|
||||
else:
|
||||
print(f"DeepSeek models API returned {resp.status}: {await resp.text()}")
|
||||
|
||||
# API fetch succeeded but returned empty list, or unknown provider — use defaults
|
||||
return {"provider": provider, "models": defaults}
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching models for {provider}: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return {"provider": provider, "models": defaults}
|
||||
|
||||
|
||||
@router.get("/providers")
|
||||
async def get_available_providers(db=Depends(get_db)):
|
||||
"""
|
||||
Get all providers that have valid API keys set
|
||||
"""
|
||||
try:
|
||||
available_providers = []
|
||||
for provider in ("openai", "claude", "qwen", "deepseek", "tavily"):
|
||||
decrypted_key = ApiKeyService.get_api_key(db, provider)
|
||||
if not decrypted_key:
|
||||
continue
|
||||
is_valid = await validate_api_key(provider, decrypted_key)
|
||||
if is_valid:
|
||||
available_providers.append({
|
||||
"provider": provider,
|
||||
"has_valid_key": True
|
||||
})
|
||||
|
||||
return {"providers": available_providers}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
192
api/setup.py
192
api/setup.py
@@ -1,192 +0,0 @@
|
||||
import os
|
||||
from typing import Optional
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
from pydantic import BaseModel
|
||||
|
||||
from services.config_service import ConfigService
|
||||
|
||||
router = APIRouter(prefix="/api/setup", tags=["setup"])
|
||||
|
||||
PASSWORD_PLACEHOLDER = "********"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Request / response models
|
||||
# ---------------------------------------------------------------------------
|
||||
class DatabaseConfig(BaseModel):
|
||||
host: str
|
||||
port: int = 3306
|
||||
user: str = "root"
|
||||
password: str = ""
|
||||
database: str = "dialectica"
|
||||
|
||||
|
||||
class KeycloakConfig(BaseModel):
|
||||
host: str = ""
|
||||
realm: str = ""
|
||||
client_id: str = ""
|
||||
|
||||
|
||||
class TlsConfig(BaseModel):
|
||||
cert_path: str = ""
|
||||
key_path: str = ""
|
||||
force_https: bool = False
|
||||
|
||||
|
||||
class FullConfig(BaseModel):
|
||||
database: Optional[DatabaseConfig] = None
|
||||
keycloak: Optional[KeycloakConfig] = None
|
||||
tls: Optional[TlsConfig] = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Access control dependency
|
||||
# ---------------------------------------------------------------------------
|
||||
async def setup_guard(request: Request):
|
||||
"""Three-phase access control for setup routes.
|
||||
|
||||
1. Not initialized → only localhost allowed
|
||||
2. ENV_MODE=dev → open
|
||||
3. ENV_MODE=prod → Keycloak admin JWT required
|
||||
"""
|
||||
config = ConfigService.load()
|
||||
initialized = config.get("initialized", False)
|
||||
env_mode = os.getenv("ENV_MODE", "dev")
|
||||
|
||||
if env_mode == "dev":
|
||||
return # dev mode: no auth needed, even before initialisation
|
||||
|
||||
if not initialized:
|
||||
# prod + not initialised: only localhost may configure
|
||||
client_ip = request.client.host
|
||||
if client_ip not in ("127.0.0.1", "::1"):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="初次设置仅允许从本机访问",
|
||||
)
|
||||
return
|
||||
|
||||
# prod → delegate to Keycloak middleware (Phase 3)
|
||||
from app.middleware.auth import require_admin
|
||||
await require_admin(request, config)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routes
|
||||
# ---------------------------------------------------------------------------
|
||||
@router.get("/status")
|
||||
async def setup_status():
|
||||
"""Return current system initialisation state, including KC info for OIDC."""
|
||||
config = ConfigService.load()
|
||||
env_mode = os.getenv("ENV_MODE", "dev")
|
||||
|
||||
result = {
|
||||
"initialized": config.get("initialized", False),
|
||||
"env_mode": env_mode,
|
||||
"db_configured": ConfigService.is_db_configured(),
|
||||
}
|
||||
|
||||
# Include Keycloak info so the frontend can build OIDC config
|
||||
kc = config.get("keycloak", {})
|
||||
if env_mode == "prod" and kc.get("host") and kc.get("realm"):
|
||||
result["keycloak"] = {
|
||||
"authority": f"{kc['host']}/realms/{kc['realm']}",
|
||||
"client_id": kc.get("client_id", ""),
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.get("/config", dependencies=[Depends(setup_guard)])
|
||||
async def get_config():
|
||||
"""Return full config with passwords replaced by placeholder."""
|
||||
config = ConfigService.load()
|
||||
if "database" in config and config["database"].get("password"):
|
||||
config["database"]["password"] = PASSWORD_PLACEHOLDER
|
||||
return config
|
||||
|
||||
|
||||
@router.put("/config", dependencies=[Depends(setup_guard)])
|
||||
async def update_config(payload: FullConfig):
|
||||
"""Merge submitted config sections into the YAML file."""
|
||||
config = ConfigService.load()
|
||||
|
||||
if payload.database is not None:
|
||||
db_dict = payload.database.model_dump()
|
||||
# If password is the placeholder, keep the existing real password
|
||||
if db_dict.get("password") == PASSWORD_PLACEHOLDER:
|
||||
db_dict["password"] = config.get("database", {}).get("password", "")
|
||||
config["database"] = db_dict
|
||||
if payload.keycloak is not None:
|
||||
config["keycloak"] = payload.keycloak.model_dump()
|
||||
if payload.tls is not None:
|
||||
config["tls"] = payload.tls.model_dump()
|
||||
|
||||
ConfigService.save(config)
|
||||
return {"message": "配置已保存"}
|
||||
|
||||
|
||||
@router.post("/test-db", dependencies=[Depends(setup_guard)])
|
||||
async def test_db_connection(db_config: DatabaseConfig):
|
||||
"""Test a database connection with the provided parameters (no save)."""
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
password = db_config.password
|
||||
# If password is the placeholder, use the real password from config
|
||||
if password == PASSWORD_PLACEHOLDER:
|
||||
password = ConfigService.load().get("database", {}).get("password", "")
|
||||
|
||||
url = (
|
||||
f"mysql+pymysql://{quote_plus(db_config.user)}:{quote_plus(password)}"
|
||||
f"@{db_config.host}:{db_config.port}/{db_config.database}"
|
||||
)
|
||||
try:
|
||||
engine = create_engine(url, pool_pre_ping=True)
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("SELECT 1"))
|
||||
engine.dispose()
|
||||
return {"success": True, "message": "数据库连接成功"}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": str(e)}
|
||||
|
||||
|
||||
@router.post("/test-keycloak", dependencies=[Depends(setup_guard)])
|
||||
async def test_keycloak(kc_config: KeycloakConfig):
|
||||
"""Test Keycloak connectivity by fetching the OIDC discovery document."""
|
||||
import httpx
|
||||
|
||||
well_known = (
|
||||
f"{kc_config.host}/realms/{kc_config.realm}"
|
||||
f"/.well-known/openid-configuration"
|
||||
)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(well_known)
|
||||
if resp.status_code == 200:
|
||||
return {"success": True, "message": "Keycloak 连通正常"}
|
||||
return {"success": False, "message": f"HTTP {resp.status_code}"}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": str(e)}
|
||||
|
||||
|
||||
@router.post("/initialize", dependencies=[Depends(setup_guard)])
|
||||
async def initialize():
|
||||
"""Mark system as initialised and reload DB connection."""
|
||||
config = ConfigService.load()
|
||||
|
||||
if not ConfigService.is_db_configured():
|
||||
raise HTTPException(status_code=400, detail="请先配置数据库连接")
|
||||
|
||||
# Reload DB engine so business routes can start working
|
||||
from app.db_models import reload_db_connection
|
||||
from app.storage.database import init_db
|
||||
|
||||
reload_db_connection()
|
||||
init_db()
|
||||
|
||||
config["initialized"] = True
|
||||
ConfigService.save(config)
|
||||
|
||||
return {"message": "系统初始化完成", "initialized": True}
|
||||
60
app.py
60
app.py
@@ -1,60 +0,0 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from starlette.responses import JSONResponse
|
||||
import uvicorn
|
||||
|
||||
from storage.database import init_db
|
||||
from exceptions import ServiceNotConfiguredError
|
||||
from middleware.config_guard import ConfigGuardMiddleware
|
||||
from api import debates_router, api_keys_router, models_router, setup_router
|
||||
|
||||
app = FastAPI(
|
||||
title="Dialectica - Multi-Model Debate Framework",
|
||||
description="A framework for structured debates between multiple language models",
|
||||
version="0.1.0"
|
||||
)
|
||||
|
||||
# Add CORS middleware
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"], # Allow all origins for development
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"], # Allow all methods
|
||||
allow_headers=["*"], # Allow all headers
|
||||
# Add exposed headers to allow frontend to access response headers
|
||||
expose_headers=["Access-Control-Allow-Origin", "Access-Control-Allow-Credentials"]
|
||||
)
|
||||
|
||||
# Config guard: return 503 for business routes when DB not configured
|
||||
app.add_middleware(ConfigGuardMiddleware)
|
||||
|
||||
|
||||
@app.exception_handler(ServiceNotConfiguredError)
|
||||
async def not_configured_handler(request, exc):
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={"error_code": "SERVICE_NOT_CONFIGURED", "detail": str(exc)},
|
||||
)
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
def startup_event():
|
||||
"""Initialize database on startup (skipped if not configured)."""
|
||||
init_db()
|
||||
|
||||
|
||||
# Register routers
|
||||
app.include_router(debates_router)
|
||||
app.include_router(api_keys_router)
|
||||
app.include_router(models_router)
|
||||
app.include_router(setup_router)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
return {"message": "Welcome to Dialectica - Multi-Model Debate Framework"}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from datetime import datetime
|
||||
|
||||
from exceptions import ServiceNotConfiguredError
|
||||
from services.config_service import ConfigService
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class ApiKey(Base):
|
||||
__tablename__ = "api_keys"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
provider = Column(String(50), unique=True, index=True, nullable=False)
|
||||
api_key_encrypted = Column(Text, nullable=False) # Encrypted API key
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow)
|
||||
|
||||
|
||||
class ModelConfig(Base):
|
||||
__tablename__ = "model_configs"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
provider = Column(String(50), nullable=False)
|
||||
model_name = Column(String(100), nullable=False)
|
||||
display_name = Column(String(100)) # Optional display name
|
||||
is_active = Column(Boolean, default=True)
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow)
|
||||
|
||||
__table_args__ = {'mysql_charset': 'utf8mb4'}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy engine / session factory — only created when first requested
|
||||
# ---------------------------------------------------------------------------
|
||||
_engine = None
|
||||
_SessionLocal = None
|
||||
|
||||
|
||||
def _get_engine():
|
||||
from sqlalchemy import create_engine
|
||||
global _engine
|
||||
if _engine is None:
|
||||
db_url = ConfigService.get_database_url()
|
||||
if not db_url:
|
||||
raise ServiceNotConfiguredError("数据库未配置")
|
||||
_engine = create_engine(db_url)
|
||||
return _engine
|
||||
|
||||
|
||||
def _get_session_factory():
|
||||
global _SessionLocal
|
||||
if _SessionLocal is None:
|
||||
_SessionLocal = sessionmaker(
|
||||
autocommit=False, autoflush=False, bind=_get_engine()
|
||||
)
|
||||
return _SessionLocal
|
||||
|
||||
|
||||
def reload_db_connection():
|
||||
"""Dispose current engine and reset so next call rebuilds from config."""
|
||||
global _engine, _SessionLocal
|
||||
if _engine is not None:
|
||||
_engine.dispose()
|
||||
_engine = None
|
||||
_SessionLocal = None
|
||||
|
||||
|
||||
def get_db():
|
||||
"""FastAPI dependency that yields a DB session."""
|
||||
session_factory = _get_session_factory()
|
||||
db = session_factory()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
45
docker-compose.dev.yml
Normal file
45
docker-compose.dev.yml
Normal file
@@ -0,0 +1,45 @@
|
||||
# Dev docker-compose: backend + MySQL only, exposed on localhost.
|
||||
# Frontend / nginx are in the sibling top-level Dialectic repo's compose.
|
||||
# For end-to-end dev: run that compose; for backend-only iteration, this.
|
||||
|
||||
services:
|
||||
backend:
|
||||
build:
|
||||
context: .
|
||||
args:
|
||||
VERSION: dev-local
|
||||
environment:
|
||||
ENV_MODE: dev
|
||||
HTTP_ADDR: 0.0.0.0:8090
|
||||
CORS_ALLOW_ORIGINS: "*"
|
||||
DB_HOST: mysql
|
||||
DB_PORT: "3306"
|
||||
DB_NAME: dialectic
|
||||
DB_USER: dialectic
|
||||
DB_PASSWORD: dialectic
|
||||
AGENT_API_KEY_PEPPER: dev-pepper
|
||||
OIDC_DEV_BYPASS_TOKEN: dev-bypass-token
|
||||
ports: ["8090:8090"]
|
||||
depends_on:
|
||||
mysql:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
|
||||
mysql:
|
||||
image: mysql:8.4
|
||||
environment:
|
||||
MYSQL_ROOT_PASSWORD: rootpassword
|
||||
MYSQL_DATABASE: dialectic
|
||||
MYSQL_USER: dialectic
|
||||
MYSQL_PASSWORD: dialectic
|
||||
ports: ["3306:3306"]
|
||||
healthcheck:
|
||||
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-pdialectic"]
|
||||
interval: 5s
|
||||
timeout: 3s
|
||||
retries: 20
|
||||
volumes:
|
||||
- dialectic_mysql_data:/var/lib/mysql
|
||||
|
||||
volumes:
|
||||
dialectic_mysql_data:
|
||||
@@ -1,3 +0,0 @@
|
||||
class ServiceNotConfiguredError(Exception):
|
||||
"""Raised when a required service (database, etc.) is not configured."""
|
||||
pass
|
||||
13
go.mod
Normal file
13
go.mod
Normal file
@@ -0,0 +1,13 @@
|
||||
module git.hangman-lab.top/hzhang/Dialectic.Backend
|
||||
|
||||
go 1.23
|
||||
|
||||
require (
|
||||
github.com/go-chi/chi/v5 v5.1.0
|
||||
github.com/go-chi/cors v1.2.1
|
||||
github.com/go-sql-driver/mysql v1.8.1
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/jmoiron/sqlx v1.4.0
|
||||
)
|
||||
|
||||
require filippo.io/edwards25519 v1.1.0 // indirect
|
||||
16
go.sum
Normal file
16
go.sum
Normal file
@@ -0,0 +1,16 @@
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
|
||||
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
||||
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
|
||||
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
|
||||
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
|
||||
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
|
||||
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
|
||||
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
144
internal/auth/auth.go
Normal file
144
internal/auth/auth.go
Normal file
@@ -0,0 +1,144 @@
|
||||
// Package auth holds the two middlewares Dialectic v2 uses:
|
||||
//
|
||||
// - AgentAPIKey: validates `Authorization: Bearer <raw>` against
|
||||
// the `agent_keys` table (hashed with the configured pepper).
|
||||
// Used by Dialectic.OpenclawPlugin → backend calls.
|
||||
//
|
||||
// - OIDCBrowser: validates a Keycloak-issued JWT in the
|
||||
// `dialectic_session` cookie. Used by the React frontend.
|
||||
// Phase 2C ships a stub that accepts a dev-mode bypass token; the
|
||||
// real JWKS verification + claim mapping lands with Phase 4.
|
||||
//
|
||||
// Both middlewares attach a typed Caller to the request context so
|
||||
// downstream handlers can read identity uniformly.
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type CallerKind string
|
||||
|
||||
const (
|
||||
CallerAgent CallerKind = "agent"
|
||||
CallerUser CallerKind = "user"
|
||||
CallerSystem CallerKind = "system"
|
||||
)
|
||||
|
||||
type Caller struct {
|
||||
Kind CallerKind
|
||||
ID string // agentId for CallerAgent; userId for CallerUser; key-name for CallerSystem
|
||||
Roles []string // populated for CallerUser (from JWT claims); empty otherwise
|
||||
}
|
||||
|
||||
type ctxKey struct{}
|
||||
|
||||
func WithCaller(ctx context.Context, c Caller) context.Context {
|
||||
return context.WithValue(ctx, ctxKey{}, c)
|
||||
}
|
||||
|
||||
// FromContext returns the caller attached by an auth middleware. The
|
||||
// zero Caller (Kind == "") indicates an unauthenticated request reached
|
||||
// a public route.
|
||||
func FromContext(ctx context.Context) Caller {
|
||||
c, _ := ctx.Value(ctxKey{}).(Caller)
|
||||
return c
|
||||
}
|
||||
|
||||
// HashKey peppers + sha256-hashes a raw API key. Constant pepper; same
|
||||
// raw key always produces the same hash so lookups can equal-match on
|
||||
// the key_hash column.
|
||||
func HashKey(pepper, raw string) string {
|
||||
h := sha256.Sum256([]byte(pepper + ":" + raw))
|
||||
return hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
// AgentAPIKey middleware: extracts Bearer token, looks up agent_keys,
|
||||
// 401 on miss. Updates last_used_at lazily (best-effort; failure here
|
||||
// doesn't block the request).
|
||||
func AgentAPIKey(db *sqlx.DB, pepper string) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
raw := bearerToken(r)
|
||||
if raw == "" {
|
||||
http.Error(w, "missing bearer token", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
hash := HashKey(pepper, raw)
|
||||
var agentID string
|
||||
err := db.GetContext(r.Context(), &agentID,
|
||||
`SELECT agent_id FROM agent_keys WHERE key_hash = ? AND revoked_at IS NULL`, hash)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.Error(w, "invalid agent key", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "auth lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
go func(h string) {
|
||||
// best-effort touch — independent ctx so it survives
|
||||
// even if the request was cancelled mid-handler.
|
||||
_, _ = db.Exec(
|
||||
`UPDATE agent_keys SET last_used_at = CURRENT_TIMESTAMP WHERE key_hash = ?`, h)
|
||||
}(hash)
|
||||
|
||||
ctx := WithCaller(r.Context(), Caller{Kind: CallerAgent, ID: agentID})
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// OIDCBrowser middleware (Phase 2C stub):
|
||||
// - Dev mode + `x-dev-bypass: <token>` header → admit as a fake user.
|
||||
// - Otherwise: 401 with a hint pointing to the (not-yet-wired)
|
||||
// Keycloak redirect path. The real JWKS-verifying middleware lands
|
||||
// when the frontend is wired up; until then, browser callers can
|
||||
// only reach the API via the dev bypass.
|
||||
func OIDCBrowser(devMode bool, devBypassToken string) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if devMode && devBypassToken != "" {
|
||||
if subtleEqual(r.Header.Get("x-dev-bypass"), devBypassToken) {
|
||||
ctx := WithCaller(r.Context(), Caller{
|
||||
Kind: CallerUser,
|
||||
ID: "dev-operator",
|
||||
Roles: []string{"dialectic-admin"},
|
||||
})
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
return
|
||||
}
|
||||
}
|
||||
// Production path goes through Keycloak — Phase 4.
|
||||
http.Error(w, "oidc login required (Phase 4: not yet wired)", http.StatusUnauthorized)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func bearerToken(r *http.Request) string {
|
||||
h := r.Header.Get("authorization")
|
||||
const prefix = "Bearer "
|
||||
if strings.HasPrefix(h, prefix) {
|
||||
return strings.TrimSpace(h[len(prefix):])
|
||||
}
|
||||
if strings.HasPrefix(h, "bearer ") {
|
||||
return strings.TrimSpace(h[len("bearer "):])
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func subtleEqual(a, b string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
return subtle.ConstantTimeCompare([]byte(a), []byte(b)) == 1
|
||||
}
|
||||
167
internal/config/config.go
Normal file
167
internal/config/config.go
Normal file
@@ -0,0 +1,167 @@
|
||||
// Package config loads runtime configuration from environment variables.
|
||||
//
|
||||
// Conventions:
|
||||
// - 12-factor: every config knob is an env var; no config files.
|
||||
// - Sensible dev defaults for local docker-compose; prod sets via env.
|
||||
// - Sensitive values (DB password, system api key) are *required* in
|
||||
// prod; LoadFromEnv() fails fast if absent and ENV_MODE != "dev".
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// "dev" | "prod". Dev relaxes required-field checks and enables a
|
||||
// dev-mode auth bypass token. Prod requires every sensitive field.
|
||||
Mode string
|
||||
|
||||
// HTTP server bind. e.g. "0.0.0.0:8090".
|
||||
HTTPAddr string
|
||||
|
||||
// CORS allowed origins (comma-separated; "*" allowed only in dev).
|
||||
CORSAllowOrigins []string
|
||||
|
||||
// MySQL DSN parts.
|
||||
DBHost string
|
||||
DBPort string
|
||||
DBName string
|
||||
DBUser string
|
||||
DBPassword string
|
||||
|
||||
// Auth.
|
||||
//
|
||||
// SystemAPIKey: Phase-1 system key for posting to announce channels
|
||||
// in Fabric. Mirrored here so Dialectic backend itself can post topic
|
||||
// announcements via Fabric's POST /channels/:id/messages with
|
||||
// x-fabric-system-key header.
|
||||
//
|
||||
// AgentAPIKeyPepper: HMAC pepper for hashing agent API keys at rest
|
||||
// (we store sha256(pepper || raw) not the raw key). Rotating the
|
||||
// pepper invalidates all keys — that's intentional, an emergency
|
||||
// kill switch.
|
||||
//
|
||||
// OIDCDevBypassToken: dev-mode only. If set AND Mode == "dev", a
|
||||
// browser request with header `x-dev-bypass: <token>` bypasses OIDC
|
||||
// and is treated as user "dev-operator" with role "dialectic-admin".
|
||||
// Prod ignores this even if set.
|
||||
SystemAPIKey string
|
||||
AgentAPIKeyPepper string
|
||||
OIDCDevBypassToken string
|
||||
|
||||
// DialecticAdminAPIKey gates POST /api/admin/agent-keys (raw key
|
||||
// minting). Held on the operator side only — kept on the openclaw
|
||||
// host at /root/.openclaw/system-secrets/dialectic-admin-key for
|
||||
// `dialectic-ctrl` script to read. Empty in env → admin endpoint
|
||||
// fully closed.
|
||||
DialecticAdminAPIKey string
|
||||
|
||||
// OIDC issuer URL (Keycloak realm endpoint). e.g.
|
||||
// https://auth.hangman-lab.top/realms/hangman-lab
|
||||
// Phase 2C ships this as configured-but-not-verified; Phase 4 wires
|
||||
// real JWKS validation.
|
||||
OIDCIssuer string
|
||||
OIDCClientID string
|
||||
|
||||
// (Removed Aug 2026: all Fabric coupling — FabricSystemAPIKey,
|
||||
// FabricGuildBaseURL, FabricAnnounceChannelID, FabricBotBearerToken.
|
||||
// Backend no longer broadcasts lifecycle events to Fabric. The
|
||||
// proposing agent posts a single recruitment fabric-send-message
|
||||
// after creating a topic; downstream agents book HF on_call slots
|
||||
// covering the debate window via `hf calendar schedule` and HF
|
||||
// wakes them naturally. The backend stays a pure data + state-
|
||||
// machine service and doesn't know about Fabric.)
|
||||
|
||||
// Orchestrator tick interval. 0 / unset → default 15s.
|
||||
OrchestratorTickInterval time.Duration
|
||||
}
|
||||
|
||||
func LoadFromEnv() (*Config, error) {
|
||||
c := &Config{
|
||||
Mode: getenv("ENV_MODE", "dev"),
|
||||
HTTPAddr: getenv("HTTP_ADDR", "0.0.0.0:8090"),
|
||||
CORSAllowOrigins: splitCSV(getenv("CORS_ALLOW_ORIGINS", "*")),
|
||||
DBHost: getenv("DB_HOST", "127.0.0.1"),
|
||||
DBPort: getenv("DB_PORT", "3306"),
|
||||
DBName: getenv("DB_NAME", "dialectic"),
|
||||
DBUser: getenv("DB_USER", "dialectic"),
|
||||
DBPassword: os.Getenv("DB_PASSWORD"),
|
||||
SystemAPIKey: os.Getenv("SYSTEM_API_KEY"),
|
||||
AgentAPIKeyPepper: os.Getenv("AGENT_API_KEY_PEPPER"),
|
||||
OIDCDevBypassToken: os.Getenv("OIDC_DEV_BYPASS_TOKEN"),
|
||||
DialecticAdminAPIKey: os.Getenv("DIALECTIC_ADMIN_API_KEY"),
|
||||
OIDCIssuer: os.Getenv("OIDC_ISSUER"),
|
||||
OIDCClientID: os.Getenv("OIDC_CLIENT_ID"),
|
||||
}
|
||||
if d := os.Getenv("ORCHESTRATOR_TICK_INTERVAL"); d != "" {
|
||||
if parsed, err := time.ParseDuration(d); err == nil {
|
||||
c.OrchestratorTickInterval = parsed
|
||||
}
|
||||
}
|
||||
|
||||
if c.Mode != "dev" && c.Mode != "prod" {
|
||||
return nil, fmt.Errorf("ENV_MODE must be dev|prod, got %q", c.Mode)
|
||||
}
|
||||
|
||||
if c.Mode == "prod" {
|
||||
var missing []string
|
||||
if c.DBPassword == "" {
|
||||
missing = append(missing, "DB_PASSWORD")
|
||||
}
|
||||
if c.AgentAPIKeyPepper == "" {
|
||||
missing = append(missing, "AGENT_API_KEY_PEPPER")
|
||||
}
|
||||
if c.OIDCIssuer == "" {
|
||||
missing = append(missing, "OIDC_ISSUER")
|
||||
}
|
||||
if c.OIDCClientID == "" {
|
||||
missing = append(missing, "OIDC_CLIENT_ID")
|
||||
}
|
||||
if len(missing) > 0 {
|
||||
return nil, fmt.Errorf("prod mode requires env: %s", strings.Join(missing, ", "))
|
||||
}
|
||||
// In prod, "*" CORS is never accepted.
|
||||
for _, o := range c.CORSAllowOrigins {
|
||||
if o == "*" {
|
||||
return nil, fmt.Errorf("prod mode forbids CORS_ALLOW_ORIGINS='*'")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *Config) IsDev() bool { return c.Mode == "dev" }
|
||||
|
||||
func (c *Config) DSN() string {
|
||||
// MySQL DSN: user:pass@tcp(host:port)/dbname?params
|
||||
return fmt.Sprintf(
|
||||
"%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4&collation=utf8mb4_unicode_ci",
|
||||
c.DBUser, c.DBPassword, c.DBHost, c.DBPort, c.DBName,
|
||||
)
|
||||
}
|
||||
|
||||
func getenv(key, fallback string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return fallback
|
||||
}
|
||||
|
||||
func splitCSV(s string) []string {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
parts := strings.Split(s, ",")
|
||||
out := make([]string, 0, len(parts))
|
||||
for _, p := range parts {
|
||||
p = strings.TrimSpace(p)
|
||||
if p != "" {
|
||||
out = append(out, p)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
142
internal/db/db.go
Normal file
142
internal/db/db.go
Normal file
@@ -0,0 +1,142 @@
|
||||
// Package db wraps sqlx and runs embedded SQL migrations on startup.
|
||||
//
|
||||
// Migrations are flat files in migrations/, named NNN_*.sql. They run in
|
||||
// lexical order. Each is executed in its own transaction; a missing
|
||||
// schema_migrations row indicates "not yet applied". This is a
|
||||
// deliberately simple migration runner — for this project's size + team
|
||||
// size, pulling in golang-migrate or atlas adds complexity without
|
||||
// payback. If migration count grows past ~20, revisit.
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"embed"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
//go:embed migrations/*.sql
|
||||
var migrationsFS embed.FS
|
||||
|
||||
func Open(ctx context.Context, dsn string) (*sqlx.DB, error) {
|
||||
d, err := sqlx.ConnectContext(ctx, "mysql", dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect mysql: %w", err)
|
||||
}
|
||||
d.SetMaxOpenConns(25)
|
||||
d.SetMaxIdleConns(5)
|
||||
d.SetConnMaxLifetime(5 * time.Minute)
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// RunMigrations applies any migrations that aren't yet present in the
|
||||
// schema_migrations table. Idempotent — safe to call on every startup.
|
||||
func RunMigrations(ctx context.Context, d *sqlx.DB) error {
|
||||
// Bootstrap the tracker table itself.
|
||||
if _, err := d.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
name VARCHAR(255) PRIMARY KEY,
|
||||
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4`); err != nil {
|
||||
return fmt.Errorf("ensure schema_migrations: %w", err)
|
||||
}
|
||||
|
||||
entries, err := migrationsFS.ReadDir("migrations")
|
||||
if err != nil {
|
||||
return fmt.Errorf("list migrations: %w", err)
|
||||
}
|
||||
var files []string
|
||||
for _, e := range entries {
|
||||
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
|
||||
files = append(files, e.Name())
|
||||
}
|
||||
}
|
||||
sort.Strings(files)
|
||||
|
||||
for _, name := range files {
|
||||
var found string
|
||||
err := d.GetContext(ctx, &found, `SELECT name FROM schema_migrations WHERE name = ?`, name)
|
||||
if err == nil {
|
||||
continue // already applied
|
||||
}
|
||||
if err != sql.ErrNoRows {
|
||||
return fmt.Errorf("check migration %s: %w", name, err)
|
||||
}
|
||||
|
||||
content, err := migrationsFS.ReadFile("migrations/" + name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read migration %s: %w", name, err)
|
||||
}
|
||||
|
||||
// MySQL doesn't support multi-statement in a single Exec by default
|
||||
// — split on ';' boundaries and run each individually. Comments are
|
||||
// passed through (server-side parser handles).
|
||||
statements := splitSQL(string(content))
|
||||
|
||||
tx, err := d.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tx for %s: %w", name, err)
|
||||
}
|
||||
for _, stmt := range statements {
|
||||
stmt = strings.TrimSpace(stmt)
|
||||
if stmt == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, stmt); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return fmt.Errorf("apply %s [statement: %q]: %w", name, firstLine(stmt), err)
|
||||
}
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `INSERT INTO schema_migrations(name) VALUES (?)`, name); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return fmt.Errorf("record %s: %w", name, err)
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("commit %s: %w", name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func splitSQL(s string) []string {
|
||||
// Comment-aware splitter: skip `;` inside `-- ...` line comments
|
||||
// and `/* ... */` block comments. Doesn't handle string-literal
|
||||
// semicolons (we don't put any) — if we ever need that, swap in a
|
||||
// real SQL tokenizer.
|
||||
var b strings.Builder
|
||||
i := 0
|
||||
for i < len(s) {
|
||||
if i+1 < len(s) && s[i] == '-' && s[i+1] == '-' {
|
||||
// single-line comment — strip through end of line
|
||||
for i < len(s) && s[i] != '\n' {
|
||||
i++
|
||||
}
|
||||
continue
|
||||
}
|
||||
if i+1 < len(s) && s[i] == '/' && s[i+1] == '*' {
|
||||
// block comment — strip through `*/`
|
||||
i += 2
|
||||
for i+1 < len(s) && !(s[i] == '*' && s[i+1] == '/') {
|
||||
i++
|
||||
}
|
||||
i += 2
|
||||
continue
|
||||
}
|
||||
b.WriteByte(s[i])
|
||||
i++
|
||||
}
|
||||
return strings.Split(b.String(), ";")
|
||||
}
|
||||
|
||||
func firstLine(s string) string {
|
||||
if i := strings.IndexByte(s, '\n'); i >= 0 {
|
||||
return s[:i]
|
||||
}
|
||||
return s
|
||||
}
|
||||
141
internal/db/migrations/001_init.sql
Normal file
141
internal/db/migrations/001_init.sql
Normal file
@@ -0,0 +1,141 @@
|
||||
-- 001_init.sql — Dialectic v2 schema (greenfield, replaces Python v1).
|
||||
-- See /home/hzhang/arch/DIALECTIC-V2-DESIGN.md for the design.
|
||||
|
||||
-- Verdict schemas — declared at topic-creation time, judge produces output matching.
|
||||
CREATE TABLE verdict_schemas (
|
||||
id VARCHAR(64) NOT NULL PRIMARY KEY,
|
||||
description TEXT NOT NULL,
|
||||
shape_json JSON NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Seed v1 schemas.
|
||||
INSERT INTO verdict_schemas (id, description, shape_json) VALUES
|
||||
('binary', 'pro|con|draw with confidence + key reasoning', JSON_OBJECT('decision', 'pro|con|draw', 'confidence', 'number 0..1', 'key_reasoning', 'string')),
|
||||
('claim-resolution', 'analyze-intel contested-cluster resolution', JSON_OBJECT('verdict', 'resolved-toward-A|resolved-toward-B|irreducibly-contested', 'winning_claim', 'string', 'dissenting_points', 'array of string', 'confidence', 'number 0..1')),
|
||||
('policy-recommendation', 'recommended action with alternatives and risks', JSON_OBJECT('recommended_action', 'string', 'alternatives', 'array of string', 'conditions_for_alternatives', 'array of string', 'risks_noted', 'array of string')),
|
||||
('free-form', 'unstructured summary escape hatch', JSON_OBJECT('summary', 'string'));
|
||||
|
||||
-- Topics (议题) — the unit of debate.
|
||||
CREATE TABLE topics (
|
||||
id CHAR(36) NOT NULL PRIMARY KEY,
|
||||
title VARCHAR(255) NOT NULL,
|
||||
summary TEXT NOT NULL,
|
||||
visibility ENUM('public','private') NOT NULL DEFAULT 'private',
|
||||
verdict_schema_id VARCHAR(64) NOT NULL,
|
||||
status ENUM('created','signup_open','signup_closed','debating','completed','cancelled') NOT NULL DEFAULT 'created',
|
||||
-- Lifecycle timestamps (per section 3 of design doc)
|
||||
signup_open_at TIMESTAMP NOT NULL,
|
||||
signup_close_at TIMESTAMP NOT NULL,
|
||||
debate_start_at TIMESTAMP NOT NULL,
|
||||
debate_end_at TIMESTAMP NOT NULL,
|
||||
-- Audit
|
||||
creator_user_id CHAR(36) NOT NULL,
|
||||
visibility_changed_by CHAR(36) NULL,
|
||||
visibility_changed_at TIMESTAMP NULL,
|
||||
cancelled_reason VARCHAR(255) NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
INDEX idx_topics_status (status, signup_open_at),
|
||||
INDEX idx_topics_visibility (visibility, created_at),
|
||||
CONSTRAINT fk_topics_schema FOREIGN KEY (verdict_schema_id) REFERENCES verdict_schemas(id)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Signups: an agent volunteers for one or more camps on a topic.
|
||||
-- willing_camps is a JSON array of camp names (subset of {pro, con, judge}).
|
||||
-- (agent_id, topic_id) is unique — re-signup updates willing_camps.
|
||||
CREATE TABLE signups (
|
||||
id CHAR(36) NOT NULL PRIMARY KEY,
|
||||
topic_id CHAR(36) NOT NULL,
|
||||
agent_id VARCHAR(64) NOT NULL,
|
||||
willing_camps JSON NOT NULL,
|
||||
-- Pre-validation result captured at signup time (plugin verifies the
|
||||
-- agent has an on_call slot covering the debate window; backend
|
||||
-- records what the agent told it for audit).
|
||||
pre_validated BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE KEY uq_signups (topic_id, agent_id),
|
||||
CONSTRAINT fk_signups_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Camps: the post-allocation assignment. One row per (topic, camp) with
|
||||
-- the locked-in agent. Written by camp-allocation algorithm at
|
||||
-- signup_close_at — immutable afterwards (no drop-out / replacement in v1).
|
||||
CREATE TABLE camps (
|
||||
id CHAR(36) NOT NULL PRIMARY KEY,
|
||||
topic_id CHAR(36) NOT NULL,
|
||||
camp ENUM('pro','con','judge') NOT NULL,
|
||||
agent_id VARCHAR(64) NOT NULL,
|
||||
allocated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE KEY uq_camps (topic_id, camp),
|
||||
INDEX idx_camps_agent (agent_id),
|
||||
CONSTRAINT fk_camps_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Rounds: chronological partition of arguments. Each topic has N rounds
|
||||
-- (typically 3-5), round 0 is the opening. Round transitions are driven
|
||||
-- by the orchestrator on a schedule (or all-participants-posted).
|
||||
CREATE TABLE rounds (
|
||||
id CHAR(36) NOT NULL PRIMARY KEY,
|
||||
topic_id CHAR(36) NOT NULL,
|
||||
round_no INT NOT NULL,
|
||||
opened_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
closed_at TIMESTAMP NULL,
|
||||
UNIQUE KEY uq_rounds (topic_id, round_no),
|
||||
CONSTRAINT fk_rounds_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Arguments: an individual contribution within a round by a camp's agent.
|
||||
-- For pro/con these are claims/rebuttals — for judge these are clarifying
|
||||
-- questions (judge is silent observer in v1 except for clarifications).
|
||||
CREATE TABLE arguments (
|
||||
id CHAR(36) NOT NULL PRIMARY KEY,
|
||||
topic_id CHAR(36) NOT NULL,
|
||||
round_id CHAR(36) NOT NULL,
|
||||
camp ENUM('pro','con','judge') NOT NULL,
|
||||
agent_id VARCHAR(64) NOT NULL,
|
||||
content MEDIUMTEXT NOT NULL,
|
||||
posted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
INDEX idx_arguments_round (round_id, posted_at),
|
||||
INDEX idx_arguments_topic (topic_id, posted_at),
|
||||
CONSTRAINT fk_arguments_round FOREIGN KEY (round_id) REFERENCES rounds(id) ON DELETE CASCADE,
|
||||
CONSTRAINT fk_arguments_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Verdicts: judge's structured output, one per topic (one verdict per
|
||||
-- debate). verdict_json shape matches the topic's verdict_schema_id.
|
||||
CREATE TABLE verdicts (
|
||||
id CHAR(36) NOT NULL PRIMARY KEY,
|
||||
topic_id CHAR(36) NOT NULL UNIQUE,
|
||||
judge_agent_id VARCHAR(64) NOT NULL,
|
||||
verdict_json JSON NOT NULL,
|
||||
rationale TEXT NOT NULL,
|
||||
-- Token cost trail for accounting (Phase 1: not enforced; Phase N: budget gate)
|
||||
tokens_input INT NOT NULL DEFAULT 0,
|
||||
tokens_output INT NOT NULL DEFAULT 0,
|
||||
produced_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT fk_verdicts_topic FOREIGN KEY (topic_id) REFERENCES topics(id) ON DELETE CASCADE
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- Agent API keys: provisioned per agent at recruitment time. Stored as
|
||||
-- sha256(pepper || raw) — pepper rotation invalidates all keys.
|
||||
CREATE TABLE agent_keys (
|
||||
agent_id VARCHAR(64) NOT NULL PRIMARY KEY,
|
||||
key_hash CHAR(64) NOT NULL UNIQUE,
|
||||
issued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
last_used_at TIMESTAMP NULL,
|
||||
revoked_at TIMESTAMP NULL,
|
||||
INDEX idx_agent_keys_hash (key_hash)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- System keys: out-of-band credentials for non-agent callers (e.g. the
|
||||
-- analyze-intel workflow running via a system identity that creates
|
||||
-- topics on behalf of the analyzing agent). Also stored as hash.
|
||||
CREATE TABLE system_keys (
|
||||
name VARCHAR(64) NOT NULL PRIMARY KEY,
|
||||
key_hash CHAR(64) NOT NULL UNIQUE,
|
||||
description TEXT NULL,
|
||||
issued_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
revoked_at TIMESTAMP NULL,
|
||||
INDEX idx_system_keys_hash (key_hash)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
10
internal/db/migrations/002_topic_announce_target.sql
Normal file
10
internal/db/migrations/002_topic_announce_target.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- 002_topic_announce_target.sql — move announce-channel routing from
|
||||
-- backend env to per-topic config. Topic creator picks which Fabric
|
||||
-- guild + announce channel to broadcast lifecycle events to (a single
|
||||
-- backend deployment can serve topics broadcasting to multiple guilds/
|
||||
-- channels). Both fields are nullable; null means "do not broadcast
|
||||
-- for this topic" (intentional opt-out).
|
||||
|
||||
ALTER TABLE topics
|
||||
ADD COLUMN announce_guild_base_url VARCHAR(255) NULL AFTER cancelled_reason,
|
||||
ADD COLUMN announce_channel_id VARCHAR(64) NULL AFTER announce_guild_base_url;
|
||||
10
internal/db/migrations/003_drop_topic_announce_target.sql
Normal file
10
internal/db/migrations/003_drop_topic_announce_target.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- Drop per-topic announce target columns. The backend no longer
|
||||
-- broadcasts lifecycle events to Fabric; the proposing agent posts a
|
||||
-- single recruitment fabric-send-message after topic creation, and
|
||||
-- downstream agents book HF on_call slots covering the debate window
|
||||
-- via `hf calendar schedule` so HF wakes them naturally.
|
||||
--
|
||||
-- Counterpart of 002_topic_announce_target.sql (now obsolete).
|
||||
ALTER TABLE topics
|
||||
DROP COLUMN announce_guild_base_url,
|
||||
DROP COLUMN announce_channel_id;
|
||||
247
internal/httpapi/handlers/admin.go
Normal file
247
internal/httpapi/handlers/admin.go
Normal file
@@ -0,0 +1,247 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/subtle"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
)
|
||||
|
||||
type AdminHandler struct {
|
||||
db *sqlx.DB
|
||||
pepper string
|
||||
adminAPIKey string
|
||||
}
|
||||
|
||||
func NewAdminHandler(db *sqlx.DB, pepper string, adminAPIKey string) *AdminHandler {
|
||||
return &AdminHandler{db: db, pepper: pepper, adminAPIKey: adminAPIKey}
|
||||
}
|
||||
|
||||
type provisionAgentKeyBody struct {
|
||||
AgentID string `json:"agent_id"`
|
||||
Force bool `json:"force"`
|
||||
}
|
||||
|
||||
// POST /api/admin/agent-keys
|
||||
//
|
||||
// System-auth (header `x-dialectic-admin-key` matching env
|
||||
// `DIALECTIC_ADMIN_API_KEY`). Generates a fresh 32-byte hex random
|
||||
// raw key, stores its peppered-sha256 hash in `agent_keys`, and
|
||||
// returns the raw key in the response. This is the ONLY time the raw
|
||||
// key is exposed — caller must capture it and place in the target
|
||||
// agent's secret-mgr.
|
||||
//
|
||||
// On duplicate (agent_id already has a key): 409 unless `force: true`,
|
||||
// in which case the old hash is replaced.
|
||||
//
|
||||
// Caller pattern: `dialectic-ctrl create-key` (run via proxy-pcexec
|
||||
// with proxy-for set to the agent being onboarded — see
|
||||
// `skills/dialectic-hangman-lab/scripts/dialectic-ctrl`).
|
||||
func (h *AdminHandler) ProvisionAgentKey(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.checkAdminAuth(r) {
|
||||
http.Error(w, "admin auth required (x-dialectic-admin-key)", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if h.pepper == "" {
|
||||
http.Error(w, "server misconfigured: AGENT_API_KEY_PEPPER unset", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var body provisionAgentKeyBody
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if body.AgentID == "" {
|
||||
http.Error(w, "agent_id required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
rawKey, err := randomHexKey(32)
|
||||
if err != nil {
|
||||
http.Error(w, "rng failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
hash := auth.HashKey(h.pepper, rawKey)
|
||||
|
||||
_, err = h.db.ExecContext(r.Context(),
|
||||
`INSERT INTO agent_keys (agent_id, key_hash) VALUES (?, ?)`,
|
||||
body.AgentID, hash)
|
||||
if err != nil {
|
||||
var mysqlErr *mysql.MySQLError
|
||||
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
|
||||
// duplicate primary key
|
||||
if !body.Force {
|
||||
http.Error(w, fmt.Sprintf(
|
||||
"agent %q already has a dialectic api key; pass force:true to rotate",
|
||||
body.AgentID), http.StatusConflict)
|
||||
return
|
||||
}
|
||||
// rotate: replace the hash
|
||||
if _, err := h.db.ExecContext(r.Context(),
|
||||
`UPDATE agent_keys SET key_hash = ?, last_used_at = NULL, revoked_at = NULL WHERE agent_id = ?`,
|
||||
hash, body.AgentID); err != nil {
|
||||
http.Error(w, "rotate failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
http.Error(w, "insert failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, map[string]any{
|
||||
"agent_id": body.AgentID,
|
||||
"api_key": rawKey, // raw — caller must store in agent's secret-mgr; not returned again
|
||||
"rotated": body.Force,
|
||||
})
|
||||
}
|
||||
|
||||
func (h *AdminHandler) checkAdminAuth(r *http.Request) bool {
|
||||
if h.adminAPIKey == "" {
|
||||
return false // unset env = no admin caller is valid
|
||||
}
|
||||
got := r.Header.Get("x-dialectic-admin-key")
|
||||
if got == "" {
|
||||
return false
|
||||
}
|
||||
return subtle.ConstantTimeCompare([]byte(got), []byte(h.adminAPIKey)) == 1
|
||||
}
|
||||
|
||||
func randomHexKey(byteLen int) (string, error) {
|
||||
b := make([]byte, byteLen)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hex.EncodeToString(b), nil
|
||||
}
|
||||
|
||||
// GET /api/admin/agents/{id}
|
||||
//
|
||||
// Same x-dialectic-admin-key gate as ProvisionAgentKey. Returns a
|
||||
// rolled-up activity summary for one agent: whether a dialectic key is
|
||||
// provisioned (and when last used), per-action counts (signups /
|
||||
// arguments / verdicts), and the 20 most-recent topics the agent
|
||||
// touched in any role. Used by the frontend AgentActivity page for
|
||||
// operator diagnostics ("did sherlock get a key?", "how much is mirror
|
||||
// participating?").
|
||||
//
|
||||
// Joins are wide but capped at 20 recent topics; total query cost is
|
||||
// bounded by index scans on (agent_id, posted_at/created_at). At
|
||||
// current row counts (low thousands) returns in <50ms.
|
||||
type recentTopic struct {
|
||||
TopicID string `db:"topic_id" json:"topic_id"`
|
||||
Title string `db:"title" json:"title"`
|
||||
Status string `db:"status" json:"status"`
|
||||
Role string `db:"role" json:"role"`
|
||||
LastActionAt time.Time `db:"last_action_at" json:"last_action_at"`
|
||||
}
|
||||
|
||||
func (h *AdminHandler) GetAgentSummary(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.checkAdminAuth(r) {
|
||||
http.Error(w, "admin auth required (x-dialectic-admin-key)", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
agentID := chi.URLParam(r, "id")
|
||||
if agentID == "" {
|
||||
http.Error(w, "agent_id required in path", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
// agent_keys row (may be missing — agent never provisioned)
|
||||
var lastUsedAt sql.NullTime
|
||||
var keyProvisioned bool
|
||||
if err := h.db.QueryRowxContext(ctx,
|
||||
`SELECT last_used_at FROM agent_keys WHERE agent_id = ?`, agentID,
|
||||
).Scan(&lastUsedAt); err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
http.Error(w, "agent_keys lookup: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
keyProvisioned = false
|
||||
} else {
|
||||
keyProvisioned = true
|
||||
}
|
||||
|
||||
// Counts (3 small queries; could be one UNION but readability wins
|
||||
// at this scale).
|
||||
count := func(q string) (int, error) {
|
||||
var n int
|
||||
err := h.db.GetContext(ctx, &n, q, agentID)
|
||||
return n, err
|
||||
}
|
||||
signups, err := count(`SELECT COUNT(*) FROM signups WHERE agent_id = ?`)
|
||||
if err != nil {
|
||||
http.Error(w, "signups count: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
args, err := count(`SELECT COUNT(*) FROM arguments WHERE agent_id = ?`)
|
||||
if err != nil {
|
||||
http.Error(w, "arguments count: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
verdicts, err := count(`SELECT COUNT(*) FROM verdicts WHERE judge_agent_id = ?`)
|
||||
if err != nil {
|
||||
http.Error(w, "verdicts count: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Recent topics: union 3 sources, dedup by topic_id keeping the
|
||||
// latest action_at + most-specific role. Allocator camps win over
|
||||
// raw signups (an allocated agent's role is meaningful; a signup
|
||||
// without allocation is just "volunteer"). Verdicts row is judge.
|
||||
var recent []recentTopic
|
||||
if err := h.db.SelectContext(ctx, &recent,
|
||||
`SELECT topic_id, title, status, role, MAX(last_action_at) AS last_action_at FROM (
|
||||
SELECT s.topic_id, t.title, t.status, 'volunteer' AS role, s.created_at AS last_action_at
|
||||
FROM signups s JOIN topics t ON t.id = s.topic_id
|
||||
WHERE s.agent_id = ?
|
||||
UNION ALL
|
||||
SELECT c.topic_id, t.title, t.status, c.camp AS role, c.allocated_at AS last_action_at
|
||||
FROM camps c JOIN topics t ON t.id = c.topic_id
|
||||
WHERE c.agent_id = ?
|
||||
UNION ALL
|
||||
SELECT a.topic_id, t.title, t.status, a.camp AS role, a.posted_at AS last_action_at
|
||||
FROM arguments a JOIN topics t ON t.id = a.topic_id
|
||||
WHERE a.agent_id = ?
|
||||
UNION ALL
|
||||
SELECT v.topic_id, t.title, t.status, 'judge' AS role, v.produced_at AS last_action_at
|
||||
FROM verdicts v JOIN topics t ON t.id = v.topic_id
|
||||
WHERE v.judge_agent_id = ?
|
||||
) AS u
|
||||
GROUP BY topic_id, title, status, role
|
||||
ORDER BY last_action_at DESC
|
||||
LIMIT 20`,
|
||||
agentID, agentID, agentID, agentID,
|
||||
); err != nil {
|
||||
http.Error(w, "recent topics: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
resp := map[string]any{
|
||||
"agent_id": agentID,
|
||||
"key_provisioned": keyProvisioned,
|
||||
"signups_count": signups,
|
||||
"arguments_count": args,
|
||||
"verdicts_count": verdicts,
|
||||
"recent_topics": recent,
|
||||
}
|
||||
if lastUsedAt.Valid {
|
||||
resp["last_used_at"] = lastUsedAt.Time
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
133
internal/httpapi/handlers/arguments.go
Normal file
133
internal/httpapi/handlers/arguments.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
type ArgumentsHandler struct {
|
||||
topics *store.TopicStore
|
||||
camps *store.CampStore
|
||||
rounds *store.RoundStore
|
||||
arguments *store.ArgumentStore
|
||||
}
|
||||
|
||||
func NewArgumentsHandler(
|
||||
t *store.TopicStore,
|
||||
c *store.CampStore,
|
||||
r *store.RoundStore,
|
||||
a *store.ArgumentStore,
|
||||
) *ArgumentsHandler {
|
||||
return &ArgumentsHandler{topics: t, camps: c, rounds: r, arguments: a}
|
||||
}
|
||||
|
||||
type postArgumentBody struct {
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
// POST /api/topics/{id}/arguments
|
||||
//
|
||||
// Agent-only. Caller must be allocated to one of the topic's camps;
|
||||
// rejected otherwise. Topic must be `debating` (status state machine
|
||||
// enforces; argument outside that window is meaningless). Content is
|
||||
// stored as-is (no markdown rendering server-side; frontend renders).
|
||||
//
|
||||
// Round: argument is attached to the LATEST open round. Round-advance
|
||||
// policy is the orchestrator's call (Phase 2D ships with manual/single
|
||||
// round 0; round bumping logic comes when the rule is decided).
|
||||
func (h *ArgumentsHandler) Post(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind != auth.CallerAgent {
|
||||
http.Error(w, "argument posting is agent-only", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
topicID := chi.URLParam(r, "id")
|
||||
|
||||
topic, err := h.topics.GetByID(r.Context(), topicID)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if topic.Status != models.TopicStatusDebating {
|
||||
http.Error(w, "topic not in debate window (status="+string(topic.Status)+")", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
|
||||
camp, err := h.camps.AgentCampInTopic(r.Context(), topicID, caller.ID)
|
||||
if err != nil {
|
||||
http.Error(w, "you are not allocated to any camp on this topic", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
round, err := h.rounds.Latest(r.Context(), topicID)
|
||||
if err != nil {
|
||||
http.Error(w, "no open round (orchestrator hasn't opened round 0 yet?)", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
|
||||
var body postArgumentBody
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if body.Content == "" {
|
||||
http.Error(w, "content required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
const maxContent = 32_000 // arbitrary upper bound; arguments shouldn't be book-length
|
||||
if len(body.Content) > maxContent {
|
||||
http.Error(w, "content too long", http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
}
|
||||
|
||||
arg, err := h.arguments.Post(r.Context(), store.PostArgumentInput{
|
||||
TopicID: topicID,
|
||||
RoundID: round.ID,
|
||||
Camp: camp,
|
||||
AgentID: caller.ID,
|
||||
Content: body.Content,
|
||||
})
|
||||
if err != nil {
|
||||
http.Error(w, "post failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, arg)
|
||||
}
|
||||
|
||||
// GET /api/topics/{id}/arguments — full transcript in posted order.
|
||||
// Visibility: anonymous can read for public topics; private requires
|
||||
// any-auth (enforced upstream by middleware composition).
|
||||
func (h *ArgumentsHandler) List(w http.ResponseWriter, r *http.Request) {
|
||||
topicID := chi.URLParam(r, "id")
|
||||
topic, err := h.topics.GetByID(r.Context(), topicID)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind == "" && topic.Visibility != models.VisibilityPublic {
|
||||
http.Error(w, "not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
rows, err := h.arguments.ListByTopic(r.Context(), topicID)
|
||||
if err != nil {
|
||||
http.Error(w, "list failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"arguments": rows, "count": len(rows)})
|
||||
}
|
||||
38
internal/httpapi/handlers/health.go
Normal file
38
internal/httpapi/handlers/health.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type HealthHandler struct {
|
||||
db *sqlx.DB
|
||||
version string
|
||||
startedAt time.Time
|
||||
}
|
||||
|
||||
func NewHealthHandler(db *sqlx.DB, version string) *HealthHandler {
|
||||
return &HealthHandler{db: db, version: version, startedAt: time.Now()}
|
||||
}
|
||||
|
||||
func (h *HealthHandler) Healthz(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
|
||||
defer cancel()
|
||||
dbOK := h.db.PingContext(ctx) == nil
|
||||
status := http.StatusOK
|
||||
if !dbOK {
|
||||
status = http.StatusServiceUnavailable
|
||||
}
|
||||
w.Header().Set("content-type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"ok": dbOK,
|
||||
"version": h.version,
|
||||
"uptime_s": int(time.Since(h.startedAt).Seconds()),
|
||||
"checked_at": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
131
internal/httpapi/handlers/signups.go
Normal file
131
internal/httpapi/handlers/signups.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
type SignupsHandler struct {
|
||||
topics *store.TopicStore
|
||||
signups *store.SignupStore
|
||||
}
|
||||
|
||||
func NewSignupsHandler(t *store.TopicStore, s *store.SignupStore) *SignupsHandler {
|
||||
return &SignupsHandler{topics: t, signups: s}
|
||||
}
|
||||
|
||||
type signupBody struct {
|
||||
WillingCamps []models.Camp `json:"willing_camps"`
|
||||
PreValidated bool `json:"pre_validated"`
|
||||
}
|
||||
|
||||
// POST /api/topics/{id}/signups
|
||||
//
|
||||
// Agent self-enrollment. Only CallerAgent is allowed — browsers can't
|
||||
// sign up on behalf of an agent (would defeat the on_call pre-check
|
||||
// that the plugin does before calling this endpoint).
|
||||
//
|
||||
// Body: { willing_camps: [pro|con|judge ...], pre_validated: bool }
|
||||
//
|
||||
// pre_validated is the agent's plugin's claim that it verified the
|
||||
// agent has an on_call HF slot covering [debate_start_at, debate_end_at].
|
||||
// Backend trusts but logs — Phase N may add server-side verification.
|
||||
//
|
||||
// Topic must be in status `signup_open`. Outside that window → 409.
|
||||
func (h *SignupsHandler) Create(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind != auth.CallerAgent {
|
||||
http.Error(w, "signup is agent-only", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
topicID := chi.URLParam(r, "id")
|
||||
t, err := h.topics.GetByID(r.Context(), topicID)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if t.Status != models.TopicStatusSignupOpen {
|
||||
http.Error(w, "signup window not open (status="+string(t.Status)+")", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
if now.Before(t.SignupOpenAt) {
|
||||
http.Error(w, "signup not yet open", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
if now.After(t.SignupCloseAt) {
|
||||
http.Error(w, "signup closed", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
|
||||
var body signupBody
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if len(body.WillingCamps) == 0 {
|
||||
http.Error(w, "willing_camps required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// Dedup the camps so a buggy plugin can't insert duplicates.
|
||||
camps := dedupCamps(body.WillingCamps)
|
||||
view, err := h.signups.Upsert(r.Context(), store.UpsertSignupInput{
|
||||
TopicID: topicID,
|
||||
AgentID: caller.ID,
|
||||
WillingCamps: camps,
|
||||
PreValidated: body.PreValidated,
|
||||
})
|
||||
if err != nil {
|
||||
http.Error(w, "upsert failed: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, view)
|
||||
}
|
||||
|
||||
// GET /api/topics/{id}/signups — list all signups for a topic.
|
||||
//
|
||||
// Visible to topic creator + admins + agents. Public anonymous see
|
||||
// nothing (avoid leaking who-signed-up for private topics).
|
||||
func (h *SignupsHandler) List(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind == "" {
|
||||
http.Error(w, "auth required", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
topicID := chi.URLParam(r, "id")
|
||||
if _, err := h.topics.GetByID(r.Context(), topicID); err != nil {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
rows, err := h.signups.ListByTopic(r.Context(), topicID)
|
||||
if err != nil {
|
||||
http.Error(w, "list failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"signups": rows, "count": len(rows)})
|
||||
}
|
||||
|
||||
func dedupCamps(in []models.Camp) []models.Camp {
|
||||
seen := map[models.Camp]struct{}{}
|
||||
out := make([]models.Camp, 0, len(in))
|
||||
for _, c := range in {
|
||||
if _, ok := seen[c]; ok {
|
||||
continue
|
||||
}
|
||||
seen[c] = struct{}{}
|
||||
out = append(out, c)
|
||||
}
|
||||
return out
|
||||
}
|
||||
247
internal/httpapi/handlers/topics.go
Normal file
247
internal/httpapi/handlers/topics.go
Normal file
@@ -0,0 +1,247 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
type TopicsHandler struct {
|
||||
store *store.TopicStore
|
||||
camps *store.CampStore
|
||||
}
|
||||
|
||||
func NewTopicsHandler(s *store.TopicStore, c *store.CampStore) *TopicsHandler {
|
||||
return &TopicsHandler{store: s, camps: c}
|
||||
}
|
||||
|
||||
// GET /api/topics?status=...&visibility=...&limit=...&offset=...
|
||||
//
|
||||
// Visibility filter is enforced at the auth layer: anonymous callers
|
||||
// only see visibility=public; authenticated users (CallerUser) see all
|
||||
// they're entitled to (Phase 2 v1: all; Phase 4 may add per-user ACLs).
|
||||
// Agent callers (CallerAgent) see all — they're acting as system on
|
||||
// behalf of the platform.
|
||||
func (h *TopicsHandler) List(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
f := store.ListFilter{
|
||||
Status: r.URL.Query().Get("status"),
|
||||
Visibility: r.URL.Query().Get("visibility"),
|
||||
}
|
||||
if v, _ := strconv.Atoi(r.URL.Query().Get("limit")); v > 0 {
|
||||
f.Limit = v
|
||||
}
|
||||
if v, _ := strconv.Atoi(r.URL.Query().Get("offset")); v > 0 {
|
||||
f.Offset = v
|
||||
}
|
||||
|
||||
// Anonymous: force visibility=public.
|
||||
if caller.Kind == "" {
|
||||
f.Visibility = string(models.VisibilityPublic)
|
||||
}
|
||||
|
||||
rows, err := h.store.List(r.Context(), f)
|
||||
if err != nil {
|
||||
http.Error(w, "list failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"topics": rows, "count": len(rows)})
|
||||
}
|
||||
|
||||
// GET /api/topics/{id}
|
||||
func (h *TopicsHandler) Get(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
if id == "" {
|
||||
http.Error(w, "missing id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
t, err := h.store.GetByID(r.Context(), id)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "get failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
// Visibility gate: anonymous can only see public; authenticated see all.
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind == "" && t.Visibility != models.VisibilityPublic {
|
||||
http.Error(w, "not found", http.StatusNotFound) // 404 not 403 — hide existence
|
||||
return
|
||||
}
|
||||
// Enrich with camps so an agent can locate their own allocation in one
|
||||
// round-trip. Camps are 0 rows pre-signup_close, 3 rows after — small
|
||||
// enough that inlining costs nothing. Arguments are deliberately NOT
|
||||
// inlined (potentially large; agents who need the transcript should
|
||||
// hit GET /api/topics/{id}/arguments via dialectic_list_arguments).
|
||||
//
|
||||
// Backward-compatible: existing callers reading the original Topic
|
||||
// fields keep working; new callers can read `camps` alongside.
|
||||
camps, cErr := h.camps.ListByTopic(r.Context(), id)
|
||||
if cErr != nil {
|
||||
camps = nil // best-effort; metadata still useful
|
||||
}
|
||||
// Marshal Topic into a map and add `camps` as a sibling field rather
|
||||
// than wrapping under "topic" — that would break every existing
|
||||
// consumer that reads e.g. response.title / response.status directly.
|
||||
buf, _ := json.Marshal(t)
|
||||
out := map[string]any{}
|
||||
_ = json.Unmarshal(buf, &out)
|
||||
out["camps"] = camps
|
||||
writeJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
type createTopicBody struct {
|
||||
Title string `json:"title"`
|
||||
Summary string `json:"summary"`
|
||||
Visibility string `json:"visibility"` // default "private"
|
||||
VerdictSchemaID string `json:"verdict_schema_id"` // default "free-form"
|
||||
SignupOpenAt string `json:"signup_open_at"` // RFC3339
|
||||
SignupCloseAt string `json:"signup_close_at"`
|
||||
DebateStartAt string `json:"debate_start_at"`
|
||||
DebateEndAt string `json:"debate_end_at"`
|
||||
}
|
||||
|
||||
// POST /api/topics
|
||||
//
|
||||
// Allowed callers: agent or authenticated user. Anonymous rejected
|
||||
// (the route wires the auth-required middleware).
|
||||
func (h *TopicsHandler) Create(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind == "" {
|
||||
http.Error(w, "auth required", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
var body createTopicBody
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad body: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if body.Title == "" || body.Summary == "" {
|
||||
http.Error(w, "title and summary required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if body.Visibility == "" {
|
||||
body.Visibility = string(models.VisibilityPrivate)
|
||||
}
|
||||
if body.VerdictSchemaID == "" {
|
||||
body.VerdictSchemaID = "free-form"
|
||||
}
|
||||
parsed, err := validateLifecycleTimes(body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
created, err := h.store.Create(r.Context(), store.CreateTopicInput{
|
||||
Title: body.Title,
|
||||
Summary: body.Summary,
|
||||
Visibility: models.Visibility(body.Visibility),
|
||||
VerdictSchemaID: body.VerdictSchemaID,
|
||||
SignupOpenAt: parsed[0],
|
||||
SignupCloseAt: parsed[1],
|
||||
DebateStartAt: parsed[2],
|
||||
DebateEndAt: parsed[3],
|
||||
CreatorUserID: caller.ID,
|
||||
})
|
||||
if err != nil {
|
||||
http.Error(w, "create failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusCreated, created)
|
||||
}
|
||||
|
||||
// validateLifecycleTimes enforces:
|
||||
//
|
||||
// signup_open < signup_close <= debate_start < debate_end
|
||||
//
|
||||
// All four timestamps must be parsable as RFC3339; failure → 400.
|
||||
// Returns the parsed times in order [signup_open, signup_close, debate_start, debate_end]
|
||||
// so the caller can pass typed values to the store (MySQL TIMESTAMP doesn't
|
||||
// accept ISO8601 strings directly; the driver handles time.Time properly).
|
||||
func validateLifecycleTimes(b createTopicBody) ([4]time.Time, error) {
|
||||
type p struct {
|
||||
name string
|
||||
raw string
|
||||
}
|
||||
parts := []p{
|
||||
{"signup_open_at", b.SignupOpenAt},
|
||||
{"signup_close_at", b.SignupCloseAt},
|
||||
{"debate_start_at", b.DebateStartAt},
|
||||
{"debate_end_at", b.DebateEndAt},
|
||||
}
|
||||
var parsed [4]time.Time
|
||||
for i, x := range parts {
|
||||
t, err := time.Parse(time.RFC3339, x.raw)
|
||||
if err != nil {
|
||||
return parsed, errors.New(x.name + ": must be RFC3339")
|
||||
}
|
||||
parsed[i] = t.UTC()
|
||||
}
|
||||
if !parsed[0].Before(parsed[1]) {
|
||||
return parsed, errors.New("signup_open_at must be before signup_close_at")
|
||||
}
|
||||
if parsed[1].After(parsed[2]) {
|
||||
return parsed, errors.New("signup_close_at must be <= debate_start_at")
|
||||
}
|
||||
if !parsed[2].Before(parsed[3]) {
|
||||
return parsed, errors.New("debate_start_at must be before debate_end_at")
|
||||
}
|
||||
return parsed, nil
|
||||
}
|
||||
|
||||
// PUT /api/topics/{id}/visibility — admin-only flip (Phase 2 stub: any
|
||||
// authenticated user; Phase 4 will check the dialectic-admin role from JWT).
|
||||
func (h *TopicsHandler) SetVisibility(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind == "" {
|
||||
http.Error(w, "auth required", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if caller.Kind == auth.CallerUser && !hasRole(caller, "dialectic-admin") {
|
||||
http.Error(w, "dialectic-admin role required", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
id := chi.URLParam(r, "id")
|
||||
var body struct {
|
||||
Visibility string `json:"visibility"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
v := models.Visibility(body.Visibility)
|
||||
if v != models.VisibilityPublic && v != models.VisibilityPrivate {
|
||||
http.Error(w, "visibility must be public|private", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
t, err := h.store.SetVisibility(r.Context(), id, v, caller.ID)
|
||||
if err != nil {
|
||||
http.Error(w, "update failed: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, t)
|
||||
}
|
||||
|
||||
func hasRole(c auth.Caller, role string) bool {
|
||||
for _, r := range c.Roles {
|
||||
if r == role {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, status int, v any) {
|
||||
w.Header().Set("content-type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
_ = json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
164
internal/httpapi/handlers/verdict.go
Normal file
164
internal/httpapi/handlers/verdict.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
type VerdictHandler struct {
|
||||
topics *store.TopicStore
|
||||
camps *store.CampStore
|
||||
verdicts *store.VerdictStore
|
||||
}
|
||||
|
||||
func NewVerdictHandler(t *store.TopicStore, c *store.CampStore, v *store.VerdictStore) *VerdictHandler {
|
||||
return &VerdictHandler{topics: t, camps: c, verdicts: v}
|
||||
}
|
||||
|
||||
type submitVerdictBody struct {
|
||||
Verdict json.RawMessage `json:"verdict"` // shape matches topic.verdict_schema_id
|
||||
Rationale string `json:"rationale"`
|
||||
TokensInput int `json:"tokens_input"`
|
||||
TokensOutput int `json:"tokens_output"`
|
||||
}
|
||||
|
||||
// POST /api/topics/{id}/verdict
|
||||
//
|
||||
// Judge-only. Caller must be allocated to the judge camp. Topic must be
|
||||
// in `debating` status AND past `debate_end_at` (the ticker doesn't
|
||||
// flip to `judging` in v1, see ticker.go note — the gate enforces the
|
||||
// time crossing instead).
|
||||
//
|
||||
// Schema validation (Phase 2D): shallow — confirm verdict is valid JSON
|
||||
// and not empty. Real schema-shape validation lands when we wire the
|
||||
// verdict_schemas.shape_json against a JSON-schema validator.
|
||||
func (h *VerdictHandler) Submit(w http.ResponseWriter, r *http.Request) {
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind != auth.CallerAgent {
|
||||
http.Error(w, "verdict submission is agent-only", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
topicID := chi.URLParam(r, "id")
|
||||
topic, err := h.topics.GetByID(r.Context(), topicID)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if topic.Status != models.TopicStatusDebating {
|
||||
http.Error(w, "topic not in debate state (status="+string(topic.Status)+")", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
if time.Now().Before(topic.DebateEndAt) {
|
||||
http.Error(w, "debate window still open; verdict premature", http.StatusConflict)
|
||||
return
|
||||
}
|
||||
camp, err := h.camps.AgentCampInTopic(r.Context(), topicID, caller.ID)
|
||||
if err != nil || camp != models.CampJudge {
|
||||
http.Error(w, "only the judge can submit a verdict", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
var body submitVerdictBody
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
http.Error(w, "bad body", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if len(body.Verdict) == 0 || string(body.Verdict) == "null" {
|
||||
http.Error(w, "verdict required (non-empty JSON object matching schema)", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// Sanity: ensure it parses as a JSON object/value.
|
||||
var probe any
|
||||
if err := json.Unmarshal(body.Verdict, &probe); err != nil {
|
||||
http.Error(w, "verdict must be valid JSON", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if body.Rationale == "" {
|
||||
http.Error(w, "rationale required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
verdict, err := h.verdicts.Submit(r.Context(), store.SubmitVerdictInput{
|
||||
TopicID: topicID,
|
||||
JudgeAgentID: caller.ID,
|
||||
VerdictJSON: body.Verdict,
|
||||
Rationale: body.Rationale,
|
||||
TokensInput: body.TokensInput,
|
||||
TokensOutput: body.TokensOutput,
|
||||
})
|
||||
if err != nil {
|
||||
// Most likely cause: unique-key conflict (already submitted).
|
||||
http.Error(w, "submit failed: "+err.Error(), http.StatusConflict)
|
||||
return
|
||||
}
|
||||
|
||||
// Transition topic to completed. Best-effort; if it fails, the
|
||||
// verdict row exists and the ticker will retry on next scan
|
||||
// (well — once we add that transition; v1 leaves it to a manual
|
||||
// flip via SQL or a follow-up endpoint).
|
||||
if _, err := h.topics.SetStatus(r.Context(), topicID, models.TopicStatusCompleted); err != nil {
|
||||
// non-fatal: log via response header (caller can spot-check)
|
||||
w.Header().Set("x-warn", "verdict saved but status update failed: "+err.Error())
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, map[string]any{
|
||||
"id": verdict.ID,
|
||||
"topic_id": verdict.TopicID,
|
||||
"judge_agent_id": verdict.JudgeAgentID,
|
||||
"verdict": json.RawMessage(verdict.VerdictJSON),
|
||||
"rationale": verdict.Rationale,
|
||||
"tokens_input": verdict.TokensInput,
|
||||
"tokens_output": verdict.TokensOutput,
|
||||
"produced_at": verdict.ProducedAt,
|
||||
})
|
||||
}
|
||||
|
||||
// GET /api/topics/{id}/verdict — fetch the published verdict (404 if
|
||||
// not yet produced). Visibility-gated like other read endpoints.
|
||||
func (h *VerdictHandler) Get(w http.ResponseWriter, r *http.Request) {
|
||||
topicID := chi.URLParam(r, "id")
|
||||
topic, err := h.topics.GetByID(r.Context(), topicID)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "topic not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
caller := auth.FromContext(r.Context())
|
||||
if caller.Kind == "" && topic.Visibility != models.VisibilityPublic {
|
||||
http.Error(w, "not found", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
verdict, err := h.verdicts.GetByTopic(r.Context(), topicID)
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
http.Error(w, "verdict not yet produced", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "lookup failed", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{
|
||||
"id": verdict.ID,
|
||||
"topic_id": verdict.TopicID,
|
||||
"judge_agent_id": verdict.JudgeAgentID,
|
||||
"verdict": json.RawMessage(verdict.VerdictJSON),
|
||||
"rationale": verdict.Rationale,
|
||||
"produced_at": verdict.ProducedAt,
|
||||
})
|
||||
}
|
||||
196
internal/httpapi/routes.go
Normal file
196
internal/httpapi/routes.go
Normal file
@@ -0,0 +1,196 @@
|
||||
package httpapi
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
chimw "github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/go-chi/cors"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi/handlers"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
// Mount returns the root router with all v2 endpoints wired. Owners of
|
||||
// individual middleware chains:
|
||||
//
|
||||
// - /api/healthz : public (no auth)
|
||||
// - /api/topics : mixed — list/get optional auth (anon
|
||||
// sees public only); create requires CallerAgent or CallerUser
|
||||
// - /api/topics/{id}/signups : agent-only (CallerAgent)
|
||||
//
|
||||
// Browser-side OIDC and agent-side bearer middlewares co-exist on the
|
||||
// same route by being "optional auth" — if either succeeds, Caller is
|
||||
// attached; otherwise the handler sees anonymous and decides whether
|
||||
// to 401 or fall through to public behavior.
|
||||
func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler {
|
||||
r := chi.NewRouter()
|
||||
|
||||
// Boilerplate middleware — these run on every request.
|
||||
r.Use(chimw.RealIP)
|
||||
r.Use(chimw.RequestID)
|
||||
r.Use(chimw.Logger)
|
||||
r.Use(chimw.Recoverer)
|
||||
r.Use(chimw.Timeout(30 * time.Second))
|
||||
r.Use(cors.Handler(cors.Options{
|
||||
AllowedOrigins: cfg.CORSAllowOrigins,
|
||||
AllowedMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"},
|
||||
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "x-dev-bypass"},
|
||||
ExposedHeaders: []string{},
|
||||
AllowCredentials: true,
|
||||
MaxAge: 300,
|
||||
}))
|
||||
|
||||
// Auth middlewares — composed as "try agent, then user, else pass anonymous".
|
||||
optionalAuth := optionalAuthChain(db, cfg)
|
||||
requireAgent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper) // strict bearer
|
||||
requireAnyAuth := requireAnyAuthChain(db, cfg)
|
||||
|
||||
// Handler instances.
|
||||
topicStore := store.NewTopicStore(db)
|
||||
signupStore := store.NewSignupStore(db)
|
||||
campStore := store.NewCampStore(db)
|
||||
roundStore := store.NewRoundStore(db)
|
||||
argStore := store.NewArgumentStore(db)
|
||||
verdictStore := store.NewVerdictStore(db)
|
||||
|
||||
health := handlers.NewHealthHandler(db, version)
|
||||
topicsH := handlers.NewTopicsHandler(topicStore, campStore)
|
||||
signupsH := handlers.NewSignupsHandler(topicStore, signupStore)
|
||||
argsH := handlers.NewArgumentsHandler(topicStore, campStore, roundStore, argStore)
|
||||
verdictH := handlers.NewVerdictHandler(topicStore, campStore, verdictStore)
|
||||
adminH := handlers.NewAdminHandler(db, cfg.AgentAPIKeyPepper, cfg.DialecticAdminAPIKey)
|
||||
|
||||
// Routes.
|
||||
r.Route("/api", func(r chi.Router) {
|
||||
r.Get("/healthz", health.Healthz)
|
||||
|
||||
// Topics: list+get optional-auth (visibility-gated by handler);
|
||||
// create+visibility-flip require any auth.
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(optionalAuth)
|
||||
r.Get("/topics", topicsH.List)
|
||||
r.Get("/topics/{id}", topicsH.Get)
|
||||
r.Get("/topics/{id}/arguments", argsH.List)
|
||||
r.Get("/topics/{id}/verdict", verdictH.Get)
|
||||
})
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(requireAnyAuth)
|
||||
r.Post("/topics", topicsH.Create)
|
||||
r.Put("/topics/{id}/visibility", topicsH.SetVisibility)
|
||||
})
|
||||
|
||||
// Signups, arguments, verdict POST: agent-only.
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(requireAgent)
|
||||
r.Post("/topics/{id}/signups", signupsH.Create)
|
||||
r.Post("/topics/{id}/arguments", argsH.Post)
|
||||
r.Post("/topics/{id}/verdict", verdictH.Submit)
|
||||
})
|
||||
// List signups: any authenticated caller.
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(requireAnyAuth)
|
||||
r.Get("/topics/{id}/signups", signupsH.List)
|
||||
})
|
||||
|
||||
// Admin: provision an agent api key + per-agent activity summary.
|
||||
// Auth is its own header (x-dialectic-admin-key against env
|
||||
// DIALECTIC_ADMIN_API_KEY), not bearer — admin lifecycle is
|
||||
// separate from agent identity.
|
||||
r.Post("/admin/agent-keys", adminH.ProvisionAgentKey)
|
||||
r.Get("/admin/agents/{id}", adminH.GetAgentSummary)
|
||||
})
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// optionalAuthChain: if either auth method succeeds, attach Caller;
|
||||
// otherwise let the request through anonymous. Handlers decide what
|
||||
// to do with anonymous (typically: serve public subset, hide private).
|
||||
func optionalAuthChain(db *sqlx.DB, cfg *config.Config) func(http.Handler) http.Handler {
|
||||
agent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper)
|
||||
oidc := auth.OIDCBrowser(cfg.IsDev(), cfg.OIDCDevBypassToken)
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Bearer present → try agent path; on success it ServeHTTPs next.
|
||||
// On failure it 401s, which we want to demote to "anonymous" for
|
||||
// optional auth. The pattern is: capture the response; if it's
|
||||
// 401, fall through to OIDC; if OIDC also 401s, finally fall
|
||||
// through to next (anonymous).
|
||||
if r.Header.Get("authorization") != "" {
|
||||
rw := &captureWriter{ResponseWriter: w}
|
||||
agent(next).ServeHTTP(rw, r)
|
||||
if rw.status != http.StatusUnauthorized {
|
||||
return
|
||||
}
|
||||
// reset captured state and try anon path (since OIDC
|
||||
// won't apply if there's no cookie / bypass header)
|
||||
}
|
||||
if r.Header.Get("x-dev-bypass") != "" {
|
||||
rw := &captureWriter{ResponseWriter: w}
|
||||
oidc(next).ServeHTTP(rw, r)
|
||||
if rw.status != http.StatusUnauthorized {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Anonymous — call next with no Caller attached.
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// requireAnyAuthChain: 401 if neither agent nor user auth succeeds.
|
||||
func requireAnyAuthChain(db *sqlx.DB, cfg *config.Config) func(http.Handler) http.Handler {
|
||||
agent := auth.AgentAPIKey(db, cfg.AgentAPIKeyPepper)
|
||||
oidc := auth.OIDCBrowser(cfg.IsDev(), cfg.OIDCDevBypassToken)
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("authorization") != "" {
|
||||
rw := &captureWriter{ResponseWriter: w}
|
||||
agent(next).ServeHTTP(rw, r)
|
||||
if rw.status != http.StatusUnauthorized {
|
||||
return
|
||||
}
|
||||
}
|
||||
oidc(next).ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// captureWriter records the status so the optional-auth chain can
|
||||
// distinguish "401 from inner middleware (try next)" from "actual
|
||||
// response from handler (deliver)". Body bytes are passed through
|
||||
// when status != 401.
|
||||
type captureWriter struct {
|
||||
http.ResponseWriter
|
||||
status int
|
||||
wroteHeader bool
|
||||
suppressing bool
|
||||
}
|
||||
|
||||
func (c *captureWriter) WriteHeader(s int) {
|
||||
c.status = s
|
||||
c.wroteHeader = true
|
||||
if s == http.StatusUnauthorized {
|
||||
// don't actually write — we may fall through
|
||||
c.suppressing = true
|
||||
return
|
||||
}
|
||||
c.ResponseWriter.WriteHeader(s)
|
||||
}
|
||||
|
||||
func (c *captureWriter) Write(b []byte) (int, error) {
|
||||
if c.suppressing {
|
||||
// swallow; caller will fall through to next chain step
|
||||
return len(b), nil
|
||||
}
|
||||
if !c.wroteHeader {
|
||||
c.ResponseWriter.WriteHeader(http.StatusOK)
|
||||
c.wroteHeader = true
|
||||
}
|
||||
return c.ResponseWriter.Write(b)
|
||||
}
|
||||
37
internal/models/signup.go
Normal file
37
internal/models/signup.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package models
|
||||
|
||||
import "time"
|
||||
|
||||
type Signup struct {
|
||||
ID string `db:"id" json:"id"`
|
||||
TopicID string `db:"topic_id" json:"topic_id"`
|
||||
AgentID string `db:"agent_id" json:"agent_id"`
|
||||
WillingCamps []byte `db:"willing_camps" json:"-"` // JSON column; surface as typed via View()
|
||||
PreValidated bool `db:"pre_validated" json:"pre_validated"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
}
|
||||
|
||||
// SignupView is the JSON-friendly projection that decodes WillingCamps.
|
||||
type SignupView struct {
|
||||
ID string `json:"id"`
|
||||
TopicID string `json:"topic_id"`
|
||||
AgentID string `json:"agent_id"`
|
||||
WillingCamps []Camp `json:"willing_camps"`
|
||||
PreValidated bool `json:"pre_validated"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
func (s *Signup) View() (SignupView, error) {
|
||||
var camps SignupCampsJSON
|
||||
if err := camps.UnmarshalDB(s.WillingCamps); err != nil {
|
||||
return SignupView{}, err
|
||||
}
|
||||
return SignupView{
|
||||
ID: s.ID,
|
||||
TopicID: s.TopicID,
|
||||
AgentID: s.AgentID,
|
||||
WillingCamps: camps,
|
||||
PreValidated: s.PreValidated,
|
||||
CreatedAt: s.CreatedAt,
|
||||
}, nil
|
||||
}
|
||||
78
internal/models/topic.go
Normal file
78
internal/models/topic.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Visibility string
|
||||
|
||||
const (
|
||||
VisibilityPublic Visibility = "public"
|
||||
VisibilityPrivate Visibility = "private"
|
||||
)
|
||||
|
||||
type TopicStatus string
|
||||
|
||||
const (
|
||||
TopicStatusCreated TopicStatus = "created"
|
||||
TopicStatusSignupOpen TopicStatus = "signup_open"
|
||||
TopicStatusSignupClosed TopicStatus = "signup_closed"
|
||||
TopicStatusDebating TopicStatus = "debating"
|
||||
TopicStatusCompleted TopicStatus = "completed"
|
||||
TopicStatusCancelled TopicStatus = "cancelled"
|
||||
)
|
||||
|
||||
type Camp string
|
||||
|
||||
const (
|
||||
CampPro Camp = "pro"
|
||||
CampCon Camp = "con"
|
||||
CampJudge Camp = "judge"
|
||||
)
|
||||
|
||||
// AllCamps is the canonical iteration order used by the allocation algorithm.
|
||||
var AllCamps = [3]Camp{CampPro, CampCon, CampJudge}
|
||||
|
||||
type Topic struct {
|
||||
ID string `db:"id" json:"id"`
|
||||
Title string `db:"title" json:"title"`
|
||||
Summary string `db:"summary" json:"summary"`
|
||||
Visibility Visibility `db:"visibility" json:"visibility"`
|
||||
VerdictSchemaID string `db:"verdict_schema_id" json:"verdict_schema_id"`
|
||||
Status TopicStatus `db:"status" json:"status"`
|
||||
SignupOpenAt time.Time `db:"signup_open_at" json:"signup_open_at"`
|
||||
SignupCloseAt time.Time `db:"signup_close_at" json:"signup_close_at"`
|
||||
DebateStartAt time.Time `db:"debate_start_at" json:"debate_start_at"`
|
||||
DebateEndAt time.Time `db:"debate_end_at" json:"debate_end_at"`
|
||||
CreatorUserID string `db:"creator_user_id" json:"creator_user_id"`
|
||||
VisibilityChangedBy *string `db:"visibility_changed_by" json:"visibility_changed_by,omitempty"`
|
||||
VisibilityChangedAt *time.Time `db:"visibility_changed_at" json:"visibility_changed_at,omitempty"`
|
||||
CancelledReason *string `db:"cancelled_reason" json:"cancelled_reason,omitempty"`
|
||||
CreatedAt time.Time `db:"created_at" json:"created_at"`
|
||||
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
|
||||
}
|
||||
|
||||
// IsCampValid returns true iff c is one of pro|con|judge.
|
||||
func IsCampValid(c Camp) bool {
|
||||
for _, k := range AllCamps {
|
||||
if k == c {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// SignupCampsJSON is a typed wrapper around the JSON-stored willing_camps
|
||||
// column. We marshal/unmarshal at the boundary so handlers can work with
|
||||
// the typed slice.
|
||||
type SignupCampsJSON []Camp
|
||||
|
||||
func (s SignupCampsJSON) Marshal() ([]byte, error) { return json.Marshal(s) }
|
||||
func (s *SignupCampsJSON) UnmarshalDB(raw []byte) error {
|
||||
if len(raw) == 0 {
|
||||
*s = nil
|
||||
return nil
|
||||
}
|
||||
return json.Unmarshal(raw, s)
|
||||
}
|
||||
116
internal/orchestrator/allocator.go
Normal file
116
internal/orchestrator/allocator.go
Normal file
@@ -0,0 +1,116 @@
|
||||
// Package orchestrator owns the topic lifecycle state machine:
|
||||
// signup-window allocator, round driver, judge invocation, Fabric
|
||||
// announce broadcaster. All long-running coordination logic lives
|
||||
// here so handlers stay thin.
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
)
|
||||
|
||||
// AllocateResult is the outcome of running the camp allocator on a
|
||||
// topic's signup pool. Either Allocation is set (one agent per camp,
|
||||
// no duplicates) or CancelReason is set (signup pool insufficient
|
||||
// after backfill).
|
||||
type AllocateResult struct {
|
||||
Allocation map[models.Camp]string // camp → agentId
|
||||
CancelReason string // non-empty when allocation could not complete
|
||||
}
|
||||
|
||||
// Allocate runs the 3-camp self-enrollment allocation algorithm
|
||||
// agreed on in the 2026-05-23 design session.
|
||||
//
|
||||
// for each camp in [pro, con, judge]:
|
||||
// if any signup has that camp in willing_camps AND that agent
|
||||
// isn't already locked → random pick, lock.
|
||||
// if camps still unfilled AND remaining unallocated signups >= unfilled:
|
||||
// random pick from remaining to fill, one each.
|
||||
// if any camp still unfilled (i.e. signup pool < 3 effective):
|
||||
// return CancelReason; creator re-times.
|
||||
//
|
||||
// Invariants: same agent never lands in two camps; allocation order
|
||||
// respects [pro, con, judge] so the test seed produces deterministic
|
||||
// results when rng is seeded.
|
||||
//
|
||||
// `rng` is injected so tests can supply a deterministic source. Pass
|
||||
// `rand.New(rand.NewSource(time.Now().UnixNano()))` in prod.
|
||||
func Allocate(signups []models.SignupView, rng *rand.Rand) AllocateResult {
|
||||
allocated := make(map[models.Camp]string, 3)
|
||||
used := make(map[string]struct{}, len(signups))
|
||||
|
||||
// Pass 1 — fill each camp from its volunteers.
|
||||
for _, camp := range models.AllCamps {
|
||||
candidates := make([]string, 0)
|
||||
for _, s := range signups {
|
||||
if _, taken := used[s.AgentID]; taken {
|
||||
continue
|
||||
}
|
||||
for _, w := range s.WillingCamps {
|
||||
if w == camp {
|
||||
candidates = append(candidates, s.AgentID)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(candidates) == 0 {
|
||||
continue
|
||||
}
|
||||
pick := candidates[rng.Intn(len(candidates))]
|
||||
allocated[camp] = pick
|
||||
used[pick] = struct{}{}
|
||||
}
|
||||
|
||||
// Pass 2 — backfill unfilled camps from any remaining signup.
|
||||
if len(allocated) < 3 {
|
||||
remaining := make([]string, 0)
|
||||
for _, s := range signups {
|
||||
if _, taken := used[s.AgentID]; taken {
|
||||
continue
|
||||
}
|
||||
remaining = append(remaining, s.AgentID)
|
||||
}
|
||||
// We can only fill if we have enough remaining for every still-empty camp.
|
||||
unfilled := make([]models.Camp, 0, 3)
|
||||
for _, c := range models.AllCamps {
|
||||
if _, ok := allocated[c]; !ok {
|
||||
unfilled = append(unfilled, c)
|
||||
}
|
||||
}
|
||||
if len(remaining) >= len(unfilled) {
|
||||
// Shuffle remaining, then take one per unfilled camp in order.
|
||||
rng.Shuffle(len(remaining), func(i, j int) {
|
||||
remaining[i], remaining[j] = remaining[j], remaining[i]
|
||||
})
|
||||
for i, c := range unfilled {
|
||||
allocated[c] = remaining[i]
|
||||
used[remaining[i]] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verdict — all 3 filled or we cancel.
|
||||
if len(allocated) < 3 {
|
||||
filled := len(allocated)
|
||||
return AllocateResult{
|
||||
CancelReason: cancelReason(filled, len(signups)),
|
||||
}
|
||||
}
|
||||
return AllocateResult{Allocation: allocated}
|
||||
}
|
||||
|
||||
func cancelReason(filled, totalSignups int) string {
|
||||
switch {
|
||||
case totalSignups == 0:
|
||||
return "no signups received"
|
||||
case totalSignups < 3:
|
||||
return "insufficient signups: need at least 3 distinct agents across pro/con/judge"
|
||||
default:
|
||||
// Edge case: pool >= 3 but allocator still couldn't fill — e.g.
|
||||
// every signup volunteered for the same one camp and the same
|
||||
// person was somehow used (shouldn't happen with current rules,
|
||||
// but make the message honest).
|
||||
return "allocation infeasible: signup distribution does not cover all 3 camps"
|
||||
}
|
||||
}
|
||||
137
internal/orchestrator/allocator_test.go
Normal file
137
internal/orchestrator/allocator_test.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
)
|
||||
|
||||
func sig(agentID string, camps ...models.Camp) models.SignupView {
|
||||
return models.SignupView{AgentID: agentID, WillingCamps: camps}
|
||||
}
|
||||
|
||||
// Helper: assert no duplicate agents across the 3 camps.
|
||||
func assertDistinct(t *testing.T, alloc map[models.Camp]string) {
|
||||
t.Helper()
|
||||
seen := map[string]models.Camp{}
|
||||
for c, a := range alloc {
|
||||
if prev, ok := seen[a]; ok {
|
||||
t.Fatalf("agent %q allocated to both %s and %s", a, prev, c)
|
||||
}
|
||||
seen[a] = c
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate_EmptyPoolCancels(t *testing.T) {
|
||||
r := Allocate(nil, rand.New(rand.NewSource(1)))
|
||||
if r.CancelReason == "" {
|
||||
t.Fatal("expected cancel reason, got allocation")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate_TwoSignupsCancels(t *testing.T) {
|
||||
r := Allocate([]models.SignupView{
|
||||
sig("a", models.CampPro),
|
||||
sig("b", models.CampCon),
|
||||
}, rand.New(rand.NewSource(1)))
|
||||
if r.CancelReason == "" {
|
||||
t.Fatalf("expected cancel reason (pool<3), got %v", r.Allocation)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate_OneVolunteerPerCampFills(t *testing.T) {
|
||||
signups := []models.SignupView{
|
||||
sig("a", models.CampPro),
|
||||
sig("b", models.CampCon),
|
||||
sig("c", models.CampJudge),
|
||||
}
|
||||
r := Allocate(signups, rand.New(rand.NewSource(1)))
|
||||
if r.CancelReason != "" {
|
||||
t.Fatalf("unexpected cancel: %s", r.CancelReason)
|
||||
}
|
||||
if r.Allocation[models.CampPro] != "a" || r.Allocation[models.CampCon] != "b" || r.Allocation[models.CampJudge] != "c" {
|
||||
t.Fatalf("wrong allocation: %v", r.Allocation)
|
||||
}
|
||||
assertDistinct(t, r.Allocation)
|
||||
}
|
||||
|
||||
func TestAllocate_AgentMultiVolunteerPicksOnlyOnce(t *testing.T) {
|
||||
// 'a' volunteers for all 3 camps. Should only be allocated to one
|
||||
// (pro, since it's first in iteration order); other camps need
|
||||
// other volunteers or get filled via backfill.
|
||||
signups := []models.SignupView{
|
||||
sig("a", models.CampPro, models.CampCon, models.CampJudge),
|
||||
sig("b", models.CampCon),
|
||||
sig("c", models.CampJudge),
|
||||
}
|
||||
r := Allocate(signups, rand.New(rand.NewSource(1)))
|
||||
if r.CancelReason != "" {
|
||||
t.Fatalf("unexpected cancel: %s", r.CancelReason)
|
||||
}
|
||||
if r.Allocation[models.CampPro] != "a" {
|
||||
t.Fatalf("expected 'a' in pro, got %v", r.Allocation)
|
||||
}
|
||||
if r.Allocation[models.CampCon] != "b" {
|
||||
t.Fatalf("expected 'b' in con, got %v", r.Allocation)
|
||||
}
|
||||
if r.Allocation[models.CampJudge] != "c" {
|
||||
t.Fatalf("expected 'c' in judge, got %v", r.Allocation)
|
||||
}
|
||||
assertDistinct(t, r.Allocation)
|
||||
}
|
||||
|
||||
func TestAllocate_BackfillFromUnallocated(t *testing.T) {
|
||||
// pro has 2 volunteers ('a','c'), con has 1 ('b'), judge has 0.
|
||||
// Allocator picks one of {a,c} for pro, then b for con, then
|
||||
// backfills judge from whichever of {a,c} is unallocated.
|
||||
signups := []models.SignupView{
|
||||
sig("a", models.CampPro),
|
||||
sig("b", models.CampCon),
|
||||
sig("c", models.CampPro),
|
||||
}
|
||||
r := Allocate(signups, rand.New(rand.NewSource(1)))
|
||||
if r.CancelReason != "" {
|
||||
t.Fatalf("unexpected cancel: %s; alloc=%v", r.CancelReason, r.Allocation)
|
||||
}
|
||||
assertDistinct(t, r.Allocation)
|
||||
if len(r.Allocation) != 3 {
|
||||
t.Fatalf("expected all 3 camps filled; got %d (%v)", len(r.Allocation), r.Allocation)
|
||||
}
|
||||
// Con must be 'b' (only volunteer); pro and judge must be {a, c} in some order.
|
||||
if r.Allocation[models.CampCon] != "b" {
|
||||
t.Fatalf("expected con=b, got %v", r.Allocation)
|
||||
}
|
||||
pro := r.Allocation[models.CampPro]
|
||||
judge := r.Allocation[models.CampJudge]
|
||||
if !(pro == "a" && judge == "c") && !(pro == "c" && judge == "a") {
|
||||
t.Fatalf("expected pro/judge to be {a,c} permutation, got pro=%s judge=%s", pro, judge)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate_BackfillInsufficientCancels(t *testing.T) {
|
||||
// pro filled by 'a'; con filled by 'b'; judge has no volunteer
|
||||
// AND no remaining unallocated signups → cancel.
|
||||
signups := []models.SignupView{
|
||||
sig("a", models.CampPro),
|
||||
sig("b", models.CampCon),
|
||||
}
|
||||
r := Allocate(signups, rand.New(rand.NewSource(1)))
|
||||
if r.CancelReason == "" {
|
||||
t.Fatalf("expected cancel; got allocation %v", r.Allocation)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate_LargePoolDistinctness(t *testing.T) {
|
||||
// Many signups, all willing for all camps. Allocation should pick 3
|
||||
// distinct agents, randomly.
|
||||
signups := []models.SignupView{}
|
||||
for i := 0; i < 20; i++ {
|
||||
signups = append(signups, sig(string(rune('a'+i)), models.CampPro, models.CampCon, models.CampJudge))
|
||||
}
|
||||
r := Allocate(signups, rand.New(rand.NewSource(42)))
|
||||
if r.CancelReason != "" {
|
||||
t.Fatalf("unexpected cancel: %s", r.CancelReason)
|
||||
}
|
||||
assertDistinct(t, r.Allocation)
|
||||
}
|
||||
212
internal/orchestrator/ticker.go
Normal file
212
internal/orchestrator/ticker.go
Normal file
@@ -0,0 +1,212 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
// Ticker drives the topic state machine. Every TickInterval it scans
|
||||
// for topics with timestamps that have crossed a transition boundary
|
||||
// and applies the transition atomically per topic.
|
||||
//
|
||||
// State transitions handled by the ticker:
|
||||
//
|
||||
// created → signup_open (when now >= signup_open_at)
|
||||
// signup_open → signup_closed (when now >= signup_close_at, allocator succeeded)
|
||||
// → cancelled (allocator returned CancelReason)
|
||||
// signup_closed → debating (when now >= debate_start_at; opens round 0)
|
||||
//
|
||||
// NOT handled by the ticker (driven elsewhere):
|
||||
//
|
||||
// debating → completed driven by POST /api/topics/{id}/verdict
|
||||
// (judge submits; handler flips status).
|
||||
// The "judging" sub-state is implicit:
|
||||
// status==debating AND now>=debate_end_at.
|
||||
//
|
||||
// Per-topic transitions use SELECT FOR UPDATE so concurrent ticker
|
||||
// instances (or future replicas) don't double-fire.
|
||||
//
|
||||
// Lifecycle broadcasting moved out-of-backend (Aug 2026): the proposing
|
||||
// agent posts a single recruitment fabric-send-message after creating a
|
||||
// topic; downstream agents book HF on_call slots covering the debate
|
||||
// window via `hf calendar schedule`, and HF wakes them naturally. The
|
||||
// backend stays a pure data + state-machine service and doesn't know
|
||||
// about Fabric.
|
||||
type Ticker struct {
|
||||
db *sqlx.DB
|
||||
topics *store.TopicStore
|
||||
signups *store.SignupStore
|
||||
camps *store.CampStore
|
||||
rounds *store.RoundStore
|
||||
interval time.Duration
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
func NewTicker(
|
||||
db *sqlx.DB,
|
||||
topics *store.TopicStore,
|
||||
signups *store.SignupStore,
|
||||
camps *store.CampStore,
|
||||
rounds *store.RoundStore,
|
||||
interval time.Duration,
|
||||
) *Ticker {
|
||||
if interval <= 0 {
|
||||
interval = 15 * time.Second
|
||||
}
|
||||
return &Ticker{
|
||||
db: db,
|
||||
topics: topics,
|
||||
signups: signups,
|
||||
camps: camps,
|
||||
rounds: rounds,
|
||||
interval: interval,
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
// Run blocks until ctx is cancelled. Caller goroutines it.
|
||||
func (t *Ticker) Run(ctx context.Context) {
|
||||
log.Printf("orchestrator: ticker started (interval=%s)", t.interval)
|
||||
tk := time.NewTicker(t.interval)
|
||||
defer tk.Stop()
|
||||
// First tick immediately so startup is responsive — don't wait
|
||||
// 15s for the first scan.
|
||||
t.tickOnce(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("orchestrator: ticker stopping")
|
||||
return
|
||||
case <-tk.C:
|
||||
t.tickOnce(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
now := time.Now()
|
||||
|
||||
// 1. created → signup_open
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusCreated, "signup_open_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`,
|
||||
models.TopicStatusSignupOpen, topicID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: created→signup_open scan: %v", err)
|
||||
}
|
||||
|
||||
// 2. signup_open → signup_closed | cancelled
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusSignupOpen, "signup_close_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
signups, err := t.signups.ListByTopic(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res := Allocate(signups, t.rng)
|
||||
if res.CancelReason != "" {
|
||||
_, err := tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ?, cancelled_reason = ? WHERE id = ?`,
|
||||
models.TopicStatusCancelled, res.CancelReason, topicID)
|
||||
log.Printf("orchestrator: topic %s cancelled at signup_close: %s",
|
||||
topicID, res.CancelReason)
|
||||
return err
|
||||
}
|
||||
if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`,
|
||||
models.TopicStatusSignupClosed, topicID)
|
||||
log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s",
|
||||
topicID,
|
||||
res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge])
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: signup_open→signup_closed scan: %v", err)
|
||||
}
|
||||
|
||||
// 3. signup_closed → debating (opens round 0)
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusSignupClosed, "debate_start_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`,
|
||||
models.TopicStatusDebating, topicID); err != nil {
|
||||
return err
|
||||
}
|
||||
// Round 0 inserted within the tx — if commit fails we don't
|
||||
// leak a half-state.
|
||||
_, err := tx.ExecContext(ctx,
|
||||
`INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`,
|
||||
topicID)
|
||||
log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID)
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: signup_closed→debating scan: %v", err)
|
||||
}
|
||||
|
||||
// Note: there's no explicit `debating → judging` transition in v1.
|
||||
// The verdict handler enforces "status==debating AND now>=debate_end_at"
|
||||
// as its preconditions; that's equivalent to a "judging" gate without
|
||||
// adding a new enum value.
|
||||
}
|
||||
|
||||
// transitionByStatus is the shared "scan + per-row tx + apply" pattern.
|
||||
// Picks all topics in `currentStatus` whose `dueColumn` <= now, opens a
|
||||
// tx with SELECT FOR UPDATE, re-checks status (someone else may have
|
||||
// already moved it), calls apply, commits. Errors per topic logged.
|
||||
func (t *Ticker) transitionByStatus(ctx context.Context, now time.Time,
|
||||
currentStatus models.TopicStatus, dueColumn string,
|
||||
apply func(context.Context, *sqlx.Tx, string) error) error {
|
||||
|
||||
// Pull candidate IDs first (no lock); we lock per row inside the loop.
|
||||
var ids []string
|
||||
q := "SELECT id FROM topics WHERE status = ? AND " + dueColumn + " <= ? LIMIT 50"
|
||||
if err := t.db.SelectContext(ctx, &ids, q, currentStatus, now); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, id := range ids {
|
||||
if err := t.applyOne(ctx, id, currentStatus, apply); err != nil {
|
||||
log.Printf("orchestrator: apply topic=%s: %v", id, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Ticker) applyOne(ctx context.Context, topicID string,
|
||||
expected models.TopicStatus,
|
||||
apply func(context.Context, *sqlx.Tx, string) error) error {
|
||||
|
||||
tx, err := t.db.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }() // safe no-op after commit
|
||||
|
||||
var actual models.TopicStatus
|
||||
if err := tx.GetContext(ctx, &actual,
|
||||
`SELECT status FROM topics WHERE id = ? FOR UPDATE`, topicID); err != nil {
|
||||
return err
|
||||
}
|
||||
if actual != expected {
|
||||
// Already transitioned by some other process — skip.
|
||||
return nil
|
||||
}
|
||||
if err := apply(ctx, tx, topicID); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
62
internal/store/argument_store.go
Normal file
62
internal/store/argument_store.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
)
|
||||
|
||||
type Argument struct {
|
||||
ID string `db:"id" json:"id"`
|
||||
TopicID string `db:"topic_id" json:"topic_id"`
|
||||
RoundID string `db:"round_id" json:"round_id"`
|
||||
Camp models.Camp `db:"camp" json:"camp"`
|
||||
AgentID string `db:"agent_id" json:"agent_id"`
|
||||
Content string `db:"content" json:"content"`
|
||||
PostedAt time.Time `db:"posted_at" json:"posted_at"`
|
||||
}
|
||||
|
||||
type ArgumentStore struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewArgumentStore(db *sqlx.DB) *ArgumentStore { return &ArgumentStore{db: db} }
|
||||
|
||||
type PostArgumentInput struct {
|
||||
TopicID string
|
||||
RoundID string
|
||||
Camp models.Camp
|
||||
AgentID string
|
||||
Content string
|
||||
}
|
||||
|
||||
func (s *ArgumentStore) Post(ctx context.Context, in PostArgumentInput) (*Argument, error) {
|
||||
id := uuid.NewString()
|
||||
if _, err := s.db.ExecContext(ctx,
|
||||
`INSERT INTO arguments (id, topic_id, round_id, camp, agent_id, content)
|
||||
VALUES (?, ?, ?, ?, ?, ?)`,
|
||||
id, in.TopicID, in.RoundID, in.Camp, in.AgentID, in.Content); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var a Argument
|
||||
if err := s.db.GetContext(ctx, &a, `SELECT * FROM arguments WHERE id = ?`, id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &a, nil
|
||||
}
|
||||
|
||||
// ListByTopic returns the full transcript in posted order. Used by the
|
||||
// judge agent at end-of-debate to write the verdict, by the frontend
|
||||
// to render, and by observer agents querying via plugin.
|
||||
func (s *ArgumentStore) ListByTopic(ctx context.Context, topicID string) ([]Argument, error) {
|
||||
var rows []Argument
|
||||
if err := s.db.SelectContext(ctx, &rows,
|
||||
`SELECT * FROM arguments WHERE topic_id = ? ORDER BY posted_at ASC, id ASC`, topicID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
66
internal/store/camp_store.go
Normal file
66
internal/store/camp_store.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
)
|
||||
|
||||
type Camp struct {
|
||||
ID string `db:"id" json:"id"`
|
||||
TopicID string `db:"topic_id" json:"topic_id"`
|
||||
Camp models.Camp `db:"camp" json:"camp"`
|
||||
AgentID string `db:"agent_id" json:"agent_id"`
|
||||
AllocatedAt time.Time `db:"allocated_at" json:"allocated_at"`
|
||||
}
|
||||
|
||||
type CampStore struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewCampStore(db *sqlx.DB) *CampStore { return &CampStore{db: db} }
|
||||
|
||||
// WriteAllocation inserts all 3 camp rows for a topic atomically. Must
|
||||
// be called within a tx the orchestrator owns (so signup_close transition
|
||||
// + camps insert + status update are all-or-nothing). Receives an open
|
||||
// *sqlx.Tx, returns nothing on success.
|
||||
func (s *CampStore) WriteAllocation(ctx context.Context, tx *sqlx.Tx, topicID string, alloc map[models.Camp]string) error {
|
||||
for _, c := range models.AllCamps {
|
||||
agentID, ok := alloc[c]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`INSERT INTO camps (id, topic_id, camp, agent_id) VALUES (?, ?, ?, ?)`,
|
||||
uuid.NewString(), topicID, c, agentID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *CampStore) ListByTopic(ctx context.Context, topicID string) ([]Camp, error) {
|
||||
var rows []Camp
|
||||
if err := s.db.SelectContext(ctx, &rows,
|
||||
`SELECT * FROM camps WHERE topic_id = ? ORDER BY allocated_at ASC`, topicID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// AgentCampInTopic returns the camp `agentID` was allocated to, or empty
|
||||
// if the agent isn't in any camp on this topic. Used by argument/verdict
|
||||
// handlers to enforce "only camp members can post".
|
||||
func (s *CampStore) AgentCampInTopic(ctx context.Context, topicID, agentID string) (models.Camp, error) {
|
||||
var camp models.Camp
|
||||
err := s.db.GetContext(ctx, &camp,
|
||||
`SELECT camp FROM camps WHERE topic_id = ? AND agent_id = ? LIMIT 1`, topicID, agentID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return camp, nil
|
||||
}
|
||||
65
internal/store/round_store.go
Normal file
65
internal/store/round_store.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type Round struct {
|
||||
ID string `db:"id" json:"id"`
|
||||
TopicID string `db:"topic_id" json:"topic_id"`
|
||||
RoundNo int `db:"round_no" json:"round_no"`
|
||||
OpenedAt time.Time `db:"opened_at" json:"opened_at"`
|
||||
ClosedAt *time.Time `db:"closed_at" json:"closed_at,omitempty"`
|
||||
}
|
||||
|
||||
type RoundStore struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewRoundStore(db *sqlx.DB) *RoundStore { return &RoundStore{db: db} }
|
||||
|
||||
// Open creates the round-0 row for a topic entering `debating`. Subsequent
|
||||
// rounds (1, 2, ...) are inserted by the round driver as the debate
|
||||
// advances; we leave round-bumping logic outside the store so the policy
|
||||
// (time-based? all-participants-posted?) can evolve without DB churn.
|
||||
func (s *RoundStore) Open(ctx context.Context, topicID string, roundNo int) (*Round, error) {
|
||||
id := uuid.NewString()
|
||||
if _, err := s.db.ExecContext(ctx,
|
||||
`INSERT INTO rounds (id, topic_id, round_no) VALUES (?, ?, ?)`,
|
||||
id, topicID, roundNo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var r Round
|
||||
if err := s.db.GetContext(ctx, &r, `SELECT * FROM rounds WHERE id = ?`, id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
func (s *RoundStore) Latest(ctx context.Context, topicID string) (*Round, error) {
|
||||
var r Round
|
||||
err := s.db.GetContext(ctx, &r,
|
||||
`SELECT * FROM rounds WHERE topic_id = ? ORDER BY round_no DESC LIMIT 1`, topicID)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
func (s *RoundStore) ListByTopic(ctx context.Context, topicID string) ([]Round, error) {
|
||||
var rows []Round
|
||||
if err := s.db.SelectContext(ctx, &rows,
|
||||
`SELECT * FROM rounds WHERE topic_id = ? ORDER BY round_no ASC`, topicID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
95
internal/store/signup_store.go
Normal file
95
internal/store/signup_store.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
)
|
||||
|
||||
type SignupStore struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewSignupStore(db *sqlx.DB) *SignupStore { return &SignupStore{db: db} }
|
||||
|
||||
type UpsertSignupInput struct {
|
||||
TopicID string
|
||||
AgentID string
|
||||
WillingCamps []models.Camp
|
||||
PreValidated bool
|
||||
}
|
||||
|
||||
// Upsert creates or updates an agent's signup for a topic. Re-signup
|
||||
// replaces willing_camps (intentional: lets an agent change their mind
|
||||
// before signup_close_at).
|
||||
func (s *SignupStore) Upsert(ctx context.Context, in UpsertSignupInput) (*models.SignupView, error) {
|
||||
if len(in.WillingCamps) == 0 {
|
||||
return nil, fmt.Errorf("willing_camps must be non-empty")
|
||||
}
|
||||
for _, c := range in.WillingCamps {
|
||||
if !models.IsCampValid(c) {
|
||||
return nil, fmt.Errorf("invalid camp %q", c)
|
||||
}
|
||||
}
|
||||
raw, err := json.Marshal(in.WillingCamps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Try insert; on duplicate (topic, agent), update.
|
||||
id := uuid.NewString()
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
INSERT INTO signups (id, topic_id, agent_id, willing_camps, pre_validated)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
willing_camps = VALUES(willing_camps),
|
||||
pre_validated = VALUES(pre_validated)`,
|
||||
id, in.TopicID, in.AgentID, raw, in.PreValidated)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("upsert signup: %w", err)
|
||||
}
|
||||
return s.GetByPair(ctx, in.TopicID, in.AgentID)
|
||||
}
|
||||
|
||||
func (s *SignupStore) GetByPair(ctx context.Context, topicID, agentID string) (*models.SignupView, error) {
|
||||
var row models.Signup
|
||||
err := s.db.GetContext(ctx, &row,
|
||||
`SELECT * FROM signups WHERE topic_id = ? AND agent_id = ?`, topicID, agentID)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
v, err := row.View()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &v, nil
|
||||
}
|
||||
|
||||
// ListByTopic returns all signups for a topic. Used by the allocation
|
||||
// algorithm at signup_close_at and by the topic-detail UI.
|
||||
func (s *SignupStore) ListByTopic(ctx context.Context, topicID string) ([]models.SignupView, error) {
|
||||
var rows []models.Signup
|
||||
if err := s.db.SelectContext(ctx, &rows,
|
||||
`SELECT * FROM signups WHERE topic_id = ? ORDER BY created_at ASC`, topicID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]models.SignupView, 0, len(rows))
|
||||
for _, r := range rows {
|
||||
v, err := r.View()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, v)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
120
internal/store/topic_store.go
Normal file
120
internal/store/topic_store.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
)
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
type TopicStore struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewTopicStore(db *sqlx.DB) *TopicStore { return &TopicStore{db: db} }
|
||||
|
||||
type CreateTopicInput struct {
|
||||
Title string
|
||||
Summary string
|
||||
Visibility models.Visibility
|
||||
VerdictSchemaID string
|
||||
SignupOpenAt time.Time
|
||||
SignupCloseAt time.Time
|
||||
DebateStartAt time.Time
|
||||
DebateEndAt time.Time
|
||||
CreatorUserID string
|
||||
}
|
||||
|
||||
func (s *TopicStore) Create(ctx context.Context, in CreateTopicInput) (*models.Topic, error) {
|
||||
id := uuid.NewString()
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
INSERT INTO topics (id, title, summary, visibility, verdict_schema_id,
|
||||
signup_open_at, signup_close_at, debate_start_at, debate_end_at,
|
||||
creator_user_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
id, in.Title, in.Summary, in.Visibility, in.VerdictSchemaID,
|
||||
in.SignupOpenAt, in.SignupCloseAt, in.DebateStartAt, in.DebateEndAt,
|
||||
in.CreatorUserID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("insert topic: %w", err)
|
||||
}
|
||||
return s.GetByID(ctx, id)
|
||||
}
|
||||
|
||||
func (s *TopicStore) GetByID(ctx context.Context, id string) (*models.Topic, error) {
|
||||
var t models.Topic
|
||||
err := s.db.GetContext(ctx, &t, `SELECT * FROM topics WHERE id = ?`, id)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
type ListFilter struct {
|
||||
Status string // empty = all
|
||||
Visibility string // empty = all
|
||||
Limit int // 0 = default 50
|
||||
Offset int
|
||||
}
|
||||
|
||||
func (s *TopicStore) List(ctx context.Context, f ListFilter) ([]models.Topic, error) {
|
||||
if f.Limit <= 0 || f.Limit > 200 {
|
||||
f.Limit = 50
|
||||
}
|
||||
q := "SELECT * FROM topics"
|
||||
args := []any{}
|
||||
var clauses []string
|
||||
if f.Status != "" {
|
||||
clauses = append(clauses, "status = ?")
|
||||
args = append(args, f.Status)
|
||||
}
|
||||
if f.Visibility != "" {
|
||||
clauses = append(clauses, "visibility = ?")
|
||||
args = append(args, f.Visibility)
|
||||
}
|
||||
if len(clauses) > 0 {
|
||||
q += " WHERE " + strings.Join(clauses, " AND ")
|
||||
}
|
||||
q += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
|
||||
args = append(args, f.Limit, f.Offset)
|
||||
|
||||
var rows []models.Topic
|
||||
if err := s.db.SelectContext(ctx, &rows, q, args...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// SetStatus is a low-level status update. Most transitions go through
|
||||
// the orchestrator's tx-wrapped paths; this is for the verdict handler
|
||||
// (debating → completed on successful judge submission) and admin tools.
|
||||
func (s *TopicStore) SetStatus(ctx context.Context, id string, status models.TopicStatus) (*models.Topic, error) {
|
||||
if _, err := s.db.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`, status, id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.GetByID(ctx, id)
|
||||
}
|
||||
|
||||
// SetVisibility flips public/private; records who/when. Returns updated row.
|
||||
func (s *TopicStore) SetVisibility(ctx context.Context, id string, v models.Visibility, byUserID string) (*models.Topic, error) {
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
UPDATE topics SET visibility = ?, visibility_changed_by = ?, visibility_changed_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?`, v, byUserID, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.GetByID(ctx, id)
|
||||
}
|
||||
63
internal/store/verdict_store.go
Normal file
63
internal/store/verdict_store.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type Verdict struct {
|
||||
ID string `db:"id" json:"id"`
|
||||
TopicID string `db:"topic_id" json:"topic_id"`
|
||||
JudgeAgentID string `db:"judge_agent_id" json:"judge_agent_id"`
|
||||
VerdictJSON []byte `db:"verdict_json" json:"-"` // surface raw via Render
|
||||
Rationale string `db:"rationale" json:"rationale"`
|
||||
TokensInput int `db:"tokens_input" json:"tokens_input"`
|
||||
TokensOutput int `db:"tokens_output" json:"tokens_output"`
|
||||
ProducedAt time.Time `db:"produced_at" json:"produced_at"`
|
||||
}
|
||||
|
||||
type VerdictStore struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewVerdictStore(db *sqlx.DB) *VerdictStore { return &VerdictStore{db: db} }
|
||||
|
||||
type SubmitVerdictInput struct {
|
||||
TopicID string
|
||||
JudgeAgentID string
|
||||
VerdictJSON []byte
|
||||
Rationale string
|
||||
TokensInput int
|
||||
TokensOutput int
|
||||
}
|
||||
|
||||
// Submit writes the (one-and-only) verdict for a topic. Unique constraint
|
||||
// on topic_id means a second submission returns a duplicate-key error;
|
||||
// caller surfaces that as 409.
|
||||
func (s *VerdictStore) Submit(ctx context.Context, in SubmitVerdictInput) (*Verdict, error) {
|
||||
id := uuid.NewString()
|
||||
if _, err := s.db.ExecContext(ctx,
|
||||
`INSERT INTO verdicts (id, topic_id, judge_agent_id, verdict_json, rationale, tokens_input, tokens_output)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
id, in.TopicID, in.JudgeAgentID, in.VerdictJSON, in.Rationale, in.TokensInput, in.TokensOutput); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.GetByTopic(ctx, in.TopicID)
|
||||
}
|
||||
|
||||
func (s *VerdictStore) GetByTopic(ctx context.Context, topicID string) (*Verdict, error) {
|
||||
var v Verdict
|
||||
err := s.db.GetContext(ctx, &v, `SELECT * FROM verdicts WHERE topic_id = ?`, topicID)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &v, nil
|
||||
}
|
||||
88
main.go
Normal file
88
main.go
Normal file
@@ -0,0 +1,88 @@
|
||||
// Dialectic.Backend.Go — entrypoint.
|
||||
//
|
||||
// Greenfield Go rewrite of the Python v1 backend; agent-only debate
|
||||
// platform per /home/hzhang/arch/DIALECTIC-V2-DESIGN.md.
|
||||
//
|
||||
// This file: load config → open db → run migrations → mount routes →
|
||||
// serve until SIGINT/SIGTERM. Everything else lives in internal/.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/db"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/orchestrator"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
// Version is overridden at build time via -ldflags="-X main.Version=...".
|
||||
var Version = "dev"
|
||||
|
||||
func main() {
|
||||
log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)
|
||||
|
||||
cfg, err := config.LoadFromEnv()
|
||||
if err != nil {
|
||||
log.Fatalf("config: %v", err)
|
||||
}
|
||||
log.Printf("starting dialectic-backend %s mode=%s addr=%s", Version, cfg.Mode, cfg.HTTPAddr)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
conn, err := db.Open(ctx, cfg.DSN())
|
||||
if err != nil {
|
||||
log.Fatalf("db open: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
if err := db.RunMigrations(ctx, conn); err != nil {
|
||||
log.Fatalf("migrations: %v", err)
|
||||
}
|
||||
log.Printf("migrations: ok")
|
||||
|
||||
// Wire orchestrator + start the ticker. Backend no longer broadcasts
|
||||
// to Fabric — proposers post a single recruitment fabric-send-message,
|
||||
// downstream agents book HF on_call slots to be woken at debate time.
|
||||
topicStore := store.NewTopicStore(conn)
|
||||
signupStore := store.NewSignupStore(conn)
|
||||
campStore := store.NewCampStore(conn)
|
||||
roundStore := store.NewRoundStore(conn)
|
||||
ticker := orchestrator.NewTicker(conn, topicStore, signupStore, campStore, roundStore,
|
||||
cfg.OrchestratorTickInterval)
|
||||
go ticker.Run(ctx)
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: cfg.HTTPAddr,
|
||||
Handler: httpapi.Mount(cfg, conn, Version),
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
// Graceful shutdown on SIGINT/SIGTERM.
|
||||
shutdown := make(chan os.Signal, 1)
|
||||
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-shutdown
|
||||
log.Printf("shutdown signal received")
|
||||
ctx2, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := srv.Shutdown(ctx2); err != nil {
|
||||
log.Printf("http shutdown error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf("http server listening on %s", cfg.HTTPAddr)
|
||||
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Fatalf("http serve: %v", err)
|
||||
}
|
||||
log.Printf("bye")
|
||||
}
|
||||
@@ -1,114 +0,0 @@
|
||||
"""Keycloak JWT authentication middleware."""
|
||||
|
||||
import os
|
||||
from fastapi import HTTPException, Request
|
||||
from jose import jwt, JWTError, jwk
|
||||
from jose.utils import base64url_decode
|
||||
import httpx
|
||||
|
||||
# Cache JWKS per (host, realm) to avoid fetching on every request
|
||||
_jwks_cache: dict[str, dict] = {}
|
||||
|
||||
|
||||
async def _get_jwks(kc_host: str, realm: str) -> dict:
|
||||
cache_key = f"{kc_host}/{realm}"
|
||||
if cache_key in _jwks_cache:
|
||||
return _jwks_cache[cache_key]
|
||||
|
||||
url = f"{kc_host}/realms/{realm}/protocol/openid-connect/certs"
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(url)
|
||||
if resp.status_code != 200:
|
||||
raise HTTPException(
|
||||
status_code=502,
|
||||
detail=f"无法获取 Keycloak JWKS: HTTP {resp.status_code}",
|
||||
)
|
||||
data = resp.json()
|
||||
_jwks_cache[cache_key] = data
|
||||
return data
|
||||
|
||||
|
||||
def _find_rsa_key(jwks: dict, token: str) -> dict | None:
|
||||
"""Find the matching RSA key from JWKS for the token's kid."""
|
||||
unverified_header = jwt.get_unverified_header(token)
|
||||
kid = unverified_header.get("kid")
|
||||
for key in jwks.get("keys", []):
|
||||
if key.get("kid") == kid:
|
||||
return key
|
||||
return None
|
||||
|
||||
|
||||
async def verify_token(request: Request, kc_host: str, realm: str) -> dict:
|
||||
"""Extract and verify the Bearer JWT from the Authorization header.
|
||||
|
||||
Returns the decoded payload on success.
|
||||
Raises HTTPException(401) on missing/invalid token.
|
||||
"""
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if not auth_header.startswith("Bearer "):
|
||||
raise HTTPException(status_code=401, detail="缺少 Authorization Bearer token")
|
||||
|
||||
token = auth_header[7:]
|
||||
|
||||
jwks = await _get_jwks(kc_host, realm)
|
||||
rsa_key = _find_rsa_key(jwks, token)
|
||||
if rsa_key is None:
|
||||
# Clear cache in case keys rotated
|
||||
_jwks_cache.pop(f"{kc_host}/{realm}", None)
|
||||
jwks = await _get_jwks(kc_host, realm)
|
||||
rsa_key = _find_rsa_key(jwks, token)
|
||||
if rsa_key is None:
|
||||
raise HTTPException(status_code=401, detail="无法匹配 JWT 签名密钥")
|
||||
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
rsa_key,
|
||||
algorithms=["RS256"],
|
||||
options={"verify_aud": False}, # Keycloak audience varies by client
|
||||
)
|
||||
return payload
|
||||
except JWTError as e:
|
||||
raise HTTPException(status_code=401, detail=f"JWT 验证失败: {e}")
|
||||
|
||||
|
||||
async def require_auth(request: Request):
|
||||
"""Verify Bearer JWT for write endpoints.
|
||||
|
||||
Dev mode: passthrough (no auth required).
|
||||
Prod mode: validates JWT via Keycloak JWKS.
|
||||
"""
|
||||
if os.getenv("ENV_MODE", "dev") == "dev":
|
||||
return None
|
||||
|
||||
from app.services.config_service import ConfigService
|
||||
config = ConfigService.load()
|
||||
kc = config.get("keycloak", {})
|
||||
if not kc.get("host"):
|
||||
return None # KC not configured – allow access
|
||||
|
||||
return await verify_token(request, kc["host"], kc.get("realm", ""))
|
||||
|
||||
|
||||
async def require_admin(request: Request, config: dict):
|
||||
"""Verify the request carries a valid Keycloak JWT with admin role.
|
||||
|
||||
Raises HTTPException(401/403) on failure.
|
||||
"""
|
||||
kc = config.get("keycloak", {})
|
||||
kc_host = kc.get("host")
|
||||
realm = kc.get("realm")
|
||||
|
||||
if not kc_host or not realm:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="Keycloak 未配置,无法进行鉴权",
|
||||
)
|
||||
|
||||
payload = await verify_token(request, kc_host, realm)
|
||||
|
||||
roles = payload.get("realm_access", {}).get("roles", [])
|
||||
if "admin" not in roles:
|
||||
raise HTTPException(status_code=403, detail="需要 admin 角色")
|
||||
|
||||
return payload
|
||||
@@ -1,32 +0,0 @@
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.responses import JSONResponse
|
||||
|
||||
from services.config_service import ConfigService
|
||||
|
||||
# Paths that are always accessible, even when DB is not configured
|
||||
_ALLOWED_PREFIXES = ("/api/setup", "/docs", "/openapi", "/redoc")
|
||||
|
||||
|
||||
class ConfigGuardMiddleware(BaseHTTPMiddleware):
|
||||
"""Return 503 for all business routes when the database is not configured."""
|
||||
|
||||
async def dispatch(self, request, call_next):
|
||||
path = request.url.path
|
||||
|
||||
# Always allow: setup routes, root, docs, OPTIONS (CORS preflight)
|
||||
if path == "/" or request.method == "OPTIONS":
|
||||
return await call_next(request)
|
||||
for prefix in _ALLOWED_PREFIXES:
|
||||
if path.startswith(prefix):
|
||||
return await call_next(request)
|
||||
|
||||
if not ConfigService.is_db_configured():
|
||||
return JSONResponse(
|
||||
status_code=503,
|
||||
content={
|
||||
"error_code": "SERVICE_NOT_CONFIGURED",
|
||||
"detail": "数据库未配置,请先完成系统初始化",
|
||||
},
|
||||
)
|
||||
|
||||
return await call_next(request)
|
||||
@@ -1,92 +0,0 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional
|
||||
from enum import Enum
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class ModelProvider(str, Enum):
|
||||
OPENAI = "openai"
|
||||
DEEPSEEK = "deepseek"
|
||||
QWEN = "qwen"
|
||||
CLAUDE = "claude"
|
||||
|
||||
|
||||
class DebateStance(str, Enum):
|
||||
PRO = "pro"
|
||||
CON = "con"
|
||||
|
||||
|
||||
class SearchResult(BaseModel):
|
||||
title: str
|
||||
url: str
|
||||
snippet: str
|
||||
score: Optional[float] = None
|
||||
|
||||
|
||||
class SearchEvidence(BaseModel):
|
||||
query: str
|
||||
results: List[SearchResult]
|
||||
mode: str # "auto", "tool", "both"
|
||||
|
||||
|
||||
class DebateRound(BaseModel):
|
||||
round_number: int
|
||||
speaker: str # Model identifier
|
||||
stance: DebateStance
|
||||
content: str
|
||||
timestamp: datetime
|
||||
token_count: Optional[int] = None
|
||||
search_evidence: Optional[SearchEvidence] = None
|
||||
|
||||
|
||||
class DebateParticipant(BaseModel):
|
||||
model_config = {"protected_namespaces": ()}
|
||||
|
||||
model_identifier: str
|
||||
provider: ModelProvider
|
||||
stance: DebateStance
|
||||
api_key: Optional[str] = None
|
||||
|
||||
|
||||
class DebateConstraints(BaseModel):
|
||||
max_rounds: int = 5
|
||||
max_tokens_per_turn: int = 500
|
||||
max_total_tokens: Optional[int] = None
|
||||
forbid_repetition: bool = True
|
||||
must_respond_to_opponent: bool = True
|
||||
web_search_enabled: bool = False
|
||||
web_search_mode: str = "auto" # "auto", "tool", "both"
|
||||
|
||||
|
||||
class DebateRequest(BaseModel):
|
||||
topic: str
|
||||
participants: List[DebateParticipant]
|
||||
constraints: DebateConstraints
|
||||
custom_system_prompt: Optional[str] = None
|
||||
|
||||
|
||||
class EvidenceReference(BaseModel):
|
||||
round_number: int
|
||||
speaker: str
|
||||
stance: DebateStance
|
||||
|
||||
|
||||
class EvidenceEntry(BaseModel):
|
||||
title: str
|
||||
url: str
|
||||
snippet: str
|
||||
score: Optional[float] = None
|
||||
references: List[EvidenceReference]
|
||||
|
||||
|
||||
class DebateSession(BaseModel):
|
||||
session_id: str
|
||||
topic: str
|
||||
participants: List[DebateParticipant]
|
||||
constraints: DebateConstraints
|
||||
rounds: List[DebateRound]
|
||||
status: str # "active", "completed", "terminated"
|
||||
created_at: datetime
|
||||
completed_at: Optional[datetime] = None
|
||||
summary: Optional[str] = None
|
||||
evidence_library: List[EvidenceEntry] = []
|
||||
@@ -1,358 +0,0 @@
|
||||
import asyncio
|
||||
import uuid
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models.debate import (
|
||||
DebateRequest, DebateSession, DebateRound,
|
||||
DebateStance, DebateParticipant, SearchEvidence, SearchResult,
|
||||
EvidenceEntry, EvidenceReference
|
||||
)
|
||||
from providers.provider_factory import ProviderFactory
|
||||
from storage.session_manager import SessionManager
|
||||
from utils.summarizer import summarize_debate
|
||||
from services.search_service import SearchService
|
||||
from services.api_key_service import ApiKeyService
|
||||
|
||||
|
||||
class DebateOrchestrator:
|
||||
"""
|
||||
Orchestrates the debate between multiple language models
|
||||
"""
|
||||
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
self.session_manager = SessionManager()
|
||||
self.provider_factory = ProviderFactory()
|
||||
|
||||
async def create_session(self, debate_request: DebateRequest) -> str:
|
||||
"""
|
||||
Create a new debate session
|
||||
"""
|
||||
session_id = str(uuid.uuid4())
|
||||
|
||||
session = DebateSession(
|
||||
session_id=session_id,
|
||||
topic=debate_request.topic,
|
||||
participants=debate_request.participants,
|
||||
constraints=debate_request.constraints,
|
||||
rounds=[],
|
||||
status="active",
|
||||
created_at=datetime.now()
|
||||
)
|
||||
|
||||
await self.session_manager.save_session(self.db, session)
|
||||
return session_id
|
||||
|
||||
def _get_search_service(self) -> Optional[SearchService]:
|
||||
"""
|
||||
Get a SearchService instance if Tavily API key is available.
|
||||
"""
|
||||
tavily_key = ApiKeyService.get_api_key(self.db, "tavily")
|
||||
if tavily_key:
|
||||
return SearchService(api_key=tavily_key)
|
||||
return None
|
||||
|
||||
async def run_debate(self, session_id: str) -> DebateSession:
|
||||
"""
|
||||
Run the complete debate process
|
||||
"""
|
||||
session = await self.session_manager.get_session(self.db, session_id)
|
||||
if not session:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
|
||||
# Initialize providers for each participant
|
||||
providers = {}
|
||||
for participant in session.participants:
|
||||
provider = self.provider_factory.create_provider(
|
||||
self.db,
|
||||
participant.provider,
|
||||
participant.api_key # This can be None, and the provider will fetch from DB
|
||||
)
|
||||
providers[participant.model_identifier] = provider
|
||||
|
||||
# Initialize search service if web search is enabled
|
||||
search_service = None
|
||||
web_search_enabled = session.constraints.web_search_enabled
|
||||
web_search_mode = session.constraints.web_search_mode
|
||||
if web_search_enabled:
|
||||
search_service = self._get_search_service()
|
||||
if not search_service:
|
||||
print("Warning: Web search enabled but no Tavily API key found. Disabling search.")
|
||||
web_search_enabled = False
|
||||
|
||||
# Run the debate rounds
|
||||
for round_num in range(session.constraints.max_rounds):
|
||||
if session.status != "active":
|
||||
break
|
||||
|
||||
# Alternate between participants
|
||||
current_participant = session.participants[round_num % len(session.participants)]
|
||||
provider = providers[current_participant.model_identifier]
|
||||
|
||||
# Perform automatic search if enabled
|
||||
search_evidence = None
|
||||
if web_search_enabled and web_search_mode in ("auto", "both"):
|
||||
search_evidence = self._perform_automatic_search(
|
||||
search_service, session, round_num
|
||||
)
|
||||
|
||||
# Prepare context for the current turn (with search results if available)
|
||||
context = self._prepare_context(session, current_participant.stance, search_evidence)
|
||||
|
||||
# Determine if we should use tool calling for this round
|
||||
use_tool_calling = (
|
||||
web_search_enabled
|
||||
and web_search_mode in ("tool", "both")
|
||||
and provider.supports_tools()
|
||||
)
|
||||
|
||||
if use_tool_calling:
|
||||
response, tool_evidence = await self._handle_tool_calls(
|
||||
provider, current_participant.model_identifier,
|
||||
context, search_service
|
||||
)
|
||||
# Merge tool-based evidence with auto evidence
|
||||
if tool_evidence:
|
||||
if search_evidence:
|
||||
search_evidence.results.extend(tool_evidence.results)
|
||||
search_evidence.query += f" | {tool_evidence.query}"
|
||||
search_evidence.mode = "both"
|
||||
else:
|
||||
search_evidence = tool_evidence
|
||||
else:
|
||||
response = await provider.generate_response(
|
||||
model=current_participant.model_identifier,
|
||||
prompt=context
|
||||
)
|
||||
|
||||
# Clean the response to remove any echoed prompt/meta text
|
||||
response = self._clean_response(response)
|
||||
|
||||
# Create a new round
|
||||
round_data = DebateRound(
|
||||
round_number=round_num + 1,
|
||||
speaker=current_participant.model_identifier,
|
||||
stance=current_participant.stance,
|
||||
content=response,
|
||||
timestamp=datetime.now(),
|
||||
token_count=len(response.split()), # Approximate token count
|
||||
search_evidence=search_evidence
|
||||
)
|
||||
|
||||
session.rounds.append(round_data)
|
||||
|
||||
# Update evidence library with search results from this round
|
||||
if round_data.search_evidence:
|
||||
self._update_evidence_library(session, round_data)
|
||||
|
||||
# Update session in storage
|
||||
await self.session_manager.update_session(self.db, session)
|
||||
|
||||
# Small delay between rounds to simulate realistic interaction
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Generate summary after all rounds are complete
|
||||
summary = await summarize_debate(session)
|
||||
session.summary = summary
|
||||
session.status = "completed"
|
||||
session.completed_at = datetime.now()
|
||||
|
||||
await self.session_manager.update_session(self.db, session)
|
||||
return session
|
||||
|
||||
def _perform_automatic_search(
|
||||
self, search_service: SearchService, session: DebateSession, round_num: int
|
||||
) -> Optional[SearchEvidence]:
|
||||
"""
|
||||
Perform an automatic web search based on the topic and last opponent argument.
|
||||
"""
|
||||
last_opponent_arg = None
|
||||
if session.rounds:
|
||||
last_opponent_arg = session.rounds[-1].content
|
||||
|
||||
query = SearchService.generate_search_query(session.topic, last_opponent_arg)
|
||||
results = search_service.search(query, max_results=3)
|
||||
|
||||
if results:
|
||||
return SearchEvidence(
|
||||
query=query,
|
||||
results=results,
|
||||
mode="auto"
|
||||
)
|
||||
return None
|
||||
|
||||
async def _handle_tool_calls(
|
||||
self, provider, model: str, context: str, search_service: SearchService
|
||||
) -> tuple:
|
||||
"""
|
||||
Handle tool calling flow: send prompt with tools, execute any tool calls,
|
||||
then re-prompt with results. Max 2 tool call iterations.
|
||||
Returns (final_response_text, SearchEvidence_or_None).
|
||||
"""
|
||||
tools = [SearchService.get_tool_definition()]
|
||||
all_search_results = []
|
||||
all_queries = []
|
||||
|
||||
text, tool_calls = await provider.generate_response_with_tools(
|
||||
model=model,
|
||||
prompt=context,
|
||||
tools=tools,
|
||||
max_tokens=500
|
||||
)
|
||||
|
||||
# If no tool calls, return the text response directly
|
||||
if not tool_calls:
|
||||
return text, None
|
||||
|
||||
# Process up to 2 rounds of tool calls
|
||||
for iteration in range(2):
|
||||
if not tool_calls:
|
||||
break
|
||||
|
||||
# Execute each tool call
|
||||
tool_results_text = []
|
||||
for tc in tool_calls:
|
||||
if tc["name"] == "web_search":
|
||||
query = tc["arguments"].get("query", "")
|
||||
all_queries.append(query)
|
||||
results = search_service.search(query, max_results=3)
|
||||
all_search_results.extend(results)
|
||||
|
||||
# Format results for the model
|
||||
evidence = SearchEvidence(query=query, results=results, mode="tool")
|
||||
tool_results_text.append(SearchService.format_results_for_context(evidence))
|
||||
|
||||
# Re-prompt the model with tool results
|
||||
augmented_context = context + "\n" + "\n".join(tool_results_text)
|
||||
augmented_context += "\n请基于以上搜索结果和辩论历史,给出你的论点。"
|
||||
|
||||
text, tool_calls = await provider.generate_response_with_tools(
|
||||
model=model,
|
||||
prompt=augmented_context,
|
||||
tools=tools,
|
||||
max_tokens=500
|
||||
)
|
||||
|
||||
# Build combined evidence
|
||||
evidence = None
|
||||
if all_search_results:
|
||||
evidence = SearchEvidence(
|
||||
query=" | ".join(all_queries),
|
||||
results=all_search_results,
|
||||
mode="tool"
|
||||
)
|
||||
|
||||
# If we still got no text (model keeps calling tools), fall back
|
||||
if not text:
|
||||
text = await provider.generate_response(model=model, prompt=context, max_tokens=500)
|
||||
|
||||
return text, evidence
|
||||
|
||||
def _update_evidence_library(self, session: DebateSession, round_data: DebateRound):
|
||||
"""
|
||||
Merge search results from a round into the session's evidence library, deduplicating by URL.
|
||||
"""
|
||||
ref = EvidenceReference(
|
||||
round_number=round_data.round_number,
|
||||
speaker=round_data.speaker,
|
||||
stance=round_data.stance
|
||||
)
|
||||
|
||||
url_index = {entry.url: i for i, entry in enumerate(session.evidence_library)}
|
||||
|
||||
for result in round_data.search_evidence.results:
|
||||
if result.url in url_index:
|
||||
entry = session.evidence_library[url_index[result.url]]
|
||||
# Avoid duplicate references (same round + speaker)
|
||||
if not any(
|
||||
r.round_number == ref.round_number and r.speaker == ref.speaker
|
||||
for r in entry.references
|
||||
):
|
||||
entry.references.append(ref)
|
||||
else:
|
||||
new_entry = EvidenceEntry(
|
||||
title=result.title,
|
||||
url=result.url,
|
||||
snippet=result.snippet,
|
||||
score=result.score,
|
||||
references=[ref]
|
||||
)
|
||||
session.evidence_library.append(new_entry)
|
||||
url_index[result.url] = len(session.evidence_library) - 1
|
||||
|
||||
def _prepare_context(
|
||||
self, session: DebateSession, current_stance: DebateStance,
|
||||
search_evidence: Optional[SearchEvidence] = None
|
||||
) -> str:
|
||||
"""
|
||||
Prepare the context/prompt for the current model turn
|
||||
"""
|
||||
# Determine the stance of the current speaker
|
||||
if current_stance == DebateStance.PRO:
|
||||
position_desc = "正方(支持方)"
|
||||
opposing_desc = "反方(反对方)"
|
||||
else:
|
||||
position_desc = "反方(反对方)"
|
||||
opposing_desc = "正方(支持方)"
|
||||
|
||||
# Build the context with previous rounds
|
||||
context_parts = [
|
||||
f"辩论主题: {session.topic}",
|
||||
f"你的立场: {position_desc}",
|
||||
"辩论规则:",
|
||||
"- 必须回应对方上一轮的核心论点",
|
||||
"- 不得重复自己已提出的观点",
|
||||
"- 输出长度限制在合理范围内",
|
||||
"\n历史辩论记录:"
|
||||
]
|
||||
|
||||
for round_data in session.rounds:
|
||||
stance_text = "正方" if round_data.stance == DebateStance.PRO else "反方"
|
||||
context_parts.append(f"第{round_data.round_number}轮 - {stance_text}: {round_data.content}")
|
||||
|
||||
# Inject search results if available
|
||||
if search_evidence:
|
||||
context_parts.append(SearchService.format_results_for_context(search_evidence))
|
||||
|
||||
context_parts.append(f"\n现在轮到你 ({position_desc}) 发言,请基于以上内容进行回应。注意:直接给出你的论点内容,不要重复上述提示词、辩论规则或历史记录。")
|
||||
|
||||
return "\n".join(context_parts)
|
||||
|
||||
def _clean_response(self, response: str) -> str:
|
||||
"""
|
||||
Clean the model response to remove any echoed prompt/meta text
|
||||
"""
|
||||
import re
|
||||
|
||||
# Remove common prompt echoes and meta prefixes
|
||||
patterns_to_remove = [
|
||||
r'^第\d+轮\s*[-::]\s*(正方|反方)\s*[::]?\s*', # 第X轮 - 正方/反方:
|
||||
r'^(正方|反方)\s*[((][^))]*[))]\s*[::]?\s*', # 正方(支持方):
|
||||
r'^(正方|反方)\s*[::]\s*', # 正方: or 反方:
|
||||
r'^我的立场\s*[::]\s*', # 我的立场:
|
||||
r'^回应\s*[::]\s*', # 回应:
|
||||
r'^辩论发言\s*[::]\s*', # 辩论发言:
|
||||
]
|
||||
|
||||
cleaned = response.strip()
|
||||
for pattern in patterns_to_remove:
|
||||
cleaned = re.sub(pattern, '', cleaned, flags=re.MULTILINE)
|
||||
|
||||
return cleaned.strip()
|
||||
|
||||
async def get_session_status(self, session_id: str) -> Optional[DebateSession]:
|
||||
"""
|
||||
Get the current status of a debate session
|
||||
"""
|
||||
return await self.session_manager.get_session(self.db, session_id)
|
||||
|
||||
async def terminate_session(self, session_id: str):
|
||||
"""
|
||||
Terminate a debate session prematurely
|
||||
"""
|
||||
session = await self.session_manager.get_session(self.db, session_id)
|
||||
if session:
|
||||
session.status = "terminated"
|
||||
await self.session_manager.update_session(self.db, session)
|
||||
@@ -1,73 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
class LLMProvider(ABC):
|
||||
"""
|
||||
Abstract base class for LLM providers
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, db: Session, api_key: Optional[str] = None):
|
||||
"""
|
||||
Initialize the provider with a database session and optional API key
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
|
||||
"""
|
||||
Generate a response from the LLM
|
||||
"""
|
||||
pass
|
||||
|
||||
def supports_tools(self) -> bool:
|
||||
"""
|
||||
Whether this provider supports function/tool calling.
|
||||
Override in subclasses that support it.
|
||||
"""
|
||||
return False
|
||||
|
||||
async def generate_response_with_tools(
|
||||
self,
|
||||
model: str,
|
||||
prompt: str,
|
||||
tools: List[Dict[str, Any]],
|
||||
max_tokens: Optional[int] = None
|
||||
) -> Tuple[str, List[Dict[str, Any]]]:
|
||||
"""
|
||||
Generate a response with tool definitions available.
|
||||
Returns (text_content, tool_calls) where tool_calls is a list of
|
||||
dicts with keys: name, arguments (dict).
|
||||
If no tools are called, tool_calls is empty and text_content has the response.
|
||||
Default implementation falls back to regular generation.
|
||||
"""
|
||||
text = await self.generate_response(model, prompt, max_tokens)
|
||||
return text, []
|
||||
|
||||
|
||||
class ProviderFactory:
|
||||
"""
|
||||
Factory class to create provider instances
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def create_provider(db: Session, provider_type: str, api_key: Optional[str] = None):
|
||||
"""
|
||||
Create a provider instance based on the type
|
||||
"""
|
||||
if provider_type.value == "openai":
|
||||
from app.providers.openai_provider import OpenAIProvider
|
||||
return OpenAIProvider(db, api_key)
|
||||
elif provider_type.value == "claude":
|
||||
from app.providers.claude_provider import ClaudeProvider
|
||||
return ClaudeProvider(db, api_key)
|
||||
elif provider_type.value == "qwen":
|
||||
from app.providers.qwen_provider import QwenProvider
|
||||
return QwenProvider(db, api_key)
|
||||
elif provider_type.value == "deepseek":
|
||||
from app.providers.deepseek_provider import DeepSeekProvider
|
||||
return DeepSeekProvider(db, api_key)
|
||||
else:
|
||||
raise ValueError(f"Unsupported provider type: {provider_type}")
|
||||
@@ -1,85 +0,0 @@
|
||||
import anthropic
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from providers.base_provider import LLMProvider
|
||||
from services.api_key_service import ApiKeyService
|
||||
|
||||
SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
|
||||
|
||||
|
||||
class ClaudeProvider(LLMProvider):
|
||||
"""
|
||||
Anthropic Claude API provider implementation
|
||||
"""
|
||||
|
||||
def __init__(self, db: Session, api_key: Optional[str] = None):
|
||||
if not api_key:
|
||||
api_key = ApiKeyService.get_api_key(db, "claude")
|
||||
|
||||
if api_key:
|
||||
self.client = anthropic.AsyncAnthropic(api_key=api_key)
|
||||
else:
|
||||
raise ValueError("Claude API key not found in database or provided")
|
||||
|
||||
def supports_tools(self) -> bool:
|
||||
return True
|
||||
|
||||
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
|
||||
try:
|
||||
response = await self.client.messages.create(
|
||||
model=model,
|
||||
max_tokens=max_tokens or 500,
|
||||
temperature=0.7,
|
||||
system=SYSTEM_PROMPT,
|
||||
messages=[
|
||||
{"role": "user", "content": prompt}
|
||||
]
|
||||
)
|
||||
return response.content[0].text
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling Claude API: {str(e)}")
|
||||
|
||||
async def generate_response_with_tools(
|
||||
self,
|
||||
model: str,
|
||||
prompt: str,
|
||||
tools: List[Dict[str, Any]],
|
||||
max_tokens: Optional[int] = None
|
||||
) -> Tuple[str, List[Dict[str, Any]]]:
|
||||
try:
|
||||
# Convert OpenAI-format tools to Anthropic format
|
||||
anthropic_tools = []
|
||||
for tool in tools:
|
||||
func = tool.get("function", tool)
|
||||
anthropic_tools.append({
|
||||
"name": func["name"],
|
||||
"description": func.get("description", ""),
|
||||
"input_schema": func.get("parameters", func.get("input_schema", {}))
|
||||
})
|
||||
|
||||
response = await self.client.messages.create(
|
||||
model=model,
|
||||
max_tokens=max_tokens or 500,
|
||||
temperature=0.7,
|
||||
system=SYSTEM_PROMPT,
|
||||
messages=[
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
tools=anthropic_tools
|
||||
)
|
||||
|
||||
text_content = ""
|
||||
tool_calls = []
|
||||
for block in response.content:
|
||||
if block.type == "text":
|
||||
text_content += block.text
|
||||
elif block.type == "tool_use":
|
||||
tool_calls.append({
|
||||
"name": block.name,
|
||||
"arguments": block.input
|
||||
})
|
||||
|
||||
return text_content.strip(), tool_calls
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling Claude API with tools: {str(e)}")
|
||||
@@ -1,64 +0,0 @@
|
||||
from typing import Optional
|
||||
from sqlalchemy.orm import Session
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
from providers.base_provider import LLMProvider
|
||||
from services.api_key_service import ApiKeyService
|
||||
|
||||
|
||||
class DeepSeekProvider(LLMProvider):
|
||||
"""
|
||||
DeepSeek API provider implementation using OpenAI-compatible API
|
||||
"""
|
||||
|
||||
def __init__(self, db: Session, api_key: Optional[str] = None):
|
||||
if not api_key:
|
||||
api_key = ApiKeyService.get_api_key(db, "deepseek")
|
||||
|
||||
if not api_key:
|
||||
raise ValueError("DeepSeek API key not found in database or provided")
|
||||
|
||||
self.client = AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url="https://api.deepseek.com"
|
||||
)
|
||||
|
||||
def supports_tools(self) -> bool:
|
||||
return False
|
||||
|
||||
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
|
||||
try:
|
||||
is_reasoner = "reasoner" in model or "r1" in model.lower()
|
||||
|
||||
messages = [{"role": "user", "content": prompt}]
|
||||
# deepseek-reasoner 不支持 system message,把指令放进 user message
|
||||
if not is_reasoner:
|
||||
messages.insert(0, {
|
||||
"role": "system",
|
||||
"content": "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
|
||||
})
|
||||
|
||||
kwargs = {
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
}
|
||||
# reasoner 模型不支持 max_tokens,使用 max_completion_tokens
|
||||
if is_reasoner:
|
||||
kwargs["max_completion_tokens"] = max_tokens or 4096
|
||||
else:
|
||||
kwargs["max_tokens"] = max_tokens or 500
|
||||
|
||||
response = await self.client.chat.completions.create(**kwargs)
|
||||
|
||||
message = response.choices[0].message
|
||||
content = message.content or ""
|
||||
|
||||
# deepseek-reasoner 的主要内容可能在 reasoning_content 中
|
||||
if not content.strip() and is_reasoner:
|
||||
reasoning = getattr(message, "reasoning_content", None)
|
||||
if reasoning:
|
||||
content = reasoning
|
||||
|
||||
return content.strip()
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling DeepSeek API: {str(e)}")
|
||||
@@ -1,72 +0,0 @@
|
||||
import json
|
||||
import openai
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from providers.base_provider import LLMProvider
|
||||
from services.api_key_service import ApiKeyService
|
||||
|
||||
SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
|
||||
|
||||
|
||||
class OpenAIProvider(LLMProvider):
|
||||
"""
|
||||
OpenAI API provider implementation
|
||||
"""
|
||||
|
||||
def __init__(self, db: Session, api_key: Optional[str] = None):
|
||||
if not api_key:
|
||||
api_key = ApiKeyService.get_api_key(db, "openai")
|
||||
|
||||
if api_key:
|
||||
self.client = openai.AsyncOpenAI(api_key=api_key)
|
||||
else:
|
||||
raise ValueError("OpenAI API key not found in database or provided")
|
||||
|
||||
def supports_tools(self) -> bool:
|
||||
return True
|
||||
|
||||
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
|
||||
try:
|
||||
response = await self.client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
max_tokens=max_tokens or 500
|
||||
)
|
||||
return response.choices[0].message.content.strip()
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling OpenAI API: {str(e)}")
|
||||
|
||||
async def generate_response_with_tools(
|
||||
self,
|
||||
model: str,
|
||||
prompt: str,
|
||||
tools: List[Dict[str, Any]],
|
||||
max_tokens: Optional[int] = None
|
||||
) -> Tuple[str, List[Dict[str, Any]]]:
|
||||
try:
|
||||
response = await self.client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
tools=tools,
|
||||
tool_choice="auto",
|
||||
max_tokens=max_tokens or 500
|
||||
)
|
||||
message = response.choices[0].message
|
||||
text_content = message.content or ""
|
||||
tool_calls = []
|
||||
if message.tool_calls:
|
||||
for tc in message.tool_calls:
|
||||
tool_calls.append({
|
||||
"name": tc.function.name,
|
||||
"arguments": json.loads(tc.function.arguments)
|
||||
})
|
||||
return text_content.strip(), tool_calls
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling OpenAI API with tools: {str(e)}")
|
||||
@@ -1,49 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
class LLMProvider(ABC):
|
||||
"""
|
||||
Abstract base class for LLM providers
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, db: Session, api_key: Optional[str] = None):
|
||||
"""
|
||||
Initialize the provider with a database session and optional API key
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
|
||||
"""
|
||||
Generate a response from the LLM
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ProviderFactory:
|
||||
"""
|
||||
Factory class to create provider instances
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def create_provider(db: Session, provider_type: str, api_key: Optional[str] = None):
|
||||
"""
|
||||
Create a provider instance based on the type
|
||||
"""
|
||||
if provider_type.value == "openai":
|
||||
from providers.openai_provider import OpenAIProvider
|
||||
return OpenAIProvider(db, api_key)
|
||||
elif provider_type.value == "claude":
|
||||
from providers.claude_provider import ClaudeProvider
|
||||
return ClaudeProvider(db, api_key)
|
||||
elif provider_type.value == "qwen":
|
||||
from providers.qwen_provider import QwenProvider
|
||||
return QwenProvider(db, api_key)
|
||||
elif provider_type.value == "deepseek":
|
||||
from providers.deepseek_provider import DeepSeekProvider
|
||||
return DeepSeekProvider(db, api_key)
|
||||
else:
|
||||
raise ValueError(f"Unsupported provider type: {provider_type}")
|
||||
@@ -1,75 +0,0 @@
|
||||
import json
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
from sqlalchemy.orm import Session
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
from providers.base_provider import LLMProvider
|
||||
from services.api_key_service import ApiKeyService
|
||||
|
||||
SYSTEM_PROMPT = "你正在参与一场结构化辩论。请按照用户消息中的规则进行辩论,直接给出你的论点,不要重复提示词或历史记录。"
|
||||
|
||||
|
||||
class QwenProvider(LLMProvider):
|
||||
"""
|
||||
Qwen API provider implementation using DashScope OpenAI-compatible API
|
||||
"""
|
||||
|
||||
def __init__(self, db: Session, api_key: Optional[str] = None):
|
||||
if not api_key:
|
||||
api_key = ApiKeyService.get_api_key(db, "qwen")
|
||||
|
||||
if not api_key:
|
||||
raise ValueError("Qwen API key not found in database or provided")
|
||||
|
||||
self.client = AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
|
||||
)
|
||||
|
||||
def supports_tools(self) -> bool:
|
||||
return True
|
||||
|
||||
async def generate_response(self, model: str, prompt: str, max_tokens: Optional[int] = None) -> str:
|
||||
try:
|
||||
response = await self.client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
max_tokens=max_tokens or 500
|
||||
)
|
||||
return response.choices[0].message.content.strip()
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling Qwen API: {str(e)}")
|
||||
|
||||
async def generate_response_with_tools(
|
||||
self,
|
||||
model: str,
|
||||
prompt: str,
|
||||
tools: List[Dict[str, Any]],
|
||||
max_tokens: Optional[int] = None
|
||||
) -> Tuple[str, List[Dict[str, Any]]]:
|
||||
try:
|
||||
response = await self.client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": prompt}
|
||||
],
|
||||
tools=tools,
|
||||
tool_choice="auto",
|
||||
max_tokens=max_tokens or 500
|
||||
)
|
||||
message = response.choices[0].message
|
||||
text_content = message.content or ""
|
||||
tool_calls = []
|
||||
if message.tool_calls:
|
||||
for tc in message.tool_calls:
|
||||
tool_calls.append({
|
||||
"name": tc.function.name,
|
||||
"arguments": json.loads(tc.function.arguments)
|
||||
})
|
||||
return text_content.strip(), tool_calls
|
||||
except Exception as e:
|
||||
raise Exception(f"Error calling Qwen API with tools: {str(e)}")
|
||||
@@ -1,19 +0,0 @@
|
||||
fastapi==0.115.0
|
||||
uvicorn[standard]==0.32.0
|
||||
pydantic==2.9.2
|
||||
pydantic-settings==2.6.1
|
||||
sqlalchemy==2.0.35
|
||||
aiosqlite==0.20.0
|
||||
pymysql==1.1.1
|
||||
cryptography==43.0.1
|
||||
python-multipart==0.0.20
|
||||
sse-starlette==2.1.3
|
||||
openai==1.52.2
|
||||
anthropic==0.40.0
|
||||
tiktoken==0.8.0
|
||||
python-dotenv==1.0.1
|
||||
aiohttp==3.9.0
|
||||
httpx==0.27.2
|
||||
tavily-python==0.5.0
|
||||
pyyaml==6.0.2
|
||||
python-jose[cryptography]==3.3.0
|
||||
@@ -1,71 +0,0 @@
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from sqlalchemy.orm import Session
|
||||
from db_models import ApiKey
|
||||
import os
|
||||
|
||||
# Initialize the encryption key from environment or generate a new one
|
||||
ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", Fernet.generate_key().decode())
|
||||
cipher_suite = Fernet(ENCRYPTION_KEY.encode())
|
||||
|
||||
|
||||
class ApiKeyService:
|
||||
"""
|
||||
Service for managing API keys in the database
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def encrypt_api_key(api_key: str) -> str:
|
||||
"""
|
||||
Encrypt an API key
|
||||
"""
|
||||
encrypted_key = cipher_suite.encrypt(api_key.encode())
|
||||
return encrypted_key.decode()
|
||||
|
||||
@staticmethod
|
||||
def decrypt_api_key(encrypted_api_key: str) -> str:
|
||||
"""
|
||||
Decrypt an API key
|
||||
"""
|
||||
decrypted_key = cipher_suite.decrypt(encrypted_api_key.encode())
|
||||
return decrypted_key.decode()
|
||||
|
||||
@staticmethod
|
||||
def get_api_key(db: Session, provider: str) -> str:
|
||||
"""
|
||||
Retrieve and decrypt an API key for a provider
|
||||
"""
|
||||
api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first()
|
||||
if not api_key_record or not api_key_record.api_key_encrypted:
|
||||
return None
|
||||
try:
|
||||
return ApiKeyService.decrypt_api_key(api_key_record.api_key_encrypted)
|
||||
except InvalidToken:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def set_api_key(db: Session, provider: str, api_key: str) -> bool:
|
||||
"""
|
||||
Encrypt and store an API key for a provider
|
||||
"""
|
||||
encrypted_key = ApiKeyService.encrypt_api_key(api_key)
|
||||
|
||||
# Check if record exists
|
||||
api_key_record = db.query(ApiKey).filter(ApiKey.provider == provider).first()
|
||||
|
||||
if api_key_record:
|
||||
# Update existing record
|
||||
api_key_record.api_key_encrypted = encrypted_key
|
||||
else:
|
||||
# Create new record
|
||||
api_key_record = ApiKey(
|
||||
provider=provider,
|
||||
api_key_encrypted=encrypted_key
|
||||
)
|
||||
db.add(api_key_record)
|
||||
|
||||
try:
|
||||
db.commit()
|
||||
return True
|
||||
except Exception:
|
||||
db.rollback()
|
||||
return False
|
||||
@@ -1,100 +0,0 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
import yaml
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
CONFIG_PATH = Path(os.getenv("CONFIG_PATH", "/app/config/dialectica.yaml"))
|
||||
|
||||
# Reuse the same encryption key used for API keys
|
||||
_ENCRYPTION_KEY = os.getenv("ENCRYPTION_KEY", "")
|
||||
_cipher = Fernet(_ENCRYPTION_KEY.encode()) if _ENCRYPTION_KEY else None
|
||||
|
||||
# Fields that should be encrypted in the YAML file
|
||||
_SECRET_FIELDS = {"password"}
|
||||
|
||||
|
||||
def _encrypt(value: str) -> str:
|
||||
if not _cipher or not value:
|
||||
return value
|
||||
return "ENC:" + _cipher.encrypt(value.encode()).decode()
|
||||
|
||||
|
||||
def _decrypt(value: str) -> str:
|
||||
if not _cipher or not isinstance(value, str) or not value.startswith("ENC:"):
|
||||
return value
|
||||
try:
|
||||
return _cipher.decrypt(value[4:].encode()).decode()
|
||||
except InvalidToken:
|
||||
return value
|
||||
|
||||
|
||||
def _encrypt_secrets(data: dict) -> dict:
|
||||
"""Deep-copy dict, encrypting secret fields."""
|
||||
out = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(v, dict):
|
||||
out[k] = _encrypt_secrets(v)
|
||||
elif k in _SECRET_FIELDS and isinstance(v, str) and not v.startswith("ENC:"):
|
||||
out[k] = _encrypt(v)
|
||||
else:
|
||||
out[k] = v
|
||||
return out
|
||||
|
||||
|
||||
def _decrypt_secrets(data: dict) -> dict:
|
||||
"""Deep-copy dict, decrypting secret fields."""
|
||||
out = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(v, dict):
|
||||
out[k] = _decrypt_secrets(v)
|
||||
elif k in _SECRET_FIELDS and isinstance(v, str):
|
||||
out[k] = _decrypt(v)
|
||||
else:
|
||||
out[k] = v
|
||||
return out
|
||||
|
||||
|
||||
class ConfigService:
|
||||
"""Read / write config/dialectica.yaml."""
|
||||
|
||||
@staticmethod
|
||||
def load() -> dict:
|
||||
"""Load config, returning decrypted values. Empty dict if file missing."""
|
||||
if not CONFIG_PATH.exists():
|
||||
return {}
|
||||
with open(CONFIG_PATH) as f:
|
||||
raw = yaml.safe_load(f) or {}
|
||||
return _decrypt_secrets(raw)
|
||||
|
||||
@staticmethod
|
||||
def save(config: dict):
|
||||
"""Save config, encrypting secret fields."""
|
||||
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
encrypted = _encrypt_secrets(config)
|
||||
with open(CONFIG_PATH, "w") as f:
|
||||
yaml.dump(encrypted, f, default_flow_style=False, allow_unicode=True)
|
||||
|
||||
@staticmethod
|
||||
def is_db_configured() -> bool:
|
||||
config = ConfigService.load()
|
||||
db = config.get("database", {})
|
||||
return bool(db.get("host") and db.get("database"))
|
||||
|
||||
@staticmethod
|
||||
def get_database_url() -> str | None:
|
||||
config = ConfigService.load()
|
||||
db = config.get("database", {})
|
||||
if not (db.get("host") and db.get("database")):
|
||||
return None
|
||||
user = db.get("user", "root")
|
||||
password = db.get("password", "")
|
||||
host = db["host"]
|
||||
port = db.get("port", 3306)
|
||||
database = db["database"]
|
||||
return f"mysql+pymysql://{quote_plus(user)}:{quote_plus(password)}@{host}:{port}/{database}"
|
||||
|
||||
@staticmethod
|
||||
def is_initialized() -> bool:
|
||||
return ConfigService.load().get("initialized", False)
|
||||
@@ -1,104 +0,0 @@
|
||||
from typing import List, Optional
|
||||
from models.debate import SearchResult, SearchEvidence
|
||||
|
||||
|
||||
class SearchService:
|
||||
"""
|
||||
Tavily web search wrapper for debate research.
|
||||
"""
|
||||
|
||||
def __init__(self, api_key: str):
|
||||
from tavily import TavilyClient
|
||||
self.client = TavilyClient(api_key=api_key)
|
||||
|
||||
def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
|
||||
"""
|
||||
Perform a web search using Tavily and return structured results.
|
||||
"""
|
||||
try:
|
||||
response = self.client.search(
|
||||
query=query,
|
||||
max_results=max_results,
|
||||
search_depth="basic"
|
||||
)
|
||||
results = []
|
||||
for item in response.get("results", []):
|
||||
results.append(SearchResult(
|
||||
title=item.get("title", ""),
|
||||
url=item.get("url", ""),
|
||||
snippet=item.get("content", "")[:500], # Truncate to avoid token bloat
|
||||
score=item.get("score")
|
||||
))
|
||||
return results
|
||||
except Exception as e:
|
||||
print(f"Tavily search error: {e}")
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def format_results_for_context(evidence: SearchEvidence) -> str:
|
||||
"""
|
||||
Format search results into a string suitable for injecting into the LLM context.
|
||||
"""
|
||||
if not evidence or not evidence.results:
|
||||
return ""
|
||||
lines = [f"\n[网络搜索结果] 搜索词: \"{evidence.query}\""]
|
||||
for i, r in enumerate(evidence.results, 1):
|
||||
lines.append(f" {i}. {r.title}")
|
||||
lines.append(f" {r.snippet}")
|
||||
lines.append(f" 来源: {r.url}")
|
||||
lines.append("[搜索结果结束]\n")
|
||||
return "\n".join(lines)
|
||||
|
||||
@staticmethod
|
||||
def generate_search_query(topic: str, last_opponent_argument: Optional[str] = None) -> str:
|
||||
"""
|
||||
Generate a search query from the debate topic and the opponent's last argument.
|
||||
"""
|
||||
if last_opponent_argument:
|
||||
# Extract key phrases from the opponent's argument (first ~100 chars)
|
||||
snippet = last_opponent_argument[:100].strip()
|
||||
return f"{topic} {snippet}"
|
||||
return topic
|
||||
|
||||
@staticmethod
|
||||
def get_tool_definition() -> dict:
|
||||
"""
|
||||
Return the web_search tool definition for function calling.
|
||||
"""
|
||||
return {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web for current information relevant to the debate topic. Use this to find facts, statistics, or recent news that support your argument.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "The search query to look up"
|
||||
}
|
||||
},
|
||||
"required": ["query"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_tool_definition_anthropic() -> dict:
|
||||
"""
|
||||
Return the web_search tool definition in Anthropic format.
|
||||
"""
|
||||
return {
|
||||
"name": "web_search",
|
||||
"description": "Search the web for current information relevant to the debate topic. Use this to find facts, statistics, or recent news that support your argument.",
|
||||
"input_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "The search query to look up"
|
||||
}
|
||||
},
|
||||
"required": ["query"]
|
||||
}
|
||||
}
|
||||
@@ -1,109 +0,0 @@
|
||||
from sqlalchemy import Column, Integer, String, DateTime, Text
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from models.debate import DebateSession
|
||||
from exceptions import ServiceNotConfiguredError
|
||||
from services.config_service import ConfigService
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class DebateSessionDB(Base):
|
||||
__tablename__ = "debate_sessions"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
session_id = Column(String(255), unique=True, index=True, nullable=False)
|
||||
topic = Column(Text, nullable=False)
|
||||
participants = Column(Text, nullable=False) # JSON string
|
||||
constraints = Column(Text, nullable=False) # JSON string
|
||||
rounds = Column(Text, nullable=False) # JSON string
|
||||
status = Column(String(50), nullable=False)
|
||||
created_at = Column(DateTime, nullable=False)
|
||||
completed_at = Column(DateTime, nullable=True)
|
||||
summary = Column(Text, nullable=True)
|
||||
evidence_library = Column(Text, nullable=True) # JSON string
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy engine / session factory
|
||||
# ---------------------------------------------------------------------------
|
||||
_engine = None
|
||||
_SessionLocal = None
|
||||
|
||||
|
||||
def _get_engine():
|
||||
from sqlalchemy import create_engine
|
||||
global _engine
|
||||
if _engine is None:
|
||||
db_url = ConfigService.get_database_url()
|
||||
if not db_url:
|
||||
raise ServiceNotConfiguredError("数据库未配置")
|
||||
_engine = create_engine(db_url)
|
||||
return _engine
|
||||
|
||||
|
||||
def _get_session_factory():
|
||||
global _SessionLocal
|
||||
if _SessionLocal is None:
|
||||
_SessionLocal = sessionmaker(
|
||||
autocommit=False, autoflush=False, bind=_get_engine()
|
||||
)
|
||||
return _SessionLocal
|
||||
|
||||
|
||||
def init_db():
|
||||
"""Create all tables if DB is configured; silently skip otherwise."""
|
||||
if not ConfigService.is_db_configured():
|
||||
print("WARNING: Database not configured, skipping table creation.")
|
||||
return
|
||||
try:
|
||||
from db_models import Base as ApiBase
|
||||
engine = _get_engine()
|
||||
Base.metadata.create_all(bind=engine)
|
||||
ApiBase.metadata.create_all(bind=engine)
|
||||
except Exception as e:
|
||||
global _engine, _SessionLocal
|
||||
print(f"WARNING: Database connection failed, skipping table creation: {e}")
|
||||
if _engine is not None:
|
||||
_engine.dispose()
|
||||
_engine = None
|
||||
_SessionLocal = None
|
||||
|
||||
|
||||
def debate_session_from_db(db_session) -> DebateSession:
|
||||
"""Convert database session to Pydantic model."""
|
||||
evidence_library = []
|
||||
if db_session.evidence_library:
|
||||
evidence_library = json.loads(db_session.evidence_library)
|
||||
|
||||
return DebateSession(
|
||||
session_id=db_session.session_id,
|
||||
topic=db_session.topic,
|
||||
participants=json.loads(db_session.participants),
|
||||
constraints=json.loads(db_session.constraints),
|
||||
rounds=json.loads(db_session.rounds),
|
||||
status=db_session.status,
|
||||
created_at=db_session.created_at,
|
||||
completed_at=db_session.completed_at,
|
||||
summary=db_session.summary,
|
||||
evidence_library=evidence_library
|
||||
)
|
||||
|
||||
|
||||
def debate_session_to_db(session: DebateSession) -> DebateSessionDB:
|
||||
"""Convert Pydantic model to database model."""
|
||||
return DebateSessionDB(
|
||||
session_id=session.session_id,
|
||||
topic=session.topic,
|
||||
participants=json.dumps([p.dict() for p in session.participants]),
|
||||
constraints=json.dumps(session.constraints.dict()),
|
||||
rounds=json.dumps([r.dict() for r in session.rounds], default=str),
|
||||
status=session.status,
|
||||
created_at=session.created_at,
|
||||
completed_at=session.completed_at,
|
||||
summary=session.summary,
|
||||
evidence_library=json.dumps([e.dict() for e in session.evidence_library], default=str) if session.evidence_library else None
|
||||
)
|
||||
@@ -1,67 +0,0 @@
|
||||
from sqlalchemy.orm import Session
|
||||
from datetime import datetime
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from models.debate import DebateSession
|
||||
from storage.database import DebateSessionDB, debate_session_from_db, debate_session_to_db
|
||||
|
||||
|
||||
class SessionManager:
|
||||
"""
|
||||
Manages debate sessions in storage
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
async def save_session(self, db: Session, session: DebateSession):
|
||||
"""
|
||||
Save a debate session to the database
|
||||
"""
|
||||
db_session = debate_session_to_db(session)
|
||||
db.add(db_session)
|
||||
db.commit()
|
||||
db.refresh(db_session)
|
||||
|
||||
async def get_session(self, db: Session, session_id: str) -> Optional[DebateSession]:
|
||||
"""
|
||||
Retrieve a debate session from the database
|
||||
"""
|
||||
db_session = db.query(DebateSessionDB).filter(DebateSessionDB.session_id == session_id).first()
|
||||
if not db_session:
|
||||
return None
|
||||
return debate_session_from_db(db_session)
|
||||
|
||||
async def update_session(self, db: Session, session: DebateSession):
|
||||
"""
|
||||
Update an existing debate session in the database
|
||||
"""
|
||||
db_session = db.query(DebateSessionDB).filter(DebateSessionDB.session_id == session.session_id).first()
|
||||
if db_session:
|
||||
db_session.topic = session.topic
|
||||
db_session.participants = json.dumps([p.dict() for p in session.participants])
|
||||
db_session.constraints = json.dumps(session.constraints.dict())
|
||||
db_session.rounds = json.dumps([r.dict() for r in session.rounds], default=str)
|
||||
db_session.status = session.status
|
||||
db_session.completed_at = session.completed_at
|
||||
db_session.summary = session.summary
|
||||
db_session.evidence_library = json.dumps([e.dict() for e in session.evidence_library], default=str) if session.evidence_library else None
|
||||
|
||||
db.commit()
|
||||
db.refresh(db_session)
|
||||
|
||||
async def list_sessions(self, db: Session):
|
||||
"""
|
||||
List all debate sessions
|
||||
"""
|
||||
db_sessions = db.query(DebateSessionDB).all()
|
||||
return [
|
||||
{
|
||||
"session_id": db_session.session_id,
|
||||
"topic": db_session.topic,
|
||||
"status": db_session.status,
|
||||
"created_at": db_session.created_at.isoformat() if db_session.created_at else None
|
||||
}
|
||||
for db_session in db_sessions
|
||||
]
|
||||
@@ -1,39 +0,0 @@
|
||||
async def summarize_debate(session):
|
||||
"""
|
||||
Generate a summary of the debate session
|
||||
"""
|
||||
if not session.rounds:
|
||||
return "No rounds were completed in this debate."
|
||||
|
||||
# Extract key points from each side
|
||||
pro_points = []
|
||||
con_points = []
|
||||
|
||||
for round_data in session.rounds:
|
||||
if round_data.stance.value == "pro":
|
||||
pro_points.append(round_data.content)
|
||||
else:
|
||||
con_points.append(round_data.content)
|
||||
|
||||
# Create a summary
|
||||
summary_parts = [
|
||||
f"辩论主题: {session.topic}",
|
||||
"",
|
||||
"正方主要观点:",
|
||||
]
|
||||
|
||||
for i, point in enumerate(pro_points, 1):
|
||||
summary_parts.append(f"{i}. {point[:100]}...") # Truncate for brevity
|
||||
|
||||
summary_parts.append("")
|
||||
summary_parts.append("反方主要观点:")
|
||||
|
||||
for i, point in enumerate(con_points, 1):
|
||||
summary_parts.append(f"{i}. {point[:100]}...") # Truncate for brevity
|
||||
|
||||
summary_parts.append("")
|
||||
summary_parts.append("总结: 本次辩论完成了 {} 轮,双方就 '{}' 主题进行了充分的讨论。".format(
|
||||
len(session.rounds), session.topic
|
||||
))
|
||||
|
||||
return "\n".join(summary_parts)
|
||||
Reference in New Issue
Block a user