diff --git a/navi/config.py b/navi/config.py index 9ca6ffa..c10de85 100644 --- a/navi/config.py +++ b/navi/config.py @@ -71,6 +71,7 @@ # Session file uploads session_files_dir: str = "session_files" session_files_max_size_mb: int = 200 + session_messages_window: int = 1000 # max hot messages per session; older -> archive share_file_max_size_mb: int = 1024 # Public base URL used by share_file tool to build download links. diff --git a/navi/core/agent.py b/navi/core/agent.py index e44743a..3a2756e 100644 --- a/navi/core/agent.py +++ b/navi/core/agent.py @@ -544,6 +544,18 @@ session.context_token_count = self._compressor.estimate_context_tokens(new_context) await self._sessions.save(session) + # Archive old messages if the hot table exceeds the configured window. + if settings.session_messages_window > 0 and session.db_next_sequence > settings.session_messages_window: + threshold = session.db_next_sequence - settings.session_messages_window + archived = await self._sessions.archive_old_messages(session_id, threshold) + if archived > 0: + log.info( + "agent.archive_messages", + session_id=session_id, + archived=archived, + threshold=threshold, + ) + log.info( "agent.context_compress", session_id=session_id, diff --git a/tests/unit/core/test_pg_session_store.py b/tests/unit/core/test_pg_session_store.py new file mode 100644 index 0000000..0072d41 --- /dev/null +++ b/tests/unit/core/test_pg_session_store.py @@ -0,0 +1,81 @@ +"""Unit tests for PgSessionStore (sequence_number + archive).""" + +import pytest + +from navi.core.pg_session_store import PgSessionStore +from navi.core.session import Session +from navi.llm.base import Message +from tests.conftest_factory import FakeConnection, FakePool + + +@pytest.mark.asyncio +async def test_save_assigns_sequence_numbers_to_new_messages(): + conn = FakeConnection() + conn.enqueue("OK") # UPDATE sessions + conn.enqueue(None) # executemany UPDATE (empty) + conn.enqueue("OK") # INSERT executemany + conn.enqueue("OK") # UPDATE next_sequence + + pool = FakePool(conn) + store = PgSessionStore(pool) + store._initialized = True + + session = Session(profile_id="test") + session.messages = [ + Message(role="user", content="hi"), + Message(role="assistant", content="hello"), + ] + for m in session.messages: + assert m.sequence_number == -1 + + await store.save(session) + + assert session.messages[0].sequence_number == 0 + assert session.messages[1].sequence_number == 1 + assert session.db_next_sequence == 2 + + +@pytest.mark.asyncio +async def test_save_updates_existing_rows_by_sequence_number(): + conn = FakeConnection() + conn.enqueue("OK") # UPDATE sessions + conn.enqueue(None) # executemany UPDATE + conn.enqueue("OK") # INSERT (empty) + conn.enqueue("OK") # UPDATE next_sequence + + pool = FakePool(conn) + store = PgSessionStore(pool) + store._initialized = True + + session = Session(profile_id="test") + m1 = Message(role="user", content="hi") + m1.sequence_number = 0 + m2 = Message(role="assistant", content="hello") + m2.sequence_number = 1 + session.messages = [m1, m2] + session.db_next_sequence = 2 + + await store.save(session) + + executemany_calls = [c for c in conn.calls if c[0] == "executemany"] + assert len(executemany_calls) == 1 + assert "UPDATE session_messages" in executemany_calls[0][1] + + +@pytest.mark.asyncio +async def test_archive_old_messages_sends_correct_sql(): + conn = FakeConnection() + conn.enqueue("INSERT 0 3") # copy to archive + conn.enqueue("OK") # DELETE + conn.enqueue("OK") # UPDATE archive_threshold + + pool = FakePool(conn) + store = PgSessionStore(pool) + store._initialized = True + + n = await store.archive_old_messages("sid-1", keep_seq_threshold=100) + assert n == 3 + + execute_calls = [c for c in conn.calls if c[0] == "execute"] + assert any("DELETE FROM session_messages" in c[1] for c in execute_calls) + assert any("UPDATE sessions SET archive_threshold" in c[1] for c in execute_calls)