2025-01-28 20:38:33 +05:00

63 lines
1.5 KiB
Python

from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy import NullPool, create_engine
from sqlalchemy.orm import sessionmaker, Session
from src.core.settings.base import settings
class SyncDataBaseHelper:
def __init__(
self,
url: str,
echo: bool = False,
):
self.engine = create_engine(
url=url,
echo=echo
)
self.session_factory = sessionmaker(
bind=self.engine,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
def get_sync_session(self) -> Session:
session = self.session_factory()
return session
class DatabaseConfig:
def __init__(
self,
connect_url: str,
echo: bool = False,
):
self.engine = create_async_engine(
url=connect_url,
echo=echo,
poolclass=NullPool,
)
self.session_factory = async_sessionmaker(
bind=self.engine,
autoflush=False,
expire_on_commit=False,
autocommit=False,
)
async def get_async_session(self):
async with self.session_factory() as session:
yield session
def get_async_session_not_closed(self) -> AsyncSession:
return self.session_factory()
db_helper = DatabaseConfig(
connect_url=str(settings.POSTGRES.async_connect_url),
echo=settings.POSTGRES.ECHO_LOG,
)