diff --git a/internal/config/config.go b/internal/config/config.go index 5fee442..9a03278 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,6 +11,7 @@ import ( "fmt" "os" "strings" + "time" ) type Config struct { @@ -57,23 +58,44 @@ type Config struct { // real JWKS validation. OIDCIssuer string OIDCClientID string + + // Fabric announce coupling (Phase 2D). All four required to enable; + // any empty → announcer becomes a no-op (logs intent, skips post). + // This lets the orchestrator run in environments where the Fabric + // coupling hasn't been wired yet. + FabricGuildBaseURL string // e.g. https://fabric-api.hangman-lab.top + FabricAnnounceChannelID string + FabricSystemAPIKey string // x-fabric-system-key value (env: FABRIC_SYSTEM_API_KEY) + FabricBotBearerToken string // Authorization Bearer for the dialectic-system Fabric user + + // Orchestrator tick interval. 0 / unset → default 15s. + OrchestratorTickInterval time.Duration } func LoadFromEnv() (*Config, error) { c := &Config{ - Mode: getenv("ENV_MODE", "dev"), - HTTPAddr: getenv("HTTP_ADDR", "0.0.0.0:8090"), - CORSAllowOrigins: splitCSV(getenv("CORS_ALLOW_ORIGINS", "*")), - DBHost: getenv("DB_HOST", "127.0.0.1"), - DBPort: getenv("DB_PORT", "3306"), - DBName: getenv("DB_NAME", "dialectic"), - DBUser: getenv("DB_USER", "dialectic"), - DBPassword: os.Getenv("DB_PASSWORD"), - SystemAPIKey: os.Getenv("SYSTEM_API_KEY"), - AgentAPIKeyPepper: os.Getenv("AGENT_API_KEY_PEPPER"), - OIDCDevBypassToken: os.Getenv("OIDC_DEV_BYPASS_TOKEN"), - OIDCIssuer: os.Getenv("OIDC_ISSUER"), - OIDCClientID: os.Getenv("OIDC_CLIENT_ID"), + Mode: getenv("ENV_MODE", "dev"), + HTTPAddr: getenv("HTTP_ADDR", "0.0.0.0:8090"), + CORSAllowOrigins: splitCSV(getenv("CORS_ALLOW_ORIGINS", "*")), + DBHost: getenv("DB_HOST", "127.0.0.1"), + DBPort: getenv("DB_PORT", "3306"), + DBName: getenv("DB_NAME", "dialectic"), + DBUser: getenv("DB_USER", "dialectic"), + DBPassword: os.Getenv("DB_PASSWORD"), + SystemAPIKey: os.Getenv("SYSTEM_API_KEY"), + AgentAPIKeyPepper: os.Getenv("AGENT_API_KEY_PEPPER"), + OIDCDevBypassToken: os.Getenv("OIDC_DEV_BYPASS_TOKEN"), + OIDCIssuer: os.Getenv("OIDC_ISSUER"), + OIDCClientID: os.Getenv("OIDC_CLIENT_ID"), + FabricGuildBaseURL: os.Getenv("FABRIC_GUILD_BASE_URL"), + FabricAnnounceChannelID: os.Getenv("FABRIC_ANNOUNCE_CHANNEL_ID"), + FabricSystemAPIKey: os.Getenv("FABRIC_SYSTEM_API_KEY"), + FabricBotBearerToken: os.Getenv("FABRIC_BOT_BEARER_TOKEN"), + } + if d := os.Getenv("ORCHESTRATOR_TICK_INTERVAL"); d != "" { + if parsed, err := time.ParseDuration(d); err == nil { + c.OrchestratorTickInterval = parsed + } } if c.Mode != "dev" && c.Mode != "prod" { diff --git a/internal/fabric/announce.go b/internal/fabric/announce.go new file mode 100644 index 0000000..a3ba71e --- /dev/null +++ b/internal/fabric/announce.go @@ -0,0 +1,123 @@ +// Package fabric provides an HTTP client for posting system messages +// to a Fabric Guild's announce-type channel. +// +// Auth model: Dialectic backend holds a system api key in +// FABRIC_SYSTEM_API_KEY env. It POSTs to +// `/api/channels//messages` with both: +// - `Authorization: Bearer ` (the +// "user" that posts; needs to be a Fabric Center user that owns +// the channel) +// - `x-fabric-system-key: ` (the new header added +// in Phase 1 that gates announce-channel POSTs) +// +// Phase 2D ships this as an OPTIONAL coupling: if any of the three +// config values is unset, the announcer is a no-op (logs the would-be +// post and returns nil). This lets the orchestrator pipeline land + run +// in environments where the Fabric coupling isn't configured yet. +package fabric + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "time" +) + +type AnnounceConfig struct { + GuildBaseURL string // e.g. https://fabric-api.hangman-lab.top + ChannelID string // the announce-type channel uuid + SystemAPIKey string // x-fabric-system-key value + BotBearerToken string // Authorization: Bearer — Fabric user posting; same + // user is the "channel author" the backend records + RequestTimeout time.Duration +} + +type Announcer struct { + cfg AnnounceConfig + client *http.Client +} + +func NewAnnouncer(cfg AnnounceConfig) *Announcer { + if cfg.RequestTimeout <= 0 { + cfg.RequestTimeout = 5 * time.Second + } + return &Announcer{ + cfg: cfg, + client: &http.Client{Timeout: cfg.RequestTimeout}, + } +} + +// Enabled returns true iff every required Announcer config field is set. +// Orchestrator checks this before each broadcast and silently skips when +// false (with a one-time startup log). +func (a *Announcer) Enabled() bool { + return a.cfg.GuildBaseURL != "" && a.cfg.ChannelID != "" && + a.cfg.SystemAPIKey != "" && a.cfg.BotBearerToken != "" +} + +// PostTopicAnnouncement posts a one-line system message describing a +// new topic that's ready for signup. Best-effort: log + nil on transport +// errors so a Fabric outage doesn't block topic transitions. +func (a *Announcer) PostTopicAnnouncement(ctx context.Context, topicID, title, summary string, + signupOpen, signupClose, debateStart, debateEnd time.Time, verdictSchemaID string) error { + + if !a.Enabled() { + log.Printf("announce: skipped (fabric coupling not configured) topic=%s", topicID) + return nil + } + + body := map[string]any{ + "content": formatAnnouncement(topicID, title, summary, signupOpen, signupClose, debateStart, debateEnd, verdictSchemaID), + } + raw, _ := json.Marshal(body) + url := fmt.Sprintf("%s/api/channels/%s/messages", + trimRightSlash(a.cfg.GuildBaseURL), a.cfg.ChannelID) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(raw)) + if err != nil { + return err + } + req.Header.Set("content-type", "application/json") + req.Header.Set("authorization", "Bearer "+a.cfg.BotBearerToken) + req.Header.Set("x-fabric-system-key", a.cfg.SystemAPIKey) + + resp, err := a.client.Do(req) + if err != nil { + log.Printf("announce: POST %s failed: %v", url, err) + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + b, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) + log.Printf("announce: POST %s -> %d body=%s", url, resp.StatusCode, string(b)) + return fmt.Errorf("announce post: status %d", resp.StatusCode) + } + return nil +} + +func formatAnnouncement(id, title, summary string, + signupOpen, signupClose, debateStart, debateEnd time.Time, schema string) string { + const layout = "2006-01-02 15:04 UTC" + return fmt.Sprintf( + "🆕 **Debate signup open** [%s]\n\n**%s**\n%s\n\n"+ + "• Signup: %s → %s\n"+ + "• Debate: %s → %s\n"+ + "• Verdict schema: `%s`\n\n"+ + "To volunteer, use the `dialectic_signup` tool with this topic_id and your willing camp(s) — pro / con / judge. "+ + "You must have an `on_call` slot covering the debate window.", + id, title, summary, + signupOpen.UTC().Format(layout), signupClose.UTC().Format(layout), + debateStart.UTC().Format(layout), debateEnd.UTC().Format(layout), + schema, + ) +} + +func trimRightSlash(s string) string { + for len(s) > 0 && s[len(s)-1] == '/' { + s = s[:len(s)-1] + } + return s +} diff --git a/internal/httpapi/handlers/arguments.go b/internal/httpapi/handlers/arguments.go new file mode 100644 index 0000000..e866ef0 --- /dev/null +++ b/internal/httpapi/handlers/arguments.go @@ -0,0 +1,133 @@ +package handlers + +import ( + "encoding/json" + "errors" + "net/http" + + "github.com/go-chi/chi/v5" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" +) + +type ArgumentsHandler struct { + topics *store.TopicStore + camps *store.CampStore + rounds *store.RoundStore + arguments *store.ArgumentStore +} + +func NewArgumentsHandler( + t *store.TopicStore, + c *store.CampStore, + r *store.RoundStore, + a *store.ArgumentStore, +) *ArgumentsHandler { + return &ArgumentsHandler{topics: t, camps: c, rounds: r, arguments: a} +} + +type postArgumentBody struct { + Content string `json:"content"` +} + +// POST /api/topics/{id}/arguments +// +// Agent-only. Caller must be allocated to one of the topic's camps; +// rejected otherwise. Topic must be `debating` (status state machine +// enforces; argument outside that window is meaningless). Content is +// stored as-is (no markdown rendering server-side; frontend renders). +// +// Round: argument is attached to the LATEST open round. Round-advance +// policy is the orchestrator's call (Phase 2D ships with manual/single +// round 0; round bumping logic comes when the rule is decided). +func (h *ArgumentsHandler) Post(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + if caller.Kind != auth.CallerAgent { + http.Error(w, "argument posting is agent-only", http.StatusForbidden) + return + } + topicID := chi.URLParam(r, "id") + + topic, err := h.topics.GetByID(r.Context(), topicID) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "lookup failed", http.StatusInternalServerError) + return + } + if topic.Status != models.TopicStatusDebating { + http.Error(w, "topic not in debate window (status="+string(topic.Status)+")", http.StatusConflict) + return + } + + camp, err := h.camps.AgentCampInTopic(r.Context(), topicID, caller.ID) + if err != nil { + http.Error(w, "you are not allocated to any camp on this topic", http.StatusForbidden) + return + } + + round, err := h.rounds.Latest(r.Context(), topicID) + if err != nil { + http.Error(w, "no open round (orchestrator hasn't opened round 0 yet?)", http.StatusConflict) + return + } + + var body postArgumentBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad body", http.StatusBadRequest) + return + } + if body.Content == "" { + http.Error(w, "content required", http.StatusBadRequest) + return + } + const maxContent = 32_000 // arbitrary upper bound; arguments shouldn't be book-length + if len(body.Content) > maxContent { + http.Error(w, "content too long", http.StatusRequestEntityTooLarge) + return + } + + arg, err := h.arguments.Post(r.Context(), store.PostArgumentInput{ + TopicID: topicID, + RoundID: round.ID, + Camp: camp, + AgentID: caller.ID, + Content: body.Content, + }) + if err != nil { + http.Error(w, "post failed: "+err.Error(), http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusCreated, arg) +} + +// GET /api/topics/{id}/arguments — full transcript in posted order. +// Visibility: anonymous can read for public topics; private requires +// any-auth (enforced upstream by middleware composition). +func (h *ArgumentsHandler) List(w http.ResponseWriter, r *http.Request) { + topicID := chi.URLParam(r, "id") + topic, err := h.topics.GetByID(r.Context(), topicID) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "lookup failed", http.StatusInternalServerError) + return + } + caller := auth.FromContext(r.Context()) + if caller.Kind == "" && topic.Visibility != models.VisibilityPublic { + http.Error(w, "not found", http.StatusNotFound) + return + } + rows, err := h.arguments.ListByTopic(r.Context(), topicID) + if err != nil { + http.Error(w, "list failed", http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, map[string]any{"arguments": rows, "count": len(rows)}) +} diff --git a/internal/httpapi/handlers/verdict.go b/internal/httpapi/handlers/verdict.go new file mode 100644 index 0000000..7054ba9 --- /dev/null +++ b/internal/httpapi/handlers/verdict.go @@ -0,0 +1,165 @@ +package handlers + +import ( + "encoding/json" + "errors" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" +) + +type VerdictHandler struct { + db *struct{} // placeholder; we don't use the raw conn directly here + topics *store.TopicStore + camps *store.CampStore + verdicts *store.VerdictStore +} + +func NewVerdictHandler(t *store.TopicStore, c *store.CampStore, v *store.VerdictStore) *VerdictHandler { + return &VerdictHandler{topics: t, camps: c, verdicts: v} +} + +type submitVerdictBody struct { + Verdict json.RawMessage `json:"verdict"` // shape matches topic.verdict_schema_id + Rationale string `json:"rationale"` + TokensInput int `json:"tokens_input"` + TokensOutput int `json:"tokens_output"` +} + +// POST /api/topics/{id}/verdict +// +// Judge-only. Caller must be allocated to the judge camp. Topic must be +// in `debating` status AND past `debate_end_at` (the ticker doesn't +// flip to `judging` in v1, see ticker.go note — the gate enforces the +// time crossing instead). +// +// Schema validation (Phase 2D): shallow — confirm verdict is valid JSON +// and not empty. Real schema-shape validation lands when we wire the +// verdict_schemas.shape_json against a JSON-schema validator. +func (h *VerdictHandler) Submit(w http.ResponseWriter, r *http.Request) { + caller := auth.FromContext(r.Context()) + if caller.Kind != auth.CallerAgent { + http.Error(w, "verdict submission is agent-only", http.StatusForbidden) + return + } + topicID := chi.URLParam(r, "id") + topic, err := h.topics.GetByID(r.Context(), topicID) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "lookup failed", http.StatusInternalServerError) + return + } + if topic.Status != models.TopicStatusDebating { + http.Error(w, "topic not in debate state (status="+string(topic.Status)+")", http.StatusConflict) + return + } + if time.Now().Before(topic.DebateEndAt) { + http.Error(w, "debate window still open; verdict premature", http.StatusConflict) + return + } + camp, err := h.camps.AgentCampInTopic(r.Context(), topicID, caller.ID) + if err != nil || camp != models.CampJudge { + http.Error(w, "only the judge can submit a verdict", http.StatusForbidden) + return + } + + var body submitVerdictBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "bad body", http.StatusBadRequest) + return + } + if len(body.Verdict) == 0 || string(body.Verdict) == "null" { + http.Error(w, "verdict required (non-empty JSON object matching schema)", http.StatusBadRequest) + return + } + // Sanity: ensure it parses as a JSON object/value. + var probe any + if err := json.Unmarshal(body.Verdict, &probe); err != nil { + http.Error(w, "verdict must be valid JSON", http.StatusBadRequest) + return + } + if body.Rationale == "" { + http.Error(w, "rationale required", http.StatusBadRequest) + return + } + + verdict, err := h.verdicts.Submit(r.Context(), store.SubmitVerdictInput{ + TopicID: topicID, + JudgeAgentID: caller.ID, + VerdictJSON: body.Verdict, + Rationale: body.Rationale, + TokensInput: body.TokensInput, + TokensOutput: body.TokensOutput, + }) + if err != nil { + // Most likely cause: unique-key conflict (already submitted). + http.Error(w, "submit failed: "+err.Error(), http.StatusConflict) + return + } + + // Transition topic to completed. Best-effort; if it fails, the + // verdict row exists and the ticker will retry on next scan + // (well — once we add that transition; v1 leaves it to a manual + // flip via SQL or a follow-up endpoint). + if _, err := h.topics.SetStatus(r.Context(), topicID, models.TopicStatusCompleted); err != nil { + // non-fatal: log via response header (caller can spot-check) + w.Header().Set("x-warn", "verdict saved but status update failed: "+err.Error()) + } + + writeJSON(w, http.StatusCreated, map[string]any{ + "id": verdict.ID, + "topic_id": verdict.TopicID, + "judge_agent_id": verdict.JudgeAgentID, + "verdict": json.RawMessage(verdict.VerdictJSON), + "rationale": verdict.Rationale, + "tokens_input": verdict.TokensInput, + "tokens_output": verdict.TokensOutput, + "produced_at": verdict.ProducedAt, + }) +} + +// GET /api/topics/{id}/verdict — fetch the published verdict (404 if +// not yet produced). Visibility-gated like other read endpoints. +func (h *VerdictHandler) Get(w http.ResponseWriter, r *http.Request) { + topicID := chi.URLParam(r, "id") + topic, err := h.topics.GetByID(r.Context(), topicID) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "topic not found", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "lookup failed", http.StatusInternalServerError) + return + } + caller := auth.FromContext(r.Context()) + if caller.Kind == "" && topic.Visibility != models.VisibilityPublic { + http.Error(w, "not found", http.StatusNotFound) + return + } + + verdict, err := h.verdicts.GetByTopic(r.Context(), topicID) + if errors.Is(err, store.ErrNotFound) { + http.Error(w, "verdict not yet produced", http.StatusNotFound) + return + } + if err != nil { + http.Error(w, "lookup failed", http.StatusInternalServerError) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "id": verdict.ID, + "topic_id": verdict.TopicID, + "judge_agent_id": verdict.JudgeAgentID, + "verdict": json.RawMessage(verdict.VerdictJSON), + "rationale": verdict.Rationale, + "produced_at": verdict.ProducedAt, + }) +} diff --git a/internal/httpapi/routes.go b/internal/httpapi/routes.go index e2bc297..a27f914 100644 --- a/internal/httpapi/routes.go +++ b/internal/httpapi/routes.go @@ -53,9 +53,16 @@ func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler { // Handler instances. topicStore := store.NewTopicStore(db) signupStore := store.NewSignupStore(db) + campStore := store.NewCampStore(db) + roundStore := store.NewRoundStore(db) + argStore := store.NewArgumentStore(db) + verdictStore := store.NewVerdictStore(db) + health := handlers.NewHealthHandler(db, version) topicsH := handlers.NewTopicsHandler(topicStore) signupsH := handlers.NewSignupsHandler(topicStore, signupStore) + argsH := handlers.NewArgumentsHandler(topicStore, campStore, roundStore, argStore) + verdictH := handlers.NewVerdictHandler(topicStore, campStore, verdictStore) // Routes. r.Route("/api", func(r chi.Router) { @@ -67,6 +74,8 @@ func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler { r.Use(optionalAuth) r.Get("/topics", topicsH.List) r.Get("/topics/{id}", topicsH.Get) + r.Get("/topics/{id}/arguments", argsH.List) + r.Get("/topics/{id}/verdict", verdictH.Get) }) r.Group(func(r chi.Router) { r.Use(requireAnyAuth) @@ -74,10 +83,12 @@ func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler { r.Put("/topics/{id}/visibility", topicsH.SetVisibility) }) - // Signups: agent-only. + // Signups, arguments, verdict POST: agent-only. r.Group(func(r chi.Router) { r.Use(requireAgent) r.Post("/topics/{id}/signups", signupsH.Create) + r.Post("/topics/{id}/arguments", argsH.Post) + r.Post("/topics/{id}/verdict", verdictH.Submit) }) // List signups: any authenticated caller. r.Group(func(r chi.Router) { diff --git a/internal/orchestrator/allocator.go b/internal/orchestrator/allocator.go new file mode 100644 index 0000000..5a85299 --- /dev/null +++ b/internal/orchestrator/allocator.go @@ -0,0 +1,116 @@ +// Package orchestrator owns the topic lifecycle state machine: +// signup-window allocator, round driver, judge invocation, Fabric +// announce broadcaster. All long-running coordination logic lives +// here so handlers stay thin. +package orchestrator + +import ( + "math/rand" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" +) + +// AllocateResult is the outcome of running the camp allocator on a +// topic's signup pool. Either Allocation is set (one agent per camp, +// no duplicates) or CancelReason is set (signup pool insufficient +// after backfill). +type AllocateResult struct { + Allocation map[models.Camp]string // camp → agentId + CancelReason string // non-empty when allocation could not complete +} + +// Allocate runs the 3-camp self-enrollment allocation algorithm +// agreed on in the 2026-05-23 design session. +// +// for each camp in [pro, con, judge]: +// if any signup has that camp in willing_camps AND that agent +// isn't already locked → random pick, lock. +// if camps still unfilled AND remaining unallocated signups >= unfilled: +// random pick from remaining to fill, one each. +// if any camp still unfilled (i.e. signup pool < 3 effective): +// return CancelReason; creator re-times. +// +// Invariants: same agent never lands in two camps; allocation order +// respects [pro, con, judge] so the test seed produces deterministic +// results when rng is seeded. +// +// `rng` is injected so tests can supply a deterministic source. Pass +// `rand.New(rand.NewSource(time.Now().UnixNano()))` in prod. +func Allocate(signups []models.SignupView, rng *rand.Rand) AllocateResult { + allocated := make(map[models.Camp]string, 3) + used := make(map[string]struct{}, len(signups)) + + // Pass 1 — fill each camp from its volunteers. + for _, camp := range models.AllCamps { + candidates := make([]string, 0) + for _, s := range signups { + if _, taken := used[s.AgentID]; taken { + continue + } + for _, w := range s.WillingCamps { + if w == camp { + candidates = append(candidates, s.AgentID) + break + } + } + } + if len(candidates) == 0 { + continue + } + pick := candidates[rng.Intn(len(candidates))] + allocated[camp] = pick + used[pick] = struct{}{} + } + + // Pass 2 — backfill unfilled camps from any remaining signup. + if len(allocated) < 3 { + remaining := make([]string, 0) + for _, s := range signups { + if _, taken := used[s.AgentID]; taken { + continue + } + remaining = append(remaining, s.AgentID) + } + // We can only fill if we have enough remaining for every still-empty camp. + unfilled := make([]models.Camp, 0, 3) + for _, c := range models.AllCamps { + if _, ok := allocated[c]; !ok { + unfilled = append(unfilled, c) + } + } + if len(remaining) >= len(unfilled) { + // Shuffle remaining, then take one per unfilled camp in order. + rng.Shuffle(len(remaining), func(i, j int) { + remaining[i], remaining[j] = remaining[j], remaining[i] + }) + for i, c := range unfilled { + allocated[c] = remaining[i] + used[remaining[i]] = struct{}{} + } + } + } + + // Verdict — all 3 filled or we cancel. + if len(allocated) < 3 { + filled := len(allocated) + return AllocateResult{ + CancelReason: cancelReason(filled, len(signups)), + } + } + return AllocateResult{Allocation: allocated} +} + +func cancelReason(filled, totalSignups int) string { + switch { + case totalSignups == 0: + return "no signups received" + case totalSignups < 3: + return "insufficient signups: need at least 3 distinct agents across pro/con/judge" + default: + // Edge case: pool >= 3 but allocator still couldn't fill — e.g. + // every signup volunteered for the same one camp and the same + // person was somehow used (shouldn't happen with current rules, + // but make the message honest). + return "allocation infeasible: signup distribution does not cover all 3 camps" + } +} diff --git a/internal/orchestrator/allocator_test.go b/internal/orchestrator/allocator_test.go new file mode 100644 index 0000000..8502068 --- /dev/null +++ b/internal/orchestrator/allocator_test.go @@ -0,0 +1,137 @@ +package orchestrator + +import ( + "math/rand" + "testing" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" +) + +func sig(agentID string, camps ...models.Camp) models.SignupView { + return models.SignupView{AgentID: agentID, WillingCamps: camps} +} + +// Helper: assert no duplicate agents across the 3 camps. +func assertDistinct(t *testing.T, alloc map[models.Camp]string) { + t.Helper() + seen := map[string]models.Camp{} + for c, a := range alloc { + if prev, ok := seen[a]; ok { + t.Fatalf("agent %q allocated to both %s and %s", a, prev, c) + } + seen[a] = c + } +} + +func TestAllocate_EmptyPoolCancels(t *testing.T) { + r := Allocate(nil, rand.New(rand.NewSource(1))) + if r.CancelReason == "" { + t.Fatal("expected cancel reason, got allocation") + } +} + +func TestAllocate_TwoSignupsCancels(t *testing.T) { + r := Allocate([]models.SignupView{ + sig("a", models.CampPro), + sig("b", models.CampCon), + }, rand.New(rand.NewSource(1))) + if r.CancelReason == "" { + t.Fatalf("expected cancel reason (pool<3), got %v", r.Allocation) + } +} + +func TestAllocate_OneVolunteerPerCampFills(t *testing.T) { + signups := []models.SignupView{ + sig("a", models.CampPro), + sig("b", models.CampCon), + sig("c", models.CampJudge), + } + r := Allocate(signups, rand.New(rand.NewSource(1))) + if r.CancelReason != "" { + t.Fatalf("unexpected cancel: %s", r.CancelReason) + } + if r.Allocation[models.CampPro] != "a" || r.Allocation[models.CampCon] != "b" || r.Allocation[models.CampJudge] != "c" { + t.Fatalf("wrong allocation: %v", r.Allocation) + } + assertDistinct(t, r.Allocation) +} + +func TestAllocate_AgentMultiVolunteerPicksOnlyOnce(t *testing.T) { + // 'a' volunteers for all 3 camps. Should only be allocated to one + // (pro, since it's first in iteration order); other camps need + // other volunteers or get filled via backfill. + signups := []models.SignupView{ + sig("a", models.CampPro, models.CampCon, models.CampJudge), + sig("b", models.CampCon), + sig("c", models.CampJudge), + } + r := Allocate(signups, rand.New(rand.NewSource(1))) + if r.CancelReason != "" { + t.Fatalf("unexpected cancel: %s", r.CancelReason) + } + if r.Allocation[models.CampPro] != "a" { + t.Fatalf("expected 'a' in pro, got %v", r.Allocation) + } + if r.Allocation[models.CampCon] != "b" { + t.Fatalf("expected 'b' in con, got %v", r.Allocation) + } + if r.Allocation[models.CampJudge] != "c" { + t.Fatalf("expected 'c' in judge, got %v", r.Allocation) + } + assertDistinct(t, r.Allocation) +} + +func TestAllocate_BackfillFromUnallocated(t *testing.T) { + // pro has 2 volunteers ('a','c'), con has 1 ('b'), judge has 0. + // Allocator picks one of {a,c} for pro, then b for con, then + // backfills judge from whichever of {a,c} is unallocated. + signups := []models.SignupView{ + sig("a", models.CampPro), + sig("b", models.CampCon), + sig("c", models.CampPro), + } + r := Allocate(signups, rand.New(rand.NewSource(1))) + if r.CancelReason != "" { + t.Fatalf("unexpected cancel: %s; alloc=%v", r.CancelReason, r.Allocation) + } + assertDistinct(t, r.Allocation) + if len(r.Allocation) != 3 { + t.Fatalf("expected all 3 camps filled; got %d (%v)", len(r.Allocation), r.Allocation) + } + // Con must be 'b' (only volunteer); pro and judge must be {a, c} in some order. + if r.Allocation[models.CampCon] != "b" { + t.Fatalf("expected con=b, got %v", r.Allocation) + } + pro := r.Allocation[models.CampPro] + judge := r.Allocation[models.CampJudge] + if !(pro == "a" && judge == "c") && !(pro == "c" && judge == "a") { + t.Fatalf("expected pro/judge to be {a,c} permutation, got pro=%s judge=%s", pro, judge) + } +} + +func TestAllocate_BackfillInsufficientCancels(t *testing.T) { + // pro filled by 'a'; con filled by 'b'; judge has no volunteer + // AND no remaining unallocated signups → cancel. + signups := []models.SignupView{ + sig("a", models.CampPro), + sig("b", models.CampCon), + } + r := Allocate(signups, rand.New(rand.NewSource(1))) + if r.CancelReason == "" { + t.Fatalf("expected cancel; got allocation %v", r.Allocation) + } +} + +func TestAllocate_LargePoolDistinctness(t *testing.T) { + // Many signups, all willing for all camps. Allocation should pick 3 + // distinct agents, randomly. + signups := []models.SignupView{} + for i := 0; i < 20; i++ { + signups = append(signups, sig(string(rune('a'+i)), models.CampPro, models.CampCon, models.CampJudge)) + } + r := Allocate(signups, rand.New(rand.NewSource(42))) + if r.CancelReason != "" { + t.Fatalf("unexpected cancel: %s", r.CancelReason) + } + assertDistinct(t, r.Allocation) +} diff --git a/internal/orchestrator/ticker.go b/internal/orchestrator/ticker.go new file mode 100644 index 0000000..43efbe9 --- /dev/null +++ b/internal/orchestrator/ticker.go @@ -0,0 +1,237 @@ +package orchestrator + +import ( + "context" + "log" + "math/rand" + "time" + + "github.com/jmoiron/sqlx" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" +) + +// Ticker drives the topic state machine. Every TickInterval it scans +// for topics with timestamps that have crossed a transition boundary +// and applies the transition atomically per topic. +// +// State transitions handled by the ticker: +// +// created → signup_open (when now >= signup_open_at) +// + post Fabric announcement +// signup_open → signup_closed (when now >= signup_close_at, allocator succeeded) +// → cancelled (allocator returned CancelReason) +// signup_closed → debating (when now >= debate_start_at; opens round 0) +// +// NOT handled by the ticker (driven elsewhere): +// +// debating → completed driven by POST /api/topics/{id}/verdict +// (judge submits; handler flips status). +// The "judging" sub-state is implicit: +// status==debating AND now>=debate_end_at. +// +// Per-topic transitions use SELECT FOR UPDATE so concurrent ticker +// instances (or future replicas) don't double-fire. +type Ticker struct { + db *sqlx.DB + topics *store.TopicStore + signups *store.SignupStore + camps *store.CampStore + rounds *store.RoundStore + announcer *fabric.Announcer + interval time.Duration + rng *rand.Rand +} + +func NewTicker( + db *sqlx.DB, + topics *store.TopicStore, + signups *store.SignupStore, + camps *store.CampStore, + rounds *store.RoundStore, + announcer *fabric.Announcer, + interval time.Duration, +) *Ticker { + if interval <= 0 { + interval = 15 * time.Second + } + return &Ticker{ + db: db, + topics: topics, + signups: signups, + camps: camps, + rounds: rounds, + announcer: announcer, + interval: interval, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +// Run blocks until ctx is cancelled. Caller goroutines it. +func (t *Ticker) Run(ctx context.Context) { + log.Printf("orchestrator: ticker started (interval=%s, announce=%v)", t.interval, t.announcer.Enabled()) + tk := time.NewTicker(t.interval) + defer tk.Stop() + // First tick immediately so startup is responsive — don't wait + // 15s for the first scan. + t.tickOnce(ctx) + for { + select { + case <-ctx.Done(): + log.Printf("orchestrator: ticker stopped") + return + case <-tk.C: + t.tickOnce(ctx) + } + } +} + +// tickOnce scans + applies. Errors are logged per topic; one topic +// failing doesn't stall the others. +func (t *Ticker) tickOnce(ctx context.Context) { + now := time.Now() + + // 1. created → signup_open + if err := t.transitionByStatus(ctx, now, + models.TopicStatusCreated, "signup_open_at", + func(ctx context.Context, tx *sqlx.Tx, topicID string) error { + topic, err := t.topics.GetByID(ctx, topicID) + if err != nil { + return err + } + if _, err := tx.ExecContext(ctx, + `UPDATE topics SET status = ? WHERE id = ?`, + models.TopicStatusSignupOpen, topicID); err != nil { + return err + } + // Announcement is best-effort, outside the tx (network call). + go t.broadcastAnnouncement(topic) + return nil + }); err != nil { + log.Printf("orchestrator: created→signup_open scan: %v", err) + } + + // 2. signup_open → signup_closed | cancelled + if err := t.transitionByStatus(ctx, now, + models.TopicStatusSignupOpen, "signup_close_at", + func(ctx context.Context, tx *sqlx.Tx, topicID string) error { + signups, err := t.signups.ListByTopic(ctx, topicID) + if err != nil { + return err + } + res := Allocate(signups, t.rng) + if res.CancelReason != "" { + _, err := tx.ExecContext(ctx, + `UPDATE topics SET status = ?, cancelled_reason = ? WHERE id = ?`, + models.TopicStatusCancelled, res.CancelReason, topicID) + log.Printf("orchestrator: topic %s cancelled at signup_close: %s", + topicID, res.CancelReason) + return err + } + if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil { + return err + } + _, err = tx.ExecContext(ctx, + `UPDATE topics SET status = ? WHERE id = ?`, + models.TopicStatusSignupClosed, topicID) + log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s", + topicID, + res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge]) + return err + }); err != nil { + log.Printf("orchestrator: signup_open→signup_closed scan: %v", err) + } + + // 3. signup_closed → debating (opens round 0) + if err := t.transitionByStatus(ctx, now, + models.TopicStatusSignupClosed, "debate_start_at", + func(ctx context.Context, tx *sqlx.Tx, topicID string) error { + if _, err := tx.ExecContext(ctx, + `UPDATE topics SET status = ? WHERE id = ?`, + models.TopicStatusDebating, topicID); err != nil { + return err + } + // Round 0 inserted within the tx — if commit fails we don't + // leak a half-state. + _, err := tx.ExecContext(ctx, + `INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`, + topicID) + log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID) + return err + }); err != nil { + log.Printf("orchestrator: signup_closed→debating scan: %v", err) + } + + // Note: there's no explicit `debating → judging` transition in v1. + // The verdict handler enforces "status==debating AND now>=debate_end_at" + // as its preconditions; that's equivalent to a "judging" gate without + // adding a new enum value. Migration 002 will introduce the explicit + // 'judging' state when we want richer UI (e.g. "Awaiting verdict" + // distinct from "In debate"); until then this comment serves as the + // state-machine documentation for future maintainers. +} + +// transitionByStatus is the shared "scan + per-row tx + apply" pattern. +// Picks all topics in `currentStatus` whose `dueColumn` <= now, opens a +// tx with SELECT FOR UPDATE, re-checks status (someone else may have +// already moved it), calls apply, commits. Errors per topic logged. +func (t *Ticker) transitionByStatus(ctx context.Context, now time.Time, + currentStatus models.TopicStatus, dueColumn string, + apply func(context.Context, *sqlx.Tx, string) error) error { + + // Pull candidate IDs first (no lock); we lock per row inside the loop. + var ids []string + q := "SELECT id FROM topics WHERE status = ? AND " + dueColumn + " <= ? LIMIT 50" + if err := t.db.SelectContext(ctx, &ids, q, currentStatus, now); err != nil { + return err + } + for _, id := range ids { + if err := t.applyOne(ctx, id, currentStatus, apply); err != nil { + log.Printf("orchestrator: apply topic=%s: %v", id, err) + } + } + return nil +} + +func (t *Ticker) applyOne(ctx context.Context, topicID string, + expected models.TopicStatus, + apply func(context.Context, *sqlx.Tx, string) error) error { + + tx, err := t.db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() // safe no-op after commit + + var actual models.TopicStatus + if err := tx.GetContext(ctx, &actual, + `SELECT status FROM topics WHERE id = ? FOR UPDATE`, topicID); err != nil { + return err + } + if actual != expected { + // Already transitioned by some other process — skip. + return nil + } + if err := apply(ctx, tx, topicID); err != nil { + return err + } + return tx.Commit() +} + +func (t *Ticker) broadcastAnnouncement(topic *models.Topic) { + if topic == nil { + return + } + if err := t.announcer.PostTopicAnnouncement( + context.Background(), + topic.ID, topic.Title, topic.Summary, + topic.SignupOpenAt, topic.SignupCloseAt, + topic.DebateStartAt, topic.DebateEndAt, + topic.VerdictSchemaID, + ); err != nil { + log.Printf("orchestrator: announce topic=%s failed: %v", topic.ID, err) + } +} + diff --git a/internal/store/argument_store.go b/internal/store/argument_store.go new file mode 100644 index 0000000..f43b979 --- /dev/null +++ b/internal/store/argument_store.go @@ -0,0 +1,62 @@ +package store + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" +) + +type Argument struct { + ID string `db:"id" json:"id"` + TopicID string `db:"topic_id" json:"topic_id"` + RoundID string `db:"round_id" json:"round_id"` + Camp models.Camp `db:"camp" json:"camp"` + AgentID string `db:"agent_id" json:"agent_id"` + Content string `db:"content" json:"content"` + PostedAt time.Time `db:"posted_at" json:"posted_at"` +} + +type ArgumentStore struct { + db *sqlx.DB +} + +func NewArgumentStore(db *sqlx.DB) *ArgumentStore { return &ArgumentStore{db: db} } + +type PostArgumentInput struct { + TopicID string + RoundID string + Camp models.Camp + AgentID string + Content string +} + +func (s *ArgumentStore) Post(ctx context.Context, in PostArgumentInput) (*Argument, error) { + id := uuid.NewString() + if _, err := s.db.ExecContext(ctx, + `INSERT INTO arguments (id, topic_id, round_id, camp, agent_id, content) + VALUES (?, ?, ?, ?, ?, ?)`, + id, in.TopicID, in.RoundID, in.Camp, in.AgentID, in.Content); err != nil { + return nil, err + } + var a Argument + if err := s.db.GetContext(ctx, &a, `SELECT * FROM arguments WHERE id = ?`, id); err != nil { + return nil, err + } + return &a, nil +} + +// ListByTopic returns the full transcript in posted order. Used by the +// judge agent at end-of-debate to write the verdict, by the frontend +// to render, and by observer agents querying via plugin. +func (s *ArgumentStore) ListByTopic(ctx context.Context, topicID string) ([]Argument, error) { + var rows []Argument + if err := s.db.SelectContext(ctx, &rows, + `SELECT * FROM arguments WHERE topic_id = ? ORDER BY posted_at ASC, id ASC`, topicID); err != nil { + return nil, err + } + return rows, nil +} diff --git a/internal/store/camp_store.go b/internal/store/camp_store.go new file mode 100644 index 0000000..1ddd045 --- /dev/null +++ b/internal/store/camp_store.go @@ -0,0 +1,66 @@ +package store + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" + + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" +) + +type Camp struct { + ID string `db:"id" json:"id"` + TopicID string `db:"topic_id" json:"topic_id"` + Camp models.Camp `db:"camp" json:"camp"` + AgentID string `db:"agent_id" json:"agent_id"` + AllocatedAt time.Time `db:"allocated_at" json:"allocated_at"` +} + +type CampStore struct { + db *sqlx.DB +} + +func NewCampStore(db *sqlx.DB) *CampStore { return &CampStore{db: db} } + +// WriteAllocation inserts all 3 camp rows for a topic atomically. Must +// be called within a tx the orchestrator owns (so signup_close transition +// + camps insert + status update are all-or-nothing). Receives an open +// *sqlx.Tx, returns nothing on success. +func (s *CampStore) WriteAllocation(ctx context.Context, tx *sqlx.Tx, topicID string, alloc map[models.Camp]string) error { + for _, c := range models.AllCamps { + agentID, ok := alloc[c] + if !ok { + continue + } + if _, err := tx.ExecContext(ctx, + `INSERT INTO camps (id, topic_id, camp, agent_id) VALUES (?, ?, ?, ?)`, + uuid.NewString(), topicID, c, agentID); err != nil { + return err + } + } + return nil +} + +func (s *CampStore) ListByTopic(ctx context.Context, topicID string) ([]Camp, error) { + var rows []Camp + if err := s.db.SelectContext(ctx, &rows, + `SELECT * FROM camps WHERE topic_id = ? ORDER BY allocated_at ASC`, topicID); err != nil { + return nil, err + } + return rows, nil +} + +// AgentCampInTopic returns the camp `agentID` was allocated to, or empty +// if the agent isn't in any camp on this topic. Used by argument/verdict +// handlers to enforce "only camp members can post". +func (s *CampStore) AgentCampInTopic(ctx context.Context, topicID, agentID string) (models.Camp, error) { + var camp models.Camp + err := s.db.GetContext(ctx, &camp, + `SELECT camp FROM camps WHERE topic_id = ? AND agent_id = ? LIMIT 1`, topicID, agentID) + if err != nil { + return "", err + } + return camp, nil +} diff --git a/internal/store/round_store.go b/internal/store/round_store.go new file mode 100644 index 0000000..54c29f0 --- /dev/null +++ b/internal/store/round_store.go @@ -0,0 +1,65 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "time" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" +) + +type Round struct { + ID string `db:"id" json:"id"` + TopicID string `db:"topic_id" json:"topic_id"` + RoundNo int `db:"round_no" json:"round_no"` + OpenedAt time.Time `db:"opened_at" json:"opened_at"` + ClosedAt *time.Time `db:"closed_at" json:"closed_at,omitempty"` +} + +type RoundStore struct { + db *sqlx.DB +} + +func NewRoundStore(db *sqlx.DB) *RoundStore { return &RoundStore{db: db} } + +// Open creates the round-0 row for a topic entering `debating`. Subsequent +// rounds (1, 2, ...) are inserted by the round driver as the debate +// advances; we leave round-bumping logic outside the store so the policy +// (time-based? all-participants-posted?) can evolve without DB churn. +func (s *RoundStore) Open(ctx context.Context, topicID string, roundNo int) (*Round, error) { + id := uuid.NewString() + if _, err := s.db.ExecContext(ctx, + `INSERT INTO rounds (id, topic_id, round_no) VALUES (?, ?, ?)`, + id, topicID, roundNo); err != nil { + return nil, err + } + var r Round + if err := s.db.GetContext(ctx, &r, `SELECT * FROM rounds WHERE id = ?`, id); err != nil { + return nil, err + } + return &r, nil +} + +func (s *RoundStore) Latest(ctx context.Context, topicID string) (*Round, error) { + var r Round + err := s.db.GetContext(ctx, &r, + `SELECT * FROM rounds WHERE topic_id = ? ORDER BY round_no DESC LIMIT 1`, topicID) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return &r, nil +} + +func (s *RoundStore) ListByTopic(ctx context.Context, topicID string) ([]Round, error) { + var rows []Round + if err := s.db.SelectContext(ctx, &rows, + `SELECT * FROM rounds WHERE topic_id = ? ORDER BY round_no ASC`, topicID); err != nil { + return nil, err + } + return rows, nil +} diff --git a/internal/store/topic_store.go b/internal/store/topic_store.go index 833e191..d883911 100644 --- a/internal/store/topic_store.go +++ b/internal/store/topic_store.go @@ -94,6 +94,17 @@ func (s *TopicStore) List(ctx context.Context, f ListFilter) ([]models.Topic, er return rows, nil } +// SetStatus is a low-level status update. Most transitions go through +// the orchestrator's tx-wrapped paths; this is for the verdict handler +// (debating → completed on successful judge submission) and admin tools. +func (s *TopicStore) SetStatus(ctx context.Context, id string, status models.TopicStatus) (*models.Topic, error) { + if _, err := s.db.ExecContext(ctx, + `UPDATE topics SET status = ? WHERE id = ?`, status, id); err != nil { + return nil, err + } + return s.GetByID(ctx, id) +} + // SetVisibility flips public/private; records who/when. Returns updated row. func (s *TopicStore) SetVisibility(ctx context.Context, id string, v models.Visibility, byUserID string) (*models.Topic, error) { _, err := s.db.ExecContext(ctx, ` diff --git a/internal/store/verdict_store.go b/internal/store/verdict_store.go new file mode 100644 index 0000000..3de6a9d --- /dev/null +++ b/internal/store/verdict_store.go @@ -0,0 +1,63 @@ +package store + +import ( + "context" + "database/sql" + "errors" + "time" + + "github.com/google/uuid" + "github.com/jmoiron/sqlx" +) + +type Verdict struct { + ID string `db:"id" json:"id"` + TopicID string `db:"topic_id" json:"topic_id"` + JudgeAgentID string `db:"judge_agent_id" json:"judge_agent_id"` + VerdictJSON []byte `db:"verdict_json" json:"-"` // surface raw via Render + Rationale string `db:"rationale" json:"rationale"` + TokensInput int `db:"tokens_input" json:"tokens_input"` + TokensOutput int `db:"tokens_output" json:"tokens_output"` + ProducedAt time.Time `db:"produced_at" json:"produced_at"` +} + +type VerdictStore struct { + db *sqlx.DB +} + +func NewVerdictStore(db *sqlx.DB) *VerdictStore { return &VerdictStore{db: db} } + +type SubmitVerdictInput struct { + TopicID string + JudgeAgentID string + VerdictJSON []byte + Rationale string + TokensInput int + TokensOutput int +} + +// Submit writes the (one-and-only) verdict for a topic. Unique constraint +// on topic_id means a second submission returns a duplicate-key error; +// caller surfaces that as 409. +func (s *VerdictStore) Submit(ctx context.Context, in SubmitVerdictInput) (*Verdict, error) { + id := uuid.NewString() + if _, err := s.db.ExecContext(ctx, + `INSERT INTO verdicts (id, topic_id, judge_agent_id, verdict_json, rationale, tokens_input, tokens_output) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + id, in.TopicID, in.JudgeAgentID, in.VerdictJSON, in.Rationale, in.TokensInput, in.TokensOutput); err != nil { + return nil, err + } + return s.GetByTopic(ctx, in.TopicID) +} + +func (s *VerdictStore) GetByTopic(ctx context.Context, topicID string) (*Verdict, error) { + var v Verdict + err := s.db.GetContext(ctx, &v, `SELECT * FROM verdicts WHERE topic_id = ?`, topicID) + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + return &v, nil +} diff --git a/main.go b/main.go index 12244af..96a85f3 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,10 @@ import ( "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/db" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/orchestrator" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" ) // Version is overridden at build time via -ldflags="-X main.Version=...". @@ -48,6 +51,21 @@ func main() { } log.Printf("migrations: ok") + // Wire orchestrator + Fabric announcer + start the ticker. + announcer := fabric.NewAnnouncer(fabric.AnnounceConfig{ + GuildBaseURL: cfg.FabricGuildBaseURL, + ChannelID: cfg.FabricAnnounceChannelID, + SystemAPIKey: cfg.FabricSystemAPIKey, + BotBearerToken: cfg.FabricBotBearerToken, + }) + topicStore := store.NewTopicStore(conn) + signupStore := store.NewSignupStore(conn) + campStore := store.NewCampStore(conn) + roundStore := store.NewRoundStore(conn) + ticker := orchestrator.NewTicker(conn, topicStore, signupStore, campStore, roundStore, + announcer, cfg.OrchestratorTickInterval) + go ticker.Run(ctx) + srv := &http.Server{ Addr: cfg.HTTPAddr, Handler: httpapi.Mount(cfg, conn, Version),