From e69046a1f85b1720134665b47ff23d3b8a197fbb Mon Sep 17 00:00:00 2001 From: Untone Date: Wed, 21 Feb 2024 17:37:58 +0300 Subject: [PATCH] cache-fixed --- main.py | 5 +- pyproject.toml | 1 - resolvers/author.py | 17 +++-- resolvers/follower.py | 10 +-- .../author_events.py => services/follows.py | 67 +++++++++++++------ services/rediscache.py | 7 -- 6 files changed, 63 insertions(+), 44 deletions(-) rename resolvers/author_events.py => services/follows.py (66%) diff --git a/main.py b/main.py index 675598dd..6e0f4f0a 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ from ariadne.asgi import GraphQL from starlette.applications import Starlette from starlette.routing import Route -from resolvers.author_events import update_cache, scheduled_cache_update +from services.follows import FollowsCached from services.rediscache import redis from services.schema import resolvers from services.search import search_service @@ -37,8 +37,7 @@ app = Starlette( on_startup=[ redis.connect, ViewedStorage.init, - update_cache, - scheduled_cache_update, + FollowsCached.worker, search_service.info, # start_sentry, start, diff --git a/pyproject.toml b/pyproject.toml index 606d31b2..7c17ae01 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,6 @@ opensearch-py = "^2.4.2" httpx = "^0.26.0" dogpile-cache = "^1.3.1" colorlog = "^6.8.2" -aiocron = "^1.8" [tool.poetry.group.dev.dependencies] ruff = "^0.2.1" diff --git a/resolvers/author.py b/resolvers/author.py index ed3f6482..11381c18 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -1,3 +1,4 @@ +import json import time from typing import List @@ -192,17 +193,19 @@ async def get_author(_, _info, slug="", author_id=None): async def get_author_by_user_id(user_id: str): redis_key = f"user:{user_id}:author" - res = await redis.hget(redis_key) - if isinstance(res, dict) and res.get("id"): - logger.debug(f"got cached author: {res}") - return res + res = await redis.execute('GET', redis_key) + if isinstance(res, str): + author = json.loads(res) + if author.get("id"): + logger.debug(f"got cached author: {author}") + return author logger.info(f"getting author id for {user_id}") q = select(Author).filter(Author.user == user_id) author = await load_author_with_stats(q) - await redis.hset(redis_key, **author.dict()) - - return author + if author: + await redis.execute('set', redis_key, json.dumps(author.dict())) + return author @query.field("get_author_id") diff --git a/resolvers/follower.py b/resolvers/follower.py index 9eed33ef..fbd837a0 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -1,3 +1,4 @@ +import json from typing import List from sqlalchemy import select, or_ @@ -131,13 +132,14 @@ def query_follows(user_id: str): async def get_follows_by_user_id(user_id: str): if user_id: redis_key = f"user:{user_id}:follows" - res = await redis.hget(redis_key) - if res: - return res + res = await redis.execute('GET', redis_key) + if isinstance(res, str): + follows = json.loads(res) + return follows logger.debug(f"getting follows for {user_id}") follows = query_follows(user_id) - await redis.hset(redis_key, **follows) + await redis.execute('SET', redis_key, json.dumps(follows)) return follows diff --git a/resolvers/author_events.py b/services/follows.py similarity index 66% rename from resolvers/author_events.py rename to services/follows.py index c1602d3d..e85c4b6a 100644 --- a/resolvers/author_events.py +++ b/services/follows.py @@ -1,37 +1,22 @@ import asyncio -from aiocron import crontab from sqlalchemy import select, event +import json from orm.author import Author, AuthorFollower from orm.topic import Topic, TopicFollower from resolvers.author import add_author_stat_columns, get_author_follows from resolvers.topic import add_topic_stat_columns +from services.logger import root_logger as logger from services.db import local_session from services.rediscache import redis from services.viewed import ViewedStorage -async def update_cache(): - with local_session() as session: - for author in session.query(Author).all(): - redis_key = f"user:{author.user}:author" - await redis.hset(redis_key, **vars(author)) - follows = await get_author_follows(None, None, user=author.user) - if isinstance(follows, dict): - redis_key = f"user:{author.user}:follows" - await redis.hset(redis_key, **follows) - - -@crontab("*/10 * * * *", func=update_cache) -async def scheduled_cache_update(): - pass - - @event.listens_for(Author, "after_insert") @event.listens_for(Author, "after_update") def after_author_update(mapper, connection, target): redis_key = f"user:{target.user}:author" - asyncio.create_task(redis.hset(redis_key, **vars(target))) + asyncio.create_task(redis.execute('set', redis_key, json.dumps(vars(target)))) @event.listens_for(TopicFollower, "after_insert") @@ -64,8 +49,10 @@ def after_author_follower_delete(mapper, connection, target): async def update_follows_for_user(connection, user_id, entity_type, entity, is_insert): redis_key = f"user:{user_id}:follows" - follows = await redis.hget(redis_key) - if not follows: + follows_str = await redis.get(redis_key) + if follows_str: + follows = json.loads(follows_str) + else: follows = { "topics": [], "authors": [], @@ -80,8 +67,7 @@ async def update_follows_for_user(connection, user_id, entity_type, entity, is_i follows[f"{entity_type}s"] = [ e for e in follows[f"{entity_type}s"] if e["id"] != entity.id ] - - await redis.hset(redis_key, **vars(follows)) + await redis.execute('set', redis_key, json.dumps(follows)) async def handle_author_follower_change(connection, author_id, follower_id, is_insert): @@ -126,3 +112,40 @@ async def handle_topic_follower_change(connection, topic_id, follower_id, is_ins await update_follows_for_user( connection, follower.user, "topic", topic, is_insert ) + + +class FollowsCached: + lock = asyncio.Lock() + + @staticmethod + async def update_cache(): + with local_session() as session: + for author in session.query(Author).all(): + if isinstance(author, Author): + redis_key = f"user:{author.user}:author" + author_dict = author.dict() + if isinstance(author_dict, dict): + filtered_author_dict = {k: v for k, v in author_dict.items() if v is not None} + await redis.execute('set', redis_key, json.dumps(filtered_author_dict)) + follows = await get_author_follows(None, None, user=author.user) + if isinstance(follows, dict): + filtered_follows = {k: v for k, v in follows.items() if v is not None} + redis_key = f"user:{author.user}:follows" + await redis.execute('set', redis_key, json.dumps(filtered_follows)) + + + @staticmethod + async def worker(): + """Асинхронная задача обновления""" + self = FollowsCached + while True: + try: + await self.update_cache() + await asyncio.sleep(10 * 60 * 60) + except asyncio.CancelledError: + # Handle cancellation due to SIGTERM + logger.info("Cancellation requested. Cleaning up...") + # Perform any necessary cleanup before exiting the loop + break + except Exception as exc: + logger.error(exc) diff --git a/services/rediscache.py b/services/rediscache.py index bc2f819f..2f0f112a 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -52,13 +52,6 @@ class RedisCache: return await self._client.publish(channel, data) - async def hset(self, hash_key: str, fields_values: dict): - return await self._client.hset(hash_key, mapping=fields_values) - - - async def hget(self, hash_key: str): - return await self._client.hget(hash_key) - redis = RedisCache()