core/services/db.py

108 lines
3.5 KiB
Python
Raw Normal View History

2024-02-26 15:04:34 +00:00
import json
2024-02-29 12:15:04 +00:00
import math
import time
2024-04-08 07:38:58 +00:00
import traceback
import warnings
2023-11-22 16:38:39 +00:00
from typing import Any, Callable, Dict, TypeVar
2024-04-08 07:38:58 +00:00
from sqlalchemy import (JSON, Column, Engine, Integer, create_engine, event,
exc, inspect)
from sqlalchemy.ext.declarative import declarative_base
2024-02-25 15:26:23 +00:00
from sqlalchemy.orm import Session, configure_mappers
from sqlalchemy.sql.schema import Table
2024-02-25 15:19:12 +00:00
from sqlalchemy_searchable import make_searchable
2024-02-25 13:43:04 +00:00
2024-02-20 16:19:46 +00:00
from services.logger import root_logger as logger
2023-11-22 16:38:39 +00:00
from settings import DB_URL
2024-02-24 21:06:54 +00:00
2024-02-19 10:16:44 +00:00
# Подключение к базе данных SQLAlchemy
engine = create_engine(DB_URL, echo=False, pool_size=10, max_overflow=20)
2024-02-25 13:43:04 +00:00
inspector = inspect(engine)
2024-02-25 15:08:02 +00:00
configure_mappers()
2024-02-21 16:14:58 +00:00
T = TypeVar('T')
2024-02-19 10:16:44 +00:00
REGISTRY: Dict[str, type] = {}
2024-03-06 09:25:55 +00:00
FILTERED_FIELDS = ['_sa_instance_state', 'search_vector']
2023-12-24 14:25:57 +00:00
2024-02-21 15:07:02 +00:00
2024-02-24 10:22:35 +00:00
# noinspection PyUnusedLocal
2024-02-21 16:14:58 +00:00
def local_session(src=''):
2023-11-22 16:38:39 +00:00
return Session(bind=engine, expire_on_commit=False)
2024-02-21 07:27:16 +00:00
class Base(declarative_base()):
2022-09-03 10:50:14 +00:00
__table__: Table
__tablename__: str
__new__: Callable
__init__: Callable
2023-01-31 07:36:54 +00:00
__allow_unmapped__ = True
2023-01-31 07:44:06 +00:00
__abstract__ = True
2024-02-21 16:14:58 +00:00
__table_args__ = {'extend_existing': True}
2023-01-31 07:44:06 +00:00
id = Column(Integer, primary_key=True)
2022-09-03 10:50:14 +00:00
def __init_subclass__(cls, **kwargs):
REGISTRY[cls.__name__] = cls
def dict(self) -> Dict[str, Any]:
2024-03-06 09:25:55 +00:00
column_names = filter(
lambda x: x not in FILTERED_FIELDS, self.__table__.columns.keys()
)
2023-11-03 10:10:22 +00:00
try:
2024-02-26 15:00:55 +00:00
data = {}
for c in column_names:
value = getattr(self, c)
if isinstance(value, JSON):
2024-02-26 15:04:34 +00:00
# save JSON column as dict
data[c] = json.loads(str(value))
2024-02-26 15:00:55 +00:00
else:
data[c] = value
2024-02-29 07:02:29 +00:00
# Add synthetic field .stat
if hasattr(self, 'stat'):
data['stat'] = self.stat
2024-02-26 15:00:55 +00:00
return data
2023-11-03 10:10:22 +00:00
except Exception as e:
2024-02-21 16:14:58 +00:00
logger.error(f'Error occurred while converting object to dictionary: {e}')
2023-11-03 10:10:22 +00:00
return {}
2023-11-22 16:38:39 +00:00
def update(self, values: Dict[str, Any]) -> None:
for key, value in values.items():
if hasattr(self, key):
setattr(self, key, value)
2024-02-19 10:16:44 +00:00
2024-02-21 07:27:16 +00:00
2024-02-25 13:43:04 +00:00
make_searchable(Base.metadata)
2024-02-25 21:06:37 +00:00
Base.metadata.create_all(bind=engine)
2024-02-25 17:58:48 +00:00
# Функция для вывода полного трейсбека при предупреждениях
2024-03-06 09:25:55 +00:00
def warning_with_traceback(
message: Warning | str, category, filename: str, lineno: int, file=None, line=None
):
2024-02-25 17:58:48 +00:00
tb = traceback.format_stack()
tb_str = ''.join(tb)
return f'{message} ({filename}, {lineno}): {category.__name__}\n{tb_str}'
# Установка функции вывода трейсбека для предупреждений SQLAlchemy
2024-02-29 12:15:04 +00:00
warnings.showwarning = warning_with_traceback
warnings.simplefilter('always', exc.SAWarning)
2024-02-25 17:58:48 +00:00
2024-02-29 12:15:04 +00:00
@event.listens_for(Engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.query_start_time = time.time()
2024-03-28 13:05:28 +00:00
conn.last_statement = ''
2024-02-25 17:58:48 +00:00
2024-03-28 16:08:55 +00:00
2024-02-29 12:15:04 +00:00
@event.listens_for(Engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
2024-03-28 13:37:04 +00:00
compiled_statement = context.compiled.string
compiled_parameters = context.compiled.params
if compiled_statement:
elapsed = time.time() - conn.query_start_time
query = compiled_statement % compiled_parameters
if elapsed > 1 and conn.last_statement != query:
conn.last_statement = query
logger.debug(f"\n{query}\n{'*' * math.floor(elapsed)} {elapsed:.3f} s\n")