This commit is contained in:
@@ -2,9 +2,7 @@ import asyncio
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Dict
|
||||
|
||||
import orjson
|
||||
from typing import Dict, Optional
|
||||
|
||||
# ga
|
||||
from google.analytics.data_v1beta import BetaAnalyticsDataClient
|
||||
@@ -20,33 +18,38 @@ from orm.author import Author
|
||||
from orm.shout import Shout, ShoutAuthor, ShoutTopic
|
||||
from orm.topic import Topic
|
||||
from services.db import local_session
|
||||
from services.redis import redis
|
||||
from utils.logger import root_logger as logger
|
||||
|
||||
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", "/dump/google-service.json")
|
||||
GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID", "")
|
||||
VIEWS_FILEPATH = "/dump/views.json"
|
||||
|
||||
|
||||
class ViewedStorage:
|
||||
"""
|
||||
Класс для хранения и доступа к данным о просмотрах.
|
||||
Использует Redis в качестве основного хранилища и Google Analytics для сбора новых данных.
|
||||
"""
|
||||
lock = asyncio.Lock()
|
||||
precounted_by_slug = {}
|
||||
views_by_shout = {}
|
||||
shouts_by_topic = {}
|
||||
shouts_by_author = {}
|
||||
views = None
|
||||
period = 60 * 60 # каждый час
|
||||
analytics_client: BetaAnalyticsDataClient | None = None
|
||||
analytics_client: Optional[BetaAnalyticsDataClient] = None
|
||||
auth_result = None
|
||||
running = False
|
||||
redis_views_key = None
|
||||
last_update_timestamp = 0
|
||||
start_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
@staticmethod
|
||||
async def init():
|
||||
"""Подключение к клиенту Google Analytics с использованием аутентификации"""
|
||||
"""Подключение к клиенту Google Analytics и загрузка данных о просмотрах из Redis"""
|
||||
self = ViewedStorage
|
||||
async with self.lock:
|
||||
# Загрузка предварительно подсчитанных просмотров из файла JSON
|
||||
self.load_precounted_views()
|
||||
# Загрузка предварительно подсчитанных просмотров из Redis
|
||||
await self.load_views_from_redis()
|
||||
|
||||
os.environ.setdefault("GOOGLE_APPLICATION_CREDENTIALS", GOOGLE_KEYFILE_PATH)
|
||||
if GOOGLE_KEYFILE_PATH and os.path.isfile(GOOGLE_KEYFILE_PATH):
|
||||
@@ -62,40 +65,54 @@ class ViewedStorage:
|
||||
self.running = False
|
||||
|
||||
@staticmethod
|
||||
def load_precounted_views():
|
||||
"""Загрузка предварительно подсчитанных просмотров из файла JSON"""
|
||||
async def load_views_from_redis():
|
||||
"""Загрузка предварительно подсчитанных просмотров из Redis"""
|
||||
self = ViewedStorage
|
||||
viewfile_path = VIEWS_FILEPATH
|
||||
if not os.path.exists(viewfile_path):
|
||||
viewfile_path = os.path.join(os.path.curdir, "views.json")
|
||||
if not os.path.exists(viewfile_path):
|
||||
logger.warning(" * views.json not found")
|
||||
return
|
||||
|
||||
logger.info(f" * loading views from {viewfile_path}")
|
||||
try:
|
||||
start_date_int = os.path.getmtime(viewfile_path)
|
||||
start_date_str = datetime.fromtimestamp(start_date_int).strftime("%Y-%m-%d")
|
||||
self.start_date = start_date_str
|
||||
|
||||
# Подключаемся к Redis если соединение не установлено
|
||||
if not redis._client:
|
||||
await redis.connect()
|
||||
|
||||
# Получаем список всех ключей migrated_views_* и находим самый последний
|
||||
keys = await redis.execute("KEYS", "migrated_views_*")
|
||||
if not keys:
|
||||
logger.warning(" * No migrated_views keys found in Redis")
|
||||
return
|
||||
|
||||
# Фильтруем только ключи timestamp формата (исключаем migrated_views_slugs)
|
||||
timestamp_keys = [k for k in keys if k != "migrated_views_slugs"]
|
||||
if not timestamp_keys:
|
||||
logger.warning(" * No migrated_views timestamp keys found in Redis")
|
||||
return
|
||||
|
||||
# Сортируем по времени создания (в названии ключа) и берем последний
|
||||
timestamp_keys.sort()
|
||||
latest_key = timestamp_keys[-1]
|
||||
self.redis_views_key = latest_key
|
||||
|
||||
# Получаем метку времени создания для установки start_date
|
||||
timestamp = await redis.execute("HGET", latest_key, "_timestamp")
|
||||
if timestamp:
|
||||
self.last_update_timestamp = int(timestamp)
|
||||
timestamp_dt = datetime.fromtimestamp(int(timestamp))
|
||||
self.start_date = timestamp_dt.strftime("%Y-%m-%d")
|
||||
|
||||
# Если данные сегодняшние, считаем их актуальными
|
||||
now_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
if now_date == self.start_date:
|
||||
logger.info(" * views data is up to date!")
|
||||
logger.info(" * Views data is up to date!")
|
||||
else:
|
||||
logger.warn(f" * {viewfile_path} is too old: {self.start_date}")
|
||||
|
||||
with open(viewfile_path, "r") as file:
|
||||
precounted_views = orjson.loads(file.read())
|
||||
self.precounted_by_slug.update(precounted_views)
|
||||
logger.info(f" * {len(precounted_views)} shouts with views was loaded.")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"precounted views loading error: {e}")
|
||||
logger.warning(f" * Views data is from {self.start_date}, may need update")
|
||||
|
||||
# Выводим информацию о количестве загруженных записей
|
||||
total_entries = await redis.execute("HGET", latest_key, "_total")
|
||||
if total_entries:
|
||||
logger.info(f" * {total_entries} shouts with views loaded from Redis key: {latest_key}")
|
||||
|
||||
# noinspection PyTypeChecker
|
||||
@staticmethod
|
||||
async def update_pages():
|
||||
"""Запрос всех страниц от Google Analytics, отсортрованных по количеству просмотров"""
|
||||
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
|
||||
self = ViewedStorage
|
||||
logger.info(" ⎧ views update from Google Analytics ---")
|
||||
if self.running:
|
||||
@@ -140,15 +157,40 @@ class ViewedStorage:
|
||||
self.running = False
|
||||
|
||||
@staticmethod
|
||||
def get_shout(shout_slug="", shout_id=0) -> int:
|
||||
"""Получение метрики просмотров shout по slug или id."""
|
||||
async def get_shout(shout_slug="", shout_id=0) -> int:
|
||||
"""
|
||||
Получение метрики просмотров shout по slug или id.
|
||||
|
||||
Args:
|
||||
shout_slug: Slug публикации
|
||||
shout_id: ID публикации
|
||||
|
||||
Returns:
|
||||
int: Количество просмотров
|
||||
"""
|
||||
self = ViewedStorage
|
||||
|
||||
# Получаем данные из Redis для новой схемы хранения
|
||||
if not redis._client:
|
||||
await redis.connect()
|
||||
|
||||
fresh_views = self.views_by_shout.get(shout_slug, 0)
|
||||
precounted_views = self.precounted_by_slug.get(shout_slug, 0)
|
||||
return fresh_views + precounted_views
|
||||
|
||||
# Если есть id, пытаемся получить данные из Redis по ключу migrated_views_<timestamp>
|
||||
if shout_id and self.redis_views_key:
|
||||
precounted_views = await redis.execute("HGET", self.redis_views_key, str(shout_id))
|
||||
if precounted_views:
|
||||
return fresh_views + int(precounted_views)
|
||||
|
||||
# Если нет id или данных, пытаемся получить по slug из отдельного хеша
|
||||
precounted_views = await redis.execute("HGET", "migrated_views_slugs", shout_slug)
|
||||
if precounted_views:
|
||||
return fresh_views + int(precounted_views)
|
||||
|
||||
return fresh_views
|
||||
|
||||
@staticmethod
|
||||
def get_shout_media(shout_slug) -> Dict[str, int]:
|
||||
async def get_shout_media(shout_slug) -> Dict[str, int]:
|
||||
"""Получение метрики воспроизведения shout по slug."""
|
||||
self = ViewedStorage
|
||||
|
||||
@@ -157,23 +199,29 @@ class ViewedStorage:
|
||||
return self.views_by_shout.get(shout_slug, 0)
|
||||
|
||||
@staticmethod
|
||||
def get_topic(topic_slug) -> int:
|
||||
async def get_topic(topic_slug) -> int:
|
||||
"""Получение суммарного значения просмотров темы."""
|
||||
self = ViewedStorage
|
||||
return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_topic.get(topic_slug, []))
|
||||
views_count = 0
|
||||
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
|
||||
views_count += await self.get_shout(shout_slug=shout_slug)
|
||||
return views_count
|
||||
|
||||
@staticmethod
|
||||
def get_author(author_slug) -> int:
|
||||
async def get_author(author_slug) -> int:
|
||||
"""Получение суммарного значения просмотров автора."""
|
||||
self = ViewedStorage
|
||||
return sum(self.views_by_shout.get(shout_slug, 0) for shout_slug in self.shouts_by_author.get(author_slug, []))
|
||||
views_count = 0
|
||||
for shout_slug in self.shouts_by_author.get(author_slug, []):
|
||||
views_count += await self.get_shout(shout_slug=shout_slug)
|
||||
return views_count
|
||||
|
||||
@staticmethod
|
||||
def update_topics(shout_slug):
|
||||
"""Обновление счетчиков темы по slug shout"""
|
||||
self = ViewedStorage
|
||||
with local_session() as session:
|
||||
# Определение вспомогательной функции для избежа<EFBFBD><EFBFBD>ия повторения кода
|
||||
# Определение вспомогательной функции для избежания повторения кода
|
||||
def update_groups(dictionary, key, value):
|
||||
dictionary[key] = list(set(dictionary.get(key, []) + [value]))
|
||||
|
||||
@@ -268,4 +316,4 @@ class ViewedStorage:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Google Analytics API Error: {e}")
|
||||
return 0
|
||||
return 0
|
Reference in New Issue
Block a user