Files
Dialectic.Backend/internal/orchestrator/ticker.go
hzhang a43ff2de62 feat: per-topic announce target (move guild+channel from env to topic row)
Operator decision: backend env hard-coding a single guild/channel was
wrong because (a) one Center can host many guilds and (b) one guild
can have many announce channels for different purposes. The
proposing agent now chooses where this topic's lifecycle events go,
passed as create-topic params and stored on the topic row.

Schema migration 002:
- ALTER topics ADD announce_guild_base_url VARCHAR(255) NULL,
                  announce_channel_id     VARCHAR(64)  NULL.
- Both nullable; one-of-two is rejected at POST time; both null =
  topic creator opted out of broadcasts (announcer skips with log).

handlers/topics.go: createTopicBody adds announce_guild_base_url +
announce_channel_id; validates both-or-neither.

fabric/announce.go: rewritten signature. NewAnnouncer takes only
the system api key. PostTopicAnnouncement + PostLifecycleEvent take
a Target {GuildBaseURL, ChannelID} per call. Zero-value Target -> skip.

orchestrator/ticker.go: new helper topicTarget(topic) extracts the
target from the topic row; all broadcasts route through it.

verdict.go: same per-topic target extraction at completion.

config: removed FabricGuildBaseURL, FabricAnnounceChannelID,
FabricBotBearerToken from the Config struct + env reads.
FabricSystemAPIKey env renamed to DIALECTIC_FABRIC_SYSTEM_API_KEY
to disambiguate from the Fabric backend's own
FABRIC_BACKEND_GUILD_SYSTEM_API_KEY (operator: paste the same value
into both - one says "I am the system caller", the other says "I
accept this caller as system").

FABRIC_BOT_BEARER_TOKEN is gone entirely. The upgraded Guild
ApiKeyGuard accepts x-fabric-system-key alone for announce posts;
no per-user Bearer needed. Pairs with the matching change on
nav/Fabric.Backend.Guild commit 985b06a.
2026-05-23 17:53:30 +01:00

293 lines
9.8 KiB
Go

package orchestrator
import (
"context"
"fmt"
"log"
"math/rand"
"time"
"github.com/jmoiron/sqlx"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
)
// Ticker drives the topic state machine. Every TickInterval it scans
// for topics with timestamps that have crossed a transition boundary
// and applies the transition atomically per topic.
//
// State transitions handled by the ticker:
//
// created → signup_open (when now >= signup_open_at)
// + post Fabric announcement
// signup_open → signup_closed (when now >= signup_close_at, allocator succeeded)
// → cancelled (allocator returned CancelReason)
// signup_closed → debating (when now >= debate_start_at; opens round 0)
//
// NOT handled by the ticker (driven elsewhere):
//
// debating → completed driven by POST /api/topics/{id}/verdict
// (judge submits; handler flips status).
// The "judging" sub-state is implicit:
// status==debating AND now>=debate_end_at.
//
// Per-topic transitions use SELECT FOR UPDATE so concurrent ticker
// instances (or future replicas) don't double-fire.
type Ticker struct {
db *sqlx.DB
topics *store.TopicStore
signups *store.SignupStore
camps *store.CampStore
rounds *store.RoundStore
announcer *fabric.Announcer
interval time.Duration
rng *rand.Rand
}
func NewTicker(
db *sqlx.DB,
topics *store.TopicStore,
signups *store.SignupStore,
camps *store.CampStore,
rounds *store.RoundStore,
announcer *fabric.Announcer,
interval time.Duration,
) *Ticker {
if interval <= 0 {
interval = 15 * time.Second
}
return &Ticker{
db: db,
topics: topics,
signups: signups,
camps: camps,
rounds: rounds,
announcer: announcer,
interval: interval,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// Run blocks until ctx is cancelled. Caller goroutines it.
func (t *Ticker) Run(ctx context.Context) {
log.Printf("orchestrator: ticker started (interval=%s, announce=%v)", t.interval, t.announcer.Enabled())
tk := time.NewTicker(t.interval)
defer tk.Stop()
// First tick immediately so startup is responsive — don't wait
// 15s for the first scan.
t.tickOnce(ctx)
for {
select {
case <-ctx.Done():
log.Printf("orchestrator: ticker stopped")
return
case <-tk.C:
t.tickOnce(ctx)
}
}
}
// tickOnce scans + applies. Errors are logged per topic; one topic
// failing doesn't stall the others.
func (t *Ticker) tickOnce(ctx context.Context) {
now := time.Now()
// 1. created → signup_open
if err := t.transitionByStatus(ctx, now,
models.TopicStatusCreated, "signup_open_at",
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
topic, err := t.topics.GetByID(ctx, topicID)
if err != nil {
return err
}
if _, err := tx.ExecContext(ctx,
`UPDATE topics SET status = ? WHERE id = ?`,
models.TopicStatusSignupOpen, topicID); err != nil {
return err
}
// Announcement is best-effort, outside the tx (network call).
go t.broadcastAnnouncement(topic)
return nil
}); err != nil {
log.Printf("orchestrator: created→signup_open scan: %v", err)
}
// 2. signup_open → signup_closed | cancelled
if err := t.transitionByStatus(ctx, now,
models.TopicStatusSignupOpen, "signup_close_at",
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
topic, err := t.topics.GetByID(ctx, topicID)
if err != nil {
return err
}
signups, err := t.signups.ListByTopic(ctx, topicID)
if err != nil {
return err
}
res := Allocate(signups, t.rng)
if res.CancelReason != "" {
_, err := tx.ExecContext(ctx,
`UPDATE topics SET status = ?, cancelled_reason = ? WHERE id = ?`,
models.TopicStatusCancelled, res.CancelReason, topicID)
log.Printf("orchestrator: topic %s cancelled at signup_close: %s",
topicID, res.CancelReason)
if err == nil {
go t.broadcastLifecycle(topic, "cancelled",
fmt.Sprintf("debate cancelled at signup close - %s", res.CancelReason))
}
return err
}
if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil {
return err
}
_, err = tx.ExecContext(ctx,
`UPDATE topics SET status = ? WHERE id = ?`,
models.TopicStatusSignupClosed, topicID)
log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s",
topicID,
res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge])
if err == nil {
go t.broadcastLifecycle(topic, "signup_closed",
fmt.Sprintf("camps allocated — pro=%s con=%s judge=%s. Debate starts at %s",
res.Allocation[models.CampPro],
res.Allocation[models.CampCon],
res.Allocation[models.CampJudge],
topic.DebateStartAt.UTC().Format("2006-01-02 15:04 UTC")))
}
return err
}); err != nil {
log.Printf("orchestrator: signup_open→signup_closed scan: %v", err)
}
// 3. signup_closed → debating (opens round 0)
if err := t.transitionByStatus(ctx, now,
models.TopicStatusSignupClosed, "debate_start_at",
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
topic, err := t.topics.GetByID(ctx, topicID)
if err != nil {
return err
}
if _, err := tx.ExecContext(ctx,
`UPDATE topics SET status = ? WHERE id = ?`,
models.TopicStatusDebating, topicID); err != nil {
return err
}
// Round 0 inserted within the tx — if commit fails we don't
// leak a half-state.
_, err = tx.ExecContext(ctx,
`INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`,
topicID)
log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID)
if err == nil {
go t.broadcastLifecycle(topic, "debating",
fmt.Sprintf("debate is live — pro/con post arguments; judge stays mostly silent until debate_end_at (%s). Use participate-debate workflow.",
topic.DebateEndAt.UTC().Format("2006-01-02 15:04 UTC")))
}
return err
}); err != nil {
log.Printf("orchestrator: signup_closed→debating scan: %v", err)
}
// Note: there's no explicit `debating → judging` transition in v1.
// The verdict handler enforces "status==debating AND now>=debate_end_at"
// as its preconditions; that's equivalent to a "judging" gate without
// adding a new enum value. Migration 002 will introduce the explicit
// 'judging' state when we want richer UI (e.g. "Awaiting verdict"
// distinct from "In debate"); until then this comment serves as the
// state-machine documentation for future maintainers.
}
// transitionByStatus is the shared "scan + per-row tx + apply" pattern.
// Picks all topics in `currentStatus` whose `dueColumn` <= now, opens a
// tx with SELECT FOR UPDATE, re-checks status (someone else may have
// already moved it), calls apply, commits. Errors per topic logged.
func (t *Ticker) transitionByStatus(ctx context.Context, now time.Time,
currentStatus models.TopicStatus, dueColumn string,
apply func(context.Context, *sqlx.Tx, string) error) error {
// Pull candidate IDs first (no lock); we lock per row inside the loop.
var ids []string
q := "SELECT id FROM topics WHERE status = ? AND " + dueColumn + " <= ? LIMIT 50"
if err := t.db.SelectContext(ctx, &ids, q, currentStatus, now); err != nil {
return err
}
for _, id := range ids {
if err := t.applyOne(ctx, id, currentStatus, apply); err != nil {
log.Printf("orchestrator: apply topic=%s: %v", id, err)
}
}
return nil
}
func (t *Ticker) applyOne(ctx context.Context, topicID string,
expected models.TopicStatus,
apply func(context.Context, *sqlx.Tx, string) error) error {
tx, err := t.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }() // safe no-op after commit
var actual models.TopicStatus
if err := tx.GetContext(ctx, &actual,
`SELECT status FROM topics WHERE id = ? FOR UPDATE`, topicID); err != nil {
return err
}
if actual != expected {
// Already transitioned by some other process — skip.
return nil
}
if err := apply(ctx, tx, topicID); err != nil {
return err
}
return tx.Commit()
}
// broadcastLifecycle wraps the announcer's lifecycle-event post with
// the standard signup_closed / cancelled / debating / completed
// formats. Best-effort; runs in its own goroutine outside any tx.
// Target is resolved from the topic's per-topic announce columns;
// null on either column → announcer skips with a log (creator opted
// out of broadcasts).
func (t *Ticker) broadcastLifecycle(topic *models.Topic, kind, summary string) {
if topic == nil {
return
}
if err := t.announcer.PostLifecycleEvent(
context.Background(), topicTarget(topic), topic.ID, topic.Title, kind, summary,
); err != nil {
log.Printf("orchestrator: lifecycle broadcast topic=%s kind=%s failed: %v", topic.ID, kind, err)
}
}
func (t *Ticker) broadcastAnnouncement(topic *models.Topic) {
if topic == nil {
return
}
if err := t.announcer.PostTopicAnnouncement(
context.Background(), topicTarget(topic),
topic.ID, topic.Title, topic.Summary,
topic.SignupOpenAt, topic.SignupCloseAt,
topic.DebateStartAt, topic.DebateEndAt,
topic.VerdictSchemaID,
); err != nil {
log.Printf("orchestrator: announce topic=%s failed: %v", topic.ID, err)
}
}
// topicTarget extracts the per-topic announce target from the topic
// row; returns zero-value Target if either column is null (which the
// announcer treats as "skip").
func topicTarget(topic *models.Topic) fabric.Target {
if topic.AnnounceGuildBaseURL == nil || topic.AnnounceChannelID == nil {
return fabric.Target{}
}
return fabric.Target{
GuildBaseURL: *topic.AnnounceGuildBaseURL,
ChannelID: *topic.AnnounceChannelID,
}
}