0.2.21-ga

This commit is contained in:
2024-01-23 16:04:38 +03:00
parent 954e6dabb7
commit 3f65652a5f
9 changed files with 249 additions and 209 deletions

View File

@@ -23,7 +23,6 @@ class SearchService:
try:
# TODO: add ttl for redis cached search results
cached = await redis.execute("GET", text)
if not cached:
async with SearchService.lock:
# Use aiohttp to send a request to ElasticSearch
@@ -35,7 +34,7 @@ class SearchService:
await redis.execute("SET", text, json.dumps(payload)) # use redis as cache
else:
logging.error(f"[services.search] response: {response.status} {await response.text()}")
else:
elif isinstance(cached, str):
payload = json.loads(cached)
except Exception as e:
logging.error(f"[services.search] Error during search: {e}")

View File

@@ -1,138 +1,156 @@
import asyncio
import os
from typing import Dict
from typing import Dict, List
import logging
import time
import json
import asyncio
from datetime import datetime, timedelta, timezone
from os import environ
# ga
from apiclient.discovery import build
from google.oauth2.service_account import Credentials
import pandas as pd
from orm.author import Author
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from services.db import local_session
# Настройка журналирования
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("\t[services.viewed]\t")
logger.setLevel(logging.DEBUG)
# Пути к ключевым файлам и идентификатор представления в Google Analytics
GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", '/dump/google-service.json')
GOOGLE_GA_VIEW_ID = os.environ.get("GOOGLE_GA_VIEW_ID", "")
gaBaseUrl = "https://analyticsreporting.googleapis.com/v4"
# Build Analytics Reporting API V4 service object.
# Функция для создания объекта службы Analytics Reporting API V4
def get_service():
SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
credentials = Credentials.from_service_account_file(
GOOGLE_KEYFILE_PATH, scopes=SCOPES
)
credentials = Credentials.from_service_account_file(GOOGLE_KEYFILE_PATH, scopes=SCOPES)
service = build(serviceName='analyticsreporting', version='v4', credentials=credentials)
return service
class ViewedStorage:
lock = asyncio.Lock()
views_by_shout = {}
shouts_by_topic = {}
shouts_by_author = {}
views = None
pages = None
facts = None
period = 60 * 60 # every hour
period = 60 * 60 # каждый час
analytics_client = None
auth_result = None
disabled = False
date_range = ""
@staticmethod
async def init():
"""Google Analytics client connection using authentication"""
"""Подключение к клиенту Google Analytics с использованием аутентификации"""
self = ViewedStorage
async with self.lock:
if os.path.exists(GOOGLE_KEYFILE_PATH):
self.analytics_client = get_service()
logger.info(" * authorized permanently by Google Analytics")
logger.info(f" * Постоянная авторизация в Google Analytics {self.analytics_client}")
# Load pre-counted views from the JSON file
# Загрузка предварительно подсчитанных просмотров из файла JSON
self.load_precounted_views()
# Установка диапазона дат на основе времени создания файла views.json
views_json_path = "/dump/views.json"
creation_time = datetime.fromtimestamp(os.path.getctime(views_json_path))
end_date = datetime.now(timezone.utc).strftime('%Y-%m-%d')
start_date = creation_time.strftime('%Y-%m-%d')
self.date_range = f'{start_date},{end_date}'
views_stat_task = asyncio.create_task(self.worker())
logger.info(views_stat_task)
else:
logger.info(" * please add Google Analytics keyfile")
logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics")
self.disabled = True
@staticmethod
def load_precounted_views():
"""Загрузка предварительно подсчитанных просмотров из файла JSON"""
self = ViewedStorage
try:
with open("/dump/views.json", "r") as file:
precounted_views = json.load(file)
self.views_by_shout.update(precounted_views)
logger.info(f" * {len(precounted_views)} pre-counted shouts' views loaded successfully.")
logger.info(f" * {len(precounted_views)} предварительно подсчитанных просмотров shouts успешно загружены.")
except Exception as e:
logger.error(f"Error loading pre-counted views: {e}")
logger.error(f"Ошибка загрузки предварительно подсчитанных просмотров: {e}")
@staticmethod
async def update_pages():
"""query all the pages from ackee sorted by views count"""
logger.info(" ⎧ updating ackee pages data ---")
try:
start = time.time()
self = ViewedStorage
async with self.lock:
if self.client:
# Use asyncio.run to execute asynchronous code in the main entry point
self.pages = await asyncio.to_thread(self.client.execute, load_pages)
domains = self.pages.get("domains", [])
# logger.debug(f" | domains: {domains}")
for domain in domains:
pages = domain.get("statistics", {}).get("pages", [])
if pages:
# logger.debug(f" | pages: {pages}")
shouts = {}
for page in pages:
p = page["value"].split("?")[0]
slug = p.split("discours.io/")[-1]
shouts[slug] = page["count"]
for slug in shouts.keys():
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0) + 1
self.update_topics(slug)
logger.info("%d pages collected " % len(shouts.keys()))
end = time.time()
logger.info(" ⎪ update_pages took %fs " % (end - start))
except Exception:
import traceback
traceback.print_exc()
@staticmethod
async def get_facts():
"""Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров"""
self = ViewedStorage
self.facts = []
try:
if self.client:
if not self.disabled and GOOGLE_GA_VIEW_ID:
logger.info(" ⎧ Обновление данных просмотров от Google Analytics ---")
try:
start = time.time()
async with self.lock:
self.facts = await asyncio.to_thread(self.client.execute, load_pages)
except Exception as er:
logger.error(f" - get_facts error: {er}")
return self.facts or []
if self.analytics_client:
data = self.analytics_client.reports().batchGet(body={
'reportRequests': [{
'viewId': GOOGLE_GA_VIEW_ID,
'dateRanges': self.date_range,
'metrics': [{'expression': 'ga:pageviews'}],
'dimensions': [{'name': 'ga:pagePath'}],
}]
}).execute()
if isinstance(data, dict):
slugs = set([])
reports = data.get('reports', [])
if reports and isinstance(reports, list):
rows = list(reports[0].get('data', {}).get('rows', []))
for row in rows:
# Извлечение путей страниц из ответа Google Analytics
if isinstance(row, dict):
dimensions = row.get('dimensions', [])
if isinstance(dimensions, list) and dimensions:
page_path = dimensions[0]
slug = page_path.split("discours.io/")[-1]
views_count = int(row['metrics'][0]['values'][0])
# Обновление данных в хранилище
self.views_by_shout[slug] = self.views_by_shout.get(slug, 0)
self.views_by_shout[slug] += views_count
self.update_topics(slug)
# Запись путей страниц для логирования
slugs.add(slug)
logger.info(f" ⎪ Собрано страниц: {len(slugs)} ")
end = time.time()
logger.info(" ⎪ Обновление страниц заняло %fs " % (end - start))
except Exception:
import traceback
traceback.print_exc()
@staticmethod
async def get_shout(shout_slug) -> int:
"""getting shout views metric by slug"""
"""Получение метрики просмотров shout по slug"""
self = ViewedStorage
async with self.lock:
return self.views_by_shout.get(shout_slug, 0)
@staticmethod
async def get_shout_media(shout_slug) -> Dict[str, int]:
"""getting shout plays metric by slug"""
"""Получение метрики воспроизведения shout по slug"""
self = ViewedStorage
async with self.lock:
return self.views_by_shout.get(shout_slug, 0)
@staticmethod
async def get_topic(topic_slug) -> int:
"""getting topic views value summed"""
"""Получение суммарного значения просмотров темы"""
self = ViewedStorage
topic_views = 0
async with self.lock:
@@ -142,7 +160,7 @@ class ViewedStorage:
@staticmethod
async def get_author(author_slug) -> int:
"""getting author views value summed"""
"""Получение суммарного значения просмотров автора"""
self = ViewedStorage
author_views = 0
async with self.lock:
@@ -152,38 +170,23 @@ class ViewedStorage:
@staticmethod
def update_topics(shout_slug):
"""Updates topics counters by shout slug"""
"""Обновление счетчиков темы по slug shout"""
self = ViewedStorage
with local_session() as session:
# Define a helper function to avoid code repetition
# Определение вспомогательной функции для избежания повторения кода
def update_groups(dictionary, key, value):
dictionary[key] = list(set(dictionary.get(key, []) + [value]))
# Update topics and authors using the helper function
# Обновление тем и авторов с использованием вспомогательной функции
for [_shout_topic, topic] in session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all():
update_groups(self.shouts_by_topic, topic.slug, shout_slug)
for [_shout_topic, author] in session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all():
update_groups(self.shouts_by_author, author.slug, shout_slug)
@staticmethod
async def increment(shout_slug):
"""the proper way to change counter"""
resource = ackee_site + shout_slug
self = ViewedStorage
async with self.lock:
self.views_by_shout[shout_slug] = self.views_by_shout.get(shout_slug, 0) + 1
self.update_topics(shout_slug)
variables = {"domainId": domain_id, "input": {"siteLocation": resource}}
if self.client:
try:
await asyncio.to_thread(self.client.execute, create_record_mutation, variables)
except Exception as e:
logger.error(f"Error during threaded execution: {e}")
@staticmethod
async def worker():
"""async task worker"""
"""Асинхронная задача обновления"""
failed = 0
self = ViewedStorage
if self.disabled:
@@ -191,20 +194,20 @@ class ViewedStorage:
while True:
try:
logger.info(" - updating records...")
logger.info(" - Обновление записей...")
await self.update_pages()
failed = 0
except Exception:
failed += 1
logger.info(" - update failed #%d, wait 10 seconds" % failed)
logger.info(" - Обновление не удалось #%d, ожидание 10 секунд" % failed)
if failed > 3:
logger.info(" - not trying to update anymore")
logger.info(" - Больше не пытаемся обновить")
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
logger.info("next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]))
logger.info("Следующее обновление: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]))
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
logger.info(" - trying to update data again")
logger.info(" - Попытка снова обновить данные")