revalidation-manager
Some checks failed
Deploy on push / deploy (push) Failing after 9s

This commit is contained in:
Untone 2024-08-06 21:44:33 +03:00
parent a577b5510d
commit 3981fa3181

View File

@ -1,130 +1,130 @@
import asyncio
from sqlalchemy import event, select
from sqlalchemy import event
from orm.author import Author, AuthorFollower
from orm.reaction import Reaction
from orm.shout import Shout, ShoutAuthor
from orm.topic import Topic, TopicFollower
from resolvers.stat import get_with_stat
from services.cache import cache_author, cache_follows, cache_topic, get_cached_author
from services.cache import cache_manager # Предполагается, что этот менеджер уже реализован
from services.logger import root_logger as logger
# Инициализация семафора для ограничения одновременных задач
semaphore = asyncio.Semaphore(10)
class CacheRevalidationManager:
"""Управление периодической ревалидацией кэша."""
def __init__(self):
self.items_to_revalidate = {"authors": set(), "topics": set(), "shouts": set()}
self.revalidation_interval = 60 # Интервал ревалидации в секундах
async def revalidate_cache(self):
"""Периодическая ревалидация кэша."""
while True:
await asyncio.sleep(self.revalidation_interval)
await self.process_revalidation()
async def process_revalidation(self):
"""Ревалидация кэша для отмеченных сущностей."""
for entity_type, ids in self.items_to_revalidate.items():
for entity_id in ids:
if entity_type == "authors":
# Ревалидация кэша автора
author = await cache_manager.get_author(entity_id)
if author:
await cache_manager.cache_author(author)
elif entity_type == "topics":
# Ревалидация кэша темы
topic = await cache_manager.get_topic(entity_id)
if topic:
await cache_manager.cache_topic(topic)
elif entity_type == "shouts":
# Ревалидация кэша shout
shout = await cache_manager.get_shout(entity_id)
if shout:
await cache_manager.cache_shout(shout)
ids.clear()
def mark_for_revalidation(self, entity_id, entity_type):
"""Отметить сущность для ревалидации."""
self.items_to_revalidate[entity_type].add(entity_id)
async def run_background_task(coro):
"""Запуск фоновой асинхронной задачи с контролем одновременности."""
async with semaphore:
try:
await coro
except Exception as e:
logger.error(f"Error in background task: {e}")
# Инициализация и запуск менеджера ревалидации
revalidation_manager = CacheRevalidationManager()
asyncio.create_task(revalidation_manager.revalidate_cache())
async def batch_cache_updates(authors, topics, followers):
"""Обновление кэша для авторов, тем и подписчиков."""
tasks = (
[cache_author(author) for author in authors]
+ [cache_topic(topic) for topic in topics]
+ [
cache_follows(follower["id"], follower["type"], follower["item_id"], follower["is_insert"])
for follower in followers
]
)
await asyncio.gather(*tasks)
def after_update_handler(mapper, connection, target):
"""Обработчик обновления сущности."""
entity_type = "authors" if isinstance(target, Author) else "topics" if isinstance(target, Topic) else "shouts"
revalidation_manager.mark_for_revalidation(target.id, entity_type)
async def handle_author_follower_change(author_id: int, follower_id: int, is_insert: bool):
"""Обработка изменений в подписках авторов."""
# Получение данных с кэша или через запрос, если необходимо
author = await get_cached_author(author_id)
follower = await get_cached_author(follower_id)
if author and follower:
authors = [author.dict()]
followers = [{"id": follower.id, "type": "author", "item_id": author.id, "is_insert": is_insert}]
await batch_cache_updates(authors, [], followers)
def after_follower_insert_update_handler(mapper, connection, target):
"""Обработчик добавления или обновления подписки."""
if isinstance(target, AuthorFollower):
# Пометить автора и подписчика для ревалидации
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
elif isinstance(target, TopicFollower):
# Пометить тему и подписчика для ревалидации
revalidation_manager.mark_for_revalidation(target.topic_id, "topics")
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
async def after_shout_update(_mapper, _connection, shout: Shout):
"""После обновления shout, обновить информацию об авторах в кэше."""
authors_updated = await get_with_stat(
select(Author).join(ShoutAuthor, ShoutAuthor.author == Author.id).filter(ShoutAuthor.shout == shout.id)
)
if authors_updated:
await batch_cache_updates([author.dict() for author in authors_updated], [], [])
def after_follower_delete_handler(mapper, connection, target):
"""Обработчик удаления подписки."""
if isinstance(target, AuthorFollower):
# Пометить автора и подписчика для ревалидации
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
elif isinstance(target, TopicFollower):
# Пометить тему и подписчика для ревалидации
revalidation_manager.mark_for_revalidation(target.topic_id, "topics")
revalidation_manager.mark_for_revalidation(target.follower_id, "authors")
async def handle_topic_follower_change(topic_id: int, follower_id: int, is_insert: bool):
queries = [select(Topic).filter(Topic.id == topic_id), select(Author).filter(Author.id == follower_id)]
topic_result, follower_result = await asyncio.gather(*(get_with_stat(query) for query in queries))
if topic_result and follower_result:
topics = [topic_result[0].dict()]
followers = [
{"id": follower_result[0].id, "type": "topic", "item_id": topic_result[0].id, "is_insert": is_insert}
]
await batch_cache_updates([], topics, followers)
def after_reaction_update_handler(mapper, connection, reaction):
"""Обработчик изменений реакций."""
# Пометить shout для ревалидации
revalidation_manager.mark_for_revalidation(reaction.shout_id, "shouts")
# Пометить автора реакции для ревалидации
revalidation_manager.mark_for_revalidation(reaction.created_by, "authors")
async def after_author_update(_mapper, _connection, author: Author):
# Обновление кэша для автора
author_dict = author.dict()
await cache_author(author_dict)
logger.info(f"Author updated and cached: {author.id}")
def after_shout_author_insert_update_handler(mapper, connection, target):
"""Обработчик добавления или обновления авторства публикации."""
# Пометить shout и автора для ревалидации
revalidation_manager.mark_for_revalidation(target.shout_id, "shouts")
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
async def after_author_follower_insert(_mapper, _connection, target: AuthorFollower):
logger.info(f"Author follower inserted: {target}")
await handle_author_follower_change(target.author, target.follower, True)
async def after_author_follower_delete(_mapper, _connection, target: AuthorFollower):
logger.info(f"Author follower deleted: {target}")
await handle_author_follower_change(target.author, target.follower, False)
async def after_topic_follower_insert(_mapper, _connection, target: TopicFollower):
logger.info(f"Topic follower inserted: {target}")
await handle_topic_follower_change(target.topic, target.follower, True)
async def after_topic_follower_delete(_mapper, _connection, target: TopicFollower):
logger.info(f"Topic follower deleted: {target}")
await handle_topic_follower_change(target.topic, target.follower, False)
async def after_reaction_update(mapper, connection, reaction: Reaction):
# Получение данных автора реакции и автора, на чью реакцию было отвечено, одновременно
author_query = select(Author).where(Author.id == reaction.created_by)
replied_author_query = (
select(Author).join(Reaction, Author.id == Reaction.created_by).where(Reaction.id == reaction.reply_to)
)
results = await asyncio.gather(get_with_stat(author_query), get_with_stat(replied_author_query))
authors = [result[0].dict() for result in results if result]
# Кэширование данных авторов
if authors:
await asyncio.gather(*(cache_author(author) for author in authors))
# Обновление информации о shout, если связанный с реакцией
if reaction.shout:
shout_query = select(Shout).where(Shout.id == reaction.shout)
shout_result = await connection.execute(shout_query)
shout = shout_result.scalar_one_or_none()
if shout:
await after_shout_update(mapper, connection, shout)
def after_shout_author_delete_handler(mapper, connection, target):
"""Обработчик удаления авторства публикации."""
# Пометить shout и автора для ревалидации
revalidation_manager.mark_for_revalidation(target.shout_id, "shouts")
revalidation_manager.mark_for_revalidation(target.author_id, "authors")
def events_register():
"""Регистрация обработчиков событий SQLAlchemy."""
event.listen(Shout, "after_insert", after_shout_update)
event.listen(Shout, "after_update", after_shout_update)
event.listen(Reaction, "after_insert", after_reaction_update)
event.listen(Reaction, "after_update", after_reaction_update)
event.listen(Author, "after_insert", after_author_update)
event.listen(Author, "after_update", after_author_update)
event.listen(AuthorFollower, "after_insert", after_author_follower_insert)
event.listen(AuthorFollower, "after_delete", after_author_follower_delete)
event.listen(TopicFollower, "after_insert", after_topic_follower_insert)
event.listen(TopicFollower, "after_delete", after_topic_follower_delete)
"""Регистрация обработчиков событий для всех сущностей."""
event.listen(ShoutAuthor, "after_insert", after_shout_author_insert_update_handler)
event.listen(ShoutAuthor, "after_update", after_shout_author_insert_update_handler)
event.listen(ShoutAuthor, "after_delete", after_shout_author_delete_handler)
event.listen(AuthorFollower, "after_insert", after_follower_insert_update_handler)
event.listen(AuthorFollower, "after_update", after_follower_insert_update_handler)
event.listen(AuthorFollower, "after_delete", after_follower_delete_handler)
event.listen(TopicFollower, "after_insert", after_follower_insert_update_handler)
event.listen(TopicFollower, "after_update", after_follower_insert_update_handler)
event.listen(TopicFollower, "after_delete", after_follower_delete_handler)
event.listen(Reaction, "after_update", after_reaction_update_handler)
event.listen(Author, "after_update", after_update_handler)
event.listen(Topic, "after_update", after_update_handler)
event.listen(Shout, "after_update", after_update_handler)
event.listen(
Reaction,
"after_update",
lambda mapper, connection, target: revalidation_manager.mark_for_revalidation(target.shout, "shouts"),
)
logger.info("Event handlers registered successfully.")