diff --git a/navi/api/deps.py b/navi/api/deps.py index 38e9ec4..6b547a0 100644 --- a/navi/api/deps.py +++ b/navi/api/deps.py @@ -48,6 +48,13 @@ memory_store=_memory_store, session_store=_session_store, ) + # Wire embedding backend into memory store for vector search + try: + backend = _registries[2].get("ollama") + if hasattr(_memory_store, "set_embedding_backend"): + _memory_store.set_embedding_backend(backend) + except Exception: + pass return _registries diff --git a/navi/config.py b/navi/config.py index 36bc0a1..e6c4030 100644 --- a/navi/config.py +++ b/navi/config.py @@ -13,6 +13,10 @@ ollama_num_ctx: int = 65536 ollama_think: bool = True + # Embedding model for memory vector search (Ollama API) + embedding_model: str = "nomic-embed-text:latest" + embedding_dimensions: int = 768 + openai_api_key: str = "" anthropic_api_key: str = "" diff --git a/navi/llm/base.py b/navi/llm/base.py index cad5915..48a08b9 100644 --- a/navi/llm/base.py +++ b/navi/llm/base.py @@ -117,3 +117,15 @@ they stream. The final chunk (finish_reason is not None) carries tool_calls when the model requested tools, or is a plain stop otherwise. """ + + async def embed( + self, + texts: list[str], + model: str | None = None, + ) -> list[list[float]]: + """Generate embeddings for a batch of texts. + + Default implementation raises NotImplementedError. + Backends that support embeddings must override this method. + """ + raise NotImplementedError(f"{type(self).__name__} does not support embeddings") diff --git a/navi/llm/fallback.py b/navi/llm/fallback.py index d1eaff7..7b8084b 100644 --- a/navi/llm/fallback.py +++ b/navi/llm/fallback.py @@ -106,6 +106,35 @@ raise LLMBackendError(f"All backends exhausted: {last_err}") from last_err + async def embed( + self, + texts: list[str], + model: "list[str] | str | None" = None, + ) -> list[list[float]]: + models = self._model_list(model) + last_err: Exception = LLMBackendError("No backends configured") + + for server in self._servers: + if server.host in _dead_servers: + continue + for m in models: + if (server.host, m) in _dead_models: + continue + try: + return await self._get_client(server).embed(texts, model=m) + except LLMConnectionError as e: + log.warning("fallback.server_dead", host=server.host, error=str(e)) + _dead_servers.add(server.host) + last_err = e + break + except LLMModelNotFoundError as e: + log.warning("fallback.model_dead", host=server.host, model=m, error=str(e)) + _dead_models.add((server.host, m)) + last_err = e + continue + + raise LLMBackendError(f"All backends exhausted: {last_err}") from last_err + async def stream( self, messages: list[Message], diff --git a/navi/llm/ollama.py b/navi/llm/ollama.py index e09fe71..ce7cad9 100644 --- a/navi/llm/ollama.py +++ b/navi/llm/ollama.py @@ -221,3 +221,17 @@ raise except Exception as e: raise _classify_error(e) from e + + async def embed( + self, + texts: list[str], + model: "list[str] | str | None" = None, + ) -> list[list[float]]: + resolved = _resolve_model(model, self.model) + try: + response = await self._client.embed(model=resolved, input=texts) + return response.embeddings + except (LLMConnectionError, LLMModelNotFoundError, LLMBackendError): + raise + except Exception as e: + raise _classify_error(e) from e diff --git a/navi/memory/extractor.py b/navi/memory/extractor.py index 000faee..9fbfbc0 100644 --- a/navi/memory/extractor.py +++ b/navi/memory/extractor.py @@ -119,7 +119,14 @@ key = str(fact.get("key", "")).strip() value = str(fact.get("value", "")).strip() if key and value: - await store.upsert_fact(category, key, value, session.id) + await store.upsert_fact( + category=category, + key=key, + value=value, + source_session_id=session.id, + source="conversation", + confidence=70, + ) count += 1 return count diff --git a/navi/memory/sqlite_store.py b/navi/memory/sqlite_store.py index 30bab70..00c93c6 100644 --- a/navi/memory/sqlite_store.py +++ b/navi/memory/sqlite_store.py @@ -1,4 +1,8 @@ -"""SQLite-backed memory store — used when DATABASE_URL is not set.""" +"""SQLite-backed memory store — used when DATABASE_URL is not set. + +SQLite does not support pgvector, so semantic search is disabled. +New columns are added for schema compatibility but embeddings are always NULL. +""" import sqlite3 import uuid @@ -15,6 +19,11 @@ created_at TEXT NOT NULL, updated_at TEXT NOT NULL, source_session_id TEXT, + source TEXT NOT NULL DEFAULT 'conversation', + confidence INTEGER NOT NULL DEFAULT 70, + expires_at TEXT, + last_verified_at TEXT, + source_context TEXT, UNIQUE(category, key) ); @@ -46,17 +55,31 @@ key: str, value: str, source_session_id: str | None = None, + source: str = "conversation", + confidence: int = 70, + expires_at: datetime | None = None, + source_context: str = "", ) -> None: now = datetime.now(timezone.utc).isoformat() + expires_str = expires_at.isoformat() if expires_at else None async with aiosqlite.connect(self._db_path) as db: await db.execute( - """INSERT INTO memory_facts (id, category, key, value, created_at, updated_at, source_session_id) - VALUES (?, ?, ?, ?, ?, ?, ?) + """INSERT INTO memory_facts + (id, category, key, value, created_at, updated_at, source_session_id, + source, confidence, expires_at, source_context) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(category, key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at, - source_session_id = excluded.source_session_id""", - (str(uuid.uuid4()), category, key, value, now, now, source_session_id), + source_session_id = excluded.source_session_id, + source = excluded.source, + confidence = excluded.confidence, + expires_at = excluded.expires_at, + source_context = excluded.source_context""", + ( + str(uuid.uuid4()), category, key, value, now, now, + source_session_id, source, confidence, expires_str, source_context, + ), ) await db.commit() @@ -72,7 +95,8 @@ async with aiosqlite.connect(self._db_path) as db: async with db.execute( - f"SELECT id, category, key, value, updated_at FROM memory_facts " + f"SELECT id, category, key, value, updated_at, source, confidence, " + f"expires_at, source_context FROM memory_facts " f"WHERE {conditions} ORDER BY updated_at DESC LIMIT ?", params + [limit], ) as cur: @@ -94,7 +118,10 @@ return cur.rowcount async def get_all_facts(self, limit: int | None = None) -> list[dict]: - q = "SELECT id, category, key, value, updated_at FROM memory_facts ORDER BY category, updated_at DESC" + q = ( + "SELECT id, category, key, value, updated_at, source, confidence, " + "expires_at, source_context FROM memory_facts ORDER BY category, updated_at DESC" + ) if limit: q += f" LIMIT {limit}" async with aiosqlite.connect(self._db_path) as db: @@ -148,4 +175,14 @@ def _row_to_dict(row: tuple) -> dict: - return {"id": row[0], "category": row[1], "key": row[2], "value": row[3], "updated_at": row[4]} + return { + "id": row[0], + "category": row[1], + "key": row[2], + "value": row[3], + "updated_at": row[4], + "source": row[5] if len(row) > 5 else "conversation", + "confidence": row[6] if len(row) > 6 else 70, + "expires_at": row[7] if len(row) > 7 else None, + "source_context": row[8] if len(row) > 8 else "", + } diff --git a/navi/memory/store.py b/navi/memory/store.py index 953628b..a670480 100644 --- a/navi/memory/store.py +++ b/navi/memory/store.py @@ -1,59 +1,91 @@ -"""Persistent memory store — facts about the user, backed by PostgreSQL.""" +"""Persistent memory store — facts about the user, backed by PostgreSQL with pgvector support.""" import asyncio import re import uuid -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +from typing import TYPE_CHECKING import asyncpg +import structlog -_SEPARATORS = re.compile(r'[-_/\\.]') # treat as word boundaries -_NOISE = re.compile(r'[^\w\s]', re.UNICODE) # strip remaining punctuation +from navi.config import settings + +if TYPE_CHECKING: + from navi.llm.base import LLMBackend + +_SEPARATORS = re.compile(r"[-_/\\.]") +_NOISE = re.compile(r"[^\w\s]", re.UNICODE) + +log = structlog.get_logger() + +_AUTO_DUMP_THRESHOLD = 60 +_VECTOR_CUTOFF_DISTANCE = 0.3 def _normalize_query(query: str) -> list[str]: - """Return a clean list of search terms from a raw query string. - - - Hyphens, underscores, slashes, dots → spaces (web-search → [web, search]) - - All remaining punctuation stripped (commas, quotes, parens …) - - Lowercased, split on whitespace - - Single-character tokens dropped - """ - q = _SEPARATORS.sub(' ', query) - q = _NOISE.sub(' ', q) + q = _SEPARATORS.sub(" ", query) + q = _NOISE.sub(" ", q) return [t for t in q.lower().split() if len(t) > 1] -_AUTO_DUMP_THRESHOLD = 60 # if the DB has ≤ this many facts, skip search and return all +def _build_ddl(pgvector_available: bool) -> list[str]: + """Return DDL statements depending on whether pgvector is installed.""" + embedding_col = f"embedding vector({settings.embedding_dimensions})," if pgvector_available else "" + embedding_idx = ( + "CREATE INDEX IF NOT EXISTS idx_memory_facts_embedding ON memory_facts USING hnsw (embedding vector_cosine_ops)" + if pgvector_available else "" + ) + stmts = [ + """CREATE TABLE IF NOT EXISTS memory_facts ( + id TEXT PRIMARY KEY, + category TEXT NOT NULL, + key TEXT NOT NULL, + value TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL, + source_session_id TEXT, + %s + source TEXT NOT NULL DEFAULT 'conversation', + confidence SMALLINT NOT NULL DEFAULT 70, + expires_at TIMESTAMPTZ, + last_verified_at TIMESTAMPTZ, + source_context TEXT, + UNIQUE(category, key) + )""" % embedding_col, + "CREATE INDEX IF NOT EXISTS idx_memory_facts_expires ON memory_facts (expires_at) WHERE expires_at IS NOT NULL", + "CREATE INDEX IF NOT EXISTS idx_memory_facts_source_cat ON memory_facts (source, category)", + """CREATE TABLE IF NOT EXISTS memory_summary ( + id INTEGER PRIMARY KEY DEFAULT 1, + content TEXT NOT NULL, + generated_at TIMESTAMPTZ NOT NULL + )""", + """CREATE TABLE IF NOT EXISTS session_memory_state ( + session_id TEXT PRIMARY KEY, + extracted_at TIMESTAMPTZ NOT NULL + )""", + ] + if embedding_idx: + stmts.insert(1, embedding_idx) + return stmts -_DDL_STATEMENTS = [ - """CREATE TABLE IF NOT EXISTS memory_facts ( - id TEXT PRIMARY KEY, - category TEXT NOT NULL, - key TEXT NOT NULL, - value TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - updated_at TIMESTAMPTZ NOT NULL, - source_session_id TEXT, - UNIQUE(category, key) - )""", - """CREATE TABLE IF NOT EXISTS memory_summary ( - id INTEGER PRIMARY KEY DEFAULT 1, - content TEXT NOT NULL, - generated_at TIMESTAMPTZ NOT NULL - )""", - """CREATE TABLE IF NOT EXISTS session_memory_state ( - session_id TEXT PRIMARY KEY, - extracted_at TIMESTAMPTZ NOT NULL - )""", -] + +def _vector_to_str(vec: list[float]) -> str: + """Serialize a vector to the PostgreSQL vector literal format.""" + return "[" + ",".join(str(v) for v in vec) + "]" class MemoryStore: - def __init__(self, dsn: str) -> None: + def __init__(self, dsn: str, embedding_backend: "LLMBackend | None" = None) -> None: self._dsn = dsn self._pool: asyncpg.Pool | None = None self._lock = asyncio.Lock() + self._embedding_backend = embedding_backend + self._pgvector_checked = False + self._pgvector_available = False + + def set_embedding_backend(self, backend: "LLMBackend | None") -> None: + self._embedding_backend = backend async def _get_pool(self) -> asyncpg.Pool: if self._pool is not None: @@ -62,11 +94,52 @@ if self._pool is None: pool = await asyncpg.create_pool(self._dsn) async with pool.acquire() as conn: - for stmt in _DDL_STATEMENTS: - await conn.execute(stmt) + # Try to create pgvector extension first + pgvector_available = False + try: + await conn.execute("CREATE EXTENSION IF NOT EXISTS vector") + row = await conn.fetchval("SELECT 1 FROM pg_extension WHERE extname = 'vector'") + pgvector_available = bool(row) + except Exception: + log.warning("memory.pgvector_not_available", exc_info=True) + + for stmt in _build_ddl(pgvector_available): + try: + await conn.execute(stmt) + except Exception: + log.warning("memory.ddl_failed", stmt=stmt[:80], exc_info=True) self._pool = pool return self._pool + async def _has_pgvector(self) -> bool: + if self._pgvector_checked: + return self._pgvector_available + try: + pool = await self._get_pool() + async with pool.acquire() as conn: + row = await conn.fetchval( + "SELECT 1 FROM pg_extension WHERE extname = 'vector'" + ) + self._pgvector_available = bool(row) + except Exception: + self._pgvector_available = False + self._pgvector_checked = True + return self._pgvector_available + + async def _generate_embedding(self, text: str) -> list[float] | None: + if not self._embedding_backend or not await self._has_pgvector(): + return None + try: + vectors = await self._embedding_backend.embed( + texts=[text], + model=settings.embedding_model, + ) + if vectors and vectors[0]: + return vectors[0] + except Exception: + log.warning("memory.embed_failed", text=text[:60], exc_info=True) + return None + # ── Facts ──────────────────────────────────────────────────────────────── async def upsert_fact( @@ -75,30 +148,90 @@ key: str, value: str, source_session_id: str | None = None, + source: str = "conversation", + confidence: int = 70, + expires_at: datetime | None = None, + source_context: str = "", ) -> None: now = datetime.now(timezone.utc) + embedding = await self._generate_embedding(value) pool = await self._get_pool() async with pool.acquire() as conn: - await conn.execute( - """INSERT INTO memory_facts (id, category, key, value, created_at, updated_at, source_session_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT(category, key) DO UPDATE SET - value = EXCLUDED.value, - updated_at = EXCLUDED.updated_at, - source_session_id = EXCLUDED.source_session_id""", - str(uuid.uuid4()), category, key, value, now, now, source_session_id, - ) + if embedding: + vec_str = _vector_to_str(embedding) + await conn.execute( + """INSERT INTO memory_facts + (id, category, key, value, created_at, updated_at, source_session_id, + embedding, source, confidence, expires_at, source_context) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::vector, $9, $10, $11, $12) + ON CONFLICT(category, key) DO UPDATE SET + value = EXCLUDED.value, + updated_at = EXCLUDED.updated_at, + source_session_id = EXCLUDED.source_session_id, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + confidence = EXCLUDED.confidence, + expires_at = EXCLUDED.expires_at, + source_context = EXCLUDED.source_context""", + str(uuid.uuid4()), category, key, value, now, now, + source_session_id, vec_str, source, confidence, expires_at, source_context, + ) + else: + await conn.execute( + """INSERT INTO memory_facts + (id, category, key, value, created_at, updated_at, source_session_id, + source, confidence, expires_at, source_context) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT(category, key) DO UPDATE SET + value = EXCLUDED.value, + updated_at = EXCLUDED.updated_at, + source_session_id = EXCLUDED.source_session_id, + source = EXCLUDED.source, + confidence = EXCLUDED.confidence, + expires_at = EXCLUDED.expires_at, + source_context = EXCLUDED.source_context""", + str(uuid.uuid4()), category, key, value, now, now, + source_session_id, source, confidence, expires_at, source_context, + ) async def search_facts(self, query: str, limit: int = 15) -> list[dict]: + # 1. Try vector search if pgvector + embedding backend are available + if self._embedding_backend and await self._has_pgvector(): + try: + query_embedding = await self._generate_embedding(query) + if query_embedding: + vec_str = _vector_to_str(query_embedding) + pool = await self._get_pool() + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT id, category, key, value, updated_at, + source, confidence, expires_at, source_context, + embedding <=> $1::vector AS distance + FROM memory_facts + WHERE (expires_at IS NULL OR expires_at > now()) + ORDER BY embedding <=> $1::vector + LIMIT $2""", + vec_str, limit, + ) + results = [ + _row_to_dict(r) + for r in rows + if r["distance"] < _VECTOR_CUTOFF_DISTANCE + ] + if results: + log.debug("memory.vector_search", hits=len(results), query=query[:40]) + return results + except Exception: + log.warning("memory.vector_search_failed", query=query[:40], exc_info=True) + + # 2. Text fallback (original ILIKE logic) terms = _normalize_query(query) if not terms: return await self.get_all_facts(limit=limit) - # Small DB — skip search entirely, just return the most recent facts if await self.fact_count() <= _AUTO_DUMP_THRESHOLD: return await self.get_all_facts(limit=limit) - # Build per-term ILIKE conditions and a shared parameter list base_params: list = [] term_conds: list[str] = [] for term in terms: @@ -115,21 +248,21 @@ pool = await self._get_pool() async with pool.acquire() as conn: - # 1. AND — all terms must match (most precise) rows = await conn.fetch( - f"SELECT id, category, key, value, updated_at FROM memory_facts " + f"SELECT id, category, key, value, updated_at, source, confidence, " + f"expires_at, source_context FROM memory_facts " f"WHERE {and_where} ORDER BY updated_at DESC LIMIT ${limit_idx}", *base_params, limit, ) if rows: return [_row_to_dict(r) for r in rows] - # 2. OR with relevance score — facts matching more terms rank higher score_expr = " + ".join( f"CASE WHEN {c} THEN 1 ELSE 0 END" for c in term_conds ) rows = await conn.fetch( - f"SELECT id, category, key, value, updated_at, ({score_expr}) AS score " + f"SELECT id, category, key, value, updated_at, source, confidence, " + f"expires_at, source_context, ({score_expr}) AS score " f"FROM memory_facts WHERE {or_where} " f"ORDER BY score DESC, updated_at DESC LIMIT ${limit_idx}", *base_params, limit, @@ -148,11 +281,13 @@ result = await conn.execute( "DELETE FROM memory_facts WHERE LOWER(key)=LOWER($1)", key ) - # asyncpg returns "DELETE N" return int(result.split()[1]) async def get_all_facts(self, limit: int | None = None) -> list[dict]: - q = "SELECT id, category, key, value, updated_at FROM memory_facts ORDER BY category, updated_at DESC" + q = ( + "SELECT id, category, key, value, updated_at, source, confidence, " + "expires_at, source_context FROM memory_facts ORDER BY category, updated_at DESC" + ) params: list = [] if limit: q += f" LIMIT ${len(params) + 1}" @@ -213,10 +348,16 @@ def _row_to_dict(row: asyncpg.Record) -> dict: val = row["updated_at"] updated_at = val.isoformat() if hasattr(val, "isoformat") else str(val) + expires_at = row.get("expires_at") + expires_str = expires_at.isoformat() if hasattr(expires_at, "isoformat") else None return { "id": row["id"], "category": row["category"], "key": row["key"], "value": row["value"], "updated_at": updated_at, + "source": row.get("source", "conversation"), + "confidence": row.get("confidence", 70), + "expires_at": expires_str, + "source_context": row.get("source_context", ""), } diff --git a/navi/tools/memory.py b/navi/tools/memory.py index 1a47fbd..a0d5a15 100644 --- a/navi/tools/memory.py +++ b/navi/tools/memory.py @@ -1,18 +1,22 @@ """Memory tool — search, save, and forget facts about the user.""" +from datetime import datetime, timedelta, timezone + from navi.memory.store import MemoryStore from navi.tools.base import current_session_id from .base import Tool, ToolResult _VALID_CATEGORIES = {"profile", "preferences", "technical", "projects", "other"} +_VALID_SOURCES = {"conversation", "tool_call", "auto_discovery", "user_explicit"} class MemoryTool(Tool): name = "memory" description = ( "Manage long-term memory about the user — facts that survive across sessions. " - "Actions: save (upsert a fact), search (find facts by query), forget (delete by key), list (all facts)." + "Actions: save (upsert a fact), search (find facts by query), forget (delete by key), list (all facts). " + "When saving system facts (IPs, hosts, services), set source='tool_call', confidence=95, and expires_days=7." ) parameters = { "type": "object", @@ -51,6 +55,29 @@ "type": "string", "description": "save only: the fact as a concise plain-text statement.", }, + "source": { + "type": "string", + "enum": ["conversation", "tool_call", "auto_discovery", "user_explicit"], + "description": ( + "save only: how the fact was obtained. " + "conversation=extracted from chat, tool_call=found via tool execution, " + "auto_discovery=system scan, user_explicit=user told me directly." + ), + }, + "confidence": { + "type": "integer", + "description": "save only: 0-100. Tool output=95, user statement=80, web=50, guess=30.", + "minimum": 0, + "maximum": 100, + }, + "expires_days": { + "type": "integer", + "description": "save only: how many days this fact stays valid. Null = never expires.", + }, + "source_context": { + "type": "string", + "description": "save only: provenance — 'found via ip addr on localhost', 'user said in session X'.", + }, }, "required": ["action"], } @@ -75,6 +102,10 @@ category = (params.get("category") or "").strip().lower() key = (params.get("key") or "").strip() value = (params.get("value") or "").strip() + source = (params.get("source") or "conversation").strip().lower() + confidence = params.get("confidence", 70) + expires_days = params.get("expires_days") + source_context = (params.get("source_context") or "").strip() if not category: return ToolResult(success=False, output="category is required for save.", error="missing category") @@ -88,9 +119,33 @@ return ToolResult(success=False, output="key is required for save.", error="missing key") if not value: return ToolResult(success=False, output="value is required for save.", error="missing value") + if source not in _VALID_SOURCES: + source = "conversation" + + try: + confidence = int(confidence) + except Exception: + confidence = 70 + confidence = max(0, min(100, confidence)) + + expires_at = None + if expires_days is not None: + try: + expires_at = datetime.now(timezone.utc) + timedelta(days=int(expires_days)) + except Exception: + pass session_id = current_session_id.get(None) - await self._store.upsert_fact(category, key, value, session_id) + await self._store.upsert_fact( + category=category, + key=key, + value=value, + source_session_id=session_id, + source=source, + confidence=confidence, + expires_at=expires_at, + source_context=source_context, + ) return ToolResult(success=True, output=f"Saved [{category}] {key}: {value}") async def _search(self, params: dict) -> ToolResult: @@ -102,7 +157,17 @@ if not facts: return ToolResult(success=True, output="No matching facts found in memory.") - lines = [f"[{f['category']}] {f['key']}: {f['value']}" for f in facts] + lines = [] + for f in facts: + prov = "" + if f.get("source"): + prov += f" (src: {f['source']}" + if f.get("confidence"): + prov += f", conf: {f['confidence']}" + if f.get("source_context"): + prov += f", ctx: {f['source_context']}" + prov += ")" + lines.append(f"[{f['category']}] {f['key']}: {f['value']}{prov}") return ToolResult(success=True, output=f"Found {len(facts)} fact(s):\n" + "\n".join(lines)) async def _forget(self, params: dict) -> ToolResult: @@ -124,5 +189,13 @@ if not facts: return ToolResult(success=True, output="Memory is empty.") - lines = [f"[{f['category']}] {f['key']}: {f['value']}" for f in facts] + lines = [] + for f in facts: + prov = "" + if f.get("source"): + prov += f" (src: {f['source']}" + if f.get("confidence"): + prov += f", conf: {f['confidence']}" + prov += ")" + lines.append(f"[{f['category']}] {f['key']}: {f['value']}{prov}") return ToolResult(success=True, output=f"{len(facts)} fact(s) in memory:\n" + "\n".join(lines)) diff --git a/requirements.txt b/requirements.txt index 712a8b1..a7b9e85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ aiosqlite pydantic websockets +pgvector