diff --git a/resolvers/listener.py b/resolvers/listener.py index 753499d..ab48783 100644 --- a/resolvers/listener.py +++ b/resolvers/listener.py @@ -1,4 +1,5 @@ import json +import asyncio from orm.notification import Notification from services.db import local_session @@ -19,9 +20,5 @@ async def handle_reaction(notification: dict[str, str | int]): async def reactions_worker(): async for message in redis.listen("reaction"): - message = await message if message: - msg_data = message.get("data") - if msg_data: - msg = json.loads(msg_data) - await handle_reaction(msg) + await handle_reaction(message) diff --git a/services/rediscache.py b/services/rediscache.py index 69cc8b7..92f6ae0 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -1,4 +1,5 @@ import asyncio +import json import redis.asyncio as aredis from settings import REDIS_URL @@ -47,25 +48,19 @@ class RedisCache: return await self._client.publish(channel, data) - async def lrange(self, key, start, stop): - if self._client: - print(f"[redis] LRANGE {key} {start} {stop}") - return await self._client.lrange(key, start, stop) - - async def mget(self, key, *keys): - if self._client: - print(f"[redis] MGET {key} {keys}") - return await self._client.mget(key, *keys) - async def listen(self, channel): - pubsub = self._client.pubsub() - await pubsub.subscribe(channel) + if self._client: + pubsub = self._client.pubsub() + await pubsub.subscribe(channel) - while True: - message = pubsub.get_message() - if message: - yield message - await asyncio.sleep(0.1) + try: + while True: + message = await pubsub.get_message() + if message: + yield json.loads(message['data']) + await asyncio.sleep(0.1) + finally: + await pubsub.unsubscribe(channel) redis = RedisCache()