"""Database migration script for memory system pgvector upgrade.
Run this once against the PostgreSQL database after deploying the new code:
.venv/bin/python navi/memory/migrate_pgvector.py
What it does:
1. Verifies pgvector extension is installed
2. Adds missing columns to memory_facts: embedding, source, confidence, expires_at, last_verified_at, source_context
3. Creates indexes: hnsw on embedding, expires (partial), source+category
Safe to run multiple times — all operations use IF NOT EXISTS.
"""
import asyncio
import sys
import asyncpg
async def migrate(dsn: str) -> None:
conn = await asyncpg.connect(dsn)
try:
has_vec = bool(await conn.fetchval("SELECT 1 FROM pg_extension WHERE extname = 'vector'"))
print(f"pgvector available: {has_vec}")
alter_stmts: list[str] = []
if has_vec:
alter_stmts.append("ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS embedding vector(768)")
alter_stmts.extend(
[
"ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT 'conversation'",
"ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS confidence SMALLINT NOT NULL DEFAULT 70",
"ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS expires_at TIMESTAMPTZ",
"ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS last_verified_at TIMESTAMPTZ",
"ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS source_context TEXT",
]
)
for stmt in alter_stmts:
try:
await conn.execute(stmt)
print(f" OK: {stmt[:70]}...")
except Exception as e:
print(f" FAIL: {stmt[:70]}... - {e}")
index_stmts = [
"CREATE INDEX IF NOT EXISTS idx_memory_facts_expires ON memory_facts (expires_at) WHERE expires_at IS NOT NULL",
"CREATE INDEX IF NOT EXISTS idx_memory_facts_source_cat ON memory_facts (source, category)",
]
if has_vec:
index_stmts.append(
"CREATE INDEX IF NOT EXISTS idx_memory_facts_embedding ON memory_facts USING hnsw (embedding vector_cosine_ops)"
)
for stmt in index_stmts:
try:
await conn.execute(stmt)
print(f" OK: {stmt[:80]}...")
except Exception as e:
print(f" FAIL: {stmt[:80]}... - {e}")
print("Migration complete.")
finally:
await conn.close()
if __name__ == "__main__":
from navi.config import settings
dsn = settings.database_url
if not dsn:
print("No DATABASE_URL set — nothing to migrate.")
sys.exit(0)
asyncio.run(migrate(dsn))