diff --git a/CHANGELOG.txt b/CHANGELOG.txt index d3f05070..e2f8780f 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,6 +1,14 @@ +[0.2.21] +- fix: rating logix +- fix: load_top_random_shouts +- resolvers: add_stat_* refactored +- services: use google analytics +- services: minor fixes search + [0.2.20] - services: ackee removed - services: following manager fixed +- services: import views.json [0.2.19] - fix: adding 'author' role diff --git a/pyproject.toml b/pyproject.toml index 0cdab90e..b4db5fff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "discoursio-core" -version = "0.2.20" +version = "0.2.21" description = "core module for discours.io" authors = ["discoursio devteam"] license = "MIT" diff --git a/resolvers/author.py b/resolvers/author.py index 9b64af41..d2c84c51 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -64,7 +64,7 @@ async def author_followings(author_id: int): "unread": await get_total_unread_counter(author_id), "topics": [t.slug for t in await followed_topics(author_id)], "authors": [a.slug for a in await followed_authors(author_id)], - "reactions": [s.slug for s in followed_reactions(author_id)], + "reactions": [s.slug for s in await followed_reactions(author_id)], "communities": [c.slug for c in [followed_communities(author_id)] if isinstance(c, Community)], } diff --git a/resolvers/collab.py b/resolvers/collab.py index add690fc..1b5d5f56 100644 --- a/resolvers/collab.py +++ b/resolvers/collab.py @@ -17,7 +17,7 @@ async def accept_invite(_, info, invite_id: int): if author: # Check if the invite exists invite = session.query(Invite).filter(Invite.id == invite_id).first() - if invite and invite.author_id == author.id and invite.status == InviteStatus.PENDING.value: + if invite and invite.author_id is author.id and invite.status is InviteStatus.PENDING.value: # Add the user to the shout authors shout = session.query(Shout).filter(Shout.id == invite.shout_id).first() if shout: @@ -46,7 +46,7 @@ async def reject_invite(_, info, invite_id: int): if author: # Check if the invite exists invite = session.query(Invite).filter(Invite.id == invite_id).first() - if invite and invite.author_id == author.id and invite.status == InviteStatus.PENDING.value: + if invite and invite.author_id is author.id and invite.status is InviteStatus.PENDING.value: # Delete the invite session.delete(invite) session.commit() @@ -59,14 +59,14 @@ async def reject_invite(_, info, invite_id: int): @mutation.field("create_invite") @login_required -async def create_invite(_, info, slug: str = "", author_id: int = None): +async def create_invite(_, info, slug: str = "", author_id: int = 0): user_id = info.context["user_id"] # Check if the inviter is the owner of the shout with local_session() as session: shout = session.query(Shout).filter(Shout.slug == slug).first() inviter = session.query(Author).filter(Author.user == user_id).first() - if inviter and shout and shout.authors and inviter.id == shout.created_by: + if inviter and shout and shout.authors and inviter.id is shout.created_by: # Check if the author is a valid author author = session.query(Author).filter(Author.id == author_id).first() if author: @@ -100,14 +100,14 @@ async def create_invite(_, info, slug: str = "", author_id: int = None): @mutation.field("remove_author") @login_required -async def remove_author(_, info, slug: str = "", author_id: int = None): +async def remove_author(_, info, slug: str = "", author_id: int = 0): user_id = info.context["user_id"] with local_session() as session: author = session.query(Author).filter(Author.user == user_id).first() if author: shout = session.query(Shout).filter(Shout.slug == slug).first() # NOTE: owner should be first in a list - if shout and author.id == shout.created_by: + if shout and author.id is shout.created_by: shout.authors = [author for author in shout.authors if author.id != author_id] session.commit() return {} @@ -125,14 +125,15 @@ async def remove_invite(_, info, invite_id: int): if author: # Check if the invite exists invite = session.query(Invite).filter(Invite.id == invite_id).first() - shout = session.query(Shout).filter(Shout.id == invite.shout_id).first() - if shout and shout.deleted_at is None and invite: - if invite.inviter_id == author.id or author.id == shout.created_by: - if invite.status == InviteStatus.PENDING.value: - # Delete the invite - session.delete(invite) - session.commit() - return {} + if isinstance(invite, Invite): + shout = session.query(Shout).filter(Shout.id == invite.shout_id).first() + if shout and shout.deleted_at is None and invite: + if invite.inviter_id is author.id or author.id is shout.created_by: + if invite.status is InviteStatus.PENDING.value: + # Delete the invite + session.delete(invite) + session.commit() + return {} else: return {"error": "Invalid invite or already accepted/rejected"} else: diff --git a/resolvers/editor.py b/resolvers/editor.py index 90f8923b..8785d82d 100644 --- a/resolvers/editor.py +++ b/resolvers/editor.py @@ -104,7 +104,7 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False): ) if not shout: return {"error": "shout not found"} - if shout.created_by != author.id and author.id not in shout.authors: + if shout.created_by is not author.id and author.id not in shout.authors: return {"error": "access denied"} if shout_input is not None: topics_input = shout_input["topics"] @@ -157,16 +157,18 @@ async def update_shout(_, info, shout_id, shout_input=None, publish=False): .first() ) main_topic = session.query(Topic).filter(Topic.slug == shout_input["main_topic"]).first() - new_main_topic = ( - session.query(ShoutTopic) - .filter(and_(ShoutTopic.shout == shout.id, ShoutTopic.topic == main_topic.id)) - .first() - ) - if old_main_topic is not new_main_topic: - old_main_topic.main = False - new_main_topic.main = True - session.add(old_main_topic) - session.add(new_main_topic) + if isinstance(main_topic, Topic): + new_main_topic = ( + session.query(ShoutTopic) + .filter(and_(ShoutTopic.shout == shout.id, ShoutTopic.topic == main_topic.id)) + .first() + ) + if isinstance(old_main_topic, ShoutTopic) and isinstance(new_main_topic, ShoutTopic) \ + and old_main_topic is not new_main_topic: + ShoutTopic.update(old_main_topic, {"main": False}) + session.add(old_main_topic) + ShoutTopic.update(new_main_topic, {"main": True}) + session.add(new_main_topic) session.commit() @@ -194,15 +196,15 @@ async def delete_shout(_, info, shout_id): shout = session.query(Shout).filter(Shout.id == shout_id).first() if not shout: return {"error": "invalid shout id"} - if author: - if shout.created_by != author.id and author.id not in shout.authors: + if isinstance(author, Author) and isinstance(shout, Shout): + # TODO: add editor role allowed here + if shout.created_by is not author.id and author.id not in shout.authors: return {"error": "access denied"} for author_id in shout.authors: reactions_unfollow(author_id, shout_id) - # Replace datetime with Unix timestamp - current_time = int(time.time()) + shout_dict = shout.dict() - shout_dict["deleted_at"] = current_time # Set deleted_at as Unix timestamp + shout_dict["deleted_at"] = int(time.time()) Shout.update(shout, shout_dict) session.add(shout) session.commit() diff --git a/resolvers/reaction.py b/resolvers/reaction.py index 9cb9ff3f..3f35f54c 100644 --- a/resolvers/reaction.py +++ b/resolvers/reaction.py @@ -92,14 +92,23 @@ def is_published_author(session, author_id): > 0 ) - -def check_to_publish(session, approver_id, reaction): - """set shout to public if publicated approvers amount > 4""" - if not reaction.reply_to and reaction.kind in [ +def is_negative(x): + return x in [ ReactionKind.ACCEPT.value, ReactionKind.LIKE.value, ReactionKind.PROOF.value, - ]: + ] + +def is_positive(x): + return x in [ + ReactionKind.ACCEPT.value, + ReactionKind.LIKE.value, + ReactionKind.PROOF.value, + ] + +def check_to_publish(session, approver_id, reaction): + """set shout to public if publicated approvers amount > 4""" + if not reaction.reply_to and is_positive(reaction.kind): if is_published_author(session, approver_id): # now count how many approvers are voted already approvers_reactions = session.query(Reaction).where(Reaction.shout == reaction.shout).all() @@ -117,20 +126,12 @@ def check_to_publish(session, approver_id, reaction): def check_to_hide(session, reaction): """hides any shout if 20% of reactions are negative""" - if not reaction.reply_to and reaction.kind in [ - ReactionKind.REJECT.value, - ReactionKind.DISLIKE.value, - ReactionKind.DISPROOF.value, - ]: + if not reaction.reply_to and is_negative(reaction.kind): # if is_published_author(author_id): approvers_reactions = session.query(Reaction).where(Reaction.shout == reaction.shout).all() rejects = 0 for r in approvers_reactions: - if r.kind in [ - ReactionKind.REJECT.value, - ReactionKind.DISLIKE.value, - ReactionKind.DISPROOF.value, - ]: + if is_negative(r.kind): rejects += 1 if len(approvers_reactions) / rejects < 5: return True @@ -155,6 +156,49 @@ def set_hidden(session, shout_id): session.add(s) session.commit() + +async def _create_reaction(session, shout, author, reaction): + r = Reaction(**reaction) + rdict = r.dict() + session.add(r) + session.commit() + + # Proposal accepting logic + if rdict.get("reply_to"): + if r.kind in ["LIKE", "APPROVE"] and author.id in shout.authors: + replied_reaction = session.query(Reaction).filter(Reaction.id == r.reply_to).first() + if replied_reaction: + if replied_reaction.kind is ReactionKind.PROPOSE.value: + if replied_reaction.range: + old_body = shout.body + start, end = replied_reaction.range.split(":") + start = int(start) + end = int(end) + new_body = old_body[:start] + replied_reaction.body + old_body[end:] + shout_dict = shout.dict() + shout_dict["body"] = new_body + Shout.update(shout, shout_dict) + session.add(shout) + session.commit() + + # Self-regulation mechanics + if check_to_hide(session, r): + set_hidden(session, shout.id) + elif check_to_publish(session, author.id, r): + await set_published(session, shout.id, author.id) + + # Reactions auto-following + reactions_follow(author.id, reaction["shout"], True) + + rdict["shout"] = shout.dict() + rdict["created_by"] = author.dict() + rdict["stat"] = {"commented": 0, "reacted": 0, "rating": 0} + + # Notifications call + await notify_reaction(rdict, "create") + + return rdict + @mutation.field("create_reaction") @login_required async def create_reaction(_, info, reaction): @@ -169,92 +213,64 @@ async def create_reaction(_, info, reaction): with local_session() as session: shout = session.query(Shout).filter(Shout.id == shout_id).one() author = session.query(Author).filter(Author.user == user_id).first() - + dont_create_new = False if shout and author: reaction["created_by"] = author.id kind = reaction.get("kind") + shout_id = shout.id + if not kind and reaction.get("body"): kind = ReactionKind.COMMENT.value + if not kind: return { "error": "cannot create reaction with this kind"} - existing_reaction = ( - session.query(Reaction) - .filter( - and_( - Reaction.shout == shout_id, - Reaction.created_by == author.id, - Reaction.kind == kind, - Reaction.reply_to == reaction.get("reply_to"), + + if kind in ["LIKE", "DISLIKE", "AGREE", "DISAGREE"]: + same_reaction = ( + session.query(Reaction) + .filter( + and_( + Reaction.shout == shout_id, + Reaction.created_by == author.id, + Reaction.kind == kind, + Reaction.reply_to == reaction.get("reply_to"), + ) ) + .first() ) - .first() - ) - if existing_reaction is not None: - return {"error": "You can't vote twice"} + if same_reaction is not None: + return {"error": "You can't vote twice"} - opposite_reaction_kind = ( - ReactionKind.DISLIKE.value - if reaction["kind"] == ReactionKind.LIKE.value - else ReactionKind.LIKE.value - ) - opposite_reaction = ( - session.query(Reaction) - .filter( - and_( - Reaction.shout == reaction["shout"], - Reaction.created_by == author.id, - Reaction.kind == opposite_reaction_kind, - Reaction.reply_to == reaction.get("reply_to"), + opposite_reaction_kind = ( + ReactionKind.DISLIKE.value + if reaction["kind"] == ReactionKind.LIKE.value + else ReactionKind.LIKE.value + ) + opposite_reaction = ( + session.query(Reaction) + .filter( + and_( + Reaction.shout == reaction["shout"], + Reaction.created_by == author.id, + Reaction.kind == opposite_reaction_kind, + Reaction.reply_to == reaction.get("reply_to"), + ) ) + .first() ) - .first() - ) - if opposite_reaction is not None: - session.delete(opposite_reaction) + if opposite_reaction is not None: + await notify_reaction(opposite_reaction, "delete") + session.delete(opposite_reaction) - r = Reaction(**reaction) - rdict = r.dict() + return {"reaction": reaction} + else: - # Proposal accepting logic - if rdict.get("reply_to"): - if r.kind is ReactionKind.ACCEPT.value and author.id in shout.authors: - replied_reaction = session.query(Reaction).filter(Reaction.id == r.reply_to).first() - if replied_reaction: - if replied_reaction.kind is ReactionKind.PROPOSE.value: - if replied_reaction.range: - old_body = shout.body - start, end = replied_reaction.range.split(":") - start = int(start) - end = int(end) - new_body = old_body[:start] + replied_reaction.body + old_body[end:] - shout_dict = shout.dict() - shout_dict["body"] = new_body - Shout.update(shout, shout_dict) + rdict = await _create_reaction(session, shout, author, reaction) - session.add(r) - session.commit() - logger.debug(r) - rdict = r.dict() + return {"reaction": rdict} - # Self-regulation mechanics - if check_to_hide(session, r): - set_hidden(session, r.shout) - elif check_to_publish(session, author.id, r): - await set_published(session, r.shout, author.id) - - # Reactions auto-following - reactions_follow(author.id, reaction["shout"], True) - - rdict["shout"] = shout.dict() - rdict["created_by"] = author.dict() - rdict["stat"] = {"commented": 0, "reacted": 0, "rating": 0} - - # Notifications call - await notify_reaction(rdict, "create") - - return {"reaction": rdict} except Exception as e: import traceback diff --git a/resolvers/reader.py b/resolvers/reader.py index 8b8b6195..aa682a16 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -342,12 +342,12 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0): with local_session() as session: author = session.query(Author).filter(Author.user == user_id).first() if author: - return get_shouts_from_query(q, author.id) + return await get_shouts_from_query(q, author.id) else: - return get_shouts_from_query(q) + return await get_shouts_from_query(q) -def get_shouts_from_query(q, author_id=None): +async def get_shouts_from_query(q, author_id=None): shouts = [] with local_session() as session: for [shout,commented_stat, likes_stat, dislikes_stat, last_comment] in session.execute( @@ -355,7 +355,7 @@ def get_shouts_from_query(q, author_id=None): ).unique(): shouts.append(shout) shout.stat = { - "viewed": shout.views, + "viewed": await ViewedStorage.get_shout(shout_slug=shout.slug), "commented": commented_stat, "rating": int(likes_stat or 0) - int(dislikes_stat or 0), } @@ -384,7 +384,18 @@ async def load_shouts_random_top(_, _info, options): subquery = select(Shout.id).outerjoin(aliased_reaction).where(Shout.deleted_at.is_(None)) subquery = apply_filters(subquery, options.get("filters", {})) - subquery = subquery.group_by(Shout.id).order_by(desc(get_rating_func(aliased_reaction))) + subquery = subquery.group_by(Shout.id).order_by(desc( + func.sum( + case( + (Reaction.kind == ReactionKind.LIKE.value, 1), + (Reaction.kind == ReactionKind.AGREE.value, 1), + (Reaction.kind == ReactionKind.DISLIKE.value, -1), + (Reaction.kind == ReactionKind.DISAGREE.value, -1), + else_=0 + ) + ) + ) + ) random_limit = options.get("random_limit") if random_limit: @@ -406,7 +417,7 @@ async def load_shouts_random_top(_, _info, options): # print(q.compile(compile_kwargs={"literal_binds": True})) - return get_shouts_from_query(q) + return await get_shouts_from_query(q) @query.field("load_shouts_random_topic") diff --git a/services/search.py b/services/search.py index 54ee4d5e..4403f8f7 100644 --- a/services/search.py +++ b/services/search.py @@ -23,7 +23,6 @@ class SearchService: try: # TODO: add ttl for redis cached search results cached = await redis.execute("GET", text) - if not cached: async with SearchService.lock: # Use aiohttp to send a request to ElasticSearch @@ -35,7 +34,7 @@ class SearchService: await redis.execute("SET", text, json.dumps(payload)) # use redis as cache else: logging.error(f"[services.search] response: {response.status} {await response.text()}") - else: + elif isinstance(cached, str): payload = json.loads(cached) except Exception as e: logging.error(f"[services.search] Error during search: {e}") diff --git a/services/viewed.py b/services/viewed.py index 43028a8b..d62887a9 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -1,138 +1,156 @@ -import asyncio import os -from typing import Dict +from typing import Dict, List import logging import time import json +import asyncio from datetime import datetime, timedelta, timezone from os import environ + # ga from apiclient.discovery import build from google.oauth2.service_account import Credentials import pandas as pd +from orm.author import Author +from orm.shout import Shout, ShoutAuthor, ShoutTopic +from orm.topic import Topic +from services.db import local_session + +# Настройка журналирования logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("\t[services.viewed]\t") logger.setLevel(logging.DEBUG) +# Пути к ключевым файлам и идентификатор представления в Google Analytics GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH", '/dump/google-service.json') +GOOGLE_GA_VIEW_ID = os.environ.get("GOOGLE_GA_VIEW_ID", "") +gaBaseUrl = "https://analyticsreporting.googleapis.com/v4" -# Build Analytics Reporting API V4 service object. + +# Функция для создания объекта службы Analytics Reporting API V4 def get_service(): SCOPES = ['https://www.googleapis.com/auth/analytics.readonly'] - credentials = Credentials.from_service_account_file( - GOOGLE_KEYFILE_PATH, scopes=SCOPES - ) + credentials = Credentials.from_service_account_file(GOOGLE_KEYFILE_PATH, scopes=SCOPES) service = build(serviceName='analyticsreporting', version='v4', credentials=credentials) return service - class ViewedStorage: lock = asyncio.Lock() views_by_shout = {} shouts_by_topic = {} shouts_by_author = {} views = None - pages = None - facts = None - period = 60 * 60 # every hour + period = 60 * 60 # каждый час analytics_client = None auth_result = None disabled = False + date_range = "" @staticmethod async def init(): - """Google Analytics client connection using authentication""" + """Подключение к клиенту Google Analytics с использованием аутентификации""" self = ViewedStorage async with self.lock: if os.path.exists(GOOGLE_KEYFILE_PATH): self.analytics_client = get_service() - logger.info(" * authorized permanently by Google Analytics") + logger.info(f" * Постоянная авторизация в Google Analytics {self.analytics_client}") - # Load pre-counted views from the JSON file + # Загрузка предварительно подсчитанных просмотров из файла JSON self.load_precounted_views() + # Установка диапазона дат на основе времени создания файла views.json + views_json_path = "/dump/views.json" + creation_time = datetime.fromtimestamp(os.path.getctime(views_json_path)) + end_date = datetime.now(timezone.utc).strftime('%Y-%m-%d') + start_date = creation_time.strftime('%Y-%m-%d') + self.date_range = f'{start_date},{end_date}' + views_stat_task = asyncio.create_task(self.worker()) logger.info(views_stat_task) else: - logger.info(" * please add Google Analytics keyfile") + logger.info(" * Пожалуйста, добавьте ключевой файл Google Analytics") self.disabled = True @staticmethod def load_precounted_views(): + """Загрузка предварительно подсчитанных просмотров из файла JSON""" self = ViewedStorage try: with open("/dump/views.json", "r") as file: precounted_views = json.load(file) self.views_by_shout.update(precounted_views) - logger.info(f" * {len(precounted_views)} pre-counted shouts' views loaded successfully.") + logger.info(f" * {len(precounted_views)} предварительно подсчитанных просмотров shouts успешно загружены.") except Exception as e: - logger.error(f"Error loading pre-counted views: {e}") + logger.error(f"Ошибка загрузки предварительно подсчитанных просмотров: {e}") @staticmethod async def update_pages(): - """query all the pages from ackee sorted by views count""" - logger.info(" ⎧ updating ackee pages data ---") - try: - start = time.time() - self = ViewedStorage - async with self.lock: - if self.client: - # Use asyncio.run to execute asynchronous code in the main entry point - self.pages = await asyncio.to_thread(self.client.execute, load_pages) - domains = self.pages.get("domains", []) - # logger.debug(f" | domains: {domains}") - for domain in domains: - pages = domain.get("statistics", {}).get("pages", []) - if pages: - # logger.debug(f" | pages: {pages}") - shouts = {} - for page in pages: - p = page["value"].split("?")[0] - slug = p.split("discours.io/")[-1] - shouts[slug] = page["count"] - for slug in shouts.keys(): - self.views_by_shout[slug] = self.views_by_shout.get(slug, 0) + 1 - self.update_topics(slug) - logger.info(" ⎪ %d pages collected " % len(shouts.keys())) - - end = time.time() - logger.info(" ⎪ update_pages took %fs " % (end - start)) - - except Exception: - import traceback - - traceback.print_exc() - - @staticmethod - async def get_facts(): + """Запрос всех страниц от Google Analytics, отсортированных по количеству просмотров""" self = ViewedStorage - self.facts = [] - try: - if self.client: + if not self.disabled and GOOGLE_GA_VIEW_ID: + logger.info(" ⎧ Обновление данных просмотров от Google Analytics ---") + try: + start = time.time() async with self.lock: - self.facts = await asyncio.to_thread(self.client.execute, load_pages) - except Exception as er: - logger.error(f" - get_facts error: {er}") - return self.facts or [] + if self.analytics_client: + data = self.analytics_client.reports().batchGet(body={ + 'reportRequests': [{ + 'viewId': GOOGLE_GA_VIEW_ID, + 'dateRanges': self.date_range, + 'metrics': [{'expression': 'ga:pageviews'}], + 'dimensions': [{'name': 'ga:pagePath'}], + }] + }).execute() + if isinstance(data, dict): + slugs = set([]) + reports = data.get('reports', []) + if reports and isinstance(reports, list): + rows = list(reports[0].get('data', {}).get('rows', [])) + for row in rows: + # Извлечение путей страниц из ответа Google Analytics + if isinstance(row, dict): + dimensions = row.get('dimensions', []) + if isinstance(dimensions, list) and dimensions: + page_path = dimensions[0] + slug = page_path.split("discours.io/")[-1] + views_count = int(row['metrics'][0]['values'][0]) + + # Обновление данных в хранилище + self.views_by_shout[slug] = self.views_by_shout.get(slug, 0) + self.views_by_shout[slug] += views_count + self.update_topics(slug) + + # Запись путей страниц для логирования + slugs.add(slug) + + logger.info(f" ⎪ Собрано страниц: {len(slugs)} ") + + end = time.time() + logger.info(" ⎪ Обновление страниц заняло %fs " % (end - start)) + + except Exception: + import traceback + traceback.print_exc() + @staticmethod async def get_shout(shout_slug) -> int: - """getting shout views metric by slug""" + """Получение метрики просмотров shout по slug""" self = ViewedStorage async with self.lock: return self.views_by_shout.get(shout_slug, 0) @staticmethod async def get_shout_media(shout_slug) -> Dict[str, int]: - """getting shout plays metric by slug""" + """Получение метрики воспроизведения shout по slug""" self = ViewedStorage async with self.lock: return self.views_by_shout.get(shout_slug, 0) @staticmethod async def get_topic(topic_slug) -> int: - """getting topic views value summed""" + """Получение суммарного значения просмотров темы""" self = ViewedStorage topic_views = 0 async with self.lock: @@ -142,7 +160,7 @@ class ViewedStorage: @staticmethod async def get_author(author_slug) -> int: - """getting author views value summed""" + """Получение суммарного значения просмотров автора""" self = ViewedStorage author_views = 0 async with self.lock: @@ -152,38 +170,23 @@ class ViewedStorage: @staticmethod def update_topics(shout_slug): - """Updates topics counters by shout slug""" + """Обновление счетчиков темы по slug shout""" self = ViewedStorage with local_session() as session: - # Define a helper function to avoid code repetition + # Определение вспомогательной функции для избежания повторения кода def update_groups(dictionary, key, value): dictionary[key] = list(set(dictionary.get(key, []) + [value])) - # Update topics and authors using the helper function + # Обновление тем и авторов с использованием вспомогательной функции for [_shout_topic, topic] in session.query(ShoutTopic, Topic).join(Topic).join(Shout).where(Shout.slug == shout_slug).all(): update_groups(self.shouts_by_topic, topic.slug, shout_slug) for [_shout_topic, author] in session.query(ShoutAuthor, Author).join(Author).join(Shout).where(Shout.slug == shout_slug).all(): update_groups(self.shouts_by_author, author.slug, shout_slug) - @staticmethod - async def increment(shout_slug): - """the proper way to change counter""" - resource = ackee_site + shout_slug - self = ViewedStorage - async with self.lock: - self.views_by_shout[shout_slug] = self.views_by_shout.get(shout_slug, 0) + 1 - self.update_topics(shout_slug) - variables = {"domainId": domain_id, "input": {"siteLocation": resource}} - if self.client: - try: - await asyncio.to_thread(self.client.execute, create_record_mutation, variables) - except Exception as e: - logger.error(f"Error during threaded execution: {e}") - @staticmethod async def worker(): - """async task worker""" + """Асинхронная задача обновления""" failed = 0 self = ViewedStorage if self.disabled: @@ -191,20 +194,20 @@ class ViewedStorage: while True: try: - logger.info(" - updating records...") + logger.info(" - Обновление записей...") await self.update_pages() failed = 0 except Exception: failed += 1 - logger.info(" - update failed #%d, wait 10 seconds" % failed) + logger.info(" - Обновление не удалось #%d, ожидание 10 секунд" % failed) if failed > 3: - logger.info(" - not trying to update anymore") + logger.info(" - Больше не пытаемся обновить") break if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period) t = format(when.astimezone().isoformat()) - logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) + logger.info(" ⎩ Следующее обновление: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) await asyncio.sleep(self.period) else: await asyncio.sleep(10) - logger.info(" - trying to update data again") + logger.info(" - Попытка снова обновить данные")