core/services/viewed.py

223 lines
10 KiB
Python
Raw Normal View History

2024-01-25 19:41:27 +00:00
import asyncio
import json
2024-01-23 01:03:15 +00:00
import logging
2024-01-25 19:41:27 +00:00
import os
import time
2023-12-17 20:30:20 +00:00
from datetime import datetime, timedelta, timezone
2024-01-25 19:41:27 +00:00
from typing import Dict
2024-01-23 13:04:38 +00:00
2024-01-23 01:03:15 +00:00
# ga
from apiclient.discovery import build
from google.oauth2.service_account import Credentials
2022-11-21 22:23:16 +00:00
2024-01-23 13:04:38 +00:00
from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import local_session
2024-01-25 19:41:27 +00:00
2024-01-23 13:04:38 +00:00
# Настройка журналирования
2024-01-22 19:21:41 +00:00
logging.basicConfig(level=logging.DEBUG)
2024-01-25 19:41:27 +00:00
logger = logging.getLogger('\t[services.viewed]\t')
2024-01-13 12:44:56 +00:00
logger.setLevel(logging.DEBUG)
2024-01-25 19:41:27 +00:00
GOOGLE_KEYFILE_PATH = os.environ.get('GOOGLE_KEYFILE_PATH', '/dump/google-service.json')
# GOOGLE_ANALYTICS_API = 'https://analyticsreporting.googleapis.com/v4'
GOOGLE_ANALYTICS_SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
2024-01-23 13:04:38 +00:00
2024-01-13 12:44:56 +00:00
2024-01-23 13:04:38 +00:00
# Функция для создания объекта службы Analytics Reporting API V4
2024-01-23 01:03:15 +00:00
def get_service():
2024-01-25 19:41:27 +00:00
credentials = Credentials.from_service_account_file(GOOGLE_KEYFILE_PATH, scopes=GOOGLE_ANALYTICS_SCOPES)
2024-01-23 01:03:15 +00:00
service = build(serviceName='analyticsreporting', version='v4', credentials=credentials)
return service
2022-11-18 17:54:37 +00:00
2024-01-25 19:41:27 +00:00
2022-11-18 17:54:37 +00:00
class ViewedStorage:
lock = asyncio.Lock()
2024-01-22 18:20:17 +00:00
views_by_shout = {}
shouts_by_topic = {}
shouts_by_author = {}
2022-11-21 22:23:16 +00:00
views = None
2024-01-23 13:04:38 +00:00
period = 60 * 60 # каждый час
2024-01-23 01:03:15 +00:00
analytics_client = None
2022-11-21 22:23:16 +00:00
auth_result = None
2022-11-22 07:29:54 +00:00
disabled = False
2024-01-28 11:28:03 +00:00
days_ago = 0
2022-11-18 17:54:37 +00:00
2022-11-20 07:48:40 +00:00
@staticmethod
2022-11-21 22:23:16 +00:00
async def init():
2024-01-23 13:04:38 +00:00
"""Подключение к клиенту Google Analytics с использованием аутентификации"""
2022-11-22 07:29:54 +00:00
self = ViewedStorage
async with self.lock:
2024-01-28 12:40:44 +00:00
if os.path.exists(GOOGLE_KEYFILE_PATH):
2024-01-23 01:03:15 +00:00
self.analytics_client = get_service()
2024-01-25 19:41:27 +00:00
logger.info(f' * Постоянная авторизация в Google Analytics {self.analytics_client}')
2023-12-23 05:40:41 +00:00
2024-01-23 13:04:38 +00:00
# Загрузка предварительно подсчитанных просмотров из файла JSON
2024-01-22 16:17:39 +00:00
self.load_precounted_views()
2024-01-28 11:28:03 +00:00
file_path = '/dump/views.json'
if os.path.exists(file_path):
creation_time = os.path.getctime(file_path)
current_time = datetime.now().timestamp()
time_difference_seconds = current_time - creation_time
self.days_ago = int(time_difference_seconds / (24 * 3600)) # Convert seconds to days
2024-01-28 12:40:44 +00:00
logger.info(f'The file {file_path} was created {self. days_ago} days ago.')
2024-01-28 11:28:03 +00:00
else:
2024-01-28 12:40:44 +00:00
logger.info(f'The file {file_path} does not exist.')
2024-01-23 13:04:38 +00:00
2024-01-28 11:20:22 +00:00
# Запуск фоновой задачи
asyncio.create_task(self.worker())
2022-11-22 07:29:54 +00:00
else:
2024-01-28 12:40:44 +00:00
logger.info(' * Пожалуйста, добавьте ключевой файл Google Analytics')
2022-11-22 07:29:54 +00:00
self.disabled = True
2022-11-20 07:48:40 +00:00
2024-01-22 16:17:39 +00:00
@staticmethod
def load_precounted_views():
2024-01-23 13:04:38 +00:00
"""Загрузка предварительно подсчитанных просмотров из файла JSON"""
2024-01-22 16:17:39 +00:00
self = ViewedStorage
try:
2024-01-25 19:41:27 +00:00
with open('/dump/views.json', 'r') as file:
2024-01-22 16:17:39 +00:00
precounted_views = json.load(file)
2024-01-22 18:20:17 +00:00
self.views_by_shout.update(precounted_views)
2024-01-25 19:41:27 +00:00
logger.info(
2024-01-28 09:03:41 +00:00
f' * {len(precounted_views)} публикаций с просмотрами успешно загружены.'
2024-01-25 19:41:27 +00:00
)
2024-01-22 16:17:39 +00:00
except Exception as e:
2024-01-25 19:41:27 +00:00
logger.error(f'Ошибка загрузки предварительно подсчитанных просмотров: {e}')
2024-01-22 16:17:39 +00:00
2022-11-21 22:23:16 +00:00
@staticmethod
async def update_pages():
2024-01-23 13:04:38 +00:00
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
2022-11-21 22:23:16 +00:00
self = ViewedStorage
2024-01-28 09:03:41 +00:00
logger.info(' ⎧ Обновление данных просмотров от Google Analytics ---')
if not self.disabled:
2024-01-23 13:04:38 +00:00
try:
start = time.time()
2023-12-17 20:30:20 +00:00
async with self.lock:
2024-01-23 13:04:38 +00:00
if self.analytics_client:
2024-01-25 19:41:27 +00:00
data = (
2024-01-28 12:40:44 +00:00
self.analytics_client.data().batchRunReports(
{
'requests': [
2024-01-25 19:41:27 +00:00
{
2024-01-28 11:28:03 +00:00
'dateRanges': [{'startDate': f'{self.days_ago}daysAgo', 'endDate': 'today'}],
2024-01-25 19:41:27 +00:00
'metrics': [{'expression': 'ga:pageviews'}],
'dimensions': [{'name': 'ga:pagePath'}],
}
]
}
)
.execute()
)
2024-01-23 13:04:38 +00:00
if isinstance(data, dict):
2024-01-25 19:41:27 +00:00
slugs = set()
2024-01-23 13:04:38 +00:00
reports = data.get('reports', [])
if reports and isinstance(reports, list):
rows = list(reports[0].get('data', {}).get('rows', []))
for row in rows:
# Извлечение путей страниц из ответа Google Analytics
if isinstance(row, dict):
dimensions = row.get('dimensions', [])
if isinstance(dimensions, list) and dimensions:
page_path = dimensions[0]
2024-01-25 19:41:27 +00:00
slug = page_path.split('discours.io/')[-1]
2024-01-23 13:04:38 +00:00
views_count = int(row['metrics'][0]['values'][0])
# Обновление данных в хранилище
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0)
self.views_by_shout[slug] += views_count
self.update_topics(slug)
# Запись путей страниц для логирования
slugs.add(slug)
2024-01-25 19:41:27 +00:00
logger.info(f' ⎪ Собрано страниц: {len(slugs)} ')
2024-01-23 13:04:38 +00:00
end = time.time()
2024-01-25 19:41:27 +00:00
logger.info(' ⎪ Обновление страниц заняло %fs ' % (end - start))
2024-01-28 09:03:41 +00:00
except Exception as error:
logger.error(error)
2022-11-18 17:54:37 +00:00
2022-11-19 11:35:34 +00:00
@staticmethod
2024-01-22 15:42:45 +00:00
async def get_shout(shout_slug) -> int:
2024-01-23 13:04:38 +00:00
"""Получение метрики просмотров shout по slug"""
2022-11-19 11:35:34 +00:00
self = ViewedStorage
async with self.lock:
2024-01-22 18:20:17 +00:00
return self.views_by_shout.get(shout_slug, 0)
2023-11-03 10:10:22 +00:00
@staticmethod
2024-01-22 15:42:45 +00:00
async def get_shout_media(shout_slug) -> Dict[str, int]:
2024-01-23 13:04:38 +00:00
"""Получение метрики воспроизведения shout по slug"""
2023-11-03 10:10:22 +00:00
self = ViewedStorage
async with self.lock:
2024-01-22 18:20:17 +00:00
return self.views_by_shout.get(shout_slug, 0)
2022-11-19 11:35:34 +00:00
2022-11-21 05:18:50 +00:00
@staticmethod
2024-01-22 15:42:45 +00:00
async def get_topic(topic_slug) -> int:
2024-01-23 13:04:38 +00:00
"""Получение суммарного значения просмотров темы"""
2022-11-21 05:18:50 +00:00
self = ViewedStorage
topic_views = 0
async with self.lock:
2024-01-22 18:20:17 +00:00
for shout_slug in self.shouts_by_topic.get(topic_slug, []):
topic_views += self.views_by_shout.get(shout_slug, 0)
2022-11-21 05:18:50 +00:00
return topic_views
2024-01-22 15:42:45 +00:00
@staticmethod
2024-01-22 18:20:17 +00:00
async def get_author(author_slug) -> int:
2024-01-23 13:04:38 +00:00
"""Получение суммарного значения просмотров автора"""
2024-01-22 15:42:45 +00:00
self = ViewedStorage
author_views = 0
async with self.lock:
2024-01-22 18:20:17 +00:00
for shout_slug in self.shouts_by_author.get(author_slug, []):
author_views += self.views_by_shout.get(shout_slug, 0)
2024-01-22 15:42:45 +00:00
return author_views
2022-11-22 13:58:55 +00:00
@staticmethod
2023-11-22 18:23:15 +00:00
def update_topics(shout_slug):
2024-01-23 13:04:38 +00:00
"""Обновление счетчиков темы по slug shout"""
2022-11-22 13:58:55 +00:00
self = ViewedStorage
2023-11-22 18:23:15 +00:00
with local_session() as session:
2024-01-23 13:04:38 +00:00
# Определение вспомогательной функции для избежания повторения кода
2024-01-22 18:20:17 +00:00
def update_groups(dictionary, key, value):
dictionary[key] = list(set(dictionary.get(key, []) + [value]))
2024-01-23 13:04:38 +00:00
# Обновление тем и авторов с использованием вспомогательной функции
2024-01-25 19:41:27 +00:00
for [_shout_topic, topic] in (
session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all()
):
2024-01-22 18:20:17 +00:00
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
2024-01-25 19:41:27 +00:00
for [_shout_topic, author] in (
session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all()
):
2024-01-22 18:20:17 +00:00
update_groups(self.shouts_by_author, author.slug, shout_slug)
2024-01-22 15:42:45 +00:00
2023-11-03 10:10:22 +00:00
@staticmethod
2022-11-18 17:54:37 +00:00
async def worker():
2024-01-23 13:04:38 +00:00
"""Асинхронная задача обновления"""
2022-11-21 22:23:16 +00:00
failed = 0
2022-11-22 07:29:54 +00:00
self = ViewedStorage
if self.disabled:
return
2023-10-05 22:45:32 +00:00
2023-01-18 12:43:56 +00:00
while True:
try:
await self.update_pages()
failed = 0
except Exception:
failed += 1
2024-01-25 19:41:27 +00:00
logger.info(' - Обновление не удалось #%d, ожидание 10 секунд' % failed)
2023-01-18 12:43:56 +00:00
if failed > 3:
2024-01-25 19:41:27 +00:00
logger.info(' - Больше не пытаемся обновить')
2023-01-18 12:43:56 +00:00
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
2024-01-25 19:41:27 +00:00
logger.info(' ⎩ Следующее обновление: %s' % (t.split('T')[0] + ' ' + t.split('T')[1].split('.')[0]))
2023-01-18 12:43:56 +00:00
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
2024-01-25 19:41:27 +00:00
logger.info(' - Попытка снова обновить данные')