From 78b1ec5181ab4f97777242394c1d9ceb75b4a92f Mon Sep 17 00:00:00 2001 From: hzhang Date: Wed, 3 Jun 2026 11:28:05 +0100 Subject: [PATCH] fix: align calendar API with actual HarborForge.Backend contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Initial drop guessed the heartbeat shape; sim e2e against a running harborforge-backend revealed the real contract is per-agent with header auth, not server-wide with bearer: POST /calendar/agent/heartbeat headers: X-Agent-ID, X-Claw-Identifier body: {claw_identifier, agent_id} response: {slots: [Slot], agent_status, message?} PATCH /calendar/slots/{id}/agent-update PATCH /calendar/slots/virtual/{vid}/agent-update body: {status, started_at?, actual_duration?} POST /calendar/agent/status body: {claw_identifier, agent_id, status} Refactors: - internal/calendar/types.go now mirrors OpenclawPlugin/calendar/ types.ts 1:1 (SlotStatus camelCase, real vs virtual slot id discrimination, event_data shape) - internal/calendar/bridge.go: header-based auth, per-agent method signatures, separate UpdateRealSlot vs UpdateVirtualSlot - internal/calendar/scheduler.go: per-agent heartbeat loop (one HTTP call per agent per tick), highest-priority slot selection, agent-update PATCH for terminal/non-terminal states - SingleActiveAgentID helper for main.bestEffortAgentID Also fix two bugs found in sim: - bgCtx capture: AgentLister closures were capturing Init's ctx which dies the moment MCP initialize returns; switched to bgCtx (lifetime = plugin process) - tools.toolRestartStatus referenced a non-existent sch.RestartPending — HF backend has no restart endpoint per /openapi.json, so the tool now reports last_heartbeats freshness Scheduler logs each tick + each heartbeat outcome at info so operators can see backend connectivity without enabling debug. E2E against http://harborforge-backend:8000 in sim: daemon → heartbeat → 404 "Agent not found" (= correct endpoint, correct headers, correct body — agent just isn't registered yet, which is expected for an untenanted plugin) --- cmd/plexum-harborforge-plugin/main.go | 37 ++- internal/calendar/bridge.go | 126 ++++--- internal/calendar/scheduler.go | 455 ++++++++++++++------------ internal/calendar/types.go | 216 +++++++----- internal/tools/tools.go | 10 +- 5 files changed, 474 insertions(+), 370 deletions(-) diff --git a/cmd/plexum-harborforge-plugin/main.go b/cmd/plexum-harborforge-plugin/main.go index 36460eb..280d61b 100644 --- a/cmd/plexum-harborforge-plugin/main.go +++ b/cmd/plexum-harborforge-plugin/main.go @@ -73,12 +73,18 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er "calendar_enabled": p.cfg.CalendarEnabled, }) + bgCtx, cancel := context.WithCancel(context.Background()) + p.cancelBg = cancel + + // Listers + collectors capture bgCtx (not Init ctx) — Init returns + // once MCP initialize completes, but the plugin process lives on + // and so do the goroutines + closures we registered. collect := func() telemetry.Snapshot { return telemetry.Collect(telemetry.CollectOpts{ Identifier: p.cfg.Identifier, Version: Version, AgentLister: func() []telemetry.AgentInfo { - return p.listAgents(ctx, profileRoot) + return p.listAgents(bgCtx, profileRoot) }, }) } @@ -86,9 +92,6 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er p.bridge = monitor.New(p.cfg.MonitorPort, collect, func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) }) - bgCtx, cancel := context.WithCancel(context.Background()) - p.cancelBg = cancel - if err := p.bridge.Start(bgCtx); err != nil { host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()}) } @@ -97,15 +100,15 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er if calBackend == "" { calBackend = p.cfg.BackendURL } - bridge := calendar.New(calBackend, p.cfg.APIKey) + bridge := calendar.New(calBackend, p.cfg.Identifier) p.sched = calendar.NewScheduler( calendar.Config{ HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second, }, - bridge, host, p.cfg.Identifier, + bridge, host, calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"}, func() []calendar.ReportableAgent { - return p.listReportableAgents(ctx, profileRoot) + return p.listReportableAgents(bgCtx, profileRoot) }, ) if p.cfg.CalendarEnabled { @@ -203,21 +206,29 @@ func mapStateToCalendar(s string) calendar.AgentStatusValue { case "offline": return calendar.AgentStatusOffline } - return calendar.AgentStatusUnknown + return calendar.AgentStatusOffline } // bestEffortAgentID is a v1 stop-gap for tools that need the calling // agent's id but don't have it on the ctx (Plexum SDK doesn't yet -// expose this — TODO upstream). Returns the only active calendar -// slot's agent if there's exactly one; otherwise empty. The calendar +// expose this — TODO upstream). v1: if exactly one agent has an +// active calendar slot we return it; otherwise empty. The calendar // tools (the only ones that need agent context) usually fire when // exactly one slot is active. func (p *harborForgePlugin) bestEffortAgentID() string { sch := p.sched.Status() - if len(sch.Active) == 1 { - return sch.Active[0].Slot.AgentID + if len(sch.Active) != 1 { + return "" } - return "" + // We don't track AgentID on Slot directly — the scheduler keeps + // activeByAgentID. Iterate to find the one. + for _, a := range sch.Active { + // Slot is shared between agents only via the scheduler's maps; + // here we have just the Slot struct without owner. + _ = a + } + // Fallback to scheduler's helper: + return p.sched.SingleActiveAgentID() } func manifestFromDisk() sdkplugin.Manifest { diff --git a/internal/calendar/bridge.go b/internal/calendar/bridge.go index 69fd6d0..26b0fe8 100644 --- a/internal/calendar/bridge.go +++ b/internal/calendar/bridge.go @@ -1,7 +1,8 @@ -// Bridge — thin HTTP client for the HarborForge backend's Calendar API. -// All operations carry the API key as Authorization: Bearer; absent -// key means missing-auth errors from the backend (caller should -// handle them as transient and log). +// Bridge — typed HTTP client for HarborForge.Backend's calendar API. +// Endpoint shapes verified via the backend's /openapi.json and against +// HarborForge.OpenclawPlugin/plugin/calendar/calendar-bridge.ts so +// the two plugins drop into the same backend without per-plugin +// adapters. package calendar @@ -13,30 +14,33 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" ) -// Bridge is the typed wrapper around an HTTP client + backend URL. +// Bridge is constructed once per scheduler and reused across heartbeats. type Bridge struct { - BackendURL string - APIKey string - HTTP *http.Client + BaseURL string + ClawIdentifier string + HTTP *http.Client } -// New constructs a bridge with a sensible default timeout. -func New(backendURL, apiKey string) *Bridge { +// New constructs a bridge with a 20s default timeout. +func New(baseURL, clawIdentifier string) *Bridge { return &Bridge{ - BackendURL: strings.TrimRight(backendURL, "/"), - APIKey: apiKey, - HTTP: &http.Client{Timeout: 20 * time.Second}, + BaseURL: strings.TrimRight(baseURL, "/"), + ClawIdentifier: clawIdentifier, + HTTP: &http.Client{Timeout: 20 * time.Second}, } } -// Heartbeat POSTs /calendar/agent/heartbeat. Returns the backend's -// reply or an error. -func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (HeartbeatResponse, error) { - raw, err := b.post(ctx, "/calendar/agent/heartbeat", payload) +// Heartbeat POSTs /calendar/agent/heartbeat. Per-agent: each running +// agent on this claw drives its own heartbeat (matches OpenClaw plugin +// semantics). +func (b *Bridge) Heartbeat(ctx context.Context, agentID string) (HeartbeatResponse, error) { + body := HeartbeatRequest{ClawIdentifier: b.ClawIdentifier, AgentID: agentID} + raw, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/heartbeat", agentID, body) if err != nil { return HeartbeatResponse{}, err } @@ -49,76 +53,64 @@ func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (Heart return out, nil } -// UpdateSlotStatus POSTs /calendar/slot//status to mark a slot -// completed / aborted / paused / resumed. -func (b *Bridge) UpdateSlotStatus(ctx context.Context, slotID string, update SlotUpdate) error { - if slotID == "" { - return errors.New("calendar: slot id required") - } - _, err := b.post(ctx, "/calendar/slot/"+slotID+"/status", update) +// UpdateRealSlot PATCHes /calendar/slots/{id}/agent-update. +func (b *Bridge) UpdateRealSlot(ctx context.Context, agentID string, slotID int64, update SlotAgentUpdate) error { + path := "/calendar/slots/" + strconv.FormatInt(slotID, 10) + "/agent-update" + _, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update) return err } -// RestartPending GETs /restart/status — returns the backend's -// current restart-requested flag. -func (b *Bridge) RestartPending(ctx context.Context) (bool, error) { - raw, err := b.get(ctx, "/restart/status") - if err != nil { - return false, err - } - var out struct { - Pending bool `json:"pending"` - } - if len(raw) > 0 { - if err := json.Unmarshal(raw, &out); err != nil { - return false, fmt.Errorf("decode restart-status: %w", err) - } - } - return out.Pending, nil +// UpdateVirtualSlot PATCHes /calendar/slots/virtual/{vid}/agent-update. +// The backend materialises the virtual slot first; subsequent calls +// against the same logical slot should use UpdateRealSlot with the +// id returned in the response — but for v1 we don't round-trip the +// materialised id back to the scheduler (would require a separate +// fetch); the agent-update path tolerates re-PATCHing a virtual id. +func (b *Bridge) UpdateVirtualSlot(ctx context.Context, agentID string, virtualID string, update SlotAgentUpdate) error { + path := "/calendar/slots/virtual/" + virtualID + "/agent-update" + _, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update) + return err } -// post serialises body as JSON, attaches Authorization, returns -// response body bytes. Non-2xx becomes an error with the body -// included for diagnostics. -func (b *Bridge) post(ctx context.Context, path string, body any) ([]byte, error) { +// PushAgentStatus POSTs /calendar/agent/status. Used to push idle ↔ +// busy transitions out of the normal heartbeat cycle. +func (b *Bridge) PushAgentStatus(ctx context.Context, agentID string, status AgentStatusValue) error { + body := AgentStatusPush{ + ClawIdentifier: b.ClawIdentifier, AgentID: agentID, Status: status, + } + _, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/status", agentID, body) + return err +} + +// doJSON serialises body, attaches the two auth headers, and returns +// the response bytes. Errors on non-2xx with truncated body. +func (b *Bridge) doJSON(ctx context.Context, method, path, agentID string, body any) ([]byte, error) { + if agentID == "" { + return nil, errors.New("calendar: agent_id required for auth headers") + } raw, err := json.Marshal(body) if err != nil { - return nil, fmt.Errorf("marshal %s: %w", path, err) + return nil, fmt.Errorf("marshal %s %s: %w", method, path, err) } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.BackendURL+path, bytes.NewReader(raw)) + req, err := http.NewRequestWithContext(ctx, method, b.BaseURL+path, bytes.NewReader(raw)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") - if b.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+b.APIKey) - } - return b.do(req) -} + req.Header.Set("X-Agent-ID", agentID) + req.Header.Set("X-Claw-Identifier", b.ClawIdentifier) -func (b *Bridge) get(ctx context.Context, path string) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, b.BackendURL+path, nil) - if err != nil { - return nil, err - } - if b.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+b.APIKey) - } - return b.do(req) -} - -func (b *Bridge) do(req *http.Request) ([]byte, error) { res, err := b.HTTP.Do(req) if err != nil { - return nil, fmt.Errorf("%s %s: %w", req.Method, req.URL.Path, err) + return nil, fmt.Errorf("%s %s: %w", method, path, err) } defer res.Body.Close() - body, _ := io.ReadAll(res.Body) + out, _ := io.ReadAll(res.Body) if res.StatusCode < 200 || res.StatusCode >= 300 { return nil, fmt.Errorf("%s %s → %d: %s", - req.Method, req.URL.Path, res.StatusCode, truncate(body, 300)) + method, path, res.StatusCode, truncate(out, 300)) } - return body, nil + return out, nil } func truncate(b []byte, n int) string { diff --git a/internal/calendar/scheduler.go b/internal/calendar/scheduler.go index 878a3d4..2a82a79 100644 --- a/internal/calendar/scheduler.go +++ b/internal/calendar/scheduler.go @@ -1,14 +1,16 @@ -// Scheduler — main loop that heartbeats the backend, dispatches -// returned slots via Plexum's WakeAgent, and tracks per-agent active -// slot state for the calendar_* tools. +// Scheduler — loops over every Plexum agent, heartbeats per-agent, +// picks the highest-priority pending slot for each, dispatches via +// host.WakeAgent. Mirrors HarborForge.OpenclawPlugin's per-agent +// scheduler loop (PLG-CAL-002). // -// State is in-memory: a daemon restart drops everything. Next -// heartbeat reconciles (backend keeps the canonical SlotStatus). +// In-memory state: per-agent active slot map. A daemon restart drops +// it; next heartbeat reconciles from the backend's canonical state. // -// Concurrency: -// - one heartbeat ticker goroutine -// - per-slot dispatch is fire-and-forget via WakeAgent (queue-aware) -// - mu guards activeBySlot + activeByAgent maps +// Wake semantics: WakeAgent is fire-and-forget; the SDK's wake queue +// (depth 1 replace-newest) handles state-aware dispatch. We mark the +// slot Ongoing optimistically the moment we call WakeAgent; agents +// drive complete/abort/pause/resume via the harborforge_calendar_* +// tools. package calendar @@ -22,22 +24,20 @@ import ( sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin" ) -// Scheduler orchestrates the calendar loop. +// Scheduler is the long-running calendar driver. type Scheduler struct { - cfg Config - bridge *Bridge - host sdkplugin.HostAPI - agentLister func() []ReportableAgent - identifier string - pluginInfo PluginInfoTag + cfg Config + bridge *Bridge + host sdkplugin.HostAPI + agentLister func() []ReportableAgent + pluginInfo PluginInfoTag - mu sync.Mutex - activeBySlotID map[string]*ActiveSlot - activeByAgentID map[string]*ActiveSlot - history []HistoryEntry - lastHeartbeat time.Time - lastResponse HeartbeatResponse - restartPending bool + mu sync.Mutex + activeByAgentID map[string]*ActiveSlot + activeBySlotIdent map[string]*ActiveSlot + history []HistoryEntry + lastHeartbeats map[string]time.Time + lastErrors map[string]string } // Config bundles scheduler tunables. @@ -46,36 +46,42 @@ type Config struct { HistoryCap int // bound on activity history; default 32 } -// ReportableAgent is the projection of a Plexum agent the scheduler -// needs for heartbeat — id + model + current sm state. -type ReportableAgent struct { - ID string - Model string - State AgentStatusValue +// PluginInfoTag tags heartbeat reports so the backend knows which +// plugin / version is reporting. +type PluginInfoTag struct { + Name string + Version string + Backend string // "plexum" } -// ActiveSlot tracks an in-flight slot (between WakeAgent dispatch and -// terminal status update). +// ReportableAgent is the per-agent projection the scheduler needs for +// heartbeat enumeration. +type ReportableAgent struct { + ID string + Model string + State AgentStatusValue +} + +// ActiveSlot tracks an in-flight slot from dispatch to terminal state. type ActiveSlot struct { Slot Slot StartedAt time.Time LastHeartbeat time.Time - State SlotStatus } -// HistoryEntry is one resolved slot kept for the calendar_status tool. +// HistoryEntry records one resolved slot for the calendar_status tool. type HistoryEntry struct { - SlotID string - AgentID string - Status SlotStatus - ResolvedAt time.Time - Reason string - Summary string + Ident string + AgentID string + Status SlotStatus + ResolvedAt time.Time + Reason string + Summary string } // NewScheduler constructs a Scheduler in stopped state. func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI, - identifier string, pluginInfo PluginInfoTag, + pluginInfo PluginInfoTag, agentLister func() []ReportableAgent) *Scheduler { if cfg.HeartbeatInterval <= 0 { cfg.HeartbeatInterval = 30 * time.Second @@ -84,193 +90,215 @@ func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI, cfg.HistoryCap = 32 } return &Scheduler{ - cfg: cfg, - bridge: bridge, - host: host, - agentLister: agentLister, - identifier: identifier, - pluginInfo: pluginInfo, - activeBySlotID: map[string]*ActiveSlot{}, - activeByAgentID: map[string]*ActiveSlot{}, + cfg: cfg, + bridge: bridge, + host: host, + agentLister: agentLister, + pluginInfo: pluginInfo, + activeByAgentID: map[string]*ActiveSlot{}, + activeBySlotIdent: map[string]*ActiveSlot{}, + lastHeartbeats: map[string]time.Time{}, + lastErrors: map[string]string{}, } } -// Run blocks until ctx cancels, ticking heartbeats every -// cfg.HeartbeatInterval. Returns nil on graceful shutdown. +// Run blocks until ctx cancels. func (s *Scheduler) Run(ctx context.Context) error { t := time.NewTicker(s.cfg.HeartbeatInterval) defer t.Stop() - // First heartbeat immediately so initial state lands fast. - s.heartbeatOnce(ctx) + s.tick(ctx) for { select { case <-ctx.Done(): return nil case <-t.C: - s.heartbeatOnce(ctx) + s.tick(ctx) } } } -func (s *Scheduler) heartbeatOnce(ctx context.Context) { - payload := HeartbeatPayload{ - Identifier: s.identifier, - APIKey: s.bridge.APIKey, - PluginInfo: s.pluginInfo, - CapturedAt: time.Now().UTC(), +func (s *Scheduler) tick(ctx context.Context) { + if s.agentLister == nil { + return } - if s.agentLister != nil { - for _, a := range s.agentLister() { - payload.AgentList = append(payload.AgentList, AgentReport{ - ID: a.ID, Model: a.Model, Status: a.State, - }) - } + now := time.Now().UTC() + agents := s.agentLister() + s.host.Log("info", "calendar tick", map[string]any{"agents": len(agents)}) + for _, agent := range agents { + s.tickForAgent(ctx, agent, now) } - resp, err := s.bridge.Heartbeat(ctx, payload) +} + +func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now time.Time) { + resp, err := s.bridge.Heartbeat(ctx, agent.ID) s.mu.Lock() - s.lastHeartbeat = time.Now() - if err == nil { - s.lastResponse = resp - s.restartPending = resp.RestartPending - } - s.mu.Unlock() + s.lastHeartbeats[agent.ID] = now if err != nil { - return // network blip; next tick retries + s.lastErrors[agent.ID] = err.Error() + s.mu.Unlock() + s.host.Log("warn", "calendar heartbeat failed", map[string]any{ + "agent": agent.ID, "err": err.Error(), + }) + return } - for _, slot := range resp.SlotsToFire { - s.dispatchSlot(ctx, slot) + delete(s.lastErrors, agent.ID) + s.mu.Unlock() + s.host.Log("info", "calendar heartbeat ok", map[string]any{ + "agent": agent.ID, "slots": len(resp.Slots), "agent_status": string(resp.AgentStatus), + }) + + // Pick highest-priority NotStarted slot; defer the rest. + var chosen *Slot + for i := range resp.Slots { + slot := &resp.Slots[i] + if slot.Status != SlotNotStarted && slot.Status != SlotDeferred { + continue + } + if chosen == nil || slot.Priority > chosen.Priority { + chosen = slot + } } + if chosen != nil { + s.dispatchSlot(ctx, agent.ID, *chosen) + } + // Defer the other unchosen NotStarted/Deferred slots (priority +1) + // so they bubble up next heartbeat. We don't strictly need to push + // the update; the backend's priority bookkeeping survives without + // our nudge for v1. (OpenClaw plugin DOES push priority bumps — + // future v2 work if backend feedback shows starvation.) } -// dispatchSlot fires the slot via host.WakeAgent and records it as -// active. WakeAgent handles state-aware queueing — if the agent is -// busy, our calendar slot enqueues at depth 1 and the previous wake -// is dropped per replace-newest semantics. We mark the slot -// in_progress optimistically when we ENQUEUED; backend reconciles on -// its own watchdog. -func (s *Scheduler) dispatchSlot(ctx context.Context, slot Slot) { - // Skip already-active slots (heartbeat may re-list a slot we - // already started — backend hasn't seen our optimistic update yet). +// dispatchSlot fires WakeAgent + records the slot active. Marks the +// slot Ongoing on the backend so the dashboard reflects the +// transition immediately. +func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) { + ident := slot.SlotIdent() s.mu.Lock() - if _, ok := s.activeBySlotID[slot.ID]; ok { + if _, dup := s.activeBySlotIdent[ident]; dup { + s.mu.Unlock() + return + } + if _, agentBusy := s.activeByAgentID[agentID]; agentBusy { + // Don't pick up another slot until the current one resolves. s.mu.Unlock() return } now := time.Now().UTC() - act := &ActiveSlot{ - Slot: slot, StartedAt: now, LastHeartbeat: now, - State: SlotInProgress, - } - s.activeBySlotID[slot.ID] = act - s.activeByAgentID[slot.AgentID] = act + active := &ActiveSlot{Slot: slot, StartedAt: now, LastHeartbeat: now} + s.activeBySlotIdent[ident] = active + s.activeByAgentID[agentID] = active s.mu.Unlock() - message := slot.WakeOptions.OverrideMessage - if message == "" { - message = slot.PromptText - } - if message == "" { - message = fmt.Sprintf("[calendar] slot %s: %s", slot.ID, slot.Title) - } - source := fmt.Sprintf("calendar:slot-%s", slot.ID) + message := buildWakeMessage(slot) + source := "calendar:" + ident if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{ - AgentID: slot.AgentID, - Message: message, - Source: source, + AgentID: agentID, Message: message, Source: source, }); err != nil { - // Wake itself failed (plumbing). Mark slot aborted + - // notify backend. - s.resolveSlot(ctx, slot.ID, SlotAborted, "", "wake-agent failed: "+err.Error()) + s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error()) return } + // Mark Ongoing on the backend. + update := SlotAgentUpdate{ + Status: SlotOngoing, StartedAt: now.Format("15:04:05"), + } + s.pushUpdate(ctx, agentID, slot, update) } -// resolveSlot moves an active slot to a terminal status, records -// history, and tells the backend. Safe to call concurrently. -func (s *Scheduler) resolveSlot(ctx context.Context, slotID string, status SlotStatus, summary, reason string) error { - s.mu.Lock() - act, ok := s.activeBySlotID[slotID] - if !ok { - s.mu.Unlock() - return fmt.Errorf("calendar: slot %s not active", slotID) +func buildWakeMessage(slot Slot) string { + // Backend EventData → prompt. v1 is intentionally simple; refine + // when the prompt-engineering side of the plugin matures. + if slot.EventType != nil { + switch *slot.EventType { + case EventTypeSystemEvent: + if ev, ok := slot.EventData["event"].(string); ok { + return fmt.Sprintf("[calendar system_event] %s", ev) + } + case EventTypeJob: + code, _ := slot.EventData["code"].(string) + typ, _ := slot.EventData["type"].(string) + if code != "" { + return fmt.Sprintf("[calendar job %s/%s] please handle this", typ, code) + } + } } - delete(s.activeBySlotID, slotID) - delete(s.activeByAgentID, act.Slot.AgentID) - s.appendHistoryLocked(HistoryEntry{ - SlotID: slotID, AgentID: act.Slot.AgentID, Status: status, + return fmt.Sprintf("[calendar slot %s] scheduled work — please proceed", slot.SlotIdent()) +} + +// CompleteForAgent → terminal; pushes Finished to backend. +func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error { + act, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + now := time.Now().UTC() + duration := int(now.Sub(act.StartedAt).Minutes()) + if duration < 1 { + duration = 1 + } + if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{ + Status: SlotFinished, ActualDuration: duration, + }); err != nil { + return err + } + s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotFinished, summary, "") + return nil +} + +// AbortForAgent → terminal; pushes Aborted to backend. +func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error { + act, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotAborted}); err != nil { + return err + } + s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotAborted, "", reason) + return nil +} + +// PauseForAgent → non-terminal; pushes Paused. +func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error { + act, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotPaused}) +} + +// ResumeForAgent → non-terminal; pushes Ongoing. +func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error { + act, ok := s.activeSlotForAgent(agentID) + if !ok { + return ErrNoActiveSlot + } + return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotOngoing}) +} + +func (s *Scheduler) pushUpdate(ctx context.Context, agentID string, slot Slot, update SlotAgentUpdate) error { + if slot.HasRealID() { + return s.bridge.UpdateRealSlot(ctx, agentID, *slot.ID, update) + } + if slot.VirtualID != nil { + return s.bridge.UpdateVirtualSlot(ctx, agentID, *slot.VirtualID, update) + } + return errors.New("calendar: slot has neither real id nor virtual id") +} + +func (s *Scheduler) resolveLocally(ident, agentID string, status SlotStatus, summary, reason string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.activeBySlotIdent, ident) + delete(s.activeByAgentID, agentID) + s.history = append(s.history, HistoryEntry{ + Ident: ident, AgentID: agentID, Status: status, ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason, }) - s.mu.Unlock() - return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{ - Status: status, Summary: summary, Reason: reason, - }) -} - -// SetSlotState is a non-terminal status change (paused/resumed). -// Records the new state in-memory and tells the backend. -func (s *Scheduler) SetSlotState(ctx context.Context, slotID string, status SlotStatus, reason string) error { - s.mu.Lock() - act, ok := s.activeBySlotID[slotID] - if !ok { - s.mu.Unlock() - return fmt.Errorf("calendar: slot %s not active", slotID) - } - act.State = status - act.LastHeartbeat = time.Now().UTC() - s.mu.Unlock() - return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{ - Status: status, Reason: reason, - }) -} - -func (s *Scheduler) appendHistoryLocked(entry HistoryEntry) { - s.history = append(s.history, entry) if len(s.history) > s.cfg.HistoryCap { s.history = s.history[len(s.history)-s.cfg.HistoryCap:] } } -// CompleteForAgent / AbortForAgent / PauseForAgent / ResumeForAgent -// are the agent-facing tool entry points. They look up the agent's -// active slot, transition or terminate it, and notify the backend. - -// CompleteForAgent terminates the agent's active slot as completed. -func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error { - slot, ok := s.activeSlotForAgent(agentID) - if !ok { - return ErrNoActiveSlot - } - return s.resolveSlot(ctx, slot.Slot.ID, SlotCompleted, summary, "") -} - -// AbortForAgent terminates the agent's active slot as aborted. -func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error { - slot, ok := s.activeSlotForAgent(agentID) - if !ok { - return ErrNoActiveSlot - } - return s.resolveSlot(ctx, slot.Slot.ID, SlotAborted, "", reason) -} - -// PauseForAgent transitions the agent's slot to paused. -func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error { - slot, ok := s.activeSlotForAgent(agentID) - if !ok { - return ErrNoActiveSlot - } - return s.SetSlotState(ctx, slot.Slot.ID, SlotPaused, reason) -} - -// ResumeForAgent transitions the agent's slot back to in_progress. -func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error { - slot, ok := s.activeSlotForAgent(agentID) - if !ok { - return ErrNoActiveSlot - } - return s.SetSlotState(ctx, slot.Slot.ID, SlotInProgress, "") -} - -// activeSlotForAgent returns the per-agent active slot copy under lock. func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) { s.mu.Lock() defer s.mu.Unlock() @@ -281,36 +309,59 @@ func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) { return *act, true } -// Status returns the introspection shape for the calendar_status tool. -func (s *Scheduler) Status() SchedulerStatus { +// Status is the introspection shape calendar_status returns. +type Status struct { + Enabled bool `json:"enabled"` + LastHeartbeats map[string]time.Time `json:"last_heartbeats"` + LastErrors map[string]string `json:"last_errors,omitempty"` + HeartbeatEvery time.Duration `json:"heartbeat_every"` + Active []ActiveSlot `json:"active"` + History []HistoryEntry `json:"history"` +} + +// SingleActiveAgentID returns the agent id when exactly one active +// slot exists, empty otherwise. Used by the plugin's bestEffortAgentID +// fallback for tool calls that don't carry agent context. +func (s *Scheduler) SingleActiveAgentID() string { s.mu.Lock() defer s.mu.Unlock() - active := make([]ActiveSlot, 0, len(s.activeBySlotID)) - for _, a := range s.activeBySlotID { + if len(s.activeByAgentID) != 1 { + return "" + } + for k := range s.activeByAgentID { + return k + } + return "" +} + +// Status returns the introspection shape calendar_status returns. +func (s *Scheduler) Status() Status { + s.mu.Lock() + defer s.mu.Unlock() + active := make([]ActiveSlot, 0, len(s.activeByAgentID)) + for _, a := range s.activeByAgentID { active = append(active, *a) } + hb := make(map[string]time.Time, len(s.lastHeartbeats)) + for k, v := range s.lastHeartbeats { + hb[k] = v + } + errs := make(map[string]string, len(s.lastErrors)) + for k, v := range s.lastErrors { + errs[k] = v + } history := make([]HistoryEntry, len(s.history)) copy(history, s.history) - return SchedulerStatus{ + return Status{ Enabled: true, - LastHeartbeat: s.lastHeartbeat, + LastHeartbeats: hb, + LastErrors: errs, HeartbeatEvery: s.cfg.HeartbeatInterval, Active: active, History: history, - RestartPending: s.restartPending, } } -// SchedulerStatus is the shape calendar_status returns. -type SchedulerStatus struct { - Enabled bool `json:"enabled"` - LastHeartbeat time.Time `json:"last_heartbeat"` - HeartbeatEvery time.Duration `json:"heartbeat_every"` - Active []ActiveSlot `json:"active"` - History []HistoryEntry `json:"history"` - RestartPending bool `json:"restart_pending"` -} - -// ErrNoActiveSlot is returned by calendar_complete/abort/pause/resume -// when the agent has no slot in progress. +// ErrNoActiveSlot is returned when an agent calls calendar_complete / +// abort / pause / resume but has no slot active. var ErrNoActiveSlot = errors.New("calendar: no active slot for agent") diff --git a/internal/calendar/types.go b/internal/calendar/types.go index 999d77f..aa8669c 100644 --- a/internal/calendar/types.go +++ b/internal/calendar/types.go @@ -1,106 +1,152 @@ -// Package calendar talks to the HarborForge backend's Calendar API -// (heartbeat, slot fetch, status update, restart-pending check) and -// drives a scheduler loop that fires Plexum wake events when slots -// come due. Types mirror HarborForge.OpenclawPlugin's calendar/types.ts -// so the backend doesn't need to know which plugin is reporting. +// Types matching HarborForge.Backend's actual calendar API contract +// (verified via /openapi.json on a running backend). Aligns 1:1 with +// HarborForge.OpenclawPlugin/plugin/calendar/types.ts so the two +// plugins can hit the same backend interchangeably. package calendar import "time" -// SlotStatus enumerates the slot lifecycle. +// SlotStatus enumerates the lifecycle. String values match backend's +// SlotStatus enum verbatim (camelCase as stored in DB). type SlotStatus string const ( - SlotNotStarted SlotStatus = "not_started" - SlotInProgress SlotStatus = "in_progress" - SlotCompleted SlotStatus = "completed" - SlotAborted SlotStatus = "aborted" - SlotPaused SlotStatus = "paused" - SlotDeferred SlotStatus = "deferred" + SlotNotStarted SlotStatus = "NotStarted" + SlotOngoing SlotStatus = "Ongoing" + SlotFinished SlotStatus = "Finished" + SlotAborted SlotStatus = "Aborted" + SlotDeferred SlotStatus = "Deferred" + SlotPaused SlotStatus = "Paused" ) -// AgentStatusValue mirrors the backend AgentStatus enum used in -// heartbeat responses (a hint about what the backend thinks the -// agent is doing). +// SlotType: work vs on_call. Affects whether the agent flips to busy. +type SlotType string + +const ( + SlotTypeWork SlotType = "work" + SlotTypeOnCall SlotType = "on_call" +) + +// EventType categorises what the slot represents. +type EventType string + +const ( + EventTypeJob EventType = "job" + EventTypeSystemEvent EventType = "system_event" + EventTypeEntertainment EventType = "entertainment" +) + +// AgentStatusValue mirrors the backend AgentStatus enum. type AgentStatusValue string const ( - AgentStatusUnknown AgentStatusValue = "unknown" - AgentStatusIdle AgentStatusValue = "idle" - AgentStatusBusy AgentStatusValue = "busy" - AgentStatusOffline AgentStatusValue = "offline" - AgentStatusOnCall AgentStatusValue = "on_call" - AgentStatusPaused AgentStatusValue = "paused" + AgentStatusIdle AgentStatusValue = "idle" + AgentStatusBusy AgentStatusValue = "busy" + AgentStatusOffline AgentStatusValue = "offline" + AgentStatusOnCall AgentStatusValue = "on_call" + AgentStatusExhausted AgentStatusValue = "exhausted" ) -// SlotKind is "work" vs "on_call" — affects how the scheduler treats -// the slot (on_call slots don't move the agent into busy). -type SlotKind string - -const ( - SlotKindWork SlotKind = "work" - SlotKindOnCall SlotKind = "on_call" -) - -// Slot is one Calendar TimeSlot the backend serves. -type Slot struct { - ID string `json:"id"` - VirtualID string `json:"virtual_id,omitempty"` - AgentID string `json:"agent_id"` - ClawID string `json:"claw_identifier,omitempty"` - Kind SlotKind `json:"slot_type"` - Title string `json:"title,omitempty"` - Description string `json:"description,omitempty"` - ScheduledAt time.Time `json:"scheduled_at"` - ExpiresAt *time.Time `json:"expires_at,omitempty"` - Status SlotStatus `json:"status"` - PromptText string `json:"prompt,omitempty"` - WakeOptions WakeOpts `json:"wake_options,omitempty"` +// HeartbeatRequest is the POST /calendar/agent/heartbeat body. +type HeartbeatRequest struct { + ClawIdentifier string `json:"claw_identifier"` + AgentID string `json:"agent_id"` } -// WakeOpts customise how the scheduler should drive the agent. v1 -// honours only Force; the rest pass through as audit trail. -type WakeOpts struct { - Force bool `json:"force,omitempty"` - OverrideMessage string `json:"override_message,omitempty"` - ScopeSessionID string `json:"scope_session_id,omitempty"` -} - -// HeartbeatPayload is what the plugin POSTs every interval. -type HeartbeatPayload struct { - Identifier string `json:"identifier"` - APIKey string `json:"api_key,omitempty"` - AgentList []AgentReport `json:"agents"` - PluginInfo PluginInfoTag `json:"plugin"` - CapturedAt time.Time `json:"captured_at"` -} - -// AgentReport is one entry in HeartbeatPayload.AgentList. -type AgentReport struct { - ID string `json:"agent_id"` - Status AgentStatusValue `json:"status"` - Model string `json:"model,omitempty"` -} - -// PluginInfoTag identifies which plugin / version is heartbeating. -type PluginInfoTag struct { - Name string `json:"name"` // "harbor-forge" - Version string `json:"version"` // e.g. 0.1.0 - Backend string `json:"backend"` // "plexum" -} - -// HeartbeatResponse is the backend's reply. SlotsToFire are slots -// the scheduler should attempt to start. +// HeartbeatResponse is the backend's reply. type HeartbeatResponse struct { - SlotsToFire []Slot `json:"slots_to_fire,omitempty"` - RestartPending bool `json:"restart_pending,omitempty"` - ServerTime time.Time `json:"server_time"` + Slots []Slot `json:"slots"` + AgentStatus AgentStatusValue `json:"agent_status"` + Message string `json:"message,omitempty"` } -// SlotUpdate is the body of POST /calendar/slot//status. -type SlotUpdate struct { - Status SlotStatus `json:"status"` - Summary string `json:"summary,omitempty"` - Reason string `json:"reason,omitempty"` +// Slot is one calendar TimeSlot — real (has ID) or virtual +// (has VirtualID). Field names mirror the backend's +// CalendarSlotResponse schema. +type Slot struct { + ID *int64 `json:"id"` // real slot db id; null for virtual + VirtualID *string `json:"virtual_id"` // plan-{plan_id}-{date}; null for real + UserID int64 `json:"user_id"` + Date string `json:"date"` // YYYY-MM-DD + SlotType SlotType `json:"slot_type"` + EstimatedDuration int `json:"estimated_duration"` // minutes + ScheduledAt string `json:"scheduled_at"` // HH:MM:SS + StartedAt *string `json:"started_at"` + Attended bool `json:"attended"` + ActualDuration *int `json:"actual_duration"` + EventType *EventType `json:"event_type"` + EventData EventData `json:"event_data"` + Priority int `json:"priority"` + Status SlotStatus `json:"status"` + PlanID *int64 `json:"plan_id"` +} + +// EventData is loosely-typed since the backend stores it as JSONB and +// the shape varies by event_type. Plugin code does best-effort +// unmarshal into JobData / SystemEventData when needed. +type EventData map[string]any + +// JobData is the event_data shape when event_type=="job". +type JobData struct { + Type string `json:"type"` // Task|Support|Meeting|Essential + Code string `json:"code"` // e.g. "TASK-42" + WorkingSessions []string `json:"working_sessions"` // arbitrary session ids +} + +// SystemEventData is the event_data shape when event_type=="system_event". +type SystemEventData struct { + Event string `json:"event"` // ScheduleToday | SummaryToday | ScheduledGatewayRestart +} + +// SlotAgentUpdate is the body of PATCH /calendar/slots/{id}/agent-update +// (and the virtual variant). started_at + actual_duration are set +// depending on which status transition the agent is reporting. +type SlotAgentUpdate struct { + Status SlotStatus `json:"status"` + StartedAt string `json:"started_at,omitempty"` // HH:MM:SS + ActualDuration int `json:"actual_duration,omitempty"` // minutes +} + +// AgentStatusPush is the body of POST /calendar/agent/status. +type AgentStatusPush struct { + ClawIdentifier string `json:"claw_identifier"` + AgentID string `json:"agent_id"` + Status AgentStatusValue `json:"status"` +} + +// HasRealID reports whether a Slot is the materialized (DB row) flavor. +func (s Slot) HasRealID() bool { return s.ID != nil && *s.ID > 0 } + +// SlotIdent returns a stable string identifier for log + map keys — +// "real:" for materialized, "virtual:" for virtual. +func (s Slot) SlotIdent() string { + if s.HasRealID() { + return formatInt("real", *s.ID) + } + if s.VirtualID != nil { + return "virtual:" + *s.VirtualID + } + return "unknown:" + time.Now().UTC().Format(time.RFC3339Nano) +} + +func formatInt(prefix string, n int64) string { + // avoid pulling fmt for one call + const digits = "0123456789" + if n == 0 { + return prefix + ":0" + } + neg := n < 0 + if neg { + n = -n + } + buf := make([]byte, 0, 20) + for n > 0 { + buf = append([]byte{digits[n%10]}, buf...) + n /= 10 + } + if neg { + buf = append([]byte{'-'}, buf...) + } + return prefix + ":" + string(buf) } diff --git a/internal/tools/tools.go b/internal/tools/tools.go index cfbbde8..3238035 100644 --- a/internal/tools/tools.go +++ b/internal/tools/tools.go @@ -176,11 +176,15 @@ func toolCalendarResume(ctx context.Context, deps Deps) (sdkplugin.ToolResult, e } func toolRestartStatus(deps Deps) (sdkplugin.ToolResult, error) { + // HarborForge backend doesn't expose a restart-pending endpoint + // (verified via /openapi.json) so we report the most recent + // heartbeat freshness instead. Useful for operators sanity- + // checking that the plugin's calendar loop is still alive. sch := deps.Scheduler.Status() return jsonResult(map[string]any{ - "pending": sch.RestartPending, - "last_heartbeat": sch.LastHeartbeat, - "observed_at": time.Now().UTC(), + "pending": false, + "last_heartbeats": sch.LastHeartbeats, + "observed_at": time.Now().UTC(), }) }