From 57d25b637dad43a905cdc0eee36dd6a73cd28fb2 Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 7 Aug 2024 13:15:58 +0300 Subject: [PATCH] sync-viewed-stat --- resolvers/reaction.py | 1 + resolvers/reader.py | 16 +++++++++++----- schema/type.graphql | 3 ++- services/viewed.py | 41 ++++++++++++++++++++--------------------- 4 files changed, 34 insertions(+), 27 deletions(-) diff --git a/resolvers/reaction.py b/resolvers/reaction.py index 9dd79345..5bcd06c9 100644 --- a/resolvers/reaction.py +++ b/resolvers/reaction.py @@ -275,6 +275,7 @@ async def update_reaction(_, info, reaction): session.commit() r.stat = { + # FIXME: "viewed": ViewedStorage.get_shout(r.shuot), sure, it is possible to collect reaction vews "commented": commented_stat, "rating": rating_stat, } diff --git a/resolvers/reader.py b/resolvers/reader.py index a81db9d3..48582c31 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -21,6 +21,7 @@ from services.db import local_session from utils.logger import root_logger as logger from services.schema import query from services.search import search_text +from services.viewed import ViewedStorage def query_shouts(): @@ -94,10 +95,14 @@ def get_shouts_with_stats(q, limit, offset=0, author_id=None): :return: Список публикаций с включенной статистикой. """ # Основной запрос для получения публикаций и объединения их с подзапросами - q = q.options( - selectinload(Shout.authors), # Eagerly load authors - selectinload(Shout.topics) # Eagerly load topics - ).limit(limit).offset(offset) + q = ( + q.options( + selectinload(Shout.authors), # Eagerly load authors + selectinload(Shout.topics), # Eagerly load topics + ) + .limit(limit) + .offset(offset) + ) # Выполнение запроса и обработка результатов with local_session() as session: @@ -109,7 +114,7 @@ def get_shouts_with_stats(q, limit, offset=0, author_id=None): shout.authors = authors or [] shout.topics = topics or [] shout.stat = { - "viewed": 0, # FIXME: use separate resolver + "viewed": ViewedStorage.get_shout(shout.id), "followers": 0, # FIXME: implement followers_stat "rating": rating_stat or 0, "commented": comments_stat or 0, @@ -215,6 +220,7 @@ async def get_shout(_, info, slug: str): [shout, commented_stat, rating_stat, last_reaction_at, authors, topics] = results shout.stat = { + "viewed": ViewedStorage.get_shout(shout.id), "commented": commented_stat, "rating": rating_stat, "last_reacted_at": last_reaction_at, diff --git a/schema/type.graphql b/schema/type.graphql index aac2979a..4a88fd51 100644 --- a/schema/type.graphql +++ b/schema/type.graphql @@ -94,7 +94,8 @@ type Shout { type Stat { rating: Int commented: Int - followers: Int + viewed: Int + followed: Int last_reacted_at: Int } diff --git a/services/viewed.py b/services/viewed.py index 42d81ba3..bbfd284b 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -1,9 +1,9 @@ -import asyncio import json import os import time from datetime import datetime, timedelta, timezone from typing import Dict +from threading import Lock # ga from google.analytics.data_v1beta import BetaAnalyticsDataClient @@ -21,7 +21,7 @@ VIEWS_FILEPATH = "/dump/views.json" class ViewedStorage: - lock = asyncio.Lock() + lock = Lock() views_by_shout = {} shouts_by_topic = {} shouts_by_author = {} @@ -33,10 +33,10 @@ class ViewedStorage: start_date = datetime.now().strftime("%Y-%m-%d") @staticmethod - async def init(): + def init(): """Подключение к клиенту Google Analytics с использованием аутентификации""" self = ViewedStorage - async with self.lock: + with self.lock: # Загрузка предварительно подсчитанных просмотров из файла JSON self.load_precounted_views() @@ -48,7 +48,7 @@ class ViewedStorage: logger.info(" * Клиент Google Analytics успешно авторизован") # Запуск фоновой задачи - _task = asyncio.create_task(self.worker()) + self.worker() else: logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics") self.disabled = True @@ -78,16 +78,15 @@ class ViewedStorage: except Exception as e: logger.error(f"Ошибка загрузки предварительно подсчитанных просмотров: {e}") - # noinspection PyTypeChecker @staticmethod - async def update_pages(): + def update_pages(): """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров""" self = ViewedStorage logger.info(" ⎧ Обновление данных просмотров от Google Analytics ---") if not self.disabled: try: start = time.time() - async with self.lock: + with self.lock: if self.analytics_client: request = RunReportRequest( property=f"properties/{GOOGLE_PROPERTY_ID}", @@ -126,35 +125,35 @@ class ViewedStorage: self.disabled = True @staticmethod - async def get_shout(shout_slug) -> int: + def get_shout(shout_slug) -> int: """Получение метрики просмотров shout по slug""" self = ViewedStorage - async with self.lock: + with self.lock: return self.views_by_shout.get(shout_slug, 0) @staticmethod - async def get_shout_media(shout_slug) -> Dict[str, int]: + def get_shout_media(shout_slug) -> Dict[str, int]: """Получение метрики воспроизведения shout по slug""" self = ViewedStorage - async with self.lock: + with self.lock: return self.views_by_shout.get(shout_slug, 0) @staticmethod - async def get_topic(topic_slug) -> int: + def get_topic(topic_slug) -> int: """Получение суммарного значения просмотров темы""" self = ViewedStorage topic_views = 0 - async with self.lock: + with self.lock: for shout_slug in self.shouts_by_topic.get(topic_slug, []): topic_views += self.views_by_shout.get(shout_slug, 0) return topic_views @staticmethod - async def get_author(author_slug) -> int: + def get_author(author_slug) -> int: """Получение суммарного значения просмотров автора""" self = ViewedStorage author_views = 0 - async with self.lock: + with self.lock: for shout_slug in self.shouts_by_author.get(author_slug, []): author_views += self.views_by_shout.get(shout_slug, 0) return author_views @@ -180,8 +179,8 @@ class ViewedStorage: update_groups(self.shouts_by_author, author.slug, shout_slug) @staticmethod - async def worker(): - """Асинхронная задача обновления""" + def worker(): + """Задача обновления""" failed = 0 self = ViewedStorage if self.disabled: @@ -189,7 +188,7 @@ class ViewedStorage: while True: try: - await self.update_pages() + self.update_pages() failed = 0 except Exception as exc: failed += 1 @@ -204,7 +203,7 @@ class ViewedStorage: logger.info( " ⎩ Следующее обновление: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]) ) - await asyncio.sleep(self.period) + time.sleep(self.period) else: - await asyncio.sleep(10) + time.sleep(10) logger.info(" - Попытка снова обновить данные")