From eba97e967bd291e8aaab36a91c31dce0884163d2 Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 7 Aug 2024 13:30:41 +0300 Subject: [PATCH] thread-lock-fix2 --- services/viewed.py | 120 +++++++++++++++++---------------------------- 1 file changed, 46 insertions(+), 74 deletions(-) diff --git a/services/viewed.py b/services/viewed.py index 772ec739..4eddff1a 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -4,9 +4,7 @@ import os import time from datetime import datetime, timedelta, timezone from typing import Dict -from threading import Lock -# ga from google.analytics.data_v1beta import BetaAnalyticsDataClient from google.analytics.data_v1beta.types import DateRange, Dimension, Metric, RunReportRequest @@ -22,41 +20,32 @@ VIEWS_FILEPATH = "/dump/views.json" class ViewedStorage: - lock = Lock() views_by_shout = {} shouts_by_topic = {} shouts_by_author = {} - views = None period = 60 * 60 # каждый час analytics_client: BetaAnalyticsDataClient | None = None - auth_result = None disabled = False start_date = datetime.now().strftime("%Y-%m-%d") @staticmethod async def init(): - """Подключение к клиенту Google Analytics с использованием аутентификации""" + """Подключение к клиенту Google Analytics и инициализация данных.""" self = ViewedStorage - async with self.lock: - # Загрузка предварительно подсчитанных просмотров из файла JSON - self.load_precounted_views() + self.load_precounted_views() - os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH) - if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH): - # Using a default constructor instructs the client to use the credentials - # specified in GOOGLE_APPLICATION_CREDENTIALS environment variable. - self.analytics_client = BetaAnalyticsDataClient() - logger.info(" * Клиент Google Analytics успешно авторизован") - - # Запуск фоновой задачи - _task = asyncio.create_task(self.worker()) - else: - logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics") - self.disabled = True + os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH) + if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH): + self.analytics_client = BetaAnalyticsDataClient() + logger.info(" * Клиент Google Analytics успешно авторизован") + asyncio.create_task(self.worker()) # Запуск фоновой задачи + else: + logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics") + self.disabled = True @staticmethod def load_precounted_views(): - """Загрузка предварительно подсчитанных просмотров из файла JSON""" + """Загрузка предварительно подсчитанных просмотров из файла JSON.""" self = ViewedStorage try: if os.path.exists(VIEWS_FILEPATH): @@ -79,90 +68,73 @@ class ViewedStorage: except Exception as e: logger.error(f"Ошибка загрузки предварительно подсчитанных просмотров: {e}") - # noinspection PyTypeChecker @staticmethod async def update_pages(): - """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров""" + """Обновление данных просмотров из Google Analytics.""" self = ViewedStorage logger.info(" ⎧ Обновление данных просмотров от Google Analytics ---") if not self.disabled: try: start = time.time() - async with self.lock: - if self.analytics_client: - request = RunReportRequest( - property=f"properties/{GOOGLE_PROPERTY_ID}", - dimensions=[Dimension(name="pagePath")], - metrics=[Metric(name="screenPageViews")], - date_ranges=[DateRange(start_date=self.start_date, end_date="today")], - ) - response = self.analytics_client.run_report(request) - if response and isinstance(response.rows, list): - slugs = set() - for row in response.rows: - print( - row.dimension_values[0].value, - row.metric_values[0].value, - ) - # Извлечение путей страниц из ответа Google Analytics - if isinstance(row.dimension_values, list): - page_path = row.dimension_values[0].value - slug = page_path.split("discours.io/")[-1] - views_count = int(row.metric_values[0].value) + if self.analytics_client: + request = RunReportRequest( + property=f"properties/{GOOGLE_PROPERTY_ID}", + dimensions=[Dimension(name="pagePath")], + metrics=[Metric(name="screenPageViews")], + date_ranges=[DateRange(start_date=self.start_date, end_date="today")], + ) + response = self.analytics_client.run_report(request) + if response and isinstance(response.rows, list): + slugs = set() + new_views_by_shout = {} # временное хранилище - # Обновление данных в хранилище - self.views_by_shout[slug] = self.views_by_shout.get(slug, 0) - self.views_by_shout[slug] += views_count - self.update_topics(slug) + for row in response.rows: + page_path = row.dimension_values[0].value + slug = page_path.split("discours.io/")[-1] + views_count = int(row.metric_values[0].value) - # Запись путей страниц для логирования - slugs.add(slug) + # Запись путей страниц для логирования + slugs.add(slug) - logger.info(f" ⎪ Собрано страниц: {len(slugs)} ") + # Обновление данных в временном хранилище + new_views_by_shout[slug] = new_views_by_shout.get(slug, 0) + views_count - end = time.time() - logger.info(" ⎪ Обновление страниц заняло %fs " % (end - start)) + self.views_by_shout = new_views_by_shout # атомарная замена + logger.info(f" ⎪ Собрано страниц: {len(slugs)} ") + + end = time.time() + logger.info(" ⎪ Обновление страниц заняло %fs " % (end - start)) except Exception as error: logger.error(error) self.disabled = True @staticmethod def get_shout(shout_slug) -> int: - """Получение метрики просмотров shout по slug""" + """Получение метрики просмотров shout по slug.""" self = ViewedStorage - with self.lock: - return self.views_by_shout.get(shout_slug, 0) + return self.views_by_shout.get(shout_slug, 0) @staticmethod def get_shout_media(shout_slug) -> Dict[str, int]: - """Получение метрики воспроизведения shout по slug""" + """Получение метрики воспроизведения shout по slug.""" self = ViewedStorage - with self.lock: - return self.views_by_shout.get(shout_slug, 0) + return self.views_by_shout.get(shout_slug, 0) @staticmethod def get_topic(topic_slug) -> int: - """Получение суммарного значения просмотров темы""" + """Получение суммарного значения просмотров темы.""" self = ViewedStorage - topic_views = 0 - with self.lock: - for shout_slug in self.shouts_by_topic.get(topic_slug, []): - topic_views += self.views_by_shout.get(shout_slug, 0) - return topic_views + return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_topic.get(topic_slug, [])) @staticmethod def get_author(author_slug) -> int: - """Получение суммарного значения просмотров автора""" + """Получение суммарного значения просмотров автора.""" self = ViewedStorage - author_views = 0 - with self.lock: - for shout_slug in self.shouts_by_author.get(author_slug, []): - author_views += self.views_by_shout.get(shout_slug, 0) - return author_views + return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_author.get(author_slug, [])) @staticmethod def update_topics(shout_slug): - """Обновление счетчиков темы по slug shout""" + """Обновление счетчиков темы по slug shout.""" self = ViewedStorage with local_session() as session: # Определение вспомогательной функции для избежания повторения кода @@ -182,7 +154,7 @@ class ViewedStorage: @staticmethod async def worker(): - """Асинхронная задача обновления""" + """Асинхронная задача обновления.""" failed = 0 self = ViewedStorage if self.disabled: