From 29478aa04b4c2c84da4064b9091a9671b28fa275 Mon Sep 17 00:00:00 2001 From: Untone Date: Fri, 13 Oct 2023 12:33:31 +0300 Subject: [PATCH] redis-multi-exec-fix-6 --- requirements.txt | 2 +- services/redis.py | 36 ++++++++++++++---------------------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5945b3a..fc4bb1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ sentry-sdk -aredis +redis[hiredis] ariadne starlette uvicorn diff --git a/services/redis.py b/services/redis.py index e10f15b..2afb24f 100644 --- a/services/redis.py +++ b/services/redis.py @@ -1,51 +1,43 @@ -import asyncio -import aredis +import redis.asyncio as redis from settings import REDIS_URL - class RedisCache: def __init__(self, uri=REDIS_URL): self._uri: str = uri self.pubsub_channels = [] self._redis = None - self._pubsub = None - self.loop = asyncio.get_event_loop() async def connect(self): - self._redis = aredis.StrictRedis.from_url(self._uri, decode_responses=True, loop=self.loop) - await self._redis.connection_pool.get_connection() - self._pubsub = self._redis.pubsub() - response = await self.execute('PING') - print(f"[redis] PING response: {response}") + self._redis = redis.Redis.from_url(self._uri, decode_responses=True) async def disconnect(self): - self._redis.connection_pool.re - self._redis = None - self._pubsub = None + await self._redis.aclose() async def execute(self, command, *args, **kwargs): - while not self._redis: - await asyncio.sleep(1) + if not self._redis: + await self.connect() try: print("[redis] " + command + " " + " ".join(args)) return await self._redis.execute_command(command, *args, **kwargs) except Exception as e: print(f"[redis] error: {e}") - raise + return None async def subscribe(self, *channels): if not self._redis: await self.connect() - for channel in channels: - await self._pubsub.subscribe(channel) - self.pubsub_channels.append(channel) + async with self._redis.pubsub() as pubsub: + for channel in channels: + await pubsub.subscribe(channel) + self.pubsub_channels.append(channel) async def unsubscribe(self, *channels): if not self._redis: return - for channel in channels: - await self._pubsub.unsubscribe(channel) - self.pubsub_channels.remove(channel) + async with self._redis.pubsub() as pubsub: + for channel in channels: + await pubsub.unsubscribe(channel) + self.pubsub_channels.remove(channel) async def publish(self, channel, data): if not self._redis: