"""Connessione asyncpg al database ml. Pool singleton.""" from __future__ import annotations import asyncpg from typing import Optional from core.config import settings _pool: Optional[asyncpg.Pool] = None async def init_pool() -> asyncpg.Pool: global _pool if _pool is None: _pool = await asyncpg.create_pool( host=settings.pg_host, port=settings.pg_port, user=settings.pg_user, password=settings.pg_password, database=settings.pg_db, min_size=1, max_size=10, command_timeout=30, ) return _pool async def close_pool() -> None: global _pool if _pool is not None: await _pool.close() _pool = None def pool() -> asyncpg.Pool: if _pool is None: raise RuntimeError("DB pool not initialized — call init_pool() at startup") return _pool async def fetch(sql: str, *args): async with pool().acquire() as c: return await c.fetch(sql, *args) async def fetchrow(sql: str, *args): async with pool().acquire() as c: return await c.fetchrow(sql, *args) async def execute(sql: str, *args): async with pool().acquire() as c: return await c.execute(sql, *args)