From 7ea2a911ef1051862f2e9bd8b76f1a6e33e847a6 Mon Sep 17 00:00:00 2001 From: Untone Date: Sun, 26 Nov 2023 22:36:02 +0300 Subject: [PATCH] worker-fix --- resolvers/listener.py | 7 ++----- services/rediscache.py | 29 ++++++++++++----------------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/resolvers/listener.py b/resolvers/listener.py index 753499d..ab48783 100644 --- a/resolvers/listener.py +++ b/resolvers/listener.py @@ -1,4 +1,5 @@ import json +import asyncio from orm.notification import Notification from services.db import local_session @@ -19,9 +20,5 @@ async def handle_reaction(notification: dict[str, str | int]): async def reactions_worker(): async for message in redis.listen("reaction"): - message = await message if message: - msg_data = message.get("data") - if msg_data: - msg = json.loads(msg_data) - await handle_reaction(msg) + await handle_reaction(message) diff --git a/services/rediscache.py b/services/rediscache.py index 69cc8b7..92f6ae0 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -1,4 +1,5 @@ import asyncio +import json import redis.asyncio as aredis from settings import REDIS_URL @@ -47,25 +48,19 @@ class RedisCache: return await self._client.publish(channel, data) - async def lrange(self, key, start, stop): - if self._client: - print(f"[redis] LRANGE {key} {start} {stop}") - return await self._client.lrange(key, start, stop) - - async def mget(self, key, *keys): - if self._client: - print(f"[redis] MGET {key} {keys}") - return await self._client.mget(key, *keys) - async def listen(self, channel): - pubsub = self._client.pubsub() - await pubsub.subscribe(channel) + if self._client: + pubsub = self._client.pubsub() + await pubsub.subscribe(channel) - while True: - message = pubsub.get_message() - if message: - yield message - await asyncio.sleep(0.1) + try: + while True: + message = await pubsub.get_message() + if message: + yield json.loads(message['data']) + await asyncio.sleep(0.1) + finally: + await pubsub.unsubscribe(channel) redis = RedisCache()