From b2a0cac4606527eaa3dd95cadb2c39ad03c73861 Mon Sep 17 00:00:00 2001 From: hzhang Date: Sat, 23 May 2026 15:02:58 +0100 Subject: [PATCH] feat: lifecycle broadcasts on signup_closed / cancelled / debating / completed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 push-wakeup mechanism without adding a new push channel. Topic state transitions now post short messages to the same Fabric announce channel used for the initial signup announcement. Agents subscribed to announce + not currently busy get woken via the existing Phase 1 inbound path; busy-discard already filters appropriately. No SSE, no per-agent DM fanout, no plugin changes — reuses existing infra end-to-end. Changes: - ticker.go: after signup_close transition, broadcasts signup_closed (with pro/con/judge agent IDs + debate-start time) OR cancelled (with reason). After debate_start transition, broadcasts debating with debate-end time. - announce.go: new PostLifecycleEvent helper - same headers/auth as PostTopicAnnouncement, different format. - verdict.go: after successful judge submission, broadcasts completed with the judge id. Best-effort + async so a slow Fabric does not slow the judge response. - routes.go: instantiates the announcer once + passes to VerdictHandler. Workflow participate-debate step 5 should be updated to expect wakeups instead of polling - separate follow-up edit on lyn/ClawSkills. --- internal/fabric/announce.go | 41 +++++++++++++++++++++++++++ internal/httpapi/handlers/verdict.go | 27 ++++++++++++++---- internal/httpapi/routes.go | 9 +++++- internal/orchestrator/ticker.go | 42 +++++++++++++++++++++++++++- 4 files changed, 111 insertions(+), 8 deletions(-) diff --git a/internal/fabric/announce.go b/internal/fabric/announce.go index a3ba71e..5211bc6 100644 --- a/internal/fabric/announce.go +++ b/internal/fabric/announce.go @@ -115,6 +115,47 @@ func formatAnnouncement(id, title, summary string, ) } +// PostLifecycleEvent broadcasts a non-signup state change to the +// announce channel. The agents' Fabric inbound + busy-discard +// (Phase 1) handles per-agent wakeup gating — no per-agent fanout +// needed from this side. Sole purpose is to let agents see "your +// debate moved to X" without polling. +// +// Currently called for: signup_closed (with allocated camps), +// cancelled, debating-start, completed. Format is plain text with the +// topic_id so agents' workflows can parse and route. +func (a *Announcer) PostLifecycleEvent(ctx context.Context, topicID, title, kind, summary string) error { + if !a.Enabled() { + log.Printf("announce: lifecycle skipped (fabric coupling not configured) topic=%s kind=%s", topicID, kind) + return nil + } + body := map[string]any{ + "content": fmt.Sprintf("📣 **[%s]** %s [%s]\n%s", kind, title, topicID, summary), + } + raw, _ := json.Marshal(body) + url := fmt.Sprintf("%s/api/channels/%s/messages", + trimRightSlash(a.cfg.GuildBaseURL), a.cfg.ChannelID) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(raw)) + if err != nil { + return err + } + req.Header.Set("content-type", "application/json") + req.Header.Set("authorization", "Bearer "+a.cfg.BotBearerToken) + req.Header.Set("x-fabric-system-key", a.cfg.SystemAPIKey) + resp, err := a.client.Do(req) + if err != nil { + log.Printf("announce: lifecycle POST %s failed: %v", url, err) + return err + } + defer resp.Body.Close() + if resp.StatusCode >= 300 { + b, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) + log.Printf("announce: lifecycle POST %s -> %d body=%s", url, resp.StatusCode, string(b)) + return fmt.Errorf("lifecycle post: status %d", resp.StatusCode) + } + return nil +} + func trimRightSlash(s string) string { for len(s) > 0 && s[len(s)-1] == '/' { s = s[:len(s)-1] diff --git a/internal/httpapi/handlers/verdict.go b/internal/httpapi/handlers/verdict.go index 7054ba9..8fb4073 100644 --- a/internal/httpapi/handlers/verdict.go +++ b/internal/httpapi/handlers/verdict.go @@ -1,27 +1,30 @@ package handlers import ( + "context" "encoding/json" "errors" + "fmt" "net/http" "time" "github.com/go-chi/chi/v5" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" ) type VerdictHandler struct { - db *struct{} // placeholder; we don't use the raw conn directly here - topics *store.TopicStore - camps *store.CampStore - verdicts *store.VerdictStore + topics *store.TopicStore + camps *store.CampStore + verdicts *store.VerdictStore + announcer *fabric.Announcer // optional; nil-safe via Enabled() check } -func NewVerdictHandler(t *store.TopicStore, c *store.CampStore, v *store.VerdictStore) *VerdictHandler { - return &VerdictHandler{topics: t, camps: c, verdicts: v} +func NewVerdictHandler(t *store.TopicStore, c *store.CampStore, v *store.VerdictStore, ann *fabric.Announcer) *VerdictHandler { + return &VerdictHandler{topics: t, camps: c, verdicts: v, announcer: ann} } type submitVerdictBody struct { @@ -114,6 +117,18 @@ func (h *VerdictHandler) Submit(w http.ResponseWriter, r *http.Request) { w.Header().Set("x-warn", "verdict saved but status update failed: "+err.Error()) } + // Lifecycle broadcast — completed event. Best-effort; runs async + // outside the request context so a slow Fabric doesn't slow the + // judge's response. + if h.announcer != nil { + go func(t *fabric.Announcer, tID, title string, judge string) { + summary := fmt.Sprintf("verdict published by judge=%s. Use dialectic_view_verdict to see the structured result.", judge) + if err := t.PostLifecycleEvent(context.Background(), tID, title, "completed", summary); err != nil { + // silent failure; the announcer logs internally + } + }(h.announcer, verdict.TopicID, topic.Title, caller.ID) + } + writeJSON(w, http.StatusCreated, map[string]any{ "id": verdict.ID, "topic_id": verdict.TopicID, diff --git a/internal/httpapi/routes.go b/internal/httpapi/routes.go index 98ee034..a0e4854 100644 --- a/internal/httpapi/routes.go +++ b/internal/httpapi/routes.go @@ -11,6 +11,7 @@ import ( "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config" + "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi/handlers" "git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store" ) @@ -62,7 +63,13 @@ func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler { topicsH := handlers.NewTopicsHandler(topicStore) signupsH := handlers.NewSignupsHandler(topicStore, signupStore) argsH := handlers.NewArgumentsHandler(topicStore, campStore, roundStore, argStore) - verdictH := handlers.NewVerdictHandler(topicStore, campStore, verdictStore) + announcer := fabric.NewAnnouncer(fabric.AnnounceConfig{ + GuildBaseURL: cfg.FabricGuildBaseURL, + ChannelID: cfg.FabricAnnounceChannelID, + SystemAPIKey: cfg.FabricSystemAPIKey, + BotBearerToken: cfg.FabricBotBearerToken, + }) + verdictH := handlers.NewVerdictHandler(topicStore, campStore, verdictStore, announcer) adminH := handlers.NewAdminHandler(db, cfg.AgentAPIKeyPepper, cfg.DialecticAdminAPIKey) // Routes. diff --git a/internal/orchestrator/ticker.go b/internal/orchestrator/ticker.go index c56c998..0db8501 100644 --- a/internal/orchestrator/ticker.go +++ b/internal/orchestrator/ticker.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "fmt" "log" "math/rand" "time" @@ -117,6 +118,10 @@ func (t *Ticker) tickOnce(ctx context.Context) { if err := t.transitionByStatus(ctx, now, models.TopicStatusSignupOpen, "signup_close_at", func(ctx context.Context, tx *sqlx.Tx, topicID string) error { + topic, err := t.topics.GetByID(ctx, topicID) + if err != nil { + return err + } signups, err := t.signups.ListByTopic(ctx, topicID) if err != nil { return err @@ -128,6 +133,10 @@ func (t *Ticker) tickOnce(ctx context.Context) { models.TopicStatusCancelled, res.CancelReason, topicID) log.Printf("orchestrator: topic %s cancelled at signup_close: %s", topicID, res.CancelReason) + if err == nil { + go t.broadcastLifecycle(topic, "cancelled", + fmt.Sprintf("debate cancelled at signup close — %s", res.CancelReason)) + } return err } if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil { @@ -139,6 +148,14 @@ func (t *Ticker) tickOnce(ctx context.Context) { log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s", topicID, res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge]) + if err == nil { + go t.broadcastLifecycle(topic, "signup_closed", + fmt.Sprintf("camps allocated — pro=%s con=%s judge=%s. Debate starts at %s", + res.Allocation[models.CampPro], + res.Allocation[models.CampCon], + res.Allocation[models.CampJudge], + topic.DebateStartAt.UTC().Format("2006-01-02 15:04 UTC"))) + } return err }); err != nil { log.Printf("orchestrator: signup_open→signup_closed scan: %v", err) @@ -148,6 +165,10 @@ func (t *Ticker) tickOnce(ctx context.Context) { if err := t.transitionByStatus(ctx, now, models.TopicStatusSignupClosed, "debate_start_at", func(ctx context.Context, tx *sqlx.Tx, topicID string) error { + topic, err := t.topics.GetByID(ctx, topicID) + if err != nil { + return err + } if _, err := tx.ExecContext(ctx, `UPDATE topics SET status = ? WHERE id = ?`, models.TopicStatusDebating, topicID); err != nil { @@ -155,10 +176,15 @@ func (t *Ticker) tickOnce(ctx context.Context) { } // Round 0 inserted within the tx — if commit fails we don't // leak a half-state. - _, err := tx.ExecContext(ctx, + _, err = tx.ExecContext(ctx, `INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`, topicID) log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID) + if err == nil { + go t.broadcastLifecycle(topic, "debating", + fmt.Sprintf("debate is live — pro/con post arguments; judge stays mostly silent until debate_end_at (%s). Use participate-debate workflow.", + topic.DebateEndAt.UTC().Format("2006-01-02 15:04 UTC"))) + } return err }); err != nil { log.Printf("orchestrator: signup_closed→debating scan: %v", err) @@ -220,6 +246,20 @@ func (t *Ticker) applyOne(ctx context.Context, topicID string, return tx.Commit() } +// broadcastLifecycle wraps the announcer's lifecycle-event post with +// the standard signup_closed / cancelled / debating / completed +// formats. Best-effort; runs in its own goroutine outside any tx. +func (t *Ticker) broadcastLifecycle(topic *models.Topic, kind, summary string) { + if topic == nil { + return + } + if err := t.announcer.PostLifecycleEvent( + context.Background(), topic.ID, topic.Title, kind, summary, + ); err != nil { + log.Printf("orchestrator: lifecycle broadcast topic=%s kind=%s failed: %v", topic.ID, kind, err) + } +} + func (t *Ticker) broadcastAnnouncement(topic *models.Topic) { if topic == nil { return