core/services/db.py

76 lines
2.4 KiB
Python
Raw Normal View History

2024-01-25 19:41:27 +00:00
import logging
2024-02-19 07:06:46 +00:00
import sys
2023-12-24 17:46:50 +00:00
import math
2023-12-24 14:25:57 +00:00
import time
2023-11-22 16:38:39 +00:00
from typing import Any, Callable, Dict, TypeVar
2023-12-24 14:25:57 +00:00
from sqlalchemy import Column, Integer, create_engine, event
2024-01-25 19:41:27 +00:00
from sqlalchemy.engine import Engine
from sqlalchemy.ext.declarative import declarative_base
2023-11-22 16:38:39 +00:00
from sqlalchemy.orm import Session
from sqlalchemy.sql.schema import Table
2023-11-22 16:38:39 +00:00
from settings import DB_URL
2023-11-03 10:10:22 +00:00
2024-01-25 19:41:27 +00:00
2024-02-16 09:16:00 +00:00
# Настройка журнала
logging.basicConfig(level=logging.DEBUG)
2024-02-19 07:06:46 +00:00
# Создание обработчика журнала для записи сообщений в stdout
2024-02-16 09:16:00 +00:00
logger = logging.getLogger('sqlalchemy.profiler')
2024-02-19 07:06:46 +00:00
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
2023-12-24 14:25:57 +00:00
2024-01-25 19:41:27 +00:00
@event.listens_for(Engine, 'before_cursor_execute')
2023-12-24 14:25:57 +00:00
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
2024-01-25 19:41:27 +00:00
conn.info.setdefault('query_start_time', []).append(time.time())
2023-12-24 14:25:57 +00:00
2024-01-25 19:41:27 +00:00
@event.listens_for(Engine, 'after_cursor_execute')
2023-12-24 14:25:57 +00:00
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
2024-01-25 19:41:27 +00:00
total = time.time() - conn.info['query_start_time'].pop(-1)
2023-12-25 07:48:50 +00:00
total = math.floor(total * 10000) / 10000
2024-02-16 09:16:00 +00:00
if total > 25:
logger.debug(f'Long running query: {statement}, Execution Time: {total} s')
2023-12-24 14:25:57 +00:00
2023-11-03 10:10:22 +00:00
engine = create_engine(DB_URL, echo=False, pool_size=10, max_overflow=20)
2024-01-25 19:41:27 +00:00
T = TypeVar('T')
REGISTRY: Dict[str, type] = {}
2024-02-16 09:16:00 +00:00
Base = declarative_base()
2022-09-03 10:50:14 +00:00
2024-01-25 19:41:27 +00:00
def local_session(src=''):
2023-11-22 16:38:39 +00:00
return Session(bind=engine, expire_on_commit=False)
class Base(declarative_base()):
2022-09-03 10:50:14 +00:00
__table__: Table
__tablename__: str
__new__: Callable
__init__: Callable
2023-01-31 07:36:54 +00:00
__allow_unmapped__ = True
2023-01-31 07:44:06 +00:00
__abstract__ = True
2024-01-25 19:41:27 +00:00
__table_args__ = {'extend_existing': True}
2023-01-31 07:44:06 +00:00
id = Column(Integer, primary_key=True)
2022-09-03 10:50:14 +00:00
def __init_subclass__(cls, **kwargs):
REGISTRY[cls.__name__] = cls
def dict(self) -> Dict[str, Any]:
column_names = self.__table__.columns.keys()
2024-01-25 19:41:27 +00:00
if '_sa_instance_state' in column_names:
column_names.remove('_sa_instance_state')
2023-11-03 10:10:22 +00:00
try:
return {c: getattr(self, c) for c in column_names}
except Exception as e:
2024-02-16 09:16:00 +00:00
logger.error(f'Error occurred while converting object to dictionary: {e}')
2023-11-03 10:10:22 +00:00
return {}
2023-11-22 16:38:39 +00:00
def update(self, values: Dict[str, Any]) -> None:
for key, value in values.items():
if hasattr(self, key):
setattr(self, key, value)