refactor(backend): drop backend-driven Fabric broadcast — agent-driven model
The backend no longer broadcasts topic lifecycle events to Fabric. The
new model (per design discussion 2026-05-23 evening):
- Proposing agent posts a single recruitment fabric-send-message
immediately after creating a topic (carries topic_id + signup
window + debate window + title).
- Downstream agents that decide to participate book a HF on_call
slot covering the debate window via `hf calendar schedule on_call
<time> <duration> --job DEBATE-<topic_id>`.
- HF wakes the agent naturally at slot start; the wake payload
carries event_data with the DEBATE-<topic_id> code so the agent
knows why it was woken.
- The backend stays a pure data + state-machine service and doesn't
know about Fabric.
Code removed:
- internal/fabric/announce.go (entire file + empty dir)
- ticker.go: broadcastLifecycle + broadcastAnnouncement + topicTarget
helpers; announcer field on Ticker; announce field/arg on NewTicker
- models/topic.go: AnnounceGuildBaseURL + AnnounceChannelID fields
- store/topic_store.go: same fields on CreateTopicInput + INSERT
- handlers/topics.go: same fields on createTopicBody + validation +
parameter passing to store
- handlers/verdict.go: announcer field + lifecycle broadcast on
verdict submit
- config/config.go: FabricSystemAPIKey field + DIALECTIC_FABRIC_SYSTEM_API_KEY
env read
- main.go + routes.go: announcer wiring
Database:
- migrations/003_drop_topic_announce_target.sql drops the two columns
added by migration 002. Counterpart commit on the deployment side
needs DIALECTIC_FABRIC_SYSTEM_API_KEY env removed from
docker-compose.yml; harmless if left as the backend no longer
reads it.
Pairs with:
- Dialectic.OpenclawPlugin: rip announce_* params from
dialectic_propose_topic (next commit)
- Fabric.Backend.Center: rip serviceEndpoint field + cli
- Fabric.Backend.Guild: rip system-key bypass on ApiKeyGuard and
announce-only-system limit on messaging.controller
- ClawSkills: rewrite participate-debate + analyze-intel step 4 +
delete rotate-fabric-system-key workflow
This commit is contained in:
@@ -2,14 +2,12 @@ package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
@@ -21,7 +19,6 @@ import (
|
||||
// State transitions handled by the ticker:
|
||||
//
|
||||
// created → signup_open (when now >= signup_open_at)
|
||||
// + post Fabric announcement
|
||||
// signup_open → signup_closed (when now >= signup_close_at, allocator succeeded)
|
||||
// → cancelled (allocator returned CancelReason)
|
||||
// signup_closed → debating (when now >= debate_start_at; opens round 0)
|
||||
@@ -35,15 +32,21 @@ import (
|
||||
//
|
||||
// Per-topic transitions use SELECT FOR UPDATE so concurrent ticker
|
||||
// instances (or future replicas) don't double-fire.
|
||||
//
|
||||
// Lifecycle broadcasting moved out-of-backend (Aug 2026): the proposing
|
||||
// agent posts a single recruitment fabric-send-message after creating a
|
||||
// topic; downstream agents book HF on_call slots covering the debate
|
||||
// window via `hf calendar schedule`, and HF wakes them naturally. The
|
||||
// backend stays a pure data + state-machine service and doesn't know
|
||||
// about Fabric.
|
||||
type Ticker struct {
|
||||
db *sqlx.DB
|
||||
topics *store.TopicStore
|
||||
signups *store.SignupStore
|
||||
camps *store.CampStore
|
||||
rounds *store.RoundStore
|
||||
announcer *fabric.Announcer
|
||||
interval time.Duration
|
||||
rng *rand.Rand
|
||||
db *sqlx.DB
|
||||
topics *store.TopicStore
|
||||
signups *store.SignupStore
|
||||
camps *store.CampStore
|
||||
rounds *store.RoundStore
|
||||
interval time.Duration
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
func NewTicker(
|
||||
@@ -52,27 +55,25 @@ func NewTicker(
|
||||
signups *store.SignupStore,
|
||||
camps *store.CampStore,
|
||||
rounds *store.RoundStore,
|
||||
announcer *fabric.Announcer,
|
||||
interval time.Duration,
|
||||
) *Ticker {
|
||||
if interval <= 0 {
|
||||
interval = 15 * time.Second
|
||||
}
|
||||
return &Ticker{
|
||||
db: db,
|
||||
topics: topics,
|
||||
signups: signups,
|
||||
camps: camps,
|
||||
rounds: rounds,
|
||||
announcer: announcer,
|
||||
interval: interval,
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
db: db,
|
||||
topics: topics,
|
||||
signups: signups,
|
||||
camps: camps,
|
||||
rounds: rounds,
|
||||
interval: interval,
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
// Run blocks until ctx is cancelled. Caller goroutines it.
|
||||
func (t *Ticker) Run(ctx context.Context) {
|
||||
log.Printf("orchestrator: ticker started (interval=%s, announce=%v)", t.interval, t.announcer.Enabled())
|
||||
log.Printf("orchestrator: ticker started (interval=%s)", t.interval)
|
||||
tk := time.NewTicker(t.interval)
|
||||
defer tk.Stop()
|
||||
// First tick immediately so startup is responsive — don't wait
|
||||
@@ -81,7 +82,7 @@ func (t *Ticker) Run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("orchestrator: ticker stopped")
|
||||
log.Printf("orchestrator: ticker stopping")
|
||||
return
|
||||
case <-tk.C:
|
||||
t.tickOnce(ctx)
|
||||
@@ -89,8 +90,6 @@ func (t *Ticker) Run(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// tickOnce scans + applies. Errors are logged per topic; one topic
|
||||
// failing doesn't stall the others.
|
||||
func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
now := time.Now()
|
||||
|
||||
@@ -98,17 +97,11 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusCreated, "signup_open_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
topic, err := t.topics.GetByID(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`,
|
||||
models.TopicStatusSignupOpen, topicID); err != nil {
|
||||
return err
|
||||
}
|
||||
// Announcement is best-effort, outside the tx (network call).
|
||||
go t.broadcastAnnouncement(topic)
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: created→signup_open scan: %v", err)
|
||||
@@ -118,10 +111,6 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusSignupOpen, "signup_close_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
topic, err := t.topics.GetByID(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
signups, err := t.signups.ListByTopic(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -133,10 +122,6 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
models.TopicStatusCancelled, res.CancelReason, topicID)
|
||||
log.Printf("orchestrator: topic %s cancelled at signup_close: %s",
|
||||
topicID, res.CancelReason)
|
||||
if err == nil {
|
||||
go t.broadcastLifecycle(topic, "cancelled",
|
||||
fmt.Sprintf("debate cancelled at signup close - %s", res.CancelReason))
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil {
|
||||
@@ -148,14 +133,6 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s",
|
||||
topicID,
|
||||
res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge])
|
||||
if err == nil {
|
||||
go t.broadcastLifecycle(topic, "signup_closed",
|
||||
fmt.Sprintf("camps allocated — pro=%s con=%s judge=%s. Debate starts at %s",
|
||||
res.Allocation[models.CampPro],
|
||||
res.Allocation[models.CampCon],
|
||||
res.Allocation[models.CampJudge],
|
||||
topic.DebateStartAt.UTC().Format("2006-01-02 15:04 UTC")))
|
||||
}
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: signup_open→signup_closed scan: %v", err)
|
||||
@@ -165,10 +142,6 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusSignupClosed, "debate_start_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
topic, err := t.topics.GetByID(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`,
|
||||
models.TopicStatusDebating, topicID); err != nil {
|
||||
@@ -176,15 +149,10 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
}
|
||||
// Round 0 inserted within the tx — if commit fails we don't
|
||||
// leak a half-state.
|
||||
_, err = tx.ExecContext(ctx,
|
||||
_, err := tx.ExecContext(ctx,
|
||||
`INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`,
|
||||
topicID)
|
||||
log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID)
|
||||
if err == nil {
|
||||
go t.broadcastLifecycle(topic, "debating",
|
||||
fmt.Sprintf("debate is live — pro/con post arguments; judge stays mostly silent until debate_end_at (%s). Use participate-debate workflow.",
|
||||
topic.DebateEndAt.UTC().Format("2006-01-02 15:04 UTC")))
|
||||
}
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: signup_closed→debating scan: %v", err)
|
||||
@@ -193,10 +161,7 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
// Note: there's no explicit `debating → judging` transition in v1.
|
||||
// The verdict handler enforces "status==debating AND now>=debate_end_at"
|
||||
// as its preconditions; that's equivalent to a "judging" gate without
|
||||
// adding a new enum value. Migration 002 will introduce the explicit
|
||||
// 'judging' state when we want richer UI (e.g. "Awaiting verdict"
|
||||
// distinct from "In debate"); until then this comment serves as the
|
||||
// state-machine documentation for future maintainers.
|
||||
// adding a new enum value.
|
||||
}
|
||||
|
||||
// transitionByStatus is the shared "scan + per-row tx + apply" pattern.
|
||||
@@ -245,48 +210,3 @@ func (t *Ticker) applyOne(ctx context.Context, topicID string,
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// broadcastLifecycle wraps the announcer's lifecycle-event post with
|
||||
// the standard signup_closed / cancelled / debating / completed
|
||||
// formats. Best-effort; runs in its own goroutine outside any tx.
|
||||
// Target is resolved from the topic's per-topic announce columns;
|
||||
// null on either column → announcer skips with a log (creator opted
|
||||
// out of broadcasts).
|
||||
func (t *Ticker) broadcastLifecycle(topic *models.Topic, kind, summary string) {
|
||||
if topic == nil {
|
||||
return
|
||||
}
|
||||
if err := t.announcer.PostLifecycleEvent(
|
||||
context.Background(), topicTarget(topic), topic.ID, topic.Title, kind, summary,
|
||||
); err != nil {
|
||||
log.Printf("orchestrator: lifecycle broadcast topic=%s kind=%s failed: %v", topic.ID, kind, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Ticker) broadcastAnnouncement(topic *models.Topic) {
|
||||
if topic == nil {
|
||||
return
|
||||
}
|
||||
if err := t.announcer.PostTopicAnnouncement(
|
||||
context.Background(), topicTarget(topic),
|
||||
topic.ID, topic.Title, topic.Summary,
|
||||
topic.SignupOpenAt, topic.SignupCloseAt,
|
||||
topic.DebateStartAt, topic.DebateEndAt,
|
||||
topic.VerdictSchemaID,
|
||||
); err != nil {
|
||||
log.Printf("orchestrator: announce topic=%s failed: %v", topic.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// topicTarget extracts the per-topic announce target from the topic
|
||||
// row; returns zero-value Target if either column is null (which the
|
||||
// announcer treats as "skip").
|
||||
func topicTarget(topic *models.Topic) fabric.Target {
|
||||
if topic.AnnounceGuildBaseURL == nil || topic.AnnounceChannelID == nil {
|
||||
return fabric.Target{}
|
||||
}
|
||||
return fabric.Target{
|
||||
GuildBaseURL: *topic.AnnounceGuildBaseURL,
|
||||
ChannelID: *topic.AnnounceChannelID,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user