From 1d4fa4b977d65d0ce74b116a4e6465f11f0326e8 Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 7 Aug 2024 08:42:59 +0300 Subject: [PATCH] loop-fix-4 --- main.py | 2 ++ services/revalidator.py | 48 +++++++++++++++++++++++++++++++++++++++++ services/triggers.py | 46 +-------------------------------------- 3 files changed, 51 insertions(+), 45 deletions(-) create mode 100644 services/revalidator.py diff --git a/main.py b/main.py index 73f4ad3b..4f7ad996 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ from services.search import search_service from services.sentry import start_sentry from services.viewed import ViewedStorage from services.webhook import WebhookEndpoint +from services.revalidator import revalidation_manager from settings import DEV_SERVER_PID_FILE_NAME, MODE import_module("resolvers") @@ -43,6 +44,7 @@ app = Starlette( search_service.info, start_sentry, start, + revalidation_manager.start, ], on_shutdown=[redis.disconnect], debug=True, diff --git a/services/revalidator.py b/services/revalidator.py new file mode 100644 index 00000000..d735b376 --- /dev/null +++ b/services/revalidator.py @@ -0,0 +1,48 @@ +import asyncio +from services.logger import root_logger as logger +from services.cache import get_cached_author, cache_author, cache_topic, get_cached_topic + + +class CacheRevalidationManager: + """Управление периодической ревалидацией кэша.""" + + def __init__(self): + self.items_to_revalidate = {"authors": set(), "topics": set()} + self.revalidation_interval = 60 # Интервал ревалидации в секундах + self.loop = None + + def start(self): + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.revalidate_cache()) + self.loop.run_forever() + logger.info("[services.revalidator] started infinite loop") + + async def revalidate_cache(self): + """Периодическая ревалидация кэша.""" + while True: + await asyncio.sleep(self.revalidation_interval) + await self.process_revalidation() + + async def process_revalidation(self): + """Ревалидация кэша для отмеченных сущностей.""" + for entity_type, ids in self.items_to_revalidate.items(): + for entity_id in ids: + if entity_type == "authors": + # Ревалидация кэша автора + author = await get_cached_author(entity_id) + if author: + await cache_author(author) + elif entity_type == "topics": + # Ревалидация кэша темы + topic = await get_cached_topic(entity_id) + if topic: + await cache_topic(topic) + ids.clear() + + def mark_for_revalidation(self, entity_id, entity_type): + """Отметить сущность для ревалидации.""" + self.items_to_revalidate[entity_type].add(entity_id) + + +# Инициализация менеджера ревалидации +revalidation_manager = CacheRevalidationManager() diff --git a/services/triggers.py b/services/triggers.py index 515fb3d3..13acb778 100644 --- a/services/triggers.py +++ b/services/triggers.py @@ -1,56 +1,12 @@ -import asyncio from sqlalchemy import event from orm.author import Author, AuthorFollower from orm.reaction import Reaction from orm.shout import Shout, ShoutAuthor from orm.topic import Topic, TopicFollower -from services.cache import cache_author, get_cached_author, cache_topic, get_cached_topic +from services.revalidator import revalidation_manager from services.logger import root_logger as logger -class CacheRevalidationManager: - """Управление периодической ревалидацией кэша.""" - - def __init__(self): - self.items_to_revalidate = {"authors": set(), "topics": set()} - self.revalidation_interval = 60 # Интервал ревалидации в секундах - - def start(self): - loop = asyncio.get_event_loop() - loop.run_until_complete(self.revalidate_cache()) - loop.run_forever() - - async def revalidate_cache(self): - """Периодическая ревалидация кэша.""" - while True: - await asyncio.sleep(self.revalidation_interval) - await self.process_revalidation() - - async def process_revalidation(self): - """Ревалидация кэша для отмеченных сущностей.""" - for entity_type, ids in self.items_to_revalidate.items(): - for entity_id in ids: - if entity_type == "authors": - # Ревалидация кэша автора - author = await get_cached_author(entity_id) - if author: - await cache_author(author) - elif entity_type == "topics": - # Ревалидация кэша темы - topic = await get_cached_topic(entity_id) - if topic: - await cache_topic(topic) - ids.clear() - - def mark_for_revalidation(self, entity_id, entity_type): - """Отметить сущность для ревалидации.""" - self.items_to_revalidate[entity_type].add(entity_id) - -# Инициализация и запуск менеджера ревалидации -revalidation_manager = CacheRevalidationManager() -revalidation_manager.start() - - def after_update_handler(mapper, connection, target): """Обработчик обновления сущности.""" entity_type = "authors" if isinstance(target, Author) else "topics" if isinstance(target, Topic) else "shouts"