feat: lifecycle broadcasts on signup_closed / cancelled / debating / completed
Phase 3 push-wakeup mechanism without adding a new push channel. Topic state transitions now post short messages to the same Fabric announce channel used for the initial signup announcement. Agents subscribed to announce + not currently busy get woken via the existing Phase 1 inbound path; busy-discard already filters appropriately. No SSE, no per-agent DM fanout, no plugin changes — reuses existing infra end-to-end. Changes: - ticker.go: after signup_close transition, broadcasts signup_closed (with pro/con/judge agent IDs + debate-start time) OR cancelled (with reason). After debate_start transition, broadcasts debating with debate-end time. - announce.go: new PostLifecycleEvent helper - same headers/auth as PostTopicAnnouncement, different format. - verdict.go: after successful judge submission, broadcasts completed with the judge id. Best-effort + async so a slow Fabric does not slow the judge response. - routes.go: instantiates the announcer once + passes to VerdictHandler. Workflow participate-debate step 5 should be updated to expect wakeups instead of polling - separate follow-up edit on lyn/ClawSkills.
This commit is contained in:
@@ -115,6 +115,47 @@ func formatAnnouncement(id, title, summary string,
|
||||
)
|
||||
}
|
||||
|
||||
// PostLifecycleEvent broadcasts a non-signup state change to the
|
||||
// announce channel. The agents' Fabric inbound + busy-discard
|
||||
// (Phase 1) handles per-agent wakeup gating — no per-agent fanout
|
||||
// needed from this side. Sole purpose is to let agents see "your
|
||||
// debate moved to X" without polling.
|
||||
//
|
||||
// Currently called for: signup_closed (with allocated camps),
|
||||
// cancelled, debating-start, completed. Format is plain text with the
|
||||
// topic_id so agents' workflows can parse and route.
|
||||
func (a *Announcer) PostLifecycleEvent(ctx context.Context, topicID, title, kind, summary string) error {
|
||||
if !a.Enabled() {
|
||||
log.Printf("announce: lifecycle skipped (fabric coupling not configured) topic=%s kind=%s", topicID, kind)
|
||||
return nil
|
||||
}
|
||||
body := map[string]any{
|
||||
"content": fmt.Sprintf("📣 **[%s]** %s [%s]\n%s", kind, title, topicID, summary),
|
||||
}
|
||||
raw, _ := json.Marshal(body)
|
||||
url := fmt.Sprintf("%s/api/channels/%s/messages",
|
||||
trimRightSlash(a.cfg.GuildBaseURL), a.cfg.ChannelID)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(raw))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("content-type", "application/json")
|
||||
req.Header.Set("authorization", "Bearer "+a.cfg.BotBearerToken)
|
||||
req.Header.Set("x-fabric-system-key", a.cfg.SystemAPIKey)
|
||||
resp, err := a.client.Do(req)
|
||||
if err != nil {
|
||||
log.Printf("announce: lifecycle POST %s failed: %v", url, err)
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
||||
log.Printf("announce: lifecycle POST %s -> %d body=%s", url, resp.StatusCode, string(b))
|
||||
return fmt.Errorf("lifecycle post: status %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func trimRightSlash(s string) string {
|
||||
for len(s) > 0 && s[len(s)-1] == '/' {
|
||||
s = s[:len(s)-1]
|
||||
|
||||
@@ -1,27 +1,30 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/models"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
|
||||
type VerdictHandler struct {
|
||||
db *struct{} // placeholder; we don't use the raw conn directly here
|
||||
topics *store.TopicStore
|
||||
camps *store.CampStore
|
||||
verdicts *store.VerdictStore
|
||||
topics *store.TopicStore
|
||||
camps *store.CampStore
|
||||
verdicts *store.VerdictStore
|
||||
announcer *fabric.Announcer // optional; nil-safe via Enabled() check
|
||||
}
|
||||
|
||||
func NewVerdictHandler(t *store.TopicStore, c *store.CampStore, v *store.VerdictStore) *VerdictHandler {
|
||||
return &VerdictHandler{topics: t, camps: c, verdicts: v}
|
||||
func NewVerdictHandler(t *store.TopicStore, c *store.CampStore, v *store.VerdictStore, ann *fabric.Announcer) *VerdictHandler {
|
||||
return &VerdictHandler{topics: t, camps: c, verdicts: v, announcer: ann}
|
||||
}
|
||||
|
||||
type submitVerdictBody struct {
|
||||
@@ -114,6 +117,18 @@ func (h *VerdictHandler) Submit(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("x-warn", "verdict saved but status update failed: "+err.Error())
|
||||
}
|
||||
|
||||
// Lifecycle broadcast — completed event. Best-effort; runs async
|
||||
// outside the request context so a slow Fabric doesn't slow the
|
||||
// judge's response.
|
||||
if h.announcer != nil {
|
||||
go func(t *fabric.Announcer, tID, title string, judge string) {
|
||||
summary := fmt.Sprintf("verdict published by judge=%s. Use dialectic_view_verdict to see the structured result.", judge)
|
||||
if err := t.PostLifecycleEvent(context.Background(), tID, title, "completed", summary); err != nil {
|
||||
// silent failure; the announcer logs internally
|
||||
}
|
||||
}(h.announcer, verdict.TopicID, topic.Title, caller.ID)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, map[string]any{
|
||||
"id": verdict.ID,
|
||||
"topic_id": verdict.TopicID,
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/auth"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/config"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/fabric"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/httpapi/handlers"
|
||||
"git.hangman-lab.top/hzhang/Dialectic.Backend/internal/store"
|
||||
)
|
||||
@@ -62,7 +63,13 @@ func Mount(cfg *config.Config, db *sqlx.DB, version string) http.Handler {
|
||||
topicsH := handlers.NewTopicsHandler(topicStore)
|
||||
signupsH := handlers.NewSignupsHandler(topicStore, signupStore)
|
||||
argsH := handlers.NewArgumentsHandler(topicStore, campStore, roundStore, argStore)
|
||||
verdictH := handlers.NewVerdictHandler(topicStore, campStore, verdictStore)
|
||||
announcer := fabric.NewAnnouncer(fabric.AnnounceConfig{
|
||||
GuildBaseURL: cfg.FabricGuildBaseURL,
|
||||
ChannelID: cfg.FabricAnnounceChannelID,
|
||||
SystemAPIKey: cfg.FabricSystemAPIKey,
|
||||
BotBearerToken: cfg.FabricBotBearerToken,
|
||||
})
|
||||
verdictH := handlers.NewVerdictHandler(topicStore, campStore, verdictStore, announcer)
|
||||
adminH := handlers.NewAdminHandler(db, cfg.AgentAPIKeyPepper, cfg.DialecticAdminAPIKey)
|
||||
|
||||
// Routes.
|
||||
|
||||
@@ -2,6 +2,7 @@ package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"time"
|
||||
@@ -117,6 +118,10 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusSignupOpen, "signup_close_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
topic, err := t.topics.GetByID(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
signups, err := t.signups.ListByTopic(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -128,6 +133,10 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
models.TopicStatusCancelled, res.CancelReason, topicID)
|
||||
log.Printf("orchestrator: topic %s cancelled at signup_close: %s",
|
||||
topicID, res.CancelReason)
|
||||
if err == nil {
|
||||
go t.broadcastLifecycle(topic, "cancelled",
|
||||
fmt.Sprintf("debate cancelled at signup close — %s", res.CancelReason))
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := t.camps.WriteAllocation(ctx, tx, topicID, res.Allocation); err != nil {
|
||||
@@ -139,6 +148,14 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
log.Printf("orchestrator: topic %s allocated pro=%s con=%s judge=%s",
|
||||
topicID,
|
||||
res.Allocation[models.CampPro], res.Allocation[models.CampCon], res.Allocation[models.CampJudge])
|
||||
if err == nil {
|
||||
go t.broadcastLifecycle(topic, "signup_closed",
|
||||
fmt.Sprintf("camps allocated — pro=%s con=%s judge=%s. Debate starts at %s",
|
||||
res.Allocation[models.CampPro],
|
||||
res.Allocation[models.CampCon],
|
||||
res.Allocation[models.CampJudge],
|
||||
topic.DebateStartAt.UTC().Format("2006-01-02 15:04 UTC")))
|
||||
}
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: signup_open→signup_closed scan: %v", err)
|
||||
@@ -148,6 +165,10 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
if err := t.transitionByStatus(ctx, now,
|
||||
models.TopicStatusSignupClosed, "debate_start_at",
|
||||
func(ctx context.Context, tx *sqlx.Tx, topicID string) error {
|
||||
topic, err := t.topics.GetByID(ctx, topicID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE topics SET status = ? WHERE id = ?`,
|
||||
models.TopicStatusDebating, topicID); err != nil {
|
||||
@@ -155,10 +176,15 @@ func (t *Ticker) tickOnce(ctx context.Context) {
|
||||
}
|
||||
// Round 0 inserted within the tx — if commit fails we don't
|
||||
// leak a half-state.
|
||||
_, err := tx.ExecContext(ctx,
|
||||
_, err = tx.ExecContext(ctx,
|
||||
`INSERT INTO rounds (id, topic_id, round_no) VALUES (UUID(), ?, 0)`,
|
||||
topicID)
|
||||
log.Printf("orchestrator: topic %s entered debating; round 0 opened", topicID)
|
||||
if err == nil {
|
||||
go t.broadcastLifecycle(topic, "debating",
|
||||
fmt.Sprintf("debate is live — pro/con post arguments; judge stays mostly silent until debate_end_at (%s). Use participate-debate workflow.",
|
||||
topic.DebateEndAt.UTC().Format("2006-01-02 15:04 UTC")))
|
||||
}
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Printf("orchestrator: signup_closed→debating scan: %v", err)
|
||||
@@ -220,6 +246,20 @@ func (t *Ticker) applyOne(ctx context.Context, topicID string,
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// broadcastLifecycle wraps the announcer's lifecycle-event post with
|
||||
// the standard signup_closed / cancelled / debating / completed
|
||||
// formats. Best-effort; runs in its own goroutine outside any tx.
|
||||
func (t *Ticker) broadcastLifecycle(topic *models.Topic, kind, summary string) {
|
||||
if topic == nil {
|
||||
return
|
||||
}
|
||||
if err := t.announcer.PostLifecycleEvent(
|
||||
context.Background(), topic.ID, topic.Title, kind, summary,
|
||||
); err != nil {
|
||||
log.Printf("orchestrator: lifecycle broadcast topic=%s kind=%s failed: %v", topic.ID, kind, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Ticker) broadcastAnnouncement(topic *models.Topic) {
|
||||
if topic == nil {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user