From d4982017f678ac9417b9690bdd2b32fee58c6809 Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 7 Aug 2024 09:51:09 +0300 Subject: [PATCH] refactored-starting --- cache/cache.py | 181 ++++++++++------------- cache/precache.py | 144 +++++++----------- cache/revalidator.py | 71 +++++---- cache/triggers.py | 111 ++++++-------- cache/unread.py | 24 --- main.py | 2 +- services/notify.py | 2 +- cache/rediscache.py => services/redis.py | 4 +- services/search.py | 2 +- services/viewed.py | 2 +- 10 files changed, 220 insertions(+), 323 deletions(-) delete mode 100644 cache/unread.py rename cache/rediscache.py => services/redis.py (97%) diff --git a/cache/cache.py b/cache/cache.py index 7042d9f5..5b4f58ae 100644 --- a/cache/cache.py +++ b/cache/cache.py @@ -1,122 +1,58 @@ import asyncio import json from typing import List - from sqlalchemy import select, join, and_ from orm.author import Author, AuthorFollower from orm.topic import Topic, TopicFollower from orm.shout import Shout, ShoutAuthor, ShoutTopic from services.db import local_session from utils.encoders import CustomJSONEncoder -from cache.rediscache import redis +from services.redis import redis from utils.logger import root_logger as logger DEFAULT_FOLLOWS = { "topics": [], "authors": [], + "shouts": [], "communities": [{"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""}], } -# Кэширование данных темы +# Cache topic data async def cache_topic(topic: dict): payload = json.dumps(topic, cls=CustomJSONEncoder) - # Одновременное кэширование по id и slug для быстрого доступа + # Cache by id and slug for quick access await asyncio.gather( redis.execute("SET", f"topic:id:{topic['id']}", payload), redis.execute("SET", f"topic:slug:{topic['slug']}", payload), ) -# Кэширование данных автора +# Cache author data async def cache_author(author: dict): payload = json.dumps(author, cls=CustomJSONEncoder) - # Кэширование данных автора по user и id + # Cache author data by user and id await asyncio.gather( redis.execute("SET", f"author:user:{author['user'].strip()}", str(author["id"])), redis.execute("SET", f"author:id:{author['id']}", payload), ) -async def get_cached_topic(topic_id: int): - """ - Получает информацию о теме из кэша или базы данных. - - Args: - topic_id (int): Идентификатор темы. - - Returns: - dict: Данные темы в формате словаря или None, если тема не найдена. - """ - # Ключ для кэширования темы в Redis - topic_key = f"topic:id:{topic_id}" - cached_topic = await redis.get(topic_key) - if cached_topic: - return json.loads(cached_topic) - - # Если данных о теме нет в кэше, загружаем из базы данных - with local_session() as session: - topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none() - if topic: - # Кэшируем полученные данные - topic_dict = topic.dict() - await redis.set(topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) - return topic_dict - - return None - - -async def get_cached_shout_authors(shout_id: int): - """ - Retrieves a list of authors for a given shout from the cache or database if not present. - - Args: - shout_id (int): The ID of the shout for which to retrieve authors. - - Returns: - List[dict]: A list of dictionaries containing author data. - """ - # Attempt to retrieve cached author IDs for the shout - rkey = f"shout:authors:{shout_id}" - cached_author_ids = await redis.get(rkey) - if cached_author_ids: - author_ids = json.loads(cached_author_ids) - else: - # If not in cache, fetch from the database and cache the result - with local_session() as session: - query = ( - select(ShoutAuthor.author) - .where(ShoutAuthor.shout == shout_id) - .join(Author, ShoutAuthor.author == Author.id) - .filter(Author.deleted_at.is_(None)) - ) - author_ids = [author_id for (author_id,) in session.execute(query).all()] - await redis.execute("set", rkey, json.dumps(author_ids)) - - # Retrieve full author details from cached IDs - if author_ids: - authors = await get_cached_authors_by_ids(author_ids) - logger.debug(f"Shout#{shout_id} authors fetched and cached: {len(authors)} authors found.") - return authors - - return [] - - -# Кэширование данных о подписках +# Cache follows data async def cache_follows(follower_id: int, entity_type: str, entity_id: int, is_insert=True): key = f"author:follows-{entity_type}s:{follower_id}" follows_str = await redis.get(key) - follows = json.loads(follows_str) if follows_str else [] + follows = json.loads(follows_str) if follows_str else DEFAULT_FOLLOWS[entity_type] if is_insert: if entity_id not in follows: follows.append(entity_id) else: follows = [eid for eid in follows if eid != entity_id] await redis.execute("set", key, json.dumps(follows, cls=CustomJSONEncoder)) - update_follower_stat(follower_id, entity_type, len(follows)) + await update_follower_stat(follower_id, entity_type, len(follows)) -# Обновление статистики подписчика +# Update follower statistics async def update_follower_stat(follower_id, entity_type, count): follower_key = f"author:id:{follower_id}" follower_str = await redis.get(follower_key) @@ -126,13 +62,13 @@ async def update_follower_stat(follower_id, entity_type, count): await cache_author(follower) -# Получение автора из кэша +# Get author from cache async def get_cached_author(author_id: int): author_key = f"author:id:{author_id}" result = await redis.get(author_key) if result: return json.loads(result) - # Загрузка из базы данных, если не найдено в кэше + # Load from database if not found in cache with local_session() as session: author = session.execute(select(Author).where(Author.id == author_id)).scalar_one_or_none() if author: @@ -141,13 +77,41 @@ async def get_cached_author(author_id: int): return None -# Получение темы по slug из кэша +# Function to get cached topic +async def get_cached_topic(topic_id: int): + """ + Fetch topic data from cache or database by id. + + Args: + topic_id (int): The identifier for the topic. + + Returns: + dict: Topic data or None if not found. + """ + topic_key = f"topic:id:{topic_id}" + cached_topic = await redis.get(topic_key) + if cached_topic: + return json.loads(cached_topic) + + # If not in cache, fetch from the database + with local_session() as session: + topic = session.execute(select(Topic).where(Topic.id == topic_id)).scalar_one_or_none() + if topic: + topic_dict = topic.dict() + await redis.set(topic_key, json.dumps(topic_dict, cls=CustomJSONEncoder)) + return topic_dict + + return None + + + +# Get topic by slug from cache async def get_cached_topic_by_slug(slug: str): topic_key = f"topic:slug:{slug}" result = await redis.get(topic_key) if result: return json.loads(result) - # Загрузка из базы данных, если не найдено в кэше + # Load from database if not found in cache with local_session() as session: topic = session.execute(select(Topic).where(Topic.slug == slug)).scalar_one_or_none() if topic: @@ -156,13 +120,13 @@ async def get_cached_topic_by_slug(slug: str): return None -# Получение списка авторов по ID из кэша +# Get list of authors by ID from cache async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]: - # Одновременное получение данных всех авторов + # Fetch all author data concurrently keys = [f"author:id:{author_id}" for author_id in author_ids] results = await asyncio.gather(*(redis.get(key) for key in keys)) authors = [json.loads(result) if result else None for result in results] - # Загрузка отсутствующих авторов из базы данных и кэширование + # Load missing authors from database and cache missing_indices = [index for index, author in enumerate(authors) if author is None] if missing_indices: missing_ids = [author_ids[index] for index in missing_indices] @@ -170,19 +134,21 @@ async def get_cached_authors_by_ids(author_ids: List[int]) -> List[dict]: query = select(Author).where(Author.id.in_(missing_ids)) missing_authors = session.execute(query).scalars().all() await asyncio.gather(*(cache_author(author.dict()) for author in missing_authors)) - authors = [author.dict() for author in missing_authors] + for index, author in zip(missing_indices, missing_authors): + authors[index] = author.dict() return authors +# Get cached topic followers async def get_cached_topic_followers(topic_id: int): - # Попытка извлечь кэшированные данные + # Attempt to retrieve cached data cached = await redis.get(f"topic:followers:{topic_id}") if cached: followers = json.loads(cached) logger.debug(f"Cached followers for topic#{topic_id}: {len(followers)}") return followers - # Загрузка из базы данных и кэширование результатов + # Load from database and cache results with local_session() as session: followers_ids = [ f[0] @@ -196,8 +162,9 @@ async def get_cached_topic_followers(topic_id: int): return followers +# Get cached author followers async def get_cached_author_followers(author_id: int): - # Проверяем кэш на наличие данных + # Check cache for data cached = await redis.get(f"author:followers:{author_id}") if cached: followers_ids = json.loads(cached) @@ -205,7 +172,7 @@ async def get_cached_author_followers(author_id: int): logger.debug(f"Cached followers for author#{author_id}: {len(followers)}") return followers - # Запрос в базу данных если кэш пуст + # Query database if cache is empty with local_session() as session: followers_ids = [ f[0] @@ -219,13 +186,14 @@ async def get_cached_author_followers(author_id: int): return followers +# Get cached follower authors async def get_cached_follower_authors(author_id: int): - # Попытка получить авторов из кэша + # Attempt to retrieve authors from cache cached = await redis.get(f"author:follows-authors:{author_id}") if cached: authors_ids = json.loads(cached) else: - # Запрос авторов из базы данных + # Query authors from database with local_session() as session: authors_ids = [ a[0] @@ -241,13 +209,14 @@ async def get_cached_follower_authors(author_id: int): return authors +# Get cached follower topics async def get_cached_follower_topics(author_id: int): - # Попытка получить темы из кэша + # Attempt to retrieve topics from cache cached = await redis.get(f"author:follows-topics:{author_id}") if cached: topics_ids = json.loads(cached) else: - # Загрузка тем из базы данных и их кэширование + # Load topics from database and cache them with local_session() as session: topics_ids = [ t[0] @@ -270,30 +239,31 @@ async def get_cached_follower_topics(author_id: int): return topics +# Get author by user ID from cache async def get_cached_author_by_user_id(user_id: str): """ - Получает информацию об авторе по его user_id, сначала проверяя кэш, а затем базу данных. + Retrieve author information by user_id, checking the cache first, then the database. Args: - user_id (str): Идентификатор пользователя, по которому нужно получить автора. + user_id (str): The user identifier for which to retrieve the author. Returns: - dict: Словарь с данными автора или None, если автор не найден. + dict: Dictionary with author data or None if not found. """ - # Пытаемся найти ID автора по user_id в кэше Redis + # Attempt to find author ID by user_id in Redis cache author_id = await redis.get(f"author:user:{user_id.strip()}") if author_id: - # Если ID найден, получаем полные данные автора по его ID + # If ID is found, get full author data by ID author_data = await redis.get(f"author:id:{author_id}") if author_data: return json.loads(author_data) - # Если данные в кэше не найдены, выполняем запрос к базе данных + # If data is not found in cache, query the database with local_session() as session: author = session.execute(select(Author).where(Author.user == user_id)).scalar_one_or_none() if author: - # Кэшируем полученные данные автора + # Cache the retrieved author data author_dict = author.dict() await asyncio.gather( redis.execute("SET", f"author:user:{user_id.strip()}", str(author.id)), @@ -301,27 +271,28 @@ async def get_cached_author_by_user_id(user_id: str): ) return author_dict - # Возвращаем None, если автор не найден + # Return None if author is not found return None +# Get cached topic authors async def get_cached_topic_authors(topic_id: int): """ - Получает список авторов для заданной темы, используя кэш или базу данных. + Retrieve a list of authors for a given topic, using cache or database. Args: - topic_id (int): Идентификатор темы, для которой нужно получить авторов. + topic_id (int): The identifier of the topic for which to retrieve authors. Returns: - List[dict]: Список словарей, содержащих данные авторов. + List[dict]: A list of dictionaries containing author data. """ - # Пытаемся получить список ID авторов из кэша + # Attempt to get a list of author IDs from cache rkey = f"topic:authors:{topic_id}" cached_authors_ids = await redis.get(rkey) if cached_authors_ids: authors_ids = json.loads(cached_authors_ids) else: - # Если кэш пуст, получаем данные из базы данных + # If cache is empty, get data from the database with local_session() as session: query = ( select(ShoutAuthor.author) @@ -330,10 +301,10 @@ async def get_cached_topic_authors(topic_id: int): .where(and_(ShoutTopic.topic == topic_id, Shout.published_at.is_not(None), Shout.deleted_at.is_(None))) ) authors_ids = [author_id for (author_id,) in session.execute(query).all()] - # Кэшируем полученные ID авторов + # Cache the retrieved author IDs await redis.execute("set", rkey, json.dumps(authors_ids)) - # Получаем полные данные авторов по кэшированным ID + # Retrieve full author details from cached IDs if authors_ids: authors = await get_cached_authors_by_ids(authors_ids) logger.debug(f"Topic#{topic_id} authors fetched and cached: {len(authors)} authors found.") diff --git a/cache/precache.py b/cache/precache.py index 8471c0b2..729d090d 100644 --- a/cache/precache.py +++ b/cache/precache.py @@ -1,67 +1,51 @@ import json - +import asyncio from sqlalchemy import and_, join, select - from orm.author import Author, AuthorFollower -from orm.shout import Shout, ShoutAuthor, ShoutTopic +from orm.shout import Shout, ShoutAuthor, ShoutTopic, ShoutReactionsFollower from orm.topic import Topic, TopicFollower from resolvers.stat import get_with_stat from cache.cache import cache_author, cache_topic from services.db import local_session from utils.encoders import CustomJSONEncoder from utils.logger import root_logger as logger -from cache.rediscache import redis +from services.redis import redis +# Предварительное кеширование подписчиков автора async def precache_authors_followers(author_id, session): - # Precache author followers authors_followers = set() followers_query = select(AuthorFollower.follower).where(AuthorFollower.author == author_id) result = session.execute(followers_query) + authors_followers.update(row[0] for row in result if row[0]) - for row in result: - follower_id = row[0] - if follower_id: - authors_followers.add(follower_id) - - followers_payload = json.dumps( - [f for f in authors_followers], - cls=CustomJSONEncoder, - ) + followers_payload = json.dumps(list(authors_followers), cls=CustomJSONEncoder) await redis.execute("SET", f"author:followers:{author_id}", followers_payload) +# Предварительное кеширование подписок автора async def precache_authors_follows(author_id, session): - # Precache topics followed by author - follows_topics = set() follows_topics_query = select(TopicFollower.topic).where(TopicFollower.follower == author_id) - result = session.execute(follows_topics_query) - - for row in result: - followed_topic_id = row[0] - if followed_topic_id: - follows_topics.add(followed_topic_id) - - topics_payload = json.dumps([t for t in follows_topics], cls=CustomJSONEncoder) - await redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload) - - # Precache authors followed by author - follows_authors = set() follows_authors_query = select(AuthorFollower.author).where(AuthorFollower.follower == author_id) - result = session.execute(follows_authors_query) + follows_shouts_query = select(ShoutReactionsFollower.shout).where(ShoutReactionsFollower.follower == author_id) - for row in result: - followed_author_id = row[0] - if followed_author_id: - follows_authors.add(followed_author_id) + follows_topics = {row[0] for row in session.execute(follows_topics_query) if row[0]} + follows_authors = {row[0] for row in session.execute(follows_authors_query) if row[0]} + follows_shouts = {row[0] for row in session.execute(follows_shouts_query) if row[0]} - authors_payload = json.dumps([a for a in follows_authors], cls=CustomJSONEncoder) - await redis.execute("SET", f"author:follows-authors:{author_id}", authors_payload) + topics_payload = json.dumps(list(follows_topics), cls=CustomJSONEncoder) + authors_payload = json.dumps(list(follows_authors), cls=CustomJSONEncoder) + shouts_payload = json.dumps(list(follows_shouts), cls=CustomJSONEncoder) + + await asyncio.gather( + redis.execute("SET", f"author:follows-topics:{author_id}", topics_payload), + redis.execute("SET", f"author:follows-authors:{author_id}", authors_payload), + redis.execute("SET", f"author:follows-shouts:{author_id}", shouts_payload), + ) +# Предварительное кеширование авторов тем async def precache_topics_authors(topic_id: int, session): - # Precache topic authors - topic_authors = set() topic_authors_query = ( select(ShoutAuthor.author) .select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id)) @@ -74,29 +58,18 @@ async def precache_topics_authors(topic_id: int, session): ) ) ) - result = session.execute(topic_authors_query) + topic_authors = {row[0] for row in session.execute(topic_authors_query) if row[0]} - for row in result: - author_id = row[0] - if author_id: - topic_authors.add(author_id) - - authors_payload = json.dumps([a for a in topic_authors], cls=CustomJSONEncoder) + authors_payload = json.dumps(list(topic_authors), cls=CustomJSONEncoder) await redis.execute("SET", f"topic:authors:{topic_id}", authors_payload) +# Предварительное кеширование подписчиков тем async def precache_topics_followers(topic_id: int, session): - # Precache topic followers - topic_followers = set() followers_query = select(TopicFollower.follower).where(TopicFollower.topic == topic_id) - result = session.execute(followers_query) + topic_followers = {row[0] for row in session.execute(followers_query) if row[0]} - for row in result: - follower_id = row[0] - if follower_id: - topic_followers.add(follower_id) - - followers_payload = json.dumps([f for f in topic_followers], cls=CustomJSONEncoder) + followers_payload = json.dumps(list(topic_followers), cls=CustomJSONEncoder) await redis.execute("SET", f"topic:followers:{topic_id}", followers_payload) @@ -106,45 +79,32 @@ async def precache_data(): await redis.execute("FLUSHDB") logger.info("redis flushed") - # topics - topics_by_id = {} - topics = get_with_stat(select(Topic)) - for topic in topics: - topic_profile = topic.dict() if not isinstance(topic, dict) else topic - await cache_topic(topic_profile) - logger.info(f"{len(topics)} topics precached") - - # followings for topics with local_session() as session: - for topic_id in topics_by_id.keys(): - await precache_topics_followers(topic_id, session) - await precache_topics_authors(topic_id, session) - logger.info("topics followings precached") + # topics + topics = get_with_stat(select(Topic)) + for topic in topics: + topic_profile = topic.dict() if not isinstance(topic, dict) else topic + await cache_topic(topic_profile) + await asyncio.gather( + precache_topics_followers(topic["id"], session), precache_topics_authors(topic["id"], session) + ) + logger.info(f"{len(topics)} topics and their followings precached") - # authors - authors_by_id = {} - authors = get_with_stat(select(Author).where(Author.user.is_not(None))) - logger.debug(f"{len(authors)} authors found in database") - c = 0 - for author in authors: - if isinstance(author, Author): - profile = author.dict() - author_id = profile.get("id") - user_id = profile.get("user", "").strip() - if author_id and user_id: - authors_by_id[author_id] = profile - await cache_author(profile) - c += 1 - else: - logger.error(f"fail caching {author}") - - logger.info(f"{c} authors precached") - - # followings for authors - with local_session() as session: - for author_id in authors_by_id.keys(): - await precache_authors_followers(author_id, session) - await precache_authors_follows(author_id, session) - logger.info("authors followings precached") + # authors + authors = get_with_stat(select(Author).where(Author.user.is_not(None))) + logger.debug(f"{len(authors)} authors found in database") + for author in authors: + if isinstance(author, Author): + profile = author.dict() + author_id = profile.get("id") + user_id = profile.get("user", "").strip() + if author_id and user_id: + await cache_author(profile) + await asyncio.gather( + precache_authors_followers(author_id, session), precache_authors_follows(author_id, session) + ) + else: + logger.error(f"fail caching {author}") + logger.info(f"{len(authors)} authors and their followings precached") except Exception as exc: - logger.error(exc) + logger.error(f"Error in precache_data: {exc}") diff --git a/cache/revalidator.py b/cache/revalidator.py index 50213858..392e4dc1 100644 --- a/cache/revalidator.py +++ b/cache/revalidator.py @@ -1,48 +1,55 @@ import asyncio +from cache.cache import get_cached_author, cache_author, get_cached_topic, cache_topic from utils.logger import root_logger as logger -from cache.cache import get_cached_author, cache_author, cache_topic, get_cached_topic class CacheRevalidationManager: - """Управление периодической ревалидацией кэша.""" + def __init__(self, interval=60): + """Инициализация менеджера с заданным интервалом проверки (в секундах).""" + self.interval = interval + self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set(), "reactions": set()} + self.lock = asyncio.Lock() + self.running = True - def __init__(self): - self.items_to_revalidate = {"authors": set(), "topics": set()} - self.revalidation_interval = 60 # Интервал ревалидации в секундах - self.loop = None - - def start(self): - self.loop = asyncio.get_event_loop() - self.loop.run_until_complete(self.revalidate_cache()) - self.loop.run_forever() - logger.info("[services.revalidator] started infinite loop") + async def start(self): + """Запуск фонового воркера для ревалидации кэша.""" + asyncio.create_task(self.revalidate_cache()) async def revalidate_cache(self): - """Периодическая ревалидация кэша.""" - while True: - await asyncio.sleep(self.revalidation_interval) - await self.process_revalidation() + """Циклическая проверка и ревалидация кэша каждые self.interval секунд.""" + try: + while self.running: + await asyncio.sleep(self.interval) + await self.process_revalidation() + except asyncio.CancelledError: + logger.info("Revalidation worker was stopped.") + except Exception as e: + logger.error(f"An error occurred in the revalidation worker: {e}") async def process_revalidation(self): - """Ревалидация кэша для отмеченных сущностей.""" - for entity_type, ids in self.items_to_revalidate.items(): - for entity_id in ids: - if entity_type == "authors": - # Ревалидация кэша автора - author = await get_cached_author(entity_id) - if author: - await cache_author(author) - elif entity_type == "topics": - # Ревалидация кэша темы - topic = await get_cached_topic(entity_id) - if topic: - await cache_topic(topic) - ids.clear() + """Обновление кэша для всех сущностей, требующих ревалидации.""" + async with self.lock: + # Ревалидация кэша авторов + for author_id in self.items_to_revalidate["authors"]: + author = await get_cached_author(author_id) + if author: + await cache_author(author) + self.items_to_revalidate["authors"].clear() + + # Ревалидация кэша тем + for topic_id in self.items_to_revalidate["topics"]: + topic = await get_cached_topic(topic_id) + if topic: + await cache_topic(topic) + self.items_to_revalidate["topics"].clear() def mark_for_revalidation(self, entity_id, entity_type): """Отметить сущность для ревалидации.""" self.items_to_revalidate[entity_type].add(entity_id) + def stop(self): + """Остановка фонового воркера.""" + self.running = False -# Инициализация менеджера ревалидации -revalidation_manager = CacheRevalidationManager() + +revalidation_manager = CacheRevalidationManager(interval=300) # Ревалидация каждые 5 минут diff --git a/cache/triggers.py b/cache/triggers.py index 444fc2e7..7c34c648 100644 --- a/cache/triggers.py +++ b/cache/triggers.py @@ -1,85 +1,68 @@ from sqlalchemy import event from orm.author import Author, AuthorFollower from orm.reaction import Reaction -from orm.shout import Shout, ShoutAuthor +from orm.shout import Shout, ShoutAuthor, ShoutReactionsFollower from orm.topic import Topic, TopicFollower from cache.revalidator import revalidation_manager from utils.logger import root_logger as logger -def after_update_handler(mapper, connection, target): - """Обработчик обновления сущности.""" - entity_type = "authors" if isinstance(target, Author) else "topics" if isinstance(target, Topic) else "shouts" - revalidation_manager.mark_for_revalidation(target.id, entity_type) +def mark_for_revalidation(entity, *args): + """Отметка сущности для ревалидации.""" + entity_type = ( + "authors" + if isinstance(entity, Author) + else "topics" + if isinstance(entity, Topic) + else "reactions" + if isinstance(entity, Reaction) + else "shouts" + if isinstance(entity, Shout) + else None + ) + if entity_type: + revalidation_manager.mark_for_revalidation(entity.id, entity_type) -def after_follower_insert_update_handler(mapper, connection, target): - """Обработчик добавления или обновления подписки.""" +def after_follower_handler(mapper, connection, target, is_delete=False): + """Обработчик добавления, обновления или удаления подписки.""" + entity_type = None if isinstance(target, AuthorFollower): - # Пометить автора и подписчика для ревалидации - revalidation_manager.mark_for_revalidation(target.author_id, "authors") - revalidation_manager.mark_for_revalidation(target.follower_id, "authors") + entity_type = "authors" elif isinstance(target, TopicFollower): - # Пометить тему и подписчика для ревалидации - revalidation_manager.mark_for_revalidation(target.topic_id, "topics") - revalidation_manager.mark_for_revalidation(target.follower_id, "authors") + entity_type = "topics" + elif isinstance(target, ShoutReactionsFollower): + entity_type = "shouts" - -def after_follower_delete_handler(mapper, connection, target): - """Обработчик удаления подписки.""" - if isinstance(target, AuthorFollower): - # Пометить автора и подписчика для ревалидации - revalidation_manager.mark_for_revalidation(target.author_id, "authors") - revalidation_manager.mark_for_revalidation(target.follower_id, "authors") - elif isinstance(target, TopicFollower): - # Пометить тему и подписчика для ревалидации - revalidation_manager.mark_for_revalidation(target.topic_id, "topics") - revalidation_manager.mark_for_revalidation(target.follower_id, "authors") - - -def after_reaction_update_handler(mapper, connection, reaction): - """Обработчик изменений реакций.""" - # Пометить shout для ревалидации - revalidation_manager.mark_for_revalidation(reaction.shout_id, "shouts") - # Пометить автора реакции для ревалидации - revalidation_manager.mark_for_revalidation(reaction.created_by, "authors") - - -def after_shout_author_insert_update_handler(mapper, connection, target): - """Обработчик добавления или обновления авторства публикации.""" - # Пометить shout и автора для ревалидации - revalidation_manager.mark_for_revalidation(target.shout_id, "shouts") - revalidation_manager.mark_for_revalidation(target.author_id, "authors") - - -def after_shout_author_delete_handler(mapper, connection, target): - """Обработчик удаления авторства публикации.""" - # Пометить shout и автора для ревалидации - revalidation_manager.mark_for_revalidation(target.shout_id, "shouts") - revalidation_manager.mark_for_revalidation(target.author_id, "authors") + if entity_type: + revalidation_manager.mark_for_revalidation( + target.author_id if entity_type == "authors" else target.topic_id, entity_type + ) + if not is_delete: + revalidation_manager.mark_for_revalidation(target.follower_id, "authors") def events_register(): """Регистрация обработчиков событий для всех сущностей.""" - event.listen(ShoutAuthor, "after_insert", after_shout_author_insert_update_handler) - event.listen(ShoutAuthor, "after_update", after_shout_author_insert_update_handler) - event.listen(ShoutAuthor, "after_delete", after_shout_author_delete_handler) + event.listen(ShoutAuthor, "after_insert", mark_for_revalidation) + event.listen(ShoutAuthor, "after_update", mark_for_revalidation) + event.listen(ShoutAuthor, "after_delete", mark_for_revalidation) - event.listen(AuthorFollower, "after_insert", after_follower_insert_update_handler) - event.listen(AuthorFollower, "after_update", after_follower_insert_update_handler) - event.listen(AuthorFollower, "after_delete", after_follower_delete_handler) - event.listen(TopicFollower, "after_insert", after_follower_insert_update_handler) - event.listen(TopicFollower, "after_update", after_follower_insert_update_handler) - event.listen(TopicFollower, "after_delete", after_follower_delete_handler) - event.listen(Reaction, "after_update", after_reaction_update_handler) + event.listen(AuthorFollower, "after_insert", after_follower_handler) + event.listen(AuthorFollower, "after_update", after_follower_handler) + event.listen(AuthorFollower, "after_delete", lambda *args: after_follower_handler(*args, is_delete=True)) - event.listen(Author, "after_update", after_update_handler) - event.listen(Topic, "after_update", after_update_handler) - event.listen(Shout, "after_update", after_update_handler) - event.listen( - Reaction, - "after_update", - lambda mapper, connection, target: revalidation_manager.mark_for_revalidation(target.shout, "shouts"), - ) + event.listen(TopicFollower, "after_insert", after_follower_handler) + event.listen(TopicFollower, "after_update", after_follower_handler) + event.listen(TopicFollower, "after_delete", lambda *args: after_follower_handler(*args, is_delete=True)) + + event.listen(ShoutReactionsFollower, "after_insert", after_follower_handler) + event.listen(ShoutReactionsFollower, "after_update", after_follower_handler) + event.listen(ShoutReactionsFollower, "after_delete", lambda *args: after_follower_handler(*args, is_delete=True)) + + event.listen(Reaction, "after_update", mark_for_revalidation) + event.listen(Author, "after_update", mark_for_revalidation) + event.listen(Topic, "after_update", mark_for_revalidation) + event.listen(Shout, "after_update", mark_for_revalidation) logger.info("Event handlers registered successfully.") diff --git a/cache/unread.py b/cache/unread.py deleted file mode 100644 index f887440f..00000000 --- a/cache/unread.py +++ /dev/null @@ -1,24 +0,0 @@ -import json - -from cache.rediscache import redis - - -async def get_unread_counter(chat_id: str, author_id: int) -> int: - r = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}") - if isinstance(r, str): - return int(r) - elif isinstance(r, int): - return r - else: - return 0 - - -async def get_total_unread_counter(author_id: int) -> int: - chats_set = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}") - s = 0 - if isinstance(chats_set, str): - chats_set = json.loads(chats_set) - if isinstance(chats_set, list): - for chat_id in chats_set: - s += await get_unread_counter(chat_id, author_id) - return s diff --git a/main.py b/main.py index 09fb37e8..2ec14568 100644 --- a/main.py +++ b/main.py @@ -14,7 +14,7 @@ from services.sentry import start_sentry from services.viewed import ViewedStorage from services.webhook import WebhookEndpoint from cache.precache import precache_data -from cache.rediscache import redis +from services.redis import redis from cache.revalidator import revalidation_manager from settings import DEV_SERVER_PID_FILE_NAME, MODE diff --git a/services/notify.py b/services/notify.py index 9d3c215d..e807d397 100644 --- a/services/notify.py +++ b/services/notify.py @@ -3,7 +3,7 @@ import json from orm.notification import Notification from services.db import local_session from utils.logger import root_logger as logger -from cache.rediscache import redis +from services.redis import redis def save_notification(action: str, entity: str, payload): diff --git a/cache/rediscache.py b/services/redis.py similarity index 97% rename from cache/rediscache.py rename to services/redis.py index 356fd886..b88e0ded 100644 --- a/cache/rediscache.py +++ b/services/redis.py @@ -9,7 +9,7 @@ logger = logging.getLogger("redis") logger.setLevel(logging.WARNING) -class RedisCache: +class RedisService: def __init__(self, uri=REDIS_URL): self._uri: str = uri self.pubsub_channels = [] @@ -58,6 +58,6 @@ class RedisCache: await self._client.publish(channel, data) -redis = RedisCache() +redis = RedisService() __all__ = ["redis"] diff --git a/services/search.py b/services/search.py index 48c855db..2d64274e 100644 --- a/services/search.py +++ b/services/search.py @@ -6,7 +6,7 @@ import os from opensearchpy import OpenSearch from utils.encoders import CustomJSONEncoder -from cache.rediscache import redis +from services.redis import redis # Set redis logging level to suppress DEBUG messages logger = logging.getLogger("search") diff --git a/services/viewed.py b/services/viewed.py index 478a2eb4..42d81ba3 100644 --- a/services/viewed.py +++ b/services/viewed.py @@ -13,7 +13,7 @@ from orm.author import Author from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.topic import Topic from services.db import local_session -from services.logger import root_logger as logger +from utils.logger import root_logger as logger GOOGLE_KEYFILE_PATH = os.environ.get("GOOGLE_KEYFILE_PATH") or "/dump/google-service.json" GOOGLE_PROPERTY_ID = os.environ.get("GOOGLE_PROPERTY_ID")