diff --git a/navi/core/compressor.py b/navi/core/compressor.py index 3cd2869..d8ee93a 100644 --- a/navi/core/compressor.py +++ b/navi/core/compressor.py @@ -247,6 +247,7 @@ role="user", content=f"[Context Summary - historical context only, not a new user request]\n{summary_text}", is_summary=True, + is_display=False, created_at=datetime.now(timezone.utc), ) diff --git a/navi/core/pg_session_store.py b/navi/core/pg_session_store.py index c342274..660add9 100644 --- a/navi/core/pg_session_store.py +++ b/navi/core/pg_session_store.py @@ -1,12 +1,12 @@ """PostgreSQL-backed session store using asyncpg connection pool. -Phase-1 migration to normalized tables: +Normalized storage: - session_messages — one row per Message, with is_display / is_context flags - session_images — base64 images referenced by session_messages (future use) -Dual-write: every save() writes to both JSON columns (legacy) and normalized -rows (new). get() prefers normalized rows when they exist, falling back to JSON. -This lets us migrate incrementally without breaking existing sessions. +All session message I/O goes through the normalized table. The legacy JSON +columns (sessions.messages, sessions.context) remain in the schema for backward +compatibility but are no longer read or written. """ import asyncio @@ -208,6 +208,53 @@ ) +async def _load_messages_map(conn: asyncpg.Connection, session_ids: list[str]) -> dict[str, list[Message]]: + """Batch-load all messages for the given session IDs.""" + if not session_ids: + return {} + rows = await conn.fetch( + "SELECT * FROM session_messages WHERE session_id = ANY($1) ORDER BY sequence_number", + session_ids, + ) + result: dict[str, list[Message]] = {sid: [] for sid in session_ids} + for row in rows: + sid = row["session_id"] + result[sid].append(_row_to_message(row)) + return result + + +async def _build_sessions( + conn: asyncpg.Connection, + rows: list[asyncpg.Record], +) -> list[Session]: + """Hydrate session rows with messages from the normalized table.""" + session_ids = [r["id"] for r in rows] + messages_map = await _load_messages_map(conn, session_ids) + sessions: list[Session] = [] + for row in rows: + all_msgs = messages_map.get(row["id"], []) + messages = [m for m in all_msgs if m.is_display] + context = [m for m in all_msgs if m.is_context] + planning_logs_raw = row["planning_logs"] + planning_logs = json.loads(planning_logs_raw) if planning_logs_raw else [] + sessions.append( + Session( + id=row["id"], + profile_id=row["profile_id"], + user_id=row["user_id"], + messages=messages, + context=context, + pinned=bool(row["pinned"]), + name=row["name"], + created_at=row["created_at"], + last_active=row["last_active"], + context_token_count=row["context_token_count"] or 0, + planning_logs=planning_logs, + ) + ) + return sessions + + class PgSessionStore(SessionStore): def __init__(self, pool: asyncpg.Pool) -> None: self._pool = pool @@ -423,7 +470,7 @@ if search: like = f"%{search}%" conditions.append( - f"(id ILIKE {add_param(like)} OR name ILIKE {add_param(like)} OR user_id ILIKE {add_param(like)} OR profile_id ILIKE {add_param(like)} OR EXISTS (SELECT 1 FROM session_messages m WHERE m.session_id = sessions.id AND m.content ILIKE {add_param(like)}))" + f"(id ILIKE {add_param(like)} OR name ILIKE {add_param(like)} OR user_id ILIKE {add_param(like)} OR profile_id ILIKE {add_param(like)} OR EXISTS (SELECT 1 FROM session_messages m WHERE m.session_id = sessions.id AND m.is_display = true AND m.content ILIKE {add_param(like)}))" ) where = "WHERE " + " AND ".join(conditions) if conditions else "" @@ -458,7 +505,7 @@ if search: like = f"%{search}%" conditions.append( - f"(id ILIKE {add_param(like)} OR name ILIKE {add_param(like)} OR user_id ILIKE {add_param(like)} OR profile_id ILIKE {add_param(like)} OR EXISTS (SELECT 1 FROM session_messages m WHERE m.session_id = sessions.id AND m.content ILIKE {add_param(like)}))" + f"(id ILIKE {add_param(like)} OR name ILIKE {add_param(like)} OR user_id ILIKE {add_param(like)} OR profile_id ILIKE {add_param(like)} OR EXISTS (SELECT 1 FROM session_messages m WHERE m.session_id = sessions.id AND m.is_display = true AND m.content ILIKE {add_param(like)}))" ) where = "WHERE " + " AND ".join(conditions) if conditions else "" diff --git a/tests/unit/core/test_agent.py b/tests/unit/core/test_agent.py index f7b35b1..efd60f3 100644 --- a/tests/unit/core/test_agent.py +++ b/tests/unit/core/test_agent.py @@ -70,6 +70,15 @@ assert saved.messages[2].role == "assistant" assert saved.messages[2].content == "hello" + # Flags: display-only user message, context-only user message + assert saved.messages[0].is_display is True + assert saved.messages[0].is_context is False + assert saved.messages[1].is_display is False + assert saved.messages[1].is_context is True + # Assistant message is both display and context + assert saved.messages[2].is_display is True + assert saved.messages[2].is_context is True + @pytest.mark.asyncio async def test_run_session_not_found(self, agent): with pytest.raises(SessionNotFound): diff --git a/tests/unit/core/test_compressor.py b/tests/unit/core/test_compressor.py index 262d366..c905125 100644 --- a/tests/unit/core/test_compressor.py +++ b/tests/unit/core/test_compressor.py @@ -165,6 +165,8 @@ assert len(new_context) == 6 assert new_context[0].role == "system" assert new_context[1].is_summary is True + assert new_context[1].is_display is False + assert new_context[1].is_context is True async def test_preserves_system_messages(self): backend = FakeLLMBackend(responses=["sum"]) diff --git a/tests/unit/core/test_planning.py b/tests/unit/core/test_planning.py index 6ab3044..3af5e83 100644 --- a/tests/unit/core/test_planning.py +++ b/tests/unit/core/test_planning.py @@ -127,3 +127,32 @@ phase1_prompt = llm.calls[0][0].content assert "gnexus-book instructions" not in phase1_prompt assert "Connected MCP knowledge servers are authoritative only when the active profile exposes their tools" in phase1_prompt + + async def test_planning_flags(self): + """Planning messages must have correct is_display / is_context flags.""" + profile = make_profile("developer", planning_phase2_enabled=False) + llm = RecordingLLM([ + "TASK: test\nGOAL: done\nUNKNOWNS: NONE\nRESOURCES: NONE\n" + "KNOWLEDGE SOURCE ASSESSMENT: NONE\nKNOWLEDGE CAPTURE: NONE\n" + "COMPLEXITY: low\nSUBTASKS:\n1. Step one\nREFLECT: no\nCOMMITMENTS: none", + "## Plan\n\n**Task:** test\n**Goal:** done\n\n**Steps:**\n1. Step one → SELF\n", + ]) + engine = PlanningEngine(FakeContextBuilder()) + context = [Message(role="user", content="hello")] + messages = [] + + async for _event in engine.run(context, profile, llm, mem=None, tool_schemas=[], messages=messages): + pass + + # Messages list should contain: user, plan context (is_display=False), plan marker (is_context=False), prompt + plan_ctx = [m for m in messages if m.role == "assistant" and not m.is_plan and m.is_display is False] + plan_marker = [m for m in messages if m.is_plan is True] + prompt_msgs = [m for m in messages if m.role == "user" and m.content.startswith("Plan is ready")] + + assert len(plan_ctx) == 1 + assert plan_ctx[0].is_context is True + assert len(plan_marker) == 1 + assert plan_marker[0].is_context is False + assert plan_marker[0].is_display is True + assert len(prompt_msgs) == 1 + assert prompt_msgs[0].is_display is False