diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 0fffef0..8429e27 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,4 +1,5 @@ [0.0.2] +- services: redis listen generating method - dx: migrated to strawberry - dx: added sentry integration - dx: added mypy plugins diff --git a/README.md b/README.md index 42a7bc3..290ffae 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,13 @@ ## Как разрабатывать локально -Установить +1 Читаем доки [strawberry](https://strawberry.rocks/docs/general/schema-basics) + +2 Устанавливаем локальные хранилища - Redis - Postgres -Затем +3 Запуск локального сервера ```shell poetry env use 3.12 diff --git a/main.py b/main.py index 83419b3..b64ca99 100644 --- a/main.py +++ b/main.py @@ -7,7 +7,7 @@ from strawberry.asgi import GraphQL from starlette.applications import Starlette from services.rediscache import redis -from resolvers.listener import start as listener_start, stop as listener_stop +from resolvers.listener import reactions_worker from resolvers.schema import schema from settings import DEV_SERVER_PID_FILE_NAME, SENTRY_DSN, MODE @@ -22,7 +22,7 @@ async def start_up(): f.write(str(os.getpid())) else: await redis.connect() - notification_service_task = asyncio.create_task(listener_start()) + notification_service_task = asyncio.create_task(reactions_worker()) print(f"[main] {notification_service_task}") try: @@ -45,7 +45,6 @@ async def start_up(): async def shutdown(): - listener_stop() await redis.disconnect() diff --git a/resolvers/listener.py b/resolvers/listener.py index 803785a..fe6401b 100644 --- a/resolvers/listener.py +++ b/resolvers/listener.py @@ -5,7 +5,7 @@ from services.db import local_session from services.rediscache import redis -def handle_reaction(notification: dict[str, str | int]): +async def handle_reaction(notification: dict[str, str | int]): """создаеёт новое хранимое уведомление""" try: with local_session() as session: @@ -17,20 +17,8 @@ def handle_reaction(notification: dict[str, str | int]): print(f"[listener.handle_reaction] error: {str(e)}") -def stop(pubsub): - pubsub.unsubscribe() - pubsub.close() - - -def start(): - pubsub = redis.pubsub() - pubsub.subscribe("reaction") - try: - # Бесконечный цикл прослушивания - while True: - msg = pubsub.get_message() - handle_reaction(json.loads(msg["data"])) - except Exception: - pass - finally: - stop(pubsub) +async def reactions_worker(): + async for message in redis.listen("reaction"): + msg = json.loads(message["data"]) + if msg: + await handle_reaction(msg) diff --git a/services/rediscache.py b/services/rediscache.py index 5282471..e5bff34 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -1,3 +1,5 @@ +import asyncio + import redis.asyncio as aredis from settings import REDIS_URL @@ -55,6 +57,16 @@ class RedisCache: print(f"[redis] MGET {key} {keys}") return await self._client.mget(key, *keys) + async def listen(self, channel): + pubsub = self._client.pubsub() + pubsub.subscribe(channel) + + while True: + message = pubsub.get_message() + if message: + yield message + await asyncio.sleep(0.1) + redis = RedisCache()