diff --git a/requirements.txt b/requirements.txt index 801121e9..2ef836aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,4 @@ python-dateutil~=2.8.2 beautifulsoup4~=4.11.1 lxml sentry-sdk>=0.10.2 +sse_starlette diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 4fce0f94..0db75f04 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -119,7 +119,6 @@ async def load_recipients(_, info, limit=50, offset=0): try: chat_users += await followed_authors(auth.user_id) - print("[resolvers.inbox] ") limit = limit - len(chat_users) except Exception: pass diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 03285b67..36cde3ac 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -143,10 +143,14 @@ async def mark_as_read(_, info, chat_id: str, messages: [int]): @subscription.source("newMessage") @login_required async def message_generator(obj, info): - try: - auth: AuthCredentials = info.context["request"].auth + print(f"[resolvers.messages] generator {info}") + auth: AuthCredentials = info.context["request"].auth + return await messages_generator_by_user(auth.user_id) - user_following_chats = await redis.execute("GET", f"chats_by_user/{auth.user_id}") + +async def messages_generator_by_user(user_id): + try: + user_following_chats = await redis.execute("GET", f"chats_by_user/{user_id}") if user_following_chats: user_following_chats = list(json.loads(user_following_chats)) # chat ids else: @@ -170,3 +174,4 @@ async def message_generator(obj, info): yield msg finally: await MessagesStorage.remove_chat(following_chat) + diff --git a/services/inbox/sse.py b/services/inbox/sse.py index 32e4b8b9..e3dde165 100644 --- a/services/inbox/sse.py +++ b/services/inbox/sse.py @@ -1,9 +1,13 @@ from sse_starlette.sse import EventSourceResponse -from resolvers.inbox.messages import message_generator +from starlette.requests import Request +from resolvers.inbox.messages import messages_generator_by_user +from base.exceptions import Unauthorized -async def sse_messages(request): - print(f'[SSE] {request}') +async def sse_messages(request: Request): + print(f'[SSE] {request.scope}') # https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md - - return EventSourceResponse(message_generator) + if request['user']: + return EventSourceResponse(messages_generator_by_user(request['user'].user_id)) + else: + raise Unauthorized("Please login")