Newer
Older
navi-1 / navi / db.py
"""Database service — manages a single asyncpg connection pool for the application."""

import asyncio

import asyncpg
import structlog

log = structlog.get_logger()


class Database:
    """Manages a single asyncpg pool shared by all stores."""

    def __init__(self, dsn: str) -> None:
        self._dsn = dsn
        self._pool: asyncpg.Pool | None = None
        self._lock = asyncio.Lock()

    async def pool(self) -> asyncpg.Pool:
        """Return the connection pool, creating it lazily if needed."""
        if self._pool is not None:
            return self._pool
        async with self._lock:
            if self._pool is not None:
                return self._pool
            self._pool = await asyncpg.create_pool(self._dsn)
            log.info("db.pool_created")
        return self._pool

    async def close(self) -> None:
        """Close the pool if it exists."""
        if self._pool is not None:
            await self._pool.close()
            self._pool = None