Newer
Older
navi-1 / tests / integration / test_scheduler_loop.py
@Eugene Sukhodolskiy Eugene Sukhodolskiy on 15 May 11 KB Add self-recall (scheduled callback) system
"""Integration tests for the scheduler background loop."""

import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock

import pytest

from navi.core.scheduler import Recall, _fire_recall, recall_scheduler_loop


@pytest.fixture(autouse=True)
def patch_scheduler_deps(monkeypatch):
    """Prevent real _fire_recall from triggering heavy dependency initialization."""
    monkeypatch.setattr("navi.api.deps.get_registries", lambda: (None, None, None, None))
    monkeypatch.setattr("navi.api.deps.get_workers", lambda: [])
    monkeypatch.setattr("navi.api.deps.get_memory_store", lambda: None)
    monkeypatch.setattr("navi.api.deps.get_mcp_manager", AsyncMock(return_value=None))


class TestSchedulerLoop:
    @pytest.mark.anyio
    async def test_loop_fires_past_due_recalls(self, monkeypatch):
        scheduler = AsyncMock()
        scheduler.get_pending_recalls.return_value = [
            Recall(
                id="r1", session_id="s1", call_type="once",
                trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
                interval_seconds=None, internal_comment=None,
                additional_context_message="ctx", status="pending",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            )
        ]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.mark_fired.return_value = None

        store = AsyncMock()
        store.get.return_value = AsyncMock()
        store.get.return_value.messages = []
        store.get.return_value.context = []

        # Patch _fire_recall to avoid full Agent construction
        fire_calls = []
        async def _fake_fire(semaphore, recall, scheduler, store):
            fire_calls.append(recall.id)
            await scheduler.mark_fired(recall.id)

        monkeypatch.setattr("navi.core.scheduler._fire_recall", _fake_fire)

        # Run loop for one iteration then cancel
        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        assert "r1" in fire_calls

    @pytest.mark.anyio
    async def test_loop_respects_semaphore(self, monkeypatch):
        scheduler = AsyncMock()
        scheduler.get_pending_recalls.return_value = [
            Recall(
                id=f"r{i}", session_id=f"s{i}", call_type="once",
                trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
                interval_seconds=None, internal_comment=None,
                additional_context_message="ctx", status="pending",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            )
            for i in range(5)
        ]
        scheduler.get_next_trigger_at.return_value = None

        store = AsyncMock()

        running = asyncio.Semaphore(0)
        max_concurrent = 0
        current = 0

        async def _slow_fire(semaphore, recall, scheduler, store):
            nonlocal max_concurrent, current
            async with semaphore:
                current += 1
                max_concurrent = max(max_concurrent, current)
                await asyncio.sleep(0.1)
                current -= 1
                await scheduler.mark_fired(recall.id)

        monkeypatch.setattr("navi.core.scheduler._fire_recall", _slow_fire)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.15)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        assert max_concurrent <= 3

    @pytest.mark.anyio
    async def test_loop_defers_when_session_busy(self, monkeypatch):
        scheduler = AsyncMock()
        scheduler.get_pending_recalls.return_value = [
            Recall(
                id="r1", session_id="s1", call_type="once",
                trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
                interval_seconds=None, internal_comment=None,
                additional_context_message="ctx", status="pending",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            )
        ]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.reschedule.return_value = None

        store = AsyncMock()

        # Simulate busy session
        import navi.api.websocket as ws_mod
        ws_mod._busy_sessions.add("s1")

        async def _fake_fire(semaphore, recall, scheduler, store):
            await _fire_recall(semaphore, recall, scheduler, store)

        monkeypatch.setattr("navi.core.scheduler._fire_recall", _fake_fire)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass
        finally:
            ws_mod._busy_sessions.discard("s1")

        scheduler.reschedule.assert_called_once()

    @pytest.mark.anyio
    async def test_loop_cancels_when_session_missing(self, monkeypatch):
        scheduler = AsyncMock()
        scheduler.get_pending_recalls.return_value = [
            Recall(
                id="r1", session_id="s1", call_type="once",
                trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
                interval_seconds=None, internal_comment=None,
                additional_context_message="ctx", status="pending",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            )
        ]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.mark_cancelled.return_value = None

        store = AsyncMock()
        store.get.return_value = None

        async def _fake_fire(semaphore, recall, scheduler, store):
            await _fire_recall(semaphore, recall, scheduler, store)

        monkeypatch.setattr("navi.core.scheduler._fire_recall", _fake_fire)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        scheduler.mark_cancelled.assert_called_once_with("r1")

    @pytest.mark.anyio
    async def test_recurring_rescheduled_on_success(self, monkeypatch):
        scheduler = AsyncMock()
        recall = Recall(
            id="r1", session_id="s1", call_type="recurring",
            trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
            interval_seconds=3600, internal_comment=None,
            additional_context_message="ctx", status="pending",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        scheduler.get_pending_recalls.return_value = [recall]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.reschedule.return_value = None

        store = AsyncMock()
        store.get.return_value = AsyncMock()
        store.get.return_value.messages = []
        store.get.return_value.context = []

        # Mock Agent.run to succeed
        agent_mock = AsyncMock()
        agent_mock.run.return_value = "done"
        monkeypatch.setattr("navi.core.agent.Agent", lambda *a, **kw: agent_mock)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        scheduler.reschedule.assert_called_once()

    @pytest.mark.anyio
    async def test_recurring_rescheduled_on_failure(self, monkeypatch):
        scheduler = AsyncMock()
        recall = Recall(
            id="r1", session_id="s1", call_type="recurring",
            trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
            interval_seconds=3600, internal_comment=None,
            additional_context_message="ctx", status="pending",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        scheduler.get_pending_recalls.return_value = [recall]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.reschedule.return_value = None

        store = AsyncMock()
        store.get.return_value = AsyncMock()
        store.get.return_value.messages = []
        store.get.return_value.context = []

        agent_mock = AsyncMock()
        agent_mock.run.side_effect = RuntimeError("boom")
        monkeypatch.setattr("navi.core.agent.Agent", lambda *a, **kw: agent_mock)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        scheduler.reschedule.assert_called_once()

    @pytest.mark.anyio
    async def test_one_time_cancelled_on_failure(self, monkeypatch):
        scheduler = AsyncMock()
        recall = Recall(
            id="r1", session_id="s1", call_type="once",
            trigger_at=datetime.now(timezone.utc) - timedelta(minutes=5),
            interval_seconds=None, internal_comment=None,
            additional_context_message="ctx", status="pending",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        scheduler.get_pending_recalls.return_value = [recall]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.mark_cancelled.return_value = None

        store = AsyncMock()
        store.get.return_value = AsyncMock()
        store.get.return_value.messages = []
        store.get.return_value.context = []

        agent_mock = AsyncMock()
        agent_mock.run.side_effect = RuntimeError("boom")
        monkeypatch.setattr("navi.core.agent.Agent", lambda *a, **kw: agent_mock)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        scheduler.mark_cancelled.assert_called_once_with("r1")

    @pytest.mark.anyio
    async def test_loop_picks_up_after_restart(self, monkeypatch):
        scheduler = AsyncMock()
        scheduler.get_pending_recalls.return_value = [
            Recall(
                id="r1", session_id="s1", call_type="once",
                trigger_at=datetime.now(timezone.utc) - timedelta(hours=2),
                interval_seconds=None, internal_comment=None,
                additional_context_message="ctx", status="pending",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            )
        ]
        scheduler.get_next_trigger_at.return_value = None
        scheduler.mark_fired.return_value = None

        store = AsyncMock()

        fire_calls = []
        async def _fake_fire(semaphore, recall, scheduler, store):
            fire_calls.append(recall.id)
            await scheduler.mark_fired(recall.id)

        monkeypatch.setattr("navi.core.scheduler._fire_recall", _fake_fire)

        task = asyncio.create_task(recall_scheduler_loop(scheduler, store))
        await asyncio.sleep(0.1)
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

        assert "r1" in fire_calls