Files
HarborForge.Backend/app/api/routers/calendar.py
hzhang dcaaa4259a feat(calendar): maintenance window + schedule_type special slots
## What this adds

1. **Maintenance window on ScheduleType**
   - New columns: maintenance_from / maintenance_to (UTC hours, 0-23)
   - Invariant: window is exactly 1 hour (validated in pydantic;
     maintenance_to must equal (maintenance_from + 1) % 24)
   - Default applied via additive migration: 8:00-9:00 UTC for existing
     rows so deployments don't crash on first boot

2. **ScheduleTypeSpecialSlot** — admin-managed slot template
   - New table schedule_type_special_slots
   - Admin (schedule_type.manage) CRUD via
     /schedule-types/{id}/special-slots
   - Fields: name, description, minute_in_window (0-59 inside the
     parent maintenance window), estimated_duration, priority,
     event_data (JSON merged into materialised slot), is_active
   - Unique constraint (schedule_type_id, name) — name is the stable
     human-readable identifier per cohort

3. **Per-agent materialisation**
   - New service app/services/special_slot_materialiser.py
   - GET /calendar/sync calls materialise_special_slots_for_claw
     (idempotent, one row per agent per template per date)
   - GET /calendar/day calls materialise_special_slots_for_user
   - Materialised rows are slot_type=system, event_type=system_event,
     is_admin_locked=true, special_slot_id pointing back to template
   - Plugin's runSync picks them up like any other due slot via the
     normal real-slots query path

4. **Admin-locked enforcement**
   - New TimeSlot columns: is_admin_locked, special_slot_id (FK to
     schedule_type_special_slots, ON DELETE SET NULL)
   - PATCH /calendar/slots/{id}: refuses any edit on admin-locked
     slots (423)
   - POST /calendar/slots/{id}/cancel: refuses cancel on admin-locked
     (423)
   - PATCH /calendar/slots/{id}/agent-update: admin-locked accept only
     ongoing/paused/finished/aborted statuses (423 on other transitions)

5. **Maintenance-window guard on slot creation**
   - POST /calendar/slots: rejects slot_type=system outright (only
     materialiser may create system slots) and rejects any non-system
     slot whose [scheduled_at, +duration] intersects the calling
     user's schedule_type maintenance window (422). Handles 23->0 wrap

6. **Schema response**
   - TimeSlotResponse / CalendarSlotItem now include is_admin_locked
     and special_slot_id so clients can render the lock indicator and
     trace back to the template

## Migration

Additive only — no destructive changes. Lives in _migrate_schema()
in app/main.py; the new schedule_type_special_slots table is created
by Base.metadata.create_all() on first boot.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 19:18:42 +01:00

1403 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Calendar API router.
BE-CAL-004: MinimumWorkload CRUD endpoints.
BE-CAL-API-001: Single-slot creation endpoint.
BE-CAL-API-002: Day-view calendar query endpoint.
BE-CAL-API-003: Calendar slot edit endpoints (real + virtual).
BE-CAL-API-004: Calendar slot cancel endpoints (real + virtual).
BE-CAL-API-005: Plan schedule / plan list endpoints.
BE-CAL-API-006: Plan edit / plan cancel endpoints.
BE-CAL-API-007: Date-list endpoint.
"""
from datetime import date as date_type, datetime, timezone
from typing import Optional
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_user
from app.core.config import get_db
from app.models.calendar import SchedulePlan, SlotStatus, SlotType, TimeSlot
from app.models.models import User
from app.models.agent import Agent, AgentStatus, ExhaustReason
from app.models.schedule_type import ScheduleType
from app.services.special_slot_materialiser import (
materialise_special_slots_for_claw,
materialise_special_slots_for_user,
)
from app.schemas.calendar import (
AgentHeartbeatResponse,
AgentStatusUpdateRequest,
CalendarDayResponse,
CalendarSlotItem,
DateListResponse,
MinimumWorkloadConfig,
MinimumWorkloadResponse,
MinimumWorkloadUpdate,
SchedulePlanCancelResponse,
SchedulePlanCreate,
SchedulePlanEdit,
SchedulePlanListResponse,
SchedulePlanResponse,
SlotStatusEnum,
SlotConflictItem,
SlotAgentUpdate,
TimeSlotCancelResponse,
TimeSlotCreate,
TimeSlotCreateResponse,
TimeSlotEdit,
TimeSlotEditResponse,
TimeSlotResponse,
)
from app.services.agent_heartbeat import get_pending_slots_for_agent
from app.services.agent_status import (
record_heartbeat,
transition_to_busy,
transition_to_idle,
transition_to_offline,
transition_to_exhausted,
)
from app.services.discord_wakeup import create_private_wakeup_channel
from app.services.minimum_workload import (
get_workload_config,
get_workload_warnings_for_date,
replace_workload_config,
upsert_workload_config,
)
from app.services.overlap import check_overlap_for_create, check_overlap_for_edit
from app.services.plan_slot import (
detach_slot_from_plan,
get_virtual_slots_for_date,
materialize_from_virtual_id,
parse_virtual_slot_id,
)
from app.services.slot_immutability import (
ImmutableSlotError,
guard_cancel_real_slot,
guard_cancel_virtual_slot,
guard_edit_real_slot,
guard_edit_virtual_slot,
guard_plan_cancel_no_past_retroaction,
guard_plan_edit_no_past_retroaction,
)
from app.models.role_permission import Permission, RolePermission
router = APIRouter(prefix="/calendar", tags=["Calendar"])
def _has_global_permission(db: Session, user: User, permission_name: str) -> bool:
if user.is_admin:
return True
if not user.role_id:
return False
perm = db.query(Permission).filter(Permission.name == permission_name).first()
if not perm:
return False
return db.query(RolePermission).filter(
RolePermission.role_id == user.role_id,
RolePermission.permission_id == perm.id,
).first() is not None
def _require_calendar_permission(db: Session, user: User, permission_name: str) -> User:
if _has_global_permission(db, user, permission_name):
return user
raise HTTPException(status_code=403, detail=f"Calendar permission '{permission_name}' required")
def require_calendar_read(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return _require_calendar_permission(db, current_user, "calendar.read")
def require_calendar_write(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return _require_calendar_permission(db, current_user, "calendar.write")
def require_calendar_manage(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return _require_calendar_permission(db, current_user, "calendar.manage")
# ---------------------------------------------------------------------------
# TimeSlot creation (BE-CAL-API-001)
# ---------------------------------------------------------------------------
def _slot_to_response(slot: TimeSlot) -> TimeSlotResponse:
"""Convert a TimeSlot ORM object to a response schema."""
return TimeSlotResponse(
id=slot.id,
user_id=slot.user_id,
date=slot.date,
slot_type=slot.slot_type.value if hasattr(slot.slot_type, "value") else str(slot.slot_type),
estimated_duration=slot.estimated_duration,
scheduled_at=slot.scheduled_at.isoformat() if slot.scheduled_at else "",
started_at=slot.started_at.isoformat() if slot.started_at else None,
attended=slot.attended,
actual_duration=slot.actual_duration,
event_type=slot.event_type.value if slot.event_type and hasattr(slot.event_type, "value") else (str(slot.event_type) if slot.event_type else None),
event_data=slot.event_data,
priority=slot.priority,
status=slot.status.value if hasattr(slot.status, "value") else str(slot.status),
plan_id=slot.plan_id,
is_admin_locked=bool(getattr(slot, "is_admin_locked", False)),
special_slot_id=getattr(slot, "special_slot_id", None),
created_at=slot.created_at,
updated_at=slot.updated_at,
)
@router.post(
"/slots",
response_model=TimeSlotCreateResponse,
status_code=201,
summary="Create a single calendar slot",
)
def create_slot(
payload: TimeSlotCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Create a one-off calendar slot.
- **Overlap detection**: rejects the request if the proposed slot
overlaps with existing real or virtual slots on the same day.
- **Workload warnings**: after successful creation, returns any
minimum-workload warnings (advisory only, does not block creation).
"""
target_date = payload.date or date_type.today()
# --- Maintenance-window guard ---
# Non-`system` slots may not be placed inside the schedule_type's
# 1-hour maintenance window. The window is admin-territory, reserved
# for materialised special slots from `schedule_type_special_slots`.
# `system` slot_type is itself reserved server-side (the materialiser
# is the only legitimate caller) — refuse it here outright so the
# public API cannot manufacture a fake admin-locked slot.
if payload.slot_type == SlotType.SYSTEM:
raise HTTPException(
status_code=422,
detail=(
"slot_type='system' is reserved for schedule_type special slots "
"and cannot be created via this endpoint"
),
)
_agent_for_user = (
db.query(Agent).filter(Agent.user_id == current_user.id).first()
)
if _agent_for_user and _agent_for_user.schedule_type_id:
st = (
db.query(ScheduleType)
.filter(ScheduleType.id == _agent_for_user.schedule_type_id)
.first()
)
if st and _scheduled_inside_window(
payload.scheduled_at,
payload.estimated_duration,
st.maintenance_from,
st.maintenance_to,
):
raise HTTPException(
status_code=422,
detail=(
f"slot at {payload.scheduled_at} duration {payload.estimated_duration}min "
f"intersects the maintenance window "
f"{st.maintenance_from:02d}:00-{st.maintenance_to:02d}:00 UTC of "
f"schedule_type '{st.name}' — that window is admin-reserved"
),
)
# --- Overlap check (hard reject) ---
conflicts = check_overlap_for_create(
db,
user_id=current_user.id,
target_date=target_date,
scheduled_at=payload.scheduled_at,
estimated_duration=payload.estimated_duration,
)
if conflicts:
raise HTTPException(
status_code=409,
detail={
"message": "Slot overlaps with existing schedule",
"conflicts": [c.to_dict() for c in conflicts],
},
)
# --- Create the slot ---
slot = TimeSlot(
user_id=current_user.id,
date=target_date,
slot_type=payload.slot_type.value,
estimated_duration=payload.estimated_duration,
scheduled_at=payload.scheduled_at,
event_type=payload.event_type.value if payload.event_type else None,
event_data=payload.event_data,
priority=payload.priority,
status=SlotStatus.NOT_STARTED,
)
db.add(slot)
db.commit()
db.refresh(slot)
# --- Workload warnings (advisory) ---
warnings = get_workload_warnings_for_date(db, current_user.id, target_date)
return TimeSlotCreateResponse(
slot=_slot_to_response(slot),
warnings=warnings,
)
# ---------------------------------------------------------------------------
# Day-view query (BE-CAL-API-002)
# ---------------------------------------------------------------------------
# Statuses that no longer occupy calendar time — hidden from default view.
_INACTIVE_STATUSES = {SlotStatus.SKIPPED.value, SlotStatus.ABORTED.value}
def _real_slot_to_item(slot: TimeSlot) -> CalendarSlotItem:
"""Convert a real TimeSlot ORM object to a CalendarSlotItem."""
return CalendarSlotItem(
id=slot.id,
virtual_id=None,
user_id=slot.user_id,
date=slot.date,
slot_type=slot.slot_type.value if hasattr(slot.slot_type, "value") else str(slot.slot_type),
estimated_duration=slot.estimated_duration,
scheduled_at=slot.scheduled_at.isoformat() if slot.scheduled_at else "",
started_at=slot.started_at.isoformat() if slot.started_at else None,
attended=slot.attended,
actual_duration=slot.actual_duration,
event_type=slot.event_type.value if slot.event_type and hasattr(slot.event_type, "value") else (str(slot.event_type) if slot.event_type else None),
event_data=slot.event_data,
priority=slot.priority,
status=slot.status.value if hasattr(slot.status, "value") else str(slot.status),
plan_id=slot.plan_id,
is_admin_locked=bool(getattr(slot, "is_admin_locked", False)),
special_slot_id=getattr(slot, "special_slot_id", None),
created_at=slot.created_at,
updated_at=slot.updated_at,
)
def _virtual_slot_to_item(vs: dict) -> CalendarSlotItem:
"""Convert a virtual-slot dict to a CalendarSlotItem."""
slot_type = vs["slot_type"]
slot_type_str = slot_type.value if hasattr(slot_type, "value") else str(slot_type)
event_type = vs.get("event_type")
event_type_str = None
if event_type is not None:
event_type_str = event_type.value if hasattr(event_type, "value") else str(event_type)
status = vs["status"]
status_str = status.value if hasattr(status, "value") else str(status)
scheduled_at = vs["scheduled_at"]
scheduled_at_str = scheduled_at.isoformat() if hasattr(scheduled_at, "isoformat") else str(scheduled_at)
return CalendarSlotItem(
id=None,
virtual_id=vs["virtual_id"],
user_id=vs["user_id"],
date=vs["date"],
slot_type=slot_type_str,
estimated_duration=vs["estimated_duration"],
scheduled_at=scheduled_at_str,
started_at=None,
attended=vs.get("attended", False),
actual_duration=vs.get("actual_duration"),
event_type=event_type_str,
event_data=vs.get("event_data"),
priority=vs.get("priority", 0),
status=status_str,
plan_id=vs.get("plan_id"),
created_at=None,
updated_at=None,
)
def _require_agent(db: Session, agent_id: str, claw_identifier: str) -> Agent:
agent = (
db.query(Agent)
.filter(Agent.agent_id == agent_id, Agent.claw_identifier == claw_identifier)
.first()
)
if agent is None:
raise HTTPException(status_code=404, detail="Agent not found")
return agent
def _scheduled_inside_window(
scheduled_at,
estimated_duration_minutes: int,
window_from_hour: int,
window_to_hour: int,
) -> bool:
"""True if [scheduled_at, scheduled_at+duration] intersects [from:00, to:00].
Handles the 23→0 wrap case (window straddles UTC midnight).
"""
start_min = scheduled_at.hour * 60 + scheduled_at.minute
end_min = start_min + max(estimated_duration_minutes, 1)
win_start_min = window_from_hour * 60
win_end_min = window_to_hour * 60
if win_end_min > win_start_min:
# normal same-day window
return start_min < win_end_min and end_min > win_start_min
# wrap-around: window = [from..24:00) [00:00..to)
return (start_min < 24 * 60 and end_min > win_start_min) or end_min > win_end_min
# Admin-locked special slots accept only these agent-driven status
# transitions; movement / cancellation / arbitrary status edits are
# rejected because the schedule_type owner is the source of truth.
_ADMIN_LOCKED_ALLOWED_STATUSES = {
SlotStatusEnum.ONGOING,
SlotStatusEnum.PAUSED,
SlotStatusEnum.FINISHED,
SlotStatusEnum.ABORTED,
}
def _apply_agent_slot_update(slot: TimeSlot, payload: SlotAgentUpdate) -> None:
if getattr(slot, "is_admin_locked", False):
if payload.status not in _ADMIN_LOCKED_ALLOWED_STATUSES:
raise HTTPException(
status_code=423,
detail=(
f"slot {slot.id} is admin-locked (special slot); only "
f"ongoing/paused/finished/aborted are allowed via agent-update"
),
)
slot.status = payload.status.value
if payload.started_at is not None:
slot.started_at = payload.started_at
slot.attended = True
if payload.actual_duration is not None:
slot.actual_duration = payload.actual_duration
if payload.status == SlotStatusEnum.ONGOING:
slot.attended = True
def _maybe_trigger_discord_wakeup(db: Session, slot: TimeSlot) -> dict | None:
"""Trigger Discord wakeup if slot became ONGOING and not already sent."""
# Only trigger for ONGOING status and if not already sent
if slot.status != SlotStatus.ONGOING or slot.wakeup_sent_at is not None:
return None
# Get user and check for discord_user_id
user = db.query(User).filter(User.id == slot.user_id).first()
if not user or not user.discord_user_id:
return None
# Get agent for this user
agent = db.query(Agent).filter(Agent.user_id == user.id).first()
agent_id_str = agent.agent_id if agent else "unknown"
# Build wakeup message
title = f"HarborForge Slot: {slot.event_type.value if slot.event_type else 'work'}"
message = (
f"🎯 **Slot started**\n"
f"Agent: `{agent_id_str}`\n"
f"Type: {slot.slot_type.value}\n"
f"Duration: {slot.estimated_duration}min\n"
f"Priority: {slot.priority}\n"
f"Use `hf calendar slot {slot.id}` for details."
)
try:
result = create_private_wakeup_channel(
discord_user_id=user.discord_user_id,
title=title,
message=message,
)
slot.wakeup_sent_at = datetime.now(timezone.utc)
return {"ok": True, "channel_id": result.get("channel_id")}
except Exception as e:
# Log but don't fail the slot update
return {"ok": False, "error": str(e)}
@router.api_route(
"/agent/heartbeat",
methods=["GET", "POST"],
response_model=AgentHeartbeatResponse,
summary="Get all due slots for the calling agent",
)
def agent_heartbeat(
x_agent_id: str = Header(..., alias="X-Agent-ID"),
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
db: Session = Depends(get_db),
):
agent = _require_agent(db, x_agent_id, x_claw_identifier)
record_heartbeat(db, agent)
slots = get_pending_slots_for_agent(db, agent.user_id, now=datetime.now(timezone.utc))
db.commit()
return AgentHeartbeatResponse(
slots=[_real_slot_to_item(slot) for slot in slots],
agent_status=agent.status.value if hasattr(agent.status, 'value') else str(agent.status),
message=f"{len(slots)} due slot(s)",
)
@router.get(
"/sync",
summary="Sync today's schedules for all agents on a claw instance",
)
def sync_schedules(
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
db: Session = Depends(get_db),
):
"""Return today's slots for all agents belonging to the given claw instance.
Used by the HF OpenClaw plugin to maintain a local schedule cache.
Returns a dict of { agent_id: [slots] } for all agents with matching
claw_identifier.
"""
today = date_type.today()
# Materialise today's special slots for every agent on this claw
# before reading. This is idempotent — re-runs against an already-
# materialised (agent, date, template) are no-ops. Plugin's runSync
# picks them up like any other slot via the normal real_slots query.
materialise_special_slots_for_claw(db, x_claw_identifier, today, commit=True)
# Find all agents on this claw instance
agents = (
db.query(Agent)
.filter(Agent.claw_identifier == x_claw_identifier)
.all()
)
schedules: dict[str, list[dict]] = {}
for agent in agents:
# Get real slots for today
real_slots = (
db.query(TimeSlot)
.filter(
TimeSlot.user_id == agent.user_id,
TimeSlot.date == today,
TimeSlot.status.notin_(list(_INACTIVE_STATUSES)),
)
.all()
)
items = [_real_slot_to_item(s).model_dump(mode="json") for s in real_slots]
# Get virtual plan slots
virtual_slots = get_virtual_slots_for_date(db, agent.user_id, today)
for vs in virtual_slots:
items.append(_virtual_slot_to_item(vs).model_dump(mode="json"))
schedules[agent.agent_id] = items
# Record heartbeat for liveness
for agent in agents:
record_heartbeat(db, agent)
db.commit()
return {
"schedules": schedules,
"date": today.isoformat(),
"agent_count": len(agents),
}
@router.patch(
"/slots/{slot_id}/agent-update",
response_model=TimeSlotEditResponse,
summary="Agent updates a real slot status",
)
def agent_update_real_slot(
slot_id: int,
payload: SlotAgentUpdate,
x_agent_id: str = Header(..., alias="X-Agent-ID"),
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
db: Session = Depends(get_db),
):
agent = _require_agent(db, x_agent_id, x_claw_identifier)
slot = db.query(TimeSlot).filter(TimeSlot.id == slot_id, TimeSlot.user_id == agent.user_id).first()
if slot is None:
raise HTTPException(status_code=404, detail="Slot not found")
_apply_agent_slot_update(slot, payload)
_maybe_trigger_discord_wakeup(db, slot)
db.commit()
db.refresh(slot)
return TimeSlotEditResponse(slot=_slot_to_response(slot), warnings=[])
@router.patch(
"/slots/virtual/{virtual_id}/agent-update",
response_model=TimeSlotEditResponse,
summary="Agent materializes and updates a virtual slot status",
)
def agent_update_virtual_slot(
virtual_id: str,
payload: SlotAgentUpdate,
x_agent_id: str = Header(..., alias="X-Agent-ID"),
x_claw_identifier: str = Header(..., alias="X-Claw-Identifier"),
db: Session = Depends(get_db),
):
agent = _require_agent(db, x_agent_id, x_claw_identifier)
slot = materialize_from_virtual_id(db, virtual_id)
if slot.user_id != agent.user_id:
db.rollback()
raise HTTPException(status_code=404, detail="Slot not found")
_apply_agent_slot_update(slot, payload)
_maybe_trigger_discord_wakeup(db, slot)
db.commit()
db.refresh(slot)
return TimeSlotEditResponse(slot=_slot_to_response(slot), warnings=[])
@router.post(
"/agent/status",
summary="Update agent runtime status from plugin",
)
def update_agent_status(
payload: AgentStatusUpdateRequest,
db: Session = Depends(get_db),
):
agent = _require_agent(db, payload.agent_id, payload.claw_identifier)
target = (payload.status or '').lower().strip()
if target == AgentStatus.IDLE.value:
transition_to_idle(db, agent)
elif target == AgentStatus.BUSY.value:
transition_to_busy(db, agent, slot_type=SlotType.WORK)
elif target == AgentStatus.ON_CALL.value:
transition_to_busy(db, agent, slot_type=SlotType.ON_CALL)
elif target == AgentStatus.OFFLINE.value:
transition_to_offline(db, agent)
elif target == AgentStatus.EXHAUSTED.value:
reason = ExhaustReason.BILLING if payload.exhaust_reason == 'billing' else ExhaustReason.RATE_LIMIT
transition_to_exhausted(db, agent, reason=reason, recovery_at=payload.recovery_at)
else:
raise HTTPException(status_code=400, detail="Unsupported agent status")
db.commit()
return {"ok": True, "agent_id": agent.agent_id, "status": agent.status.value if hasattr(agent.status, 'value') else str(agent.status)}
@router.get(
"/day",
response_model=CalendarDayResponse,
summary="Get all calendar slots for a given day",
)
def get_calendar_day(
date: Optional[date_type] = Query(None, description="Target date (defaults to today)"),
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_read),
):
"""Return all calendar slots for the authenticated user on the given date.
The response merges:
1. **Real (materialized) slots** — rows from the ``time_slots`` table.
2. **Virtual (plan-generated) slots** — synthesized from active
``SchedulePlan`` rules that match the date but have not yet been
materialized.
All slots are sorted by ``scheduled_at`` ascending. Inactive slots
(skipped / aborted) are excluded by default.
"""
target_date = date or date_type.today()
# Materialise today's special slots for this user before reading,
# so the day-view returns them alongside any user-created slots.
materialise_special_slots_for_user(db, current_user.id, target_date, commit=True)
# 1. Fetch real slots for the day
real_slots = (
db.query(TimeSlot)
.filter(
TimeSlot.user_id == current_user.id,
TimeSlot.date == target_date,
TimeSlot.status.notin_(list(_INACTIVE_STATUSES)),
)
.all()
)
items: list[CalendarSlotItem] = [_real_slot_to_item(s) for s in real_slots]
# 2. Synthesize virtual plan slots for the day
virtual_slots = get_virtual_slots_for_date(db, current_user.id, target_date)
items.extend(_virtual_slot_to_item(vs) for vs in virtual_slots)
# 3. Sort by scheduled_at ascending
items.sort(key=lambda item: item.scheduled_at)
return CalendarDayResponse(
date=target_date,
user_id=current_user.id,
slots=items,
)
# ---------------------------------------------------------------------------
# Slot edit (BE-CAL-API-003)
# ---------------------------------------------------------------------------
def _apply_edit_fields(slot: TimeSlot, payload: TimeSlotEdit) -> None:
"""Apply non-None fields from *payload* to a TimeSlot ORM object."""
if payload.slot_type is not None:
slot.slot_type = payload.slot_type.value
if payload.scheduled_at is not None:
slot.scheduled_at = payload.scheduled_at
if payload.estimated_duration is not None:
slot.estimated_duration = payload.estimated_duration
if payload.event_type is not None:
slot.event_type = payload.event_type.value
if payload.event_data is not None:
slot.event_data = payload.event_data
if payload.priority is not None:
slot.priority = payload.priority
@router.patch(
"/slots/{slot_id}",
response_model=TimeSlotEditResponse,
summary="Edit a real (materialized) calendar slot",
)
def edit_real_slot(
slot_id: int,
payload: TimeSlotEdit,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Edit an existing real (materialized) slot.
- **Immutability**: rejects edits to past slots.
- **Overlap detection**: if time/duration changed, rejects on overlap
(excluding the slot being edited).
- **Plan detach**: if the slot was materialized from a plan, editing
detaches it (sets ``plan_id`` to NULL).
- **Workload warnings**: returned after successful edit (advisory only).
"""
slot = (
db.query(TimeSlot)
.filter(TimeSlot.id == slot_id, TimeSlot.user_id == current_user.id)
.first()
)
if slot is None:
raise HTTPException(status_code=404, detail="Slot not found")
# --- Admin-locked guard ---
# Special slots materialised from a schedule_type template are
# admin-owned; agents may complete/abort/pause/resume via the
# plugin-facing agent-update endpoint but cannot edit time/type/
# duration/event-data via this user-facing edit endpoint.
if getattr(slot, "is_admin_locked", False):
raise HTTPException(
status_code=423,
detail=(
f"slot {slot.id} is admin-locked (materialised from a special "
f"slot template); only the schedule_type owner can edit it"
),
)
# --- Past-slot guard ---
try:
guard_edit_real_slot(db, slot)
except ImmutableSlotError as e:
raise HTTPException(status_code=422, detail=str(e))
# --- Determine effective time/duration for overlap check ---
effective_scheduled_at = payload.scheduled_at if payload.scheduled_at is not None else slot.scheduled_at
effective_duration = payload.estimated_duration if payload.estimated_duration is not None else slot.estimated_duration
# --- Overlap check (if time or duration changed) ---
time_changed = (
(payload.scheduled_at is not None and payload.scheduled_at != slot.scheduled_at)
or (payload.estimated_duration is not None and payload.estimated_duration != slot.estimated_duration)
)
if time_changed:
conflicts = check_overlap_for_edit(
db,
user_id=current_user.id,
slot_id=slot.id,
target_date=slot.date,
scheduled_at=effective_scheduled_at,
estimated_duration=effective_duration,
)
if conflicts:
raise HTTPException(
status_code=409,
detail={
"message": "Edited slot overlaps with existing schedule",
"conflicts": [c.to_dict() for c in conflicts],
},
)
# --- Detach from plan if applicable ---
if slot.plan_id is not None:
detach_slot_from_plan(slot)
# --- Apply edits ---
_apply_edit_fields(slot, payload)
db.commit()
db.refresh(slot)
# --- Workload warnings ---
warnings = get_workload_warnings_for_date(db, current_user.id, slot.date)
return TimeSlotEditResponse(
slot=_slot_to_response(slot),
warnings=warnings,
)
@router.patch(
"/slots/virtual/{virtual_id}",
response_model=TimeSlotEditResponse,
summary="Edit a virtual (plan-generated) calendar slot",
)
def edit_virtual_slot(
virtual_id: str,
payload: TimeSlotEdit,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Edit a virtual (plan-generated) slot.
This triggers **materialization**: the virtual slot is first converted
into a real TimeSlot row, then the edits are applied, and the slot is
detached from its plan (``plan_id`` set to NULL).
- **Immutability**: rejects edits to past virtual slots.
- **Overlap detection**: checks overlap with the edited time/duration
against existing slots on the same day.
- **Workload warnings**: returned after successful edit (advisory only).
"""
# --- Validate virtual_id format ---
parsed = parse_virtual_slot_id(virtual_id)
if parsed is None:
raise HTTPException(status_code=400, detail=f"Invalid virtual slot id: {virtual_id}")
plan_id, slot_date = parsed
# --- Past-slot guard ---
try:
guard_edit_virtual_slot(virtual_id)
except ImmutableSlotError as e:
raise HTTPException(status_code=422, detail=str(e))
# --- Materialize ---
try:
slot = materialize_from_virtual_id(db, virtual_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# --- Verify ownership ---
if slot.user_id != current_user.id:
db.rollback()
raise HTTPException(status_code=404, detail="Slot not found")
# --- Determine effective time/duration for overlap check ---
effective_scheduled_at = payload.scheduled_at if payload.scheduled_at is not None else slot.scheduled_at
effective_duration = payload.estimated_duration if payload.estimated_duration is not None else slot.estimated_duration
# --- Overlap check (exclude newly materialized slot) ---
conflicts = check_overlap_for_edit(
db,
user_id=current_user.id,
slot_id=slot.id,
target_date=slot.date,
scheduled_at=effective_scheduled_at,
estimated_duration=effective_duration,
)
if conflicts:
db.rollback()
raise HTTPException(
status_code=409,
detail={
"message": "Edited slot overlaps with existing schedule",
"conflicts": [c.to_dict() for c in conflicts],
},
)
# --- Detach from plan ---
detach_slot_from_plan(slot)
# --- Apply edits ---
_apply_edit_fields(slot, payload)
db.commit()
db.refresh(slot)
# --- Workload warnings ---
warnings = get_workload_warnings_for_date(db, current_user.id, slot.date)
return TimeSlotEditResponse(
slot=_slot_to_response(slot),
warnings=warnings,
)
# ---------------------------------------------------------------------------
# Slot cancel (BE-CAL-API-004)
# ---------------------------------------------------------------------------
@router.post(
"/slots/{slot_id}/cancel",
response_model=TimeSlotCancelResponse,
summary="Cancel a real (materialized) calendar slot",
)
def cancel_real_slot(
slot_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Cancel an existing real (materialized) slot.
- **Immutability**: rejects cancellation of past slots.
- **Plan detach**: if the slot was materialized from a plan, cancelling
detaches it (sets ``plan_id`` to NULL) so the plan no longer claims
that date.
- **Status**: sets the slot status to ``Skipped``.
"""
slot = (
db.query(TimeSlot)
.filter(TimeSlot.id == slot_id, TimeSlot.user_id == current_user.id)
.first()
)
if slot is None:
raise HTTPException(status_code=404, detail="Slot not found")
# --- Admin-locked guard ---
if getattr(slot, "is_admin_locked", False):
raise HTTPException(
status_code=423,
detail=(
f"slot {slot.id} is admin-locked (materialised from a special "
f"slot template); only the schedule_type owner can cancel it"
),
)
# --- Past-slot guard ---
try:
guard_cancel_real_slot(db, slot)
except ImmutableSlotError as e:
raise HTTPException(status_code=422, detail=str(e))
# --- Detach from plan if applicable ---
if slot.plan_id is not None:
detach_slot_from_plan(slot)
# --- Update status ---
slot.status = SlotStatus.SKIPPED
db.commit()
db.refresh(slot)
return TimeSlotCancelResponse(
slot=_slot_to_response(slot),
message="Slot cancelled successfully",
)
@router.post(
"/slots/virtual/{virtual_id}/cancel",
response_model=TimeSlotCancelResponse,
summary="Cancel a virtual (plan-generated) calendar slot",
)
def cancel_virtual_slot(
virtual_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Cancel a virtual (plan-generated) slot.
This triggers **materialization**: the virtual slot is first converted
into a real TimeSlot row, then immediately set to ``Skipped`` status
and detached from its plan (``plan_id`` set to NULL).
- **Immutability**: rejects cancellation of past virtual slots.
"""
# --- Validate virtual_id format ---
parsed = parse_virtual_slot_id(virtual_id)
if parsed is None:
raise HTTPException(status_code=400, detail=f"Invalid virtual slot id: {virtual_id}")
plan_id, slot_date = parsed
# --- Past-slot guard ---
try:
guard_cancel_virtual_slot(virtual_id)
except ImmutableSlotError as e:
raise HTTPException(status_code=422, detail=str(e))
# --- Materialize ---
try:
slot = materialize_from_virtual_id(db, virtual_id)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# --- Verify ownership ---
if slot.user_id != current_user.id:
db.rollback()
raise HTTPException(status_code=404, detail="Slot not found")
# --- Detach from plan ---
detach_slot_from_plan(slot)
# --- Update status ---
slot.status = SlotStatus.SKIPPED
db.commit()
db.refresh(slot)
return TimeSlotCancelResponse(
slot=_slot_to_response(slot),
message="Virtual slot materialized and cancelled successfully",
)
# ---------------------------------------------------------------------------
# SchedulePlan (BE-CAL-API-005)
# ---------------------------------------------------------------------------
def _plan_to_response(plan: SchedulePlan) -> SchedulePlanResponse:
"""Convert a SchedulePlan ORM object to a response schema."""
return SchedulePlanResponse(
id=plan.id,
user_id=plan.user_id,
slot_type=plan.slot_type.value if hasattr(plan.slot_type, "value") else str(plan.slot_type),
estimated_duration=plan.estimated_duration,
at_time=plan.at_time.isoformat() if plan.at_time else "",
on_day=plan.on_day.value if plan.on_day and hasattr(plan.on_day, "value") else (str(plan.on_day) if plan.on_day else None),
on_week=plan.on_week,
on_month=plan.on_month.value if plan.on_month and hasattr(plan.on_month, "value") else (str(plan.on_month) if plan.on_month else None),
event_type=plan.event_type.value if plan.event_type and hasattr(plan.event_type, "value") else (str(plan.event_type) if plan.event_type else None),
event_data=plan.event_data,
is_active=plan.is_active,
created_at=plan.created_at,
updated_at=plan.updated_at,
)
@router.post(
"/plans",
response_model=SchedulePlanResponse,
status_code=201,
summary="Create a recurring schedule plan",
)
def create_plan(
payload: SchedulePlanCreate,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Create a new recurring schedule plan.
The plan defines a template for virtual slots that are generated
on matching dates. Period-parameter hierarchy is enforced:
``on_month`` requires ``on_week``, which requires ``on_day``.
``at_time`` is always required.
"""
plan = SchedulePlan(
user_id=current_user.id,
slot_type=payload.slot_type.value,
estimated_duration=payload.estimated_duration,
at_time=payload.at_time,
on_day=payload.on_day.value if payload.on_day else None,
on_week=payload.on_week,
on_month=payload.on_month.value if payload.on_month else None,
event_type=payload.event_type.value if payload.event_type else None,
event_data=payload.event_data,
is_active=True,
)
db.add(plan)
db.commit()
db.refresh(plan)
return _plan_to_response(plan)
@router.get(
"/plans",
response_model=SchedulePlanListResponse,
summary="List all schedule plans for the current user",
)
def list_plans(
include_inactive: bool = Query(False, description="Include cancelled/inactive plans"),
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_read),
):
"""Return all schedule plans for the authenticated user.
By default only active plans are returned. Pass
``include_inactive=true`` to also include cancelled plans.
"""
q = db.query(SchedulePlan).filter(SchedulePlan.user_id == current_user.id)
if not include_inactive:
q = q.filter(SchedulePlan.is_active.is_(True))
q = q.order_by(SchedulePlan.created_at.desc())
plans = q.all()
return SchedulePlanListResponse(
plans=[_plan_to_response(p) for p in plans],
)
@router.get(
"/plans/{plan_id}",
response_model=SchedulePlanResponse,
summary="Get a single schedule plan by ID",
)
def get_plan(
plan_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_read),
):
"""Return a single schedule plan owned by the authenticated user."""
plan = (
db.query(SchedulePlan)
.filter(SchedulePlan.id == plan_id, SchedulePlan.user_id == current_user.id)
.first()
)
if plan is None:
raise HTTPException(status_code=404, detail="Plan not found")
return _plan_to_response(plan)
# ---------------------------------------------------------------------------
# Plan edit / cancel (BE-CAL-API-006)
# ---------------------------------------------------------------------------
def _validate_plan_hierarchy(
on_day: str | None,
on_week: int | None,
on_month: str | None,
) -> None:
"""Enforce period-parameter hierarchy after merging edited values.
Raises HTTPException(422) if the hierarchy is violated.
"""
if on_month is not None and on_week is None:
raise HTTPException(
status_code=422,
detail="on_month requires on_week to be set",
)
if on_week is not None and on_day is None:
raise HTTPException(
status_code=422,
detail="on_week requires on_day to be set",
)
@router.patch(
"/plans/{plan_id}",
response_model=SchedulePlanResponse,
summary="Edit a recurring schedule plan",
)
def edit_plan(
plan_id: int,
payload: SchedulePlanEdit,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Edit an existing schedule plan.
- Only **future** virtual/materialized slots are affected; past
materialized slots remain untouched.
- Period-parameter hierarchy (``on_month`` → ``on_week`` → ``on_day``)
is validated after merging edited values with existing plan values.
- Inactive (cancelled) plans cannot be edited.
"""
plan = (
db.query(SchedulePlan)
.filter(SchedulePlan.id == plan_id, SchedulePlan.user_id == current_user.id)
.first()
)
if plan is None:
raise HTTPException(status_code=404, detail="Plan not found")
if not plan.is_active:
raise HTTPException(status_code=422, detail="Cannot edit an inactive (cancelled) plan")
# --- Identify past slots that must NOT be touched ---
_past_ids = guard_plan_edit_no_past_retroaction(db, plan_id)
# --- Apply clear flags first (set to NULL) ---
if payload.clear_on_month:
plan.on_month = None
if payload.clear_on_week:
plan.on_week = None
if payload.clear_on_day:
plan.on_day = None
# --- Apply provided values ---
if payload.slot_type is not None:
plan.slot_type = payload.slot_type.value
if payload.estimated_duration is not None:
plan.estimated_duration = payload.estimated_duration
if payload.at_time is not None:
plan.at_time = payload.at_time
if payload.on_day is not None:
plan.on_day = payload.on_day.value
if payload.on_week is not None:
plan.on_week = payload.on_week
if payload.on_month is not None:
plan.on_month = payload.on_month.value
if payload.event_type is not None:
plan.event_type = payload.event_type.value
if payload.event_data is not None:
plan.event_data = payload.event_data
# --- Validate hierarchy with merged values ---
effective_on_day = plan.on_day
effective_on_week = plan.on_week
effective_on_month = plan.on_month
_validate_plan_hierarchy(effective_on_day, effective_on_week, effective_on_month)
# --- Detach future materialized slots so they keep old data ---
# Future materialized slots with plan_id set are detached because
# they were generated from the old plan template. New virtual slots
# will reflect the updated plan going forward.
from datetime import date as date_type
today = date_type.today()
future_materialized = (
db.query(TimeSlot)
.filter(
TimeSlot.plan_id == plan_id,
TimeSlot.date >= today,
)
.all()
)
for slot in future_materialized:
slot.plan_id = None
db.commit()
db.refresh(plan)
return _plan_to_response(plan)
@router.post(
"/plans/{plan_id}/cancel",
response_model=SchedulePlanCancelResponse,
summary="Cancel a recurring schedule plan",
)
def cancel_plan(
plan_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_write),
):
"""Cancel (soft-delete) a schedule plan.
- Sets the plan's ``is_active`` flag to ``False``.
- **Past** materialized slots are preserved untouched.
- **Future** materialized slots that still reference this plan are
detached (``plan_id`` set to NULL) so they remain on the calendar
as standalone slots. If you want to also cancel those future slots,
cancel them individually via the slot-cancel endpoints.
"""
plan = (
db.query(SchedulePlan)
.filter(SchedulePlan.id == plan_id, SchedulePlan.user_id == current_user.id)
.first()
)
if plan is None:
raise HTTPException(status_code=404, detail="Plan not found")
if not plan.is_active:
raise HTTPException(status_code=422, detail="Plan is already cancelled")
# --- Identify past slots that must NOT be touched ---
past_ids = guard_plan_cancel_no_past_retroaction(db, plan_id)
# --- Detach future materialized slots ---
from datetime import date as date_type
today = date_type.today()
future_materialized = (
db.query(TimeSlot)
.filter(
TimeSlot.plan_id == plan_id,
TimeSlot.date >= today,
)
.all()
)
for slot in future_materialized:
slot.plan_id = None
# --- Deactivate the plan ---
plan.is_active = False
db.commit()
db.refresh(plan)
return SchedulePlanCancelResponse(
plan=_plan_to_response(plan),
message="Plan cancelled successfully",
preserved_past_slot_ids=past_ids,
)
# ---------------------------------------------------------------------------
# Date list (BE-CAL-API-007)
# ---------------------------------------------------------------------------
# Statuses considered inactive — slots with these statuses are excluded
# from the date-list result because they no longer occupy calendar time.
_DATE_LIST_EXCLUDED_STATUSES = {SlotStatus.SKIPPED.value, SlotStatus.ABORTED.value}
@router.get(
"/dates",
response_model=DateListResponse,
summary="List future dates that have materialized slots",
)
def list_dates(
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_read),
):
"""Return a sorted list of future dates that have at least one
materialized (real) slot.
- Only dates **today or later** are included.
- Only **active** slots are counted (skipped / aborted are excluded).
- Pure plan-generated (virtual) dates that have not been materialized
are **not** included.
"""
today = date_type.today()
rows = (
db.query(TimeSlot.date)
.filter(
TimeSlot.user_id == current_user.id,
TimeSlot.date >= today,
TimeSlot.status.notin_(list(_DATE_LIST_EXCLUDED_STATUSES)),
)
.group_by(TimeSlot.date)
.order_by(TimeSlot.date.asc())
.all()
)
return DateListResponse(dates=[r[0] for r in rows])
# ---------------------------------------------------------------------------
# MinimumWorkload
# ---------------------------------------------------------------------------
@router.get(
"/workload-config",
response_model=MinimumWorkloadResponse,
summary="Get current user's minimum workload configuration",
)
def get_my_workload_config(
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_manage),
):
"""Return the workload thresholds for the authenticated user.
If no configuration has been saved yet, returns default (all-zero)
thresholds.
"""
cfg = get_workload_config(db, current_user.id)
return MinimumWorkloadResponse(user_id=current_user.id, config=cfg)
@router.put(
"/workload-config",
response_model=MinimumWorkloadResponse,
summary="Replace the current user's minimum workload configuration",
)
def put_my_workload_config(
payload: MinimumWorkloadConfig,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_manage),
):
"""Full replacement of the workload configuration."""
row = replace_workload_config(db, current_user.id, payload)
db.commit()
db.refresh(row)
return MinimumWorkloadResponse(user_id=current_user.id, config=row.config)
@router.patch(
"/workload-config",
response_model=MinimumWorkloadResponse,
summary="Partially update the current user's minimum workload configuration",
)
def patch_my_workload_config(
payload: MinimumWorkloadUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(require_calendar_manage),
):
"""Partial update — only the provided periods are overwritten."""
row = upsert_workload_config(db, current_user.id, payload)
db.commit()
db.refresh(row)
return MinimumWorkloadResponse(user_id=current_user.id, config=row.config)
# ---------------------------------------------------------------------------
# Admin: manage another user's workload config
# ---------------------------------------------------------------------------
def _require_admin(current_user: User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin required")
return current_user
@router.get(
"/workload-config/{user_id}",
response_model=MinimumWorkloadResponse,
summary="[Admin] Get a specific user's minimum workload configuration",
)
def get_user_workload_config(
user_id: int,
db: Session = Depends(get_db),
_admin: User = Depends(_require_admin),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
cfg = get_workload_config(db, user_id)
return MinimumWorkloadResponse(user_id=user_id, config=cfg)
@router.put(
"/workload-config/{user_id}",
response_model=MinimumWorkloadResponse,
summary="[Admin] Replace a specific user's minimum workload configuration",
)
def put_user_workload_config(
user_id: int,
payload: MinimumWorkloadConfig,
db: Session = Depends(get_db),
_admin: User = Depends(_require_admin),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
row = replace_workload_config(db, user_id, payload)
db.commit()
db.refresh(row)
return MinimumWorkloadResponse(user_id=user_id, config=row.config)
@router.patch(
"/workload-config/{user_id}",
response_model=MinimumWorkloadResponse,
summary="[Admin] Partially update a specific user's minimum workload configuration",
)
def patch_user_workload_config(
user_id: int,
payload: MinimumWorkloadUpdate,
db: Session = Depends(get_db),
_admin: User = Depends(_require_admin),
):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
row = upsert_workload_config(db, user_id, payload)
db.commit()
db.refresh(row)
return MinimumWorkloadResponse(user_id=user_id, config=row.config)