Compare commits
1 Commits
754e5183f7
...
78b1ec5181
| Author | SHA1 | Date | |
|---|---|---|---|
| 78b1ec5181 |
@@ -73,12 +73,18 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
|
|||||||
"calendar_enabled": p.cfg.CalendarEnabled,
|
"calendar_enabled": p.cfg.CalendarEnabled,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
bgCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
p.cancelBg = cancel
|
||||||
|
|
||||||
|
// Listers + collectors capture bgCtx (not Init ctx) — Init returns
|
||||||
|
// once MCP initialize completes, but the plugin process lives on
|
||||||
|
// and so do the goroutines + closures we registered.
|
||||||
collect := func() telemetry.Snapshot {
|
collect := func() telemetry.Snapshot {
|
||||||
return telemetry.Collect(telemetry.CollectOpts{
|
return telemetry.Collect(telemetry.CollectOpts{
|
||||||
Identifier: p.cfg.Identifier,
|
Identifier: p.cfg.Identifier,
|
||||||
Version: Version,
|
Version: Version,
|
||||||
AgentLister: func() []telemetry.AgentInfo {
|
AgentLister: func() []telemetry.AgentInfo {
|
||||||
return p.listAgents(ctx, profileRoot)
|
return p.listAgents(bgCtx, profileRoot)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -86,9 +92,6 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
|
|||||||
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
|
p.bridge = monitor.New(p.cfg.MonitorPort, collect,
|
||||||
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
func(level, msg string, attrs map[string]any) { host.Log(level, msg, attrs) })
|
||||||
|
|
||||||
bgCtx, cancel := context.WithCancel(context.Background())
|
|
||||||
p.cancelBg = cancel
|
|
||||||
|
|
||||||
if err := p.bridge.Start(bgCtx); err != nil {
|
if err := p.bridge.Start(bgCtx); err != nil {
|
||||||
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
|
host.Log("warn", "monitor bridge failed to start", map[string]any{"err": err.Error()})
|
||||||
}
|
}
|
||||||
@@ -97,15 +100,15 @@ func (p *harborForgePlugin) Init(ctx context.Context, host sdkplugin.HostAPI) er
|
|||||||
if calBackend == "" {
|
if calBackend == "" {
|
||||||
calBackend = p.cfg.BackendURL
|
calBackend = p.cfg.BackendURL
|
||||||
}
|
}
|
||||||
bridge := calendar.New(calBackend, p.cfg.APIKey)
|
bridge := calendar.New(calBackend, p.cfg.Identifier)
|
||||||
p.sched = calendar.NewScheduler(
|
p.sched = calendar.NewScheduler(
|
||||||
calendar.Config{
|
calendar.Config{
|
||||||
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
|
HeartbeatInterval: time.Duration(p.cfg.CalendarHeartbeatIntervalSeconds) * time.Second,
|
||||||
},
|
},
|
||||||
bridge, host, p.cfg.Identifier,
|
bridge, host,
|
||||||
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
|
calendar.PluginInfoTag{Name: "harbor-forge", Version: Version, Backend: "plexum"},
|
||||||
func() []calendar.ReportableAgent {
|
func() []calendar.ReportableAgent {
|
||||||
return p.listReportableAgents(ctx, profileRoot)
|
return p.listReportableAgents(bgCtx, profileRoot)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
if p.cfg.CalendarEnabled {
|
if p.cfg.CalendarEnabled {
|
||||||
@@ -203,21 +206,29 @@ func mapStateToCalendar(s string) calendar.AgentStatusValue {
|
|||||||
case "offline":
|
case "offline":
|
||||||
return calendar.AgentStatusOffline
|
return calendar.AgentStatusOffline
|
||||||
}
|
}
|
||||||
return calendar.AgentStatusUnknown
|
return calendar.AgentStatusOffline
|
||||||
}
|
}
|
||||||
|
|
||||||
// bestEffortAgentID is a v1 stop-gap for tools that need the calling
|
// bestEffortAgentID is a v1 stop-gap for tools that need the calling
|
||||||
// agent's id but don't have it on the ctx (Plexum SDK doesn't yet
|
// agent's id but don't have it on the ctx (Plexum SDK doesn't yet
|
||||||
// expose this — TODO upstream). Returns the only active calendar
|
// expose this — TODO upstream). v1: if exactly one agent has an
|
||||||
// slot's agent if there's exactly one; otherwise empty. The calendar
|
// active calendar slot we return it; otherwise empty. The calendar
|
||||||
// tools (the only ones that need agent context) usually fire when
|
// tools (the only ones that need agent context) usually fire when
|
||||||
// exactly one slot is active.
|
// exactly one slot is active.
|
||||||
func (p *harborForgePlugin) bestEffortAgentID() string {
|
func (p *harborForgePlugin) bestEffortAgentID() string {
|
||||||
sch := p.sched.Status()
|
sch := p.sched.Status()
|
||||||
if len(sch.Active) == 1 {
|
if len(sch.Active) != 1 {
|
||||||
return sch.Active[0].Slot.AgentID
|
return ""
|
||||||
}
|
}
|
||||||
return ""
|
// We don't track AgentID on Slot directly — the scheduler keeps
|
||||||
|
// activeByAgentID. Iterate to find the one.
|
||||||
|
for _, a := range sch.Active {
|
||||||
|
// Slot is shared between agents only via the scheduler's maps;
|
||||||
|
// here we have just the Slot struct without owner.
|
||||||
|
_ = a
|
||||||
|
}
|
||||||
|
// Fallback to scheduler's helper:
|
||||||
|
return p.sched.SingleActiveAgentID()
|
||||||
}
|
}
|
||||||
|
|
||||||
func manifestFromDisk() sdkplugin.Manifest {
|
func manifestFromDisk() sdkplugin.Manifest {
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
// Bridge — thin HTTP client for the HarborForge backend's Calendar API.
|
// Bridge — typed HTTP client for HarborForge.Backend's calendar API.
|
||||||
// All operations carry the API key as Authorization: Bearer; absent
|
// Endpoint shapes verified via the backend's /openapi.json and against
|
||||||
// key means missing-auth errors from the backend (caller should
|
// HarborForge.OpenclawPlugin/plugin/calendar/calendar-bridge.ts so
|
||||||
// handle them as transient and log).
|
// the two plugins drop into the same backend without per-plugin
|
||||||
|
// adapters.
|
||||||
|
|
||||||
package calendar
|
package calendar
|
||||||
|
|
||||||
@@ -13,30 +14,33 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bridge is the typed wrapper around an HTTP client + backend URL.
|
// Bridge is constructed once per scheduler and reused across heartbeats.
|
||||||
type Bridge struct {
|
type Bridge struct {
|
||||||
BackendURL string
|
BaseURL string
|
||||||
APIKey string
|
ClawIdentifier string
|
||||||
HTTP *http.Client
|
HTTP *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs a bridge with a sensible default timeout.
|
// New constructs a bridge with a 20s default timeout.
|
||||||
func New(backendURL, apiKey string) *Bridge {
|
func New(baseURL, clawIdentifier string) *Bridge {
|
||||||
return &Bridge{
|
return &Bridge{
|
||||||
BackendURL: strings.TrimRight(backendURL, "/"),
|
BaseURL: strings.TrimRight(baseURL, "/"),
|
||||||
APIKey: apiKey,
|
ClawIdentifier: clawIdentifier,
|
||||||
HTTP: &http.Client{Timeout: 20 * time.Second},
|
HTTP: &http.Client{Timeout: 20 * time.Second},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heartbeat POSTs /calendar/agent/heartbeat. Returns the backend's
|
// Heartbeat POSTs /calendar/agent/heartbeat. Per-agent: each running
|
||||||
// reply or an error.
|
// agent on this claw drives its own heartbeat (matches OpenClaw plugin
|
||||||
func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (HeartbeatResponse, error) {
|
// semantics).
|
||||||
raw, err := b.post(ctx, "/calendar/agent/heartbeat", payload)
|
func (b *Bridge) Heartbeat(ctx context.Context, agentID string) (HeartbeatResponse, error) {
|
||||||
|
body := HeartbeatRequest{ClawIdentifier: b.ClawIdentifier, AgentID: agentID}
|
||||||
|
raw, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/heartbeat", agentID, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return HeartbeatResponse{}, err
|
return HeartbeatResponse{}, err
|
||||||
}
|
}
|
||||||
@@ -49,76 +53,64 @@ func (b *Bridge) Heartbeat(ctx context.Context, payload HeartbeatPayload) (Heart
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateSlotStatus POSTs /calendar/slot/<id>/status to mark a slot
|
// UpdateRealSlot PATCHes /calendar/slots/{id}/agent-update.
|
||||||
// completed / aborted / paused / resumed.
|
func (b *Bridge) UpdateRealSlot(ctx context.Context, agentID string, slotID int64, update SlotAgentUpdate) error {
|
||||||
func (b *Bridge) UpdateSlotStatus(ctx context.Context, slotID string, update SlotUpdate) error {
|
path := "/calendar/slots/" + strconv.FormatInt(slotID, 10) + "/agent-update"
|
||||||
if slotID == "" {
|
_, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update)
|
||||||
return errors.New("calendar: slot id required")
|
|
||||||
}
|
|
||||||
_, err := b.post(ctx, "/calendar/slot/"+slotID+"/status", update)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestartPending GETs /restart/status — returns the backend's
|
// UpdateVirtualSlot PATCHes /calendar/slots/virtual/{vid}/agent-update.
|
||||||
// current restart-requested flag.
|
// The backend materialises the virtual slot first; subsequent calls
|
||||||
func (b *Bridge) RestartPending(ctx context.Context) (bool, error) {
|
// against the same logical slot should use UpdateRealSlot with the
|
||||||
raw, err := b.get(ctx, "/restart/status")
|
// id returned in the response — but for v1 we don't round-trip the
|
||||||
if err != nil {
|
// materialised id back to the scheduler (would require a separate
|
||||||
return false, err
|
// fetch); the agent-update path tolerates re-PATCHing a virtual id.
|
||||||
}
|
func (b *Bridge) UpdateVirtualSlot(ctx context.Context, agentID string, virtualID string, update SlotAgentUpdate) error {
|
||||||
var out struct {
|
path := "/calendar/slots/virtual/" + virtualID + "/agent-update"
|
||||||
Pending bool `json:"pending"`
|
_, err := b.doJSON(ctx, http.MethodPatch, path, agentID, update)
|
||||||
}
|
return err
|
||||||
if len(raw) > 0 {
|
|
||||||
if err := json.Unmarshal(raw, &out); err != nil {
|
|
||||||
return false, fmt.Errorf("decode restart-status: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out.Pending, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// post serialises body as JSON, attaches Authorization, returns
|
// PushAgentStatus POSTs /calendar/agent/status. Used to push idle ↔
|
||||||
// response body bytes. Non-2xx becomes an error with the body
|
// busy transitions out of the normal heartbeat cycle.
|
||||||
// included for diagnostics.
|
func (b *Bridge) PushAgentStatus(ctx context.Context, agentID string, status AgentStatusValue) error {
|
||||||
func (b *Bridge) post(ctx context.Context, path string, body any) ([]byte, error) {
|
body := AgentStatusPush{
|
||||||
|
ClawIdentifier: b.ClawIdentifier, AgentID: agentID, Status: status,
|
||||||
|
}
|
||||||
|
_, err := b.doJSON(ctx, http.MethodPost, "/calendar/agent/status", agentID, body)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// doJSON serialises body, attaches the two auth headers, and returns
|
||||||
|
// the response bytes. Errors on non-2xx with truncated body.
|
||||||
|
func (b *Bridge) doJSON(ctx context.Context, method, path, agentID string, body any) ([]byte, error) {
|
||||||
|
if agentID == "" {
|
||||||
|
return nil, errors.New("calendar: agent_id required for auth headers")
|
||||||
|
}
|
||||||
raw, err := json.Marshal(body)
|
raw, err := json.Marshal(body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("marshal %s: %w", path, err)
|
return nil, fmt.Errorf("marshal %s %s: %w", method, path, err)
|
||||||
}
|
}
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.BackendURL+path, bytes.NewReader(raw))
|
req, err := http.NewRequestWithContext(ctx, method, b.BaseURL+path, bytes.NewReader(raw))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
if b.APIKey != "" {
|
req.Header.Set("X-Agent-ID", agentID)
|
||||||
req.Header.Set("Authorization", "Bearer "+b.APIKey)
|
req.Header.Set("X-Claw-Identifier", b.ClawIdentifier)
|
||||||
}
|
|
||||||
return b.do(req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bridge) get(ctx context.Context, path string) ([]byte, error) {
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, b.BackendURL+path, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if b.APIKey != "" {
|
|
||||||
req.Header.Set("Authorization", "Bearer "+b.APIKey)
|
|
||||||
}
|
|
||||||
return b.do(req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Bridge) do(req *http.Request) ([]byte, error) {
|
|
||||||
res, err := b.HTTP.Do(req)
|
res, err := b.HTTP.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("%s %s: %w", req.Method, req.URL.Path, err)
|
return nil, fmt.Errorf("%s %s: %w", method, path, err)
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
body, _ := io.ReadAll(res.Body)
|
out, _ := io.ReadAll(res.Body)
|
||||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||||
return nil, fmt.Errorf("%s %s → %d: %s",
|
return nil, fmt.Errorf("%s %s → %d: %s",
|
||||||
req.Method, req.URL.Path, res.StatusCode, truncate(body, 300))
|
method, path, res.StatusCode, truncate(out, 300))
|
||||||
}
|
}
|
||||||
return body, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func truncate(b []byte, n int) string {
|
func truncate(b []byte, n int) string {
|
||||||
|
|||||||
@@ -1,14 +1,16 @@
|
|||||||
// Scheduler — main loop that heartbeats the backend, dispatches
|
// Scheduler — loops over every Plexum agent, heartbeats per-agent,
|
||||||
// returned slots via Plexum's WakeAgent, and tracks per-agent active
|
// picks the highest-priority pending slot for each, dispatches via
|
||||||
// slot state for the calendar_* tools.
|
// host.WakeAgent. Mirrors HarborForge.OpenclawPlugin's per-agent
|
||||||
|
// scheduler loop (PLG-CAL-002).
|
||||||
//
|
//
|
||||||
// State is in-memory: a daemon restart drops everything. Next
|
// In-memory state: per-agent active slot map. A daemon restart drops
|
||||||
// heartbeat reconciles (backend keeps the canonical SlotStatus).
|
// it; next heartbeat reconciles from the backend's canonical state.
|
||||||
//
|
//
|
||||||
// Concurrency:
|
// Wake semantics: WakeAgent is fire-and-forget; the SDK's wake queue
|
||||||
// - one heartbeat ticker goroutine
|
// (depth 1 replace-newest) handles state-aware dispatch. We mark the
|
||||||
// - per-slot dispatch is fire-and-forget via WakeAgent (queue-aware)
|
// slot Ongoing optimistically the moment we call WakeAgent; agents
|
||||||
// - mu guards activeBySlot + activeByAgent maps
|
// drive complete/abort/pause/resume via the harborforge_calendar_*
|
||||||
|
// tools.
|
||||||
|
|
||||||
package calendar
|
package calendar
|
||||||
|
|
||||||
@@ -22,22 +24,20 @@ import (
|
|||||||
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
sdkplugin "git.hangman-lab.top/hzhang/Plexum-sdk-go/plugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Scheduler orchestrates the calendar loop.
|
// Scheduler is the long-running calendar driver.
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
cfg Config
|
cfg Config
|
||||||
bridge *Bridge
|
bridge *Bridge
|
||||||
host sdkplugin.HostAPI
|
host sdkplugin.HostAPI
|
||||||
agentLister func() []ReportableAgent
|
agentLister func() []ReportableAgent
|
||||||
identifier string
|
pluginInfo PluginInfoTag
|
||||||
pluginInfo PluginInfoTag
|
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
activeBySlotID map[string]*ActiveSlot
|
activeByAgentID map[string]*ActiveSlot
|
||||||
activeByAgentID map[string]*ActiveSlot
|
activeBySlotIdent map[string]*ActiveSlot
|
||||||
history []HistoryEntry
|
history []HistoryEntry
|
||||||
lastHeartbeat time.Time
|
lastHeartbeats map[string]time.Time
|
||||||
lastResponse HeartbeatResponse
|
lastErrors map[string]string
|
||||||
restartPending bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config bundles scheduler tunables.
|
// Config bundles scheduler tunables.
|
||||||
@@ -46,36 +46,42 @@ type Config struct {
|
|||||||
HistoryCap int // bound on activity history; default 32
|
HistoryCap int // bound on activity history; default 32
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportableAgent is the projection of a Plexum agent the scheduler
|
// PluginInfoTag tags heartbeat reports so the backend knows which
|
||||||
// needs for heartbeat — id + model + current sm state.
|
// plugin / version is reporting.
|
||||||
type ReportableAgent struct {
|
type PluginInfoTag struct {
|
||||||
ID string
|
Name string
|
||||||
Model string
|
Version string
|
||||||
State AgentStatusValue
|
Backend string // "plexum"
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActiveSlot tracks an in-flight slot (between WakeAgent dispatch and
|
// ReportableAgent is the per-agent projection the scheduler needs for
|
||||||
// terminal status update).
|
// heartbeat enumeration.
|
||||||
|
type ReportableAgent struct {
|
||||||
|
ID string
|
||||||
|
Model string
|
||||||
|
State AgentStatusValue
|
||||||
|
}
|
||||||
|
|
||||||
|
// ActiveSlot tracks an in-flight slot from dispatch to terminal state.
|
||||||
type ActiveSlot struct {
|
type ActiveSlot struct {
|
||||||
Slot Slot
|
Slot Slot
|
||||||
StartedAt time.Time
|
StartedAt time.Time
|
||||||
LastHeartbeat time.Time
|
LastHeartbeat time.Time
|
||||||
State SlotStatus
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HistoryEntry is one resolved slot kept for the calendar_status tool.
|
// HistoryEntry records one resolved slot for the calendar_status tool.
|
||||||
type HistoryEntry struct {
|
type HistoryEntry struct {
|
||||||
SlotID string
|
Ident string
|
||||||
AgentID string
|
AgentID string
|
||||||
Status SlotStatus
|
Status SlotStatus
|
||||||
ResolvedAt time.Time
|
ResolvedAt time.Time
|
||||||
Reason string
|
Reason string
|
||||||
Summary string
|
Summary string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScheduler constructs a Scheduler in stopped state.
|
// NewScheduler constructs a Scheduler in stopped state.
|
||||||
func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
|
func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
|
||||||
identifier string, pluginInfo PluginInfoTag,
|
pluginInfo PluginInfoTag,
|
||||||
agentLister func() []ReportableAgent) *Scheduler {
|
agentLister func() []ReportableAgent) *Scheduler {
|
||||||
if cfg.HeartbeatInterval <= 0 {
|
if cfg.HeartbeatInterval <= 0 {
|
||||||
cfg.HeartbeatInterval = 30 * time.Second
|
cfg.HeartbeatInterval = 30 * time.Second
|
||||||
@@ -84,193 +90,215 @@ func NewScheduler(cfg Config, bridge *Bridge, host sdkplugin.HostAPI,
|
|||||||
cfg.HistoryCap = 32
|
cfg.HistoryCap = 32
|
||||||
}
|
}
|
||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
bridge: bridge,
|
bridge: bridge,
|
||||||
host: host,
|
host: host,
|
||||||
agentLister: agentLister,
|
agentLister: agentLister,
|
||||||
identifier: identifier,
|
pluginInfo: pluginInfo,
|
||||||
pluginInfo: pluginInfo,
|
activeByAgentID: map[string]*ActiveSlot{},
|
||||||
activeBySlotID: map[string]*ActiveSlot{},
|
activeBySlotIdent: map[string]*ActiveSlot{},
|
||||||
activeByAgentID: map[string]*ActiveSlot{},
|
lastHeartbeats: map[string]time.Time{},
|
||||||
|
lastErrors: map[string]string{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run blocks until ctx cancels, ticking heartbeats every
|
// Run blocks until ctx cancels.
|
||||||
// cfg.HeartbeatInterval. Returns nil on graceful shutdown.
|
|
||||||
func (s *Scheduler) Run(ctx context.Context) error {
|
func (s *Scheduler) Run(ctx context.Context) error {
|
||||||
t := time.NewTicker(s.cfg.HeartbeatInterval)
|
t := time.NewTicker(s.cfg.HeartbeatInterval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
// First heartbeat immediately so initial state lands fast.
|
s.tick(ctx)
|
||||||
s.heartbeatOnce(ctx)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
s.heartbeatOnce(ctx)
|
s.tick(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) heartbeatOnce(ctx context.Context) {
|
func (s *Scheduler) tick(ctx context.Context) {
|
||||||
payload := HeartbeatPayload{
|
if s.agentLister == nil {
|
||||||
Identifier: s.identifier,
|
return
|
||||||
APIKey: s.bridge.APIKey,
|
|
||||||
PluginInfo: s.pluginInfo,
|
|
||||||
CapturedAt: time.Now().UTC(),
|
|
||||||
}
|
}
|
||||||
if s.agentLister != nil {
|
now := time.Now().UTC()
|
||||||
for _, a := range s.agentLister() {
|
agents := s.agentLister()
|
||||||
payload.AgentList = append(payload.AgentList, AgentReport{
|
s.host.Log("info", "calendar tick", map[string]any{"agents": len(agents)})
|
||||||
ID: a.ID, Model: a.Model, Status: a.State,
|
for _, agent := range agents {
|
||||||
})
|
s.tickForAgent(ctx, agent, now)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
resp, err := s.bridge.Heartbeat(ctx, payload)
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) tickForAgent(ctx context.Context, agent ReportableAgent, now time.Time) {
|
||||||
|
resp, err := s.bridge.Heartbeat(ctx, agent.ID)
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.lastHeartbeat = time.Now()
|
s.lastHeartbeats[agent.ID] = now
|
||||||
if err == nil {
|
|
||||||
s.lastResponse = resp
|
|
||||||
s.restartPending = resp.RestartPending
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return // network blip; next tick retries
|
s.lastErrors[agent.ID] = err.Error()
|
||||||
|
s.mu.Unlock()
|
||||||
|
s.host.Log("warn", "calendar heartbeat failed", map[string]any{
|
||||||
|
"agent": agent.ID, "err": err.Error(),
|
||||||
|
})
|
||||||
|
return
|
||||||
}
|
}
|
||||||
for _, slot := range resp.SlotsToFire {
|
delete(s.lastErrors, agent.ID)
|
||||||
s.dispatchSlot(ctx, slot)
|
s.mu.Unlock()
|
||||||
|
s.host.Log("info", "calendar heartbeat ok", map[string]any{
|
||||||
|
"agent": agent.ID, "slots": len(resp.Slots), "agent_status": string(resp.AgentStatus),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Pick highest-priority NotStarted slot; defer the rest.
|
||||||
|
var chosen *Slot
|
||||||
|
for i := range resp.Slots {
|
||||||
|
slot := &resp.Slots[i]
|
||||||
|
if slot.Status != SlotNotStarted && slot.Status != SlotDeferred {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if chosen == nil || slot.Priority > chosen.Priority {
|
||||||
|
chosen = slot
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if chosen != nil {
|
||||||
|
s.dispatchSlot(ctx, agent.ID, *chosen)
|
||||||
|
}
|
||||||
|
// Defer the other unchosen NotStarted/Deferred slots (priority +1)
|
||||||
|
// so they bubble up next heartbeat. We don't strictly need to push
|
||||||
|
// the update; the backend's priority bookkeeping survives without
|
||||||
|
// our nudge for v1. (OpenClaw plugin DOES push priority bumps —
|
||||||
|
// future v2 work if backend feedback shows starvation.)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchSlot fires the slot via host.WakeAgent and records it as
|
// dispatchSlot fires WakeAgent + records the slot active. Marks the
|
||||||
// active. WakeAgent handles state-aware queueing — if the agent is
|
// slot Ongoing on the backend so the dashboard reflects the
|
||||||
// busy, our calendar slot enqueues at depth 1 and the previous wake
|
// transition immediately.
|
||||||
// is dropped per replace-newest semantics. We mark the slot
|
func (s *Scheduler) dispatchSlot(ctx context.Context, agentID string, slot Slot) {
|
||||||
// in_progress optimistically when we ENQUEUED; backend reconciles on
|
ident := slot.SlotIdent()
|
||||||
// its own watchdog.
|
|
||||||
func (s *Scheduler) dispatchSlot(ctx context.Context, slot Slot) {
|
|
||||||
// Skip already-active slots (heartbeat may re-list a slot we
|
|
||||||
// already started — backend hasn't seen our optimistic update yet).
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if _, ok := s.activeBySlotID[slot.ID]; ok {
|
if _, dup := s.activeBySlotIdent[ident]; dup {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, agentBusy := s.activeByAgentID[agentID]; agentBusy {
|
||||||
|
// Don't pick up another slot until the current one resolves.
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
act := &ActiveSlot{
|
active := &ActiveSlot{Slot: slot, StartedAt: now, LastHeartbeat: now}
|
||||||
Slot: slot, StartedAt: now, LastHeartbeat: now,
|
s.activeBySlotIdent[ident] = active
|
||||||
State: SlotInProgress,
|
s.activeByAgentID[agentID] = active
|
||||||
}
|
|
||||||
s.activeBySlotID[slot.ID] = act
|
|
||||||
s.activeByAgentID[slot.AgentID] = act
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
message := slot.WakeOptions.OverrideMessage
|
message := buildWakeMessage(slot)
|
||||||
if message == "" {
|
source := "calendar:" + ident
|
||||||
message = slot.PromptText
|
|
||||||
}
|
|
||||||
if message == "" {
|
|
||||||
message = fmt.Sprintf("[calendar] slot %s: %s", slot.ID, slot.Title)
|
|
||||||
}
|
|
||||||
source := fmt.Sprintf("calendar:slot-%s", slot.ID)
|
|
||||||
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
|
if err := s.host.WakeAgent(ctx, sdkplugin.WakeAgentRequest{
|
||||||
AgentID: slot.AgentID,
|
AgentID: agentID, Message: message, Source: source,
|
||||||
Message: message,
|
|
||||||
Source: source,
|
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
// Wake itself failed (plumbing). Mark slot aborted +
|
s.resolveLocally(ident, agentID, SlotAborted, "", "wake failed: "+err.Error())
|
||||||
// notify backend.
|
|
||||||
s.resolveSlot(ctx, slot.ID, SlotAborted, "", "wake-agent failed: "+err.Error())
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Mark Ongoing on the backend.
|
||||||
|
update := SlotAgentUpdate{
|
||||||
|
Status: SlotOngoing, StartedAt: now.Format("15:04:05"),
|
||||||
|
}
|
||||||
|
s.pushUpdate(ctx, agentID, slot, update)
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolveSlot moves an active slot to a terminal status, records
|
func buildWakeMessage(slot Slot) string {
|
||||||
// history, and tells the backend. Safe to call concurrently.
|
// Backend EventData → prompt. v1 is intentionally simple; refine
|
||||||
func (s *Scheduler) resolveSlot(ctx context.Context, slotID string, status SlotStatus, summary, reason string) error {
|
// when the prompt-engineering side of the plugin matures.
|
||||||
s.mu.Lock()
|
if slot.EventType != nil {
|
||||||
act, ok := s.activeBySlotID[slotID]
|
switch *slot.EventType {
|
||||||
if !ok {
|
case EventTypeSystemEvent:
|
||||||
s.mu.Unlock()
|
if ev, ok := slot.EventData["event"].(string); ok {
|
||||||
return fmt.Errorf("calendar: slot %s not active", slotID)
|
return fmt.Sprintf("[calendar system_event] %s", ev)
|
||||||
|
}
|
||||||
|
case EventTypeJob:
|
||||||
|
code, _ := slot.EventData["code"].(string)
|
||||||
|
typ, _ := slot.EventData["type"].(string)
|
||||||
|
if code != "" {
|
||||||
|
return fmt.Sprintf("[calendar job %s/%s] please handle this", typ, code)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
delete(s.activeBySlotID, slotID)
|
return fmt.Sprintf("[calendar slot %s] scheduled work — please proceed", slot.SlotIdent())
|
||||||
delete(s.activeByAgentID, act.Slot.AgentID)
|
}
|
||||||
s.appendHistoryLocked(HistoryEntry{
|
|
||||||
SlotID: slotID, AgentID: act.Slot.AgentID, Status: status,
|
// CompleteForAgent → terminal; pushes Finished to backend.
|
||||||
|
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
|
||||||
|
act, ok := s.activeSlotForAgent(agentID)
|
||||||
|
if !ok {
|
||||||
|
return ErrNoActiveSlot
|
||||||
|
}
|
||||||
|
now := time.Now().UTC()
|
||||||
|
duration := int(now.Sub(act.StartedAt).Minutes())
|
||||||
|
if duration < 1 {
|
||||||
|
duration = 1
|
||||||
|
}
|
||||||
|
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{
|
||||||
|
Status: SlotFinished, ActualDuration: duration,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotFinished, summary, "")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AbortForAgent → terminal; pushes Aborted to backend.
|
||||||
|
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
|
||||||
|
act, ok := s.activeSlotForAgent(agentID)
|
||||||
|
if !ok {
|
||||||
|
return ErrNoActiveSlot
|
||||||
|
}
|
||||||
|
if err := s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotAborted}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.resolveLocally(act.Slot.SlotIdent(), agentID, SlotAborted, "", reason)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PauseForAgent → non-terminal; pushes Paused.
|
||||||
|
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
|
||||||
|
act, ok := s.activeSlotForAgent(agentID)
|
||||||
|
if !ok {
|
||||||
|
return ErrNoActiveSlot
|
||||||
|
}
|
||||||
|
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotPaused})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResumeForAgent → non-terminal; pushes Ongoing.
|
||||||
|
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
|
||||||
|
act, ok := s.activeSlotForAgent(agentID)
|
||||||
|
if !ok {
|
||||||
|
return ErrNoActiveSlot
|
||||||
|
}
|
||||||
|
return s.pushUpdate(ctx, agentID, act.Slot, SlotAgentUpdate{Status: SlotOngoing})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) pushUpdate(ctx context.Context, agentID string, slot Slot, update SlotAgentUpdate) error {
|
||||||
|
if slot.HasRealID() {
|
||||||
|
return s.bridge.UpdateRealSlot(ctx, agentID, *slot.ID, update)
|
||||||
|
}
|
||||||
|
if slot.VirtualID != nil {
|
||||||
|
return s.bridge.UpdateVirtualSlot(ctx, agentID, *slot.VirtualID, update)
|
||||||
|
}
|
||||||
|
return errors.New("calendar: slot has neither real id nor virtual id")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) resolveLocally(ident, agentID string, status SlotStatus, summary, reason string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
delete(s.activeBySlotIdent, ident)
|
||||||
|
delete(s.activeByAgentID, agentID)
|
||||||
|
s.history = append(s.history, HistoryEntry{
|
||||||
|
Ident: ident, AgentID: agentID, Status: status,
|
||||||
ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason,
|
ResolvedAt: time.Now().UTC(), Summary: summary, Reason: reason,
|
||||||
})
|
})
|
||||||
s.mu.Unlock()
|
|
||||||
return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{
|
|
||||||
Status: status, Summary: summary, Reason: reason,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetSlotState is a non-terminal status change (paused/resumed).
|
|
||||||
// Records the new state in-memory and tells the backend.
|
|
||||||
func (s *Scheduler) SetSlotState(ctx context.Context, slotID string, status SlotStatus, reason string) error {
|
|
||||||
s.mu.Lock()
|
|
||||||
act, ok := s.activeBySlotID[slotID]
|
|
||||||
if !ok {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return fmt.Errorf("calendar: slot %s not active", slotID)
|
|
||||||
}
|
|
||||||
act.State = status
|
|
||||||
act.LastHeartbeat = time.Now().UTC()
|
|
||||||
s.mu.Unlock()
|
|
||||||
return s.bridge.UpdateSlotStatus(ctx, slotID, SlotUpdate{
|
|
||||||
Status: status, Reason: reason,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scheduler) appendHistoryLocked(entry HistoryEntry) {
|
|
||||||
s.history = append(s.history, entry)
|
|
||||||
if len(s.history) > s.cfg.HistoryCap {
|
if len(s.history) > s.cfg.HistoryCap {
|
||||||
s.history = s.history[len(s.history)-s.cfg.HistoryCap:]
|
s.history = s.history[len(s.history)-s.cfg.HistoryCap:]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteForAgent / AbortForAgent / PauseForAgent / ResumeForAgent
|
|
||||||
// are the agent-facing tool entry points. They look up the agent's
|
|
||||||
// active slot, transition or terminate it, and notify the backend.
|
|
||||||
|
|
||||||
// CompleteForAgent terminates the agent's active slot as completed.
|
|
||||||
func (s *Scheduler) CompleteForAgent(ctx context.Context, agentID, summary string) error {
|
|
||||||
slot, ok := s.activeSlotForAgent(agentID)
|
|
||||||
if !ok {
|
|
||||||
return ErrNoActiveSlot
|
|
||||||
}
|
|
||||||
return s.resolveSlot(ctx, slot.Slot.ID, SlotCompleted, summary, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
// AbortForAgent terminates the agent's active slot as aborted.
|
|
||||||
func (s *Scheduler) AbortForAgent(ctx context.Context, agentID, reason string) error {
|
|
||||||
slot, ok := s.activeSlotForAgent(agentID)
|
|
||||||
if !ok {
|
|
||||||
return ErrNoActiveSlot
|
|
||||||
}
|
|
||||||
return s.resolveSlot(ctx, slot.Slot.ID, SlotAborted, "", reason)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PauseForAgent transitions the agent's slot to paused.
|
|
||||||
func (s *Scheduler) PauseForAgent(ctx context.Context, agentID, reason string) error {
|
|
||||||
slot, ok := s.activeSlotForAgent(agentID)
|
|
||||||
if !ok {
|
|
||||||
return ErrNoActiveSlot
|
|
||||||
}
|
|
||||||
return s.SetSlotState(ctx, slot.Slot.ID, SlotPaused, reason)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResumeForAgent transitions the agent's slot back to in_progress.
|
|
||||||
func (s *Scheduler) ResumeForAgent(ctx context.Context, agentID string) error {
|
|
||||||
slot, ok := s.activeSlotForAgent(agentID)
|
|
||||||
if !ok {
|
|
||||||
return ErrNoActiveSlot
|
|
||||||
}
|
|
||||||
return s.SetSlotState(ctx, slot.Slot.ID, SlotInProgress, "")
|
|
||||||
}
|
|
||||||
|
|
||||||
// activeSlotForAgent returns the per-agent active slot copy under lock.
|
|
||||||
func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
|
func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
@@ -281,36 +309,59 @@ func (s *Scheduler) activeSlotForAgent(agentID string) (ActiveSlot, bool) {
|
|||||||
return *act, true
|
return *act, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status returns the introspection shape for the calendar_status tool.
|
// Status is the introspection shape calendar_status returns.
|
||||||
func (s *Scheduler) Status() SchedulerStatus {
|
type Status struct {
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
LastHeartbeats map[string]time.Time `json:"last_heartbeats"`
|
||||||
|
LastErrors map[string]string `json:"last_errors,omitempty"`
|
||||||
|
HeartbeatEvery time.Duration `json:"heartbeat_every"`
|
||||||
|
Active []ActiveSlot `json:"active"`
|
||||||
|
History []HistoryEntry `json:"history"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SingleActiveAgentID returns the agent id when exactly one active
|
||||||
|
// slot exists, empty otherwise. Used by the plugin's bestEffortAgentID
|
||||||
|
// fallback for tool calls that don't carry agent context.
|
||||||
|
func (s *Scheduler) SingleActiveAgentID() string {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
active := make([]ActiveSlot, 0, len(s.activeBySlotID))
|
if len(s.activeByAgentID) != 1 {
|
||||||
for _, a := range s.activeBySlotID {
|
return ""
|
||||||
|
}
|
||||||
|
for k := range s.activeByAgentID {
|
||||||
|
return k
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status returns the introspection shape calendar_status returns.
|
||||||
|
func (s *Scheduler) Status() Status {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
active := make([]ActiveSlot, 0, len(s.activeByAgentID))
|
||||||
|
for _, a := range s.activeByAgentID {
|
||||||
active = append(active, *a)
|
active = append(active, *a)
|
||||||
}
|
}
|
||||||
|
hb := make(map[string]time.Time, len(s.lastHeartbeats))
|
||||||
|
for k, v := range s.lastHeartbeats {
|
||||||
|
hb[k] = v
|
||||||
|
}
|
||||||
|
errs := make(map[string]string, len(s.lastErrors))
|
||||||
|
for k, v := range s.lastErrors {
|
||||||
|
errs[k] = v
|
||||||
|
}
|
||||||
history := make([]HistoryEntry, len(s.history))
|
history := make([]HistoryEntry, len(s.history))
|
||||||
copy(history, s.history)
|
copy(history, s.history)
|
||||||
return SchedulerStatus{
|
return Status{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
LastHeartbeat: s.lastHeartbeat,
|
LastHeartbeats: hb,
|
||||||
|
LastErrors: errs,
|
||||||
HeartbeatEvery: s.cfg.HeartbeatInterval,
|
HeartbeatEvery: s.cfg.HeartbeatInterval,
|
||||||
Active: active,
|
Active: active,
|
||||||
History: history,
|
History: history,
|
||||||
RestartPending: s.restartPending,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SchedulerStatus is the shape calendar_status returns.
|
// ErrNoActiveSlot is returned when an agent calls calendar_complete /
|
||||||
type SchedulerStatus struct {
|
// abort / pause / resume but has no slot active.
|
||||||
Enabled bool `json:"enabled"`
|
|
||||||
LastHeartbeat time.Time `json:"last_heartbeat"`
|
|
||||||
HeartbeatEvery time.Duration `json:"heartbeat_every"`
|
|
||||||
Active []ActiveSlot `json:"active"`
|
|
||||||
History []HistoryEntry `json:"history"`
|
|
||||||
RestartPending bool `json:"restart_pending"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrNoActiveSlot is returned by calendar_complete/abort/pause/resume
|
|
||||||
// when the agent has no slot in progress.
|
|
||||||
var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")
|
var ErrNoActiveSlot = errors.New("calendar: no active slot for agent")
|
||||||
|
|||||||
@@ -1,106 +1,152 @@
|
|||||||
// Package calendar talks to the HarborForge backend's Calendar API
|
// Types matching HarborForge.Backend's actual calendar API contract
|
||||||
// (heartbeat, slot fetch, status update, restart-pending check) and
|
// (verified via /openapi.json on a running backend). Aligns 1:1 with
|
||||||
// drives a scheduler loop that fires Plexum wake events when slots
|
// HarborForge.OpenclawPlugin/plugin/calendar/types.ts so the two
|
||||||
// come due. Types mirror HarborForge.OpenclawPlugin's calendar/types.ts
|
// plugins can hit the same backend interchangeably.
|
||||||
// so the backend doesn't need to know which plugin is reporting.
|
|
||||||
|
|
||||||
package calendar
|
package calendar
|
||||||
|
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
// SlotStatus enumerates the slot lifecycle.
|
// SlotStatus enumerates the lifecycle. String values match backend's
|
||||||
|
// SlotStatus enum verbatim (camelCase as stored in DB).
|
||||||
type SlotStatus string
|
type SlotStatus string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
SlotNotStarted SlotStatus = "not_started"
|
SlotNotStarted SlotStatus = "NotStarted"
|
||||||
SlotInProgress SlotStatus = "in_progress"
|
SlotOngoing SlotStatus = "Ongoing"
|
||||||
SlotCompleted SlotStatus = "completed"
|
SlotFinished SlotStatus = "Finished"
|
||||||
SlotAborted SlotStatus = "aborted"
|
SlotAborted SlotStatus = "Aborted"
|
||||||
SlotPaused SlotStatus = "paused"
|
SlotDeferred SlotStatus = "Deferred"
|
||||||
SlotDeferred SlotStatus = "deferred"
|
SlotPaused SlotStatus = "Paused"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AgentStatusValue mirrors the backend AgentStatus enum used in
|
// SlotType: work vs on_call. Affects whether the agent flips to busy.
|
||||||
// heartbeat responses (a hint about what the backend thinks the
|
type SlotType string
|
||||||
// agent is doing).
|
|
||||||
|
const (
|
||||||
|
SlotTypeWork SlotType = "work"
|
||||||
|
SlotTypeOnCall SlotType = "on_call"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EventType categorises what the slot represents.
|
||||||
|
type EventType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventTypeJob EventType = "job"
|
||||||
|
EventTypeSystemEvent EventType = "system_event"
|
||||||
|
EventTypeEntertainment EventType = "entertainment"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AgentStatusValue mirrors the backend AgentStatus enum.
|
||||||
type AgentStatusValue string
|
type AgentStatusValue string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
AgentStatusUnknown AgentStatusValue = "unknown"
|
AgentStatusIdle AgentStatusValue = "idle"
|
||||||
AgentStatusIdle AgentStatusValue = "idle"
|
AgentStatusBusy AgentStatusValue = "busy"
|
||||||
AgentStatusBusy AgentStatusValue = "busy"
|
AgentStatusOffline AgentStatusValue = "offline"
|
||||||
AgentStatusOffline AgentStatusValue = "offline"
|
AgentStatusOnCall AgentStatusValue = "on_call"
|
||||||
AgentStatusOnCall AgentStatusValue = "on_call"
|
AgentStatusExhausted AgentStatusValue = "exhausted"
|
||||||
AgentStatusPaused AgentStatusValue = "paused"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SlotKind is "work" vs "on_call" — affects how the scheduler treats
|
// HeartbeatRequest is the POST /calendar/agent/heartbeat body.
|
||||||
// the slot (on_call slots don't move the agent into busy).
|
type HeartbeatRequest struct {
|
||||||
type SlotKind string
|
ClawIdentifier string `json:"claw_identifier"`
|
||||||
|
AgentID string `json:"agent_id"`
|
||||||
const (
|
|
||||||
SlotKindWork SlotKind = "work"
|
|
||||||
SlotKindOnCall SlotKind = "on_call"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Slot is one Calendar TimeSlot the backend serves.
|
|
||||||
type Slot struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
VirtualID string `json:"virtual_id,omitempty"`
|
|
||||||
AgentID string `json:"agent_id"`
|
|
||||||
ClawID string `json:"claw_identifier,omitempty"`
|
|
||||||
Kind SlotKind `json:"slot_type"`
|
|
||||||
Title string `json:"title,omitempty"`
|
|
||||||
Description string `json:"description,omitempty"`
|
|
||||||
ScheduledAt time.Time `json:"scheduled_at"`
|
|
||||||
ExpiresAt *time.Time `json:"expires_at,omitempty"`
|
|
||||||
Status SlotStatus `json:"status"`
|
|
||||||
PromptText string `json:"prompt,omitempty"`
|
|
||||||
WakeOptions WakeOpts `json:"wake_options,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WakeOpts customise how the scheduler should drive the agent. v1
|
// HeartbeatResponse is the backend's reply.
|
||||||
// honours only Force; the rest pass through as audit trail.
|
|
||||||
type WakeOpts struct {
|
|
||||||
Force bool `json:"force,omitempty"`
|
|
||||||
OverrideMessage string `json:"override_message,omitempty"`
|
|
||||||
ScopeSessionID string `json:"scope_session_id,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// HeartbeatPayload is what the plugin POSTs every interval.
|
|
||||||
type HeartbeatPayload struct {
|
|
||||||
Identifier string `json:"identifier"`
|
|
||||||
APIKey string `json:"api_key,omitempty"`
|
|
||||||
AgentList []AgentReport `json:"agents"`
|
|
||||||
PluginInfo PluginInfoTag `json:"plugin"`
|
|
||||||
CapturedAt time.Time `json:"captured_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// AgentReport is one entry in HeartbeatPayload.AgentList.
|
|
||||||
type AgentReport struct {
|
|
||||||
ID string `json:"agent_id"`
|
|
||||||
Status AgentStatusValue `json:"status"`
|
|
||||||
Model string `json:"model,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// PluginInfoTag identifies which plugin / version is heartbeating.
|
|
||||||
type PluginInfoTag struct {
|
|
||||||
Name string `json:"name"` // "harbor-forge"
|
|
||||||
Version string `json:"version"` // e.g. 0.1.0
|
|
||||||
Backend string `json:"backend"` // "plexum"
|
|
||||||
}
|
|
||||||
|
|
||||||
// HeartbeatResponse is the backend's reply. SlotsToFire are slots
|
|
||||||
// the scheduler should attempt to start.
|
|
||||||
type HeartbeatResponse struct {
|
type HeartbeatResponse struct {
|
||||||
SlotsToFire []Slot `json:"slots_to_fire,omitempty"`
|
Slots []Slot `json:"slots"`
|
||||||
RestartPending bool `json:"restart_pending,omitempty"`
|
AgentStatus AgentStatusValue `json:"agent_status"`
|
||||||
ServerTime time.Time `json:"server_time"`
|
Message string `json:"message,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SlotUpdate is the body of POST /calendar/slot/<id>/status.
|
// Slot is one calendar TimeSlot — real (has ID) or virtual
|
||||||
type SlotUpdate struct {
|
// (has VirtualID). Field names mirror the backend's
|
||||||
Status SlotStatus `json:"status"`
|
// CalendarSlotResponse schema.
|
||||||
Summary string `json:"summary,omitempty"`
|
type Slot struct {
|
||||||
Reason string `json:"reason,omitempty"`
|
ID *int64 `json:"id"` // real slot db id; null for virtual
|
||||||
|
VirtualID *string `json:"virtual_id"` // plan-{plan_id}-{date}; null for real
|
||||||
|
UserID int64 `json:"user_id"`
|
||||||
|
Date string `json:"date"` // YYYY-MM-DD
|
||||||
|
SlotType SlotType `json:"slot_type"`
|
||||||
|
EstimatedDuration int `json:"estimated_duration"` // minutes
|
||||||
|
ScheduledAt string `json:"scheduled_at"` // HH:MM:SS
|
||||||
|
StartedAt *string `json:"started_at"`
|
||||||
|
Attended bool `json:"attended"`
|
||||||
|
ActualDuration *int `json:"actual_duration"`
|
||||||
|
EventType *EventType `json:"event_type"`
|
||||||
|
EventData EventData `json:"event_data"`
|
||||||
|
Priority int `json:"priority"`
|
||||||
|
Status SlotStatus `json:"status"`
|
||||||
|
PlanID *int64 `json:"plan_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventData is loosely-typed since the backend stores it as JSONB and
|
||||||
|
// the shape varies by event_type. Plugin code does best-effort
|
||||||
|
// unmarshal into JobData / SystemEventData when needed.
|
||||||
|
type EventData map[string]any
|
||||||
|
|
||||||
|
// JobData is the event_data shape when event_type=="job".
|
||||||
|
type JobData struct {
|
||||||
|
Type string `json:"type"` // Task|Support|Meeting|Essential
|
||||||
|
Code string `json:"code"` // e.g. "TASK-42"
|
||||||
|
WorkingSessions []string `json:"working_sessions"` // arbitrary session ids
|
||||||
|
}
|
||||||
|
|
||||||
|
// SystemEventData is the event_data shape when event_type=="system_event".
|
||||||
|
type SystemEventData struct {
|
||||||
|
Event string `json:"event"` // ScheduleToday | SummaryToday | ScheduledGatewayRestart
|
||||||
|
}
|
||||||
|
|
||||||
|
// SlotAgentUpdate is the body of PATCH /calendar/slots/{id}/agent-update
|
||||||
|
// (and the virtual variant). started_at + actual_duration are set
|
||||||
|
// depending on which status transition the agent is reporting.
|
||||||
|
type SlotAgentUpdate struct {
|
||||||
|
Status SlotStatus `json:"status"`
|
||||||
|
StartedAt string `json:"started_at,omitempty"` // HH:MM:SS
|
||||||
|
ActualDuration int `json:"actual_duration,omitempty"` // minutes
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentStatusPush is the body of POST /calendar/agent/status.
|
||||||
|
type AgentStatusPush struct {
|
||||||
|
ClawIdentifier string `json:"claw_identifier"`
|
||||||
|
AgentID string `json:"agent_id"`
|
||||||
|
Status AgentStatusValue `json:"status"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasRealID reports whether a Slot is the materialized (DB row) flavor.
|
||||||
|
func (s Slot) HasRealID() bool { return s.ID != nil && *s.ID > 0 }
|
||||||
|
|
||||||
|
// SlotIdent returns a stable string identifier for log + map keys —
|
||||||
|
// "real:<id>" for materialized, "virtual:<vid>" for virtual.
|
||||||
|
func (s Slot) SlotIdent() string {
|
||||||
|
if s.HasRealID() {
|
||||||
|
return formatInt("real", *s.ID)
|
||||||
|
}
|
||||||
|
if s.VirtualID != nil {
|
||||||
|
return "virtual:" + *s.VirtualID
|
||||||
|
}
|
||||||
|
return "unknown:" + time.Now().UTC().Format(time.RFC3339Nano)
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatInt(prefix string, n int64) string {
|
||||||
|
// avoid pulling fmt for one call
|
||||||
|
const digits = "0123456789"
|
||||||
|
if n == 0 {
|
||||||
|
return prefix + ":0"
|
||||||
|
}
|
||||||
|
neg := n < 0
|
||||||
|
if neg {
|
||||||
|
n = -n
|
||||||
|
}
|
||||||
|
buf := make([]byte, 0, 20)
|
||||||
|
for n > 0 {
|
||||||
|
buf = append([]byte{digits[n%10]}, buf...)
|
||||||
|
n /= 10
|
||||||
|
}
|
||||||
|
if neg {
|
||||||
|
buf = append([]byte{'-'}, buf...)
|
||||||
|
}
|
||||||
|
return prefix + ":" + string(buf)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -176,11 +176,15 @@ func toolCalendarResume(ctx context.Context, deps Deps) (sdkplugin.ToolResult, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func toolRestartStatus(deps Deps) (sdkplugin.ToolResult, error) {
|
func toolRestartStatus(deps Deps) (sdkplugin.ToolResult, error) {
|
||||||
|
// HarborForge backend doesn't expose a restart-pending endpoint
|
||||||
|
// (verified via /openapi.json) so we report the most recent
|
||||||
|
// heartbeat freshness instead. Useful for operators sanity-
|
||||||
|
// checking that the plugin's calendar loop is still alive.
|
||||||
sch := deps.Scheduler.Status()
|
sch := deps.Scheduler.Status()
|
||||||
return jsonResult(map[string]any{
|
return jsonResult(map[string]any{
|
||||||
"pending": sch.RestartPending,
|
"pending": false,
|
||||||
"last_heartbeat": sch.LastHeartbeat,
|
"last_heartbeats": sch.LastHeartbeats,
|
||||||
"observed_at": time.Now().UTC(),
|
"observed_at": time.Now().UTC(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user