package orchestrator import ( "context" "log" "math/rand" "time" "github.com/jmoiron/sqlx" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" ) // Ticker drives the topic state machine. Every TickInterval it scans // for topics with timestamps that have crossed a transition boundary // and applies the transition atomically per topic. // // State transitions handled by the ticker: // // created → signup_open (when now >= signup_open_at) // + post Fabric announcement // signup_open → signup_closed (when now >= signup_close_at, allocator succeeded) // → cancelled (allocator returned CancelReason) // signup_closed → debating (when now >= debate_start_at; opens round 0) // // NOT handled by the ticker (driven elsewhere): // // debating → completed driven by POST /api/topics/{id}/verdict // (judge submits; handler flips status). // The "judging" sub-state is implicit: // status==debating AND now>=debate_end_at. // // Per-topic transitions use SELECT FOR UPDATE so concurrent ticker // instances (or future replicas) don't double-fire. type Ticker struct { db *sqlx.DB topics *store.TopicStore signups *store.SignupStore camps *store.CampStore rounds *store.RoundStore announcer *fabric.Announcer interval time.Duration rng *rand.Rand } func NewTicker( db *sqlx.DB, topics *store.TopicStore, signups *store.SignupStore, camps *store.CampStore, rounds *store.RoundStore, announcer *fabric.Announcer, interval time.Duration, ) *Ticker { if interval <= 0 { interval = 15 * time.Second } return &Ticker{ db: db, topics: topics, signups: signups, camps: camps, rounds: rounds, announcer: announcer, interval: interval, rng: rand.New(rand.NewSource(time.Now().UnixNano())), } } // Run blocks until ctx is cancelled. Caller goroutines it. func (t *Ticker) Run(ctx context.Context) { log.Printf("orchestrator: ticker started (interval=%s, announce=%v)", t.interval, t.announcer.Enabled()) tk := time.NewTicker(t.interval) defer tk.Stop() // First tick immediately so startup is responsive — don't wait // 15s for the first scan. t.tickOnce(ctx) for { select { case <-ctx.Done(): log.Printf("orchestrator: ticker stopped") return case <-tk.C: t.tickOnce(ctx) } } } // tickOnce scans + applies. Errors are logged per topic; one topic // failing doesn't stall the others. func (t *Ticker) tickOnce(ctx context.Context) { now := time.Now() // 1. created → signup_open if err := t.transitionByStatus(ctx, now, models.TopicStatusCreated, "signup_open_at", func(ctx context.Context, tx *sqlx.Tx, topicID string) error { topic, err := t.topics.GetByID(ctx, topicID) if err != nil { return err } if _, err := tx.ExecContext(ctx, `UPDATE topics SET status = ? WHERE id = ?`, models.TopicStatusSignupOpen, topicID); err != nil { return err } // Announcement is best-effort, outside the tx (network call). go t.broadcastAnnouncement(topic) return nil }); err != nil { log.Printf("orchestrator: created→signup_open scan: %v", err) } // 2. signup_open → signup_closed | cancelled if err := t.transitionByStatus(ctx, now, models.TopicStatusSignupOpen, "signup_close_at", func(ctx context.Context, tx *sqlx.Tx, topicID string) error { signups, err := t.signups.ListByTopic(ctx, topicID) if err != nil { return err } res := Allocate(signups, t.rng) if res.CancelReason != "" { _, err := tx.ExecContext(ctx, `UPDATE topics SET status = ?, cancelled_reason = ? WHERE id = ?`, models.TopicStatusCancelled, res.CancelReason, topicID) log.Printf("orchestrator: topic %s cancelled at signup_close: %s", topicID, res.CancelReason) return err } if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil { return err } _, err = tx.ExecContext(ctx, `UPDATE topics SET status = ? WHERE id = ?`, models.TopicStatusSignupClosed, topicID) log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s", topicID, res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge]) return err }); err != nil { log.Printf("orchestrator: signup_open→signup_closed scan: %v", err) } // 3. signup_closed → debating (opens round 0) if err := t.transitionByStatus(ctx, now, models.TopicStatusSignupClosed, "debate_start_at", func(ctx context.Context, tx *sqlx.Tx, topicID string) error { if _, err := tx.ExecContext(ctx, `UPDATE topics SET status = ? WHERE id = ?`, models.TopicStatusDebating, topicID); err != nil { return err } // Round 0 inserted within the tx — if commit fails we don't // leak a half-state. _, err := tx.ExecContext(ctx, `INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`, topicID) log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID) return err }); err != nil { log.Printf("orchestrator: signup_closed→debating scan: %v", err) } // Note: there's no explicit `debating → judging` transition in v1. // The verdict handler enforces "status==debating AND now>=debate_end_at" // as its preconditions; that's equivalent to a "judging" gate without // adding a new enum value. Migration 002 will introduce the explicit // 'judging' state when we want richer UI (e.g. "Awaiting verdict" // distinct from "In debate"); until then this comment serves as the // state-machine documentation for future maintainers. } // transitionByStatus is the shared "scan + per-row tx + apply" pattern. // Picks all topics in `currentStatus` whose `dueColumn` <= now, opens a // tx with SELECT FOR UPDATE, re-checks status (someone else may have // already moved it), calls apply, commits. Errors per topic logged. func (t *Ticker) transitionByStatus(ctx context.Context, now time.Time, currentStatus models.TopicStatus, dueColumn string, apply func(context.Context, *sqlx.Tx, string) error) error { // Pull candidate IDs first (no lock); we lock per row inside the loop. var ids []string q := "SELECT id FROM topics WHERE status = ? AND " + dueColumn + " <= ? LIMIT 50" if err := t.db.SelectContext(ctx, &ids, q, currentStatus, now); err != nil { return err } for _, id := range ids { if err := t.applyOne(ctx, id, currentStatus, apply); err != nil { log.Printf("orchestrator: apply topic=%s: %v", id, err) } } return nil } func (t *Ticker) applyOne(ctx context.Context, topicID string, expected models.TopicStatus, apply func(context.Context, *sqlx.Tx, string) error) error { tx, err := t.db.BeginTxx(ctx, nil) if err != nil { return err } defer func() { _ = tx.Rollback() }() // safe no-op after commit var actual models.TopicStatus if err := tx.GetContext(ctx, &actual, `SELECT status FROM topics WHERE id = ? FOR UPDATE`, topicID); err != nil { return err } if actual != expected { // Already transitioned by some other process — skip. return nil } if err := apply(ctx, tx, topicID); err != nil { return err } return tx.Commit() } func (t *Ticker) broadcastAnnouncement(topic *models.Topic) { if topic == nil { return } if err := t.announcer.PostTopicAnnouncement( context.Background(), topic.ID, topic.Title, topic.Summary, topic.SignupOpenAt, topic.SignupCloseAt, topic.DebateStartAt, topic.DebateEndAt, topic.VerdictSchemaID, ); err != nil { log.Printf("orchestrator: announce topic=%s failed: %v", topic.ID, err) } }