Files
Dialectic.Backend/internal/orchestrator/ticker.go
hzhang 15bb942d9b feat: POST /api/admin/agent-keys — system-keyed raw key minting
New admin endpoint for provisioning per-agent dialectic API keys
during recruitment. Auth via separate x-dialectic-admin-key header
matching env DIALECTIC_ADMIN_API_KEY (not bearer — admin lifecycle
is independent of agent identity).

Behavior:
- Body {agent_id, force?}; generates 32-byte hex raw key; stores
  sha256-peppered hash in agent_keys; returns raw key (ONLY time
  exposed — caller stores in agent secret-mgr)
- 409 on existing agent_id unless force:true (rotates the hash,
  clears last_used_at + revoked_at)
- Closed-by-default: if DIALECTIC_ADMIN_API_KEY env is empty, every
  request 401s

Caller pattern: skills/dialectic-hangman-lab/scripts/dialectic-ctrl
(to be added) reads admin key from
/root/.openclaw/system-secrets/dialectic-admin-key on the openclaw
host, POSTs to admin endpoint, stores returned raw key in the proxy-
for agent secret-mgr (inherits the proxy-pcexec context from
recruitment/onboard).

Unblocks Phase 3.5 plan to provision all prod agents and integrate
into recruitment skill.
2026-05-23 14:53:39 +01:00

237 lines
7.7 KiB
Go

package orchestrator
import (
"context"
"log"
"math/rand"
"time"
"github.com/jmoiron/sqlx"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
)
// Ticker drives the topic state machine. Every TickInterval it scans
// for topics with timestamps that have crossed a transition boundary
// and applies the transition atomically per topic.
//
// State transitions handled by the ticker:
//
// created → signup_open (when now >= signup_open_at)
// + post Fabric announcement
// signup_open → signup_closed (when now >= signup_close_at, allocator succeeded)
// → cancelled (allocator returned CancelReason)
// signup_closed → debating (when now >= debate_start_at; opens round 0)
//
// NOT handled by the ticker (driven elsewhere):
//
// debating → completed driven by POST /api/topics/{id}/verdict
// (judge submits; handler flips status).
// The "judging" sub-state is implicit:
// status==debating AND now>=debate_end_at.
//
// Per-topic transitions use SELECT FOR UPDATE so concurrent ticker
// instances (or future replicas) don't double-fire.
type Ticker struct {
db *sqlx.DB
topics *store.TopicStore
signups *store.SignupStore
camps *store.CampStore
rounds *store.RoundStore
announcer *fabric.Announcer
interval time.Duration
rng *rand.Rand
}
func NewTicker(
db *sqlx.DB,
topics *store.TopicStore,
signups *store.SignupStore,
camps *store.CampStore,
rounds *store.RoundStore,
announcer *fabric.Announcer,
interval time.Duration,
) *Ticker {
if interval <= 0 {
interval = 15 * time.Second
}
return &Ticker{
db: db,
topics: topics,
signups: signups,
camps: camps,
rounds: rounds,
announcer: announcer,
interval: interval,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// Run blocks until ctx is cancelled. Caller goroutines it.
func (t *Ticker) Run(ctx context.Context) {
log.Printf("orchestrator: ticker started (interval=%s, announce=%v)", t.interval, t.announcer.Enabled())
tk := time.NewTicker(t.interval)
defer tk.Stop()
// First tick immediately so startup is responsive — don't wait
// 15s for the first scan.
t.tickOnce(ctx)
for {
select {
case <-ctx.Done():
log.Printf("orchestrator: ticker stopped")
return
case <-tk.C:
t.tickOnce(ctx)
}
}
}
// tickOnce scans + applies. Errors are logged per topic; one topic
// failing doesn't stall the others.
func (t *Ticker) tickOnce(ctx context.Context) {
now := time.Now()
// 1. created → signup_open
if err := t.transitionByStatus(ctx, now,
models.TopicStatusCreated, "signup_open_at",
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
topic, err := t.topics.GetByID(ctx, topicID)
if err != nil {
return err
}
if _, err := tx.ExecContext(ctx,
`UPDATE topics SET status = ? WHERE id = ?`,
models.TopicStatusSignupOpen, topicID); err != nil {
return err
}
// Announcement is best-effort, outside the tx (network call).
go t.broadcastAnnouncement(topic)
return nil
}); err != nil {
log.Printf("orchestrator: created→signup_open scan: %v", err)
}
// 2. signup_open → signup_closed | cancelled
if err := t.transitionByStatus(ctx, now,
models.TopicStatusSignupOpen, "signup_close_at",
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
signups, err := t.signups.ListByTopic(ctx, topicID)
if err != nil {
return err
}
res := Allocate(signups, t.rng)
if res.CancelReason != "" {
_, err := tx.ExecContext(ctx,
`UPDATE topics SET status = ?, cancelled_reason = ? WHERE id = ?`,
models.TopicStatusCancelled, res.CancelReason, topicID)
log.Printf("orchestrator: topic %s cancelled at signup_close: %s",
topicID, res.CancelReason)
return err
}
if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil {
return err
}
_, err = tx.ExecContext(ctx,
`UPDATE topics SET status = ? WHERE id = ?`,
models.TopicStatusSignupClosed, topicID)
log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s",
topicID,
res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge])
return err
}); err != nil {
log.Printf("orchestrator: signup_open→signup_closed scan: %v", err)
}
// 3. signup_closed → debating (opens round 0)
if err := t.transitionByStatus(ctx, now,
models.TopicStatusSignupClosed, "debate_start_at",
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
if _, err := tx.ExecContext(ctx,
`UPDATE topics SET status = ? WHERE id = ?`,
models.TopicStatusDebating, topicID); err != nil {
return err
}
// Round 0 inserted within the tx — if commit fails we don't
// leak a half-state.
_, err := tx.ExecContext(ctx,
`INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`,
topicID)
log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID)
return err
}); err != nil {
log.Printf("orchestrator: signup_closed→debating scan: %v", err)
}
// Note: there's no explicit `debating → judging` transition in v1.
// The verdict handler enforces "status==debating AND now>=debate_end_at"
// as its preconditions; that's equivalent to a "judging" gate without
// adding a new enum value. Migration 002 will introduce the explicit
// 'judging' state when we want richer UI (e.g. "Awaiting verdict"
// distinct from "In debate"); until then this comment serves as the
// state-machine documentation for future maintainers.
}
// transitionByStatus is the shared "scan + per-row tx + apply" pattern.
// Picks all topics in `currentStatus` whose `dueColumn` <= now, opens a
// tx with SELECT FOR UPDATE, re-checks status (someone else may have
// already moved it), calls apply, commits. Errors per topic logged.
func (t *Ticker) transitionByStatus(ctx context.Context, now time.Time,
currentStatus models.TopicStatus, dueColumn string,
apply func(context.Context, *sqlx.Tx, string) error) error {
// Pull candidate IDs first (no lock); we lock per row inside the loop.
var ids []string
q := "SELECT id FROM topics WHERE status = ? AND " + dueColumn + " <= ? LIMIT 50"
if err := t.db.SelectContext(ctx, &ids, q, currentStatus, now); err != nil {
return err
}
for _, id := range ids {
if err := t.applyOne(ctx, id, currentStatus, apply); err != nil {
log.Printf("orchestrator: apply topic=%s: %v", id, err)
}
}
return nil
}
func (t *Ticker) applyOne(ctx context.Context, topicID string,
expected models.TopicStatus,
apply func(context.Context, *sqlx.Tx, string) error) error {
tx, err := t.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func() { _ = tx.Rollback() }() // safe no-op after commit
var actual models.TopicStatus
if err := tx.GetContext(ctx, &actual,
`SELECT status FROM topics WHERE id = ? FOR UPDATE`, topicID); err != nil {
return err
}
if actual != expected {
// Already transitioned by some other process — skip.
return nil
}
if err := apply(ctx, tx, topicID); err != nil {
return err
}
return tx.Commit()
}
func (t *Ticker) broadcastAnnouncement(topic *models.Topic) {
if topic == nil {
return
}
if err := t.announcer.PostTopicAnnouncement(
context.Background(),
topic.ID, topic.Title, topic.Summary,
topic.SignupOpenAt, topic.SignupCloseAt,
topic.DebateStartAt, topic.DebateEndAt,
topic.VerdictSchemaID,
); err != nil {
log.Printf("orchestrator: announce topic=%s failed: %v", topic.ID, err)
}
}