reader-query-optimized
Some checks failed
Deploy on push / deploy (push) Failing after 10s

This commit is contained in:
Untone 2024-10-31 19:06:58 +03:00
parent 1114c7766d
commit 62370b94b3
2 changed files with 133 additions and 246 deletions

View File

@ -29,23 +29,34 @@ from utils.logger import root_logger as logger
def query_shouts(): def query_shouts():
""" """
Базовый запрос для получения публикаций с подзапросами статистики, авторов и тем, Базовый запрос для получения публикаций с подзапросами статистики, авторов и тем.
с агрегированием в JSON.
""" """
comments_reaction = aliased(Reaction, name="comments_reaction") # Подзапрос для реакций и статистики (объединяем только эту часть)
ratings_reaction = aliased(Reaction, name="ratings_reaction") reactions_subquery = (
last_reaction = aliased(Reaction, name="last_reaction") select(
func.sum(
case(
(Reaction.kind == ReactionKind.LIKE.value, 1),
(Reaction.kind == ReactionKind.DISLIKE.value, -1),
else_=0,
)
).label("rating_stat"),
func.count(distinct(case((Reaction.kind == ReactionKind.COMMENT.value, Reaction.id), else_=None))).label(
"comments_stat"
),
func.max(Reaction.created_at).label("last_reacted_at"),
)
.select_from(Reaction)
.where(and_(Reaction.shout == Shout.id, Reaction.reply_to.is_(None), Reaction.deleted_at.is_(None)))
.correlate(Shout)
.scalar_subquery()
)
# Подзапрос для уникальных авторов, агрегированных в JSON # Остальные подзапросы оставляем как есть
authors_subquery = ( authors_subquery = (
select( select(
func.json_agg( func.json_agg(
func.json_build_object( func.json_build_object("id", Author.id, "name", Author.name, "slug", Author.slug, "pic", Author.pic)
"id", Author.id,
"name", Author.name,
"slug", Author.slug,
"pic", Author.pic
)
).label("authors") ).label("authors")
) )
.select_from(ShoutAuthor) .select_from(ShoutAuthor)
@ -58,13 +69,9 @@ def query_shouts():
# Подзапрос для уникальных тем, агрегированных в JSON # Подзапрос для уникальных тем, агрегированных в JSON
topics_subquery = ( topics_subquery = (
select( select(
func.json_agg( func.json_agg(func.json_build_object("id", Topic.id, "title", Topic.title, "slug", Topic.slug)).label(
func.json_build_object( "topics"
"id", Topic.id, )
"title", Topic.title,
"slug", Topic.slug
)
).label("topics")
) )
.select_from(ShoutTopic) .select_from(ShoutTopic)
.join(Topic, ShoutTopic.topic == Topic.id) .join(Topic, ShoutTopic.topic == Topic.id)
@ -83,58 +90,30 @@ def query_shouts():
.scalar_subquery() .scalar_subquery()
) )
# Подзапрос для комментариев captions_subquery = (
comments_subq = (
select(func.count(distinct(comments_reaction.id)))
.select_from(comments_reaction)
.where(
and_(
comments_reaction.shout == Shout.id,
comments_reaction.kind == ReactionKind.COMMENT.value,
comments_reaction.deleted_at.is_(None),
comments_reaction.reply_to.is_(None),
)
)
.scalar_subquery()
.label("comments_stat")
)
# Подзапрос для рейтинга
ratings_subq = (
select( select(
func.sum( func.json_agg(func.json_build_object("author_id", Author.id, "caption", ShoutAuthor.caption)).label(
case( "captions"
(ratings_reaction.kind == ReactionKind.LIKE.value, 1),
(ratings_reaction.kind == ReactionKind.DISLIKE.value, -1),
else_=0,
)
)
)
.select_from(ratings_reaction)
.where(
and_(
ratings_reaction.shout == Shout.id,
ratings_reaction.reply_to.is_(None),
ratings_reaction.deleted_at.is_(None),
) )
) )
.select_from(ShoutAuthor)
.join(Author, ShoutAuthor.author == Author.id)
.where(ShoutAuthor.shout == Shout.id)
.correlate(Shout)
.scalar_subquery() .scalar_subquery()
.label("rating_stat")
) )
# Основной запрос с использованием подзапросов # Основной запрос
q = ( q = (
select( select(
Shout, Shout,
comments_subq, reactions_subquery,
ratings_subq,
func.max(Reaction.created_at).label("last_reacted_at"),
authors_subquery, authors_subquery,
captions_subquery,
topics_subquery, topics_subquery,
main_topic_subquery, main_topic_subquery,
) )
.outerjoin(last_reaction, and_(last_reaction.shout == Shout.id, last_reaction.deleted_at.is_(None))) .outerjoin(Reaction, Reaction.shout == Shout.id)
.outerjoin(ShoutReactionsFollower, ShoutReactionsFollower.shout == Shout.id)
.where(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None))) .where(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None)))
.group_by(Shout.id) .group_by(Shout.id)
) )
@ -144,140 +123,65 @@ def query_shouts():
def get_shouts_with_stats(q, limit=20, offset=0, author_id=None): def get_shouts_with_stats(q, limit=20, offset=0, author_id=None):
""" """
Получение публикаций со статистикой, и подзапросами авторов и тем. Получение публикаций со статистикой.
:param q: Запрос :param q: Базовый запрос публикаций
:param limit: Ограничение на количество результатов. :param limit: Ограничение количества результатов
:param offset: Смещение для пагинации. :param offset: Смещение для пагинации
:return: Список публикаций с включенной статистикой. :param author_id: Опциональный ID автора для фильтрации
:return: Список публикаций с статистикой
""" """
# Скалярный подзапрос для авторов
authors_subquery = (
select(
func.json_agg(
func.json_build_object(
"id", Author.id,
"name", Author.name,
"slug", Author.slug,
"pic", Author.pic
)
).label("authors")
)
.select_from(ShoutAuthor)
.join(Author, ShoutAuthor.author == Author.id)
.where(ShoutAuthor.shout == Shout.id)
.correlate(Shout)
.scalar_subquery()
)
# Подзапрос для captions
captions_subquery = (
select(
func.json_agg(
func.json_build_object(
"author_id", Author.id,
"caption", ShoutAuthor.caption
)
).label("captions")
)
.select_from(ShoutAuthor)
.join(Author, ShoutAuthor.author == Author.id)
.where(ShoutAuthor.shout == Shout.id)
.correlate(Shout)
.scalar_subquery()
)
# Скалярный подзапрос для тем
topics_subquery = (
select(
func.json_agg(
func.json_build_object(
"id", Topic.id,
"title", Topic.title,
"slug", Topic.slug
)
).label("topics"),
)
.select_from(ShoutTopic)
.join(Topic, ShoutTopic.topic == Topic.id)
.where(ShoutTopic.shout == Shout.id)
.correlate(Shout)
.scalar_subquery()
)
# Скалярный подзапрос для основного топика
main_topic_subquery = (
select(func.max(Topic.slug).label("main_topic_slug"))
.select_from(ShoutTopic)
.join(Topic, ShoutTopic.topic == Topic.id)
.where(
and_(
ShoutTopic.shout == Shout.id,
ShoutTopic.main.is_(True),
)
)
.correlate(Shout)
.scalar_subquery()
)
# Основной запрос
query = (
select(
Shout,
func.count(distinct(Reaction.id)).label("comments_stat"),
func.sum(
case(
(Reaction.kind == ReactionKind.LIKE.value, 1),
(Reaction.kind == ReactionKind.DISLIKE.value, -1),
else_=0,
)
).label("rating_stat"),
func.max(Reaction.created_at).label("last_reacted_at"),
authors_subquery,
captions_subquery,
topics_subquery,
main_topic_subquery,
)
.outerjoin(Reaction, Reaction.shout == Shout.id)
.where(and_(Shout.published_at.is_not(None), Shout.deleted_at.is_(None), Shout.featured_at.is_not(None)))
.group_by(Shout.id)
.order_by(Shout.published_at.desc().nulls_last())
.limit(limit)
.offset(offset)
)
# Применение дополнительных фильтров, если необходимо
if author_id: if author_id:
query = query.filter(Shout.created_by == author_id) q = q.filter(Shout.created_by == author_id)
q = q.order_by(Shout.published_at.desc().nulls_last())
if limit:
q = q.limit(limit)
if offset:
q = q.offset(offset)
# Формирование списка публикаций с их данными
shouts = [] shouts = []
with local_session() as session: with local_session() as session:
for [shout, comments_stat, rating_stat, last_reacted_at, authors_json, captions_json, topics_json, main_topic_slug] in ( results = session.execute(q).all()
session.execute(query).all() or []
): for [shout, reactions_stat, authors_json, captions_json, topics_json, main_topic_slug] in results:
viewed_stat = ViewedStorage.get_shout(shout.id) # Базовые данные публикации
shout_dict = shout.dict() shout_dict = shout.dict()
# Преобразование JSON данных в объекты
captions = {int(ca['author_id']): ca['caption'] for ca in captions_json} if captions_json else {} # Добавление статистики просмотров
# patch json to add captions to authors viewed_stat = ViewedStorage.get_shout(shout_slug=shout.slug)
authors = [
{**a, "caption": captions.get(int(a["id"]), "")} for a in authors_json # Обработка авторов и их подписей
] if authors_json else [] authors = authors_json or []
shout_dict["authors"] = authors captions = captions_json or []
# patch json to add is_main to topics
topics = [ # Объединяем авторов с их подписями
{**t, "is_main": t["slug"] == main_topic_slug} for t in topics_json for author in authors:
] if topics_json else [] caption_item = next((c for c in captions if c["author_id"] == author["id"]), None)
shout_dict["topics"] = topics if caption_item:
shout_dict["stat"] = { author["caption"] = caption_item["caption"]
"viewed": viewed_stat or 0,
"rating": rating_stat or 0, # Обработка тем
"commented": comments_stat or 0, topics = topics_json or []
"last_reacted_at": last_reacted_at, for topic in topics:
} topic["is_main"] = topic["slug"] == main_topic_slug
shout_dict["main_topic"] = main_topic_slug # Присваиваем основной топик
# Формирование финальной структуры
shout_dict.update(
{
"authors": authors,
"topics": topics,
"main_topic": main_topic_slug,
"stat": {
"viewed": viewed_stat or 0,
"rating": reactions_stat.rating_stat or 0,
"commented": reactions_stat.comments_stat or 0,
"last_reacted_at": reactions_stat.last_reacted_at,
},
}
)
shouts.append(shout_dict) shouts.append(shout_dict)
return shouts return shouts
@ -362,55 +266,35 @@ def apply_filters(q, filters, author_id=None):
@query.field("get_shout") @query.field("get_shout")
async def get_shout(_, _info, slug="", shout_id=0): async def get_shout(_, _info, slug="", shout_id=0):
""" """
Получение публикации по slug. Получение публикации по slug или id.
:param _: Корневой объект запроса (не используется). :param _: Корневой объект запроса (не используется)
:param info: Информация о контексте GraphQL. :param _info: Информация о контексте GraphQL
:param slug: Уникальный идентификатор шута. :param slug: Уникальный идентификатор публикации
:return: Данные шута с включенной статистикой. :param shout_id: ID публикации
:return: Данные публикации с включенной статистикой
""" """
try: try:
with local_session() as session: # Получаем базовый запрос с подзапросами статистики
# Отключение автосохранения q = query_shouts()
with session.no_autoflush:
q = query_shouts()
if slug: # Применяем фильтр по slug или id
q = q.where(Shout.slug == slug) if slug:
elif shout_id: q = q.where(Shout.slug == slug)
q = q.where(Shout.id == shout_id) elif shout_id:
q = q.where(Shout.id == shout_id)
else:
return None
results = session.execute(q).first() # Получаем результат через get_shouts_with_stats с limit=1
if results: shouts = get_shouts_with_stats(q, limit=1)
[
shout,
commented_stat,
# followers_stat,
rating_stat,
last_reaction_at,
authors_json,
topics_json,
main_topic_slug,
] = results
viewed_stat = ViewedStorage.get_shout(shout.id)
shout_dict = shout.dict()
shout_dict["stat"] = {
"viewed": viewed_stat or 0,
"commented": commented_stat or 0,
"rating": rating_stat or 0,
"last_reacted_at": last_reaction_at or 0,
}
shout_dict["authors"] = authors_json or [] # Возвращаем первую (и единственную) публикацию, если она найдена
shout_dict["topics"] = topics_json or [] return shouts[0] if shouts else None
shout_dict["main_topic"] = main_topic_slug
return shout_dict except Exception as exc:
except Exception as _exc: logger.error(f"Error in get_shout: {exc}", exc_info=True)
import traceback return None
logger.error(traceback.format_exc())
return None
@query.field("load_shouts_by") @query.field("load_shouts_by")
@ -514,7 +398,7 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0):
Загрузка публикаций с наименьшим количеством оценок. Загрузка публикаций с наименьшим количеством оценок.
""" """
rating_reaction = aliased(Reaction, name="rating_reaction") rating_reaction = aliased(Reaction, name="rating_reaction")
# Подзапрос для подсчета количества оценок (лайков и дизлайков) # Подзапрос для подсчета количества оценок (лайков и дизлайков)
ratings_count = ( ratings_count = (
select(func.count(distinct(rating_reaction.id))) select(func.count(distinct(rating_reaction.id)))
@ -524,7 +408,7 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0):
rating_reaction.shout == Shout.id, rating_reaction.shout == Shout.id,
rating_reaction.reply_to.is_(None), rating_reaction.reply_to.is_(None),
rating_reaction.deleted_at.is_(None), rating_reaction.deleted_at.is_(None),
rating_reaction.kind.in_([ReactionKind.LIKE.value, ReactionKind.DISLIKE.value]) rating_reaction.kind.in_([ReactionKind.LIKE.value, ReactionKind.DISLIKE.value]),
) )
) )
.correlate(Shout) .correlate(Shout)
@ -533,18 +417,13 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0):
) )
q = query_shouts() q = query_shouts()
# Добавляем подсчет рейтингов в основной запрос # Добавляем подсчет рейтингов в основной запрос
q = q.add_columns(ratings_count) q = q.add_columns(ratings_count)
# Фильтруем только опубликованные и не удаленные публикации # Фильтруем только опубликованные и не удаленные публикации
q = q.filter( q = q.filter(and_(Shout.deleted_at.is_(None), Shout.published_at.is_not(None)))
and_(
Shout.deleted_at.is_(None),
Shout.published_at.is_not(None)
)
)
# Сортируем по количеству оценок (по возрастанию) и случайно среди равных # Сортируем по количеству оценок (по возрастанию) и случайно среди равных
q = q.order_by(ratings_count.asc(), func.random()) q = q.order_by(ratings_count.asc(), func.random())
@ -618,10 +497,10 @@ async def load_shouts_coauthored(_, info, limit=50, offset=0):
""" """
Загрузка публикаций, написанных в соавторстве с пользователем. Загрузка публикаций, написанных в соавторстве с пользователем.
:param info: Информация о контексте GraphQL. :param info: Информаци<EFBFBD><EFBFBD> о контексте GraphQL.
:param limit: Максимальное количество публикаций. :param limit: Максимальное количество публикаций.
:param offset: Смещение для пагинации. :param offset: Смещение для пагинации.
:return: Список публикаций в соавторстве. :return: Список публикаций в соавто<EFBFBD><EFBFBD>стве.
""" """
author_id = info.context.get("author", {}).get("id") author_id = info.context.get("author", {}).get("id")
if not author_id: if not author_id:
@ -639,7 +518,7 @@ async def load_shouts_discussed(_, info, limit=50, offset=0):
:param info: Информация о контексте GraphQL. :param info: Информация о контексте GraphQL.
:param limit: Максимальное количество публикаций. :param limit: Максимальное количество публикаций.
:param offset: Смещение для пагинации. :param offset: Смещене для пагинации.
:return: Список публикаций, обсужденных пользователем. :return: Список публикаций, обсужденных пользователем.
""" """
author_id = info.context.get("author", {}).get("id") author_id = info.context.get("author", {}).get("id")

View File

@ -27,7 +27,8 @@ VIEWS_FILEPATH = "/dump/views.json"
class ViewedStorage: class ViewedStorage:
lock = asyncio.Lock() lock = asyncio.Lock()
views_by_shout = {} views_by_shout_slug = {}
views_by_shout_id = {}
shouts_by_topic = {} shouts_by_topic = {}
shouts_by_author = {} shouts_by_author = {}
views = None views = None
@ -83,9 +84,16 @@ class ViewedStorage:
with open(viewfile_path, "r") as file: with open(viewfile_path, "r") as file:
precounted_views = json.load(file) precounted_views = json.load(file)
self.views_by_shout.update(precounted_views) self.views_by_shout_slug.update(precounted_views)
logger.info(f" * {len(precounted_views)} shouts with views was loaded.") logger.info(f" * {len(precounted_views)} shouts with views was loaded.")
# get shout_id by slug
with local_session() as session:
for slug, views_count in self.views_by_shout_slug.items():
shout_id = session.query(Shout.id).filter(Shout.slug == slug).scalar()
if isinstance(shout_id, int):
self.views_by_shout_id.update({shout_id: views_count})
except Exception as e: except Exception as e:
logger.error(f"precounted views loading error: {e}") logger.error(f"precounted views loading error: {e}")
@ -137,10 +145,10 @@ class ViewedStorage:
self.disabled = True self.disabled = True
@staticmethod @staticmethod
def get_shout(shout_slug) -> int: def get_shout(shout_slug="", shout_id=0) -> int:
"""Получение метрики просмотров shout по slug.""" """Получение метрики просмотров shout по slug или id."""
self = ViewedStorage self = ViewedStorage
return self.views_by_shout.get(shout_slug, 0) return self.views_by_shout_slug.get(shout_slug, self.views_by_shout_id.get(shout_id, 0))
@staticmethod @staticmethod
def get_shout_media(shout_slug) -> Dict[str, int]: def get_shout_media(shout_slug) -> Dict[str, int]: