"""Session model and in-memory session store."""
import uuid
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from pydantic import BaseModel, Field
from navi.llm.base import Message
class Session(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
profile_id: str
user_id: str | None = None # owner; null for legacy single-user sessions
messages: list[Message] = Field(default_factory=list) # full display history (never compressed)
context: list[Message] = Field(default_factory=list) # LLM context (may be compressed)
context_token_count: int = 0 # accumulated total; reset to 0 after compression
pinned: bool = False
name: str | None = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_active: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
planning_logs: list[dict] = Field(default_factory=list) # raw planning phase outputs per turn
db_message_count: int = Field(default=0, exclude=True) # messages already persisted in DB
db_next_sequence: int = Field(default=0, exclude=True) # next global sequence_number for this session
archive_threshold: int = Field(default=0, exclude=True) # seq < threshold are archived
session_metadata: dict = Field(default_factory=dict, exclude=True) # ephemeral session-level metadata (e.g. terminal configs)
class SessionStore(ABC):
@abstractmethod
async def create(self, profile_id: str, user_id: str | None = None) -> Session: ...
@abstractmethod
async def get(self, session_id: str) -> Session | None: ...
@abstractmethod
async def save(self, session: Session) -> None: ...
@abstractmethod
async def list_all(self, user_id: str | None = None, is_admin: bool = False) -> list[Session]: ...
@abstractmethod
async def list_page(
self,
*,
limit: int,
offset: int,
profile_id: str | None = None,
user_id: str | None = None,
is_admin: bool = False,
) -> list[Session]: ...
@abstractmethod
async def count_all(
self,
*,
user_id: str | None = None,
is_admin: bool = False,
search: str | None = None,
) -> int: ...
@abstractmethod
async def search_list(
self,
*,
limit: int,
offset: int,
user_id: str | None = None,
is_admin: bool = False,
search: str | None = None,
sort_by: str = "last_active",
sort_order: str = "desc",
) -> list[Session]: ...
@abstractmethod
async def delete(self, session_id: str) -> bool: ...
@abstractmethod
async def set_pinned(self, session_id: str, pinned: bool) -> bool: ...
@abstractmethod
async def set_name(self, session_id: str, name: str) -> bool: ...
@abstractmethod
async def archive_old_messages(self, session_id: str, keep_seq_threshold: int) -> int: ...
@abstractmethod
async def get_archived_messages(
self, session_id: str, before_seq: int | None = None, limit: int = 50
) -> list[Message]: ...
class InMemorySessionStore(SessionStore):
def __init__(self) -> None:
self._sessions: dict[str, Session] = {}
async def create(self, profile_id: str, user_id: str | None = None) -> Session:
session = Session(profile_id=profile_id, user_id=user_id)
self._sessions[session.id] = session
return session
async def get(self, session_id: str) -> Session | None:
return self._sessions.get(session_id)
async def save(self, session: Session) -> None:
session.last_active = datetime.now(timezone.utc)
self._sessions[session.id] = session
async def list_all(self, user_id: str | None = None, is_admin: bool = False) -> list[Session]:
sessions = self._sessions.values()
if not is_admin and user_id is not None:
sessions = [s for s in sessions if s.user_id == user_id]
return sorted(
sessions,
key=lambda s: (s.pinned, s.last_active),
reverse=True,
)
async def list_page(
self,
*,
limit: int,
offset: int,
profile_id: str | None = None,
user_id: str | None = None,
is_admin: bool = False,
) -> list[Session]:
sessions = await self.list_all(user_id=user_id, is_admin=is_admin)
if profile_id:
sessions = [s for s in sessions if s.profile_id == profile_id]
return sessions[offset:offset + limit]
async def count_all(
self,
*,
user_id: str | None = None,
is_admin: bool = False,
search: str | None = None,
) -> int:
sessions = await self.search_list(
limit=10000, offset=0, user_id=user_id, is_admin=is_admin, search=search
)
return len(sessions)
async def search_list(
self,
*,
limit: int,
offset: int,
user_id: str | None = None,
is_admin: bool = False,
search: str | None = None,
sort_by: str = "last_active",
sort_order: str = "desc",
) -> list[Session]:
sessions = list(self._sessions.values())
if not is_admin and user_id is not None:
sessions = [s for s in sessions if s.user_id == user_id]
if search:
q = search.lower()
sessions = [
s
for s in sessions
if (s.id and q in s.id.lower())
or (s.name and q in s.name.lower())
or (s.user_id and q in s.user_id.lower())
or (s.profile_id and q in s.profile_id.lower())
or any(q in (m.content or "").lower() for m in s.messages)
]
key_map = {
"created_at": lambda s: s.created_at,
"last_active": lambda s: s.last_active,
"name": lambda s: (s.name or "").lower(),
"message_count": lambda s: len(s.messages),
}
key = key_map.get(sort_by, key_map["last_active"])
reverse = sort_order == "desc"
sessions = sorted(sessions, key=key, reverse=reverse)
return sessions[offset:offset + limit]
async def delete(self, session_id: str) -> bool:
if session_id in self._sessions:
del self._sessions[session_id]
return True
return False
async def set_pinned(self, session_id: str, pinned: bool) -> bool:
s = self._sessions.get(session_id)
if s is None:
return False
s.pinned = pinned
return True
async def set_name(self, session_id: str, name: str) -> bool:
s = self._sessions.get(session_id)
if s is None:
return False
s.name = name
return True
async def archive_old_messages(self, session_id: str, keep_seq_threshold: int) -> int:
# In-memory store: no-op, everything stays in RAM
return 0
async def get_archived_messages(
self, session_id: str, before_seq: int | None = None, limit: int = 50
) -> list[Message]:
# In-memory store: no archive; return empty
return []