Newer
Older
navi-1 / navi / memory / migrate_pgvector.py
"""Database migration script for memory system pgvector upgrade.

Run this once against the PostgreSQL database after deploying the new code:
    .venv/bin/python navi/memory/migrate_pgvector.py

What it does:
1. Verifies pgvector extension is installed
2. Adds missing columns to memory_facts: embedding, source, confidence, expires_at, last_verified_at, source_context
3. Creates indexes: hnsw on embedding, expires (partial), source+category

Safe to run multiple times — all operations use IF NOT EXISTS.
"""

import asyncio
import sys

import asyncpg


async def migrate(dsn: str) -> None:
    conn = await asyncpg.connect(dsn)
    try:
        has_vec = bool(await conn.fetchval("SELECT 1 FROM pg_extension WHERE extname = 'vector'"))
        print(f"pgvector available: {has_vec}")

        alter_stmts: list[str] = []
        if has_vec:
            alter_stmts.append("ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS embedding vector(768)")

        alter_stmts.extend(
            [
                "ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS source TEXT NOT NULL DEFAULT 'conversation'",
                "ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS confidence SMALLINT NOT NULL DEFAULT 70",
                "ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS expires_at TIMESTAMPTZ",
                "ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS last_verified_at TIMESTAMPTZ",
                "ALTER TABLE memory_facts ADD COLUMN IF NOT EXISTS source_context TEXT",
            ]
        )

        for stmt in alter_stmts:
            try:
                await conn.execute(stmt)
                print(f"  OK: {stmt[:70]}...")
            except Exception as e:
                print(f"  FAIL: {stmt[:70]}... - {e}")

        index_stmts = [
            "CREATE INDEX IF NOT EXISTS idx_memory_facts_expires ON memory_facts (expires_at) WHERE expires_at IS NOT NULL",
            "CREATE INDEX IF NOT EXISTS idx_memory_facts_source_cat ON memory_facts (source, category)",
        ]
        if has_vec:
            index_stmts.append(
                "CREATE INDEX IF NOT EXISTS idx_memory_facts_embedding ON memory_facts USING hnsw (embedding vector_cosine_ops)"
            )

        for stmt in index_stmts:
            try:
                await conn.execute(stmt)
                print(f"  OK: {stmt[:80]}...")
            except Exception as e:
                print(f"  FAIL: {stmt[:80]}... - {e}")

        print("Migration complete.")
    finally:
        await conn.close()


if __name__ == "__main__":
    from navi.config import settings

    dsn = settings.database_url
    if not dsn:
        print("No DATABASE_URL set — nothing to migrate.")
        sys.exit(0)

    asyncio.run(migrate(dsn))