From a034eda220bc746ae212f9b84d237bd9eabbffca Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sun, 4 Dec 2022 17:03:55 +0300 Subject: [PATCH] ws-subs --- auth/authenticate.py | 1 - main.py | 20 +++++++++--- resolvers/create/editor.py | 2 +- resolvers/inbox/load.py | 2 ++ resolvers/inbox/messages.py | 6 ++-- schema.graphql | 1 + services/inbox/helpers.py | 14 ++++++++ services/inbox/presence.py | 43 +++++++++++++++++++++++++ services/{inbox.py => inbox/storage.py} | 13 -------- settings.py | 3 +- 10 files changed, 83 insertions(+), 22 deletions(-) create mode 100644 services/inbox/helpers.py create mode 100644 services/inbox/presence.py rename services/{inbox.py => inbox/storage.py} (72%) diff --git a/auth/authenticate.py b/auth/authenticate.py index e11e821e..3e587225 100644 --- a/auth/authenticate.py +++ b/auth/authenticate.py @@ -3,7 +3,6 @@ from typing import Optional, Tuple from graphql.type import GraphQLResolveInfo from sqlalchemy.orm import joinedload, exc -from sqlalchemy import select, and_ from starlette.authentication import AuthenticationBackend from starlette.requests import HTTPConnection diff --git a/main.py b/main.py index 1c77edd7..c9d5ff2a 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,11 @@ from resolvers.auth import confirm_email_handler from services.main import storages_init from services.stat.viewed import ViewedStorage from services.zine.gittask import GitTask -from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_ID +from settings import DEV_SERVER_STATUS_FILE_NAME, SENTRY_DSN +from ariadne.asgi.handlers import GraphQLTransportWSHandler +from services.inbox.presence import on_connect, on_disconnect +# from services.inbox.sse import sse_messages + import_module("resolvers") schema = make_executable_schema(load_schema_from_path("schema.graphql"), resolvers) # type: ignore @@ -39,7 +43,7 @@ async def start_up(): print(git_task) try: import sentry_sdk - sentry_sdk.init("https://%s@testsentry.discours.io/2" % SENTRY_ID) + sentry_sdk.init(SENTRY_DSN) except Exception as e: print('[sentry] init error') print(e) @@ -63,7 +67,8 @@ async def shutdown(): routes = [ Route("/oauth/{provider}", endpoint=oauth_login), Route("/oauth-authorize", endpoint=oauth_authorize), - Route("/confirm/{token}", endpoint=confirm_email_handler) + Route("/confirm/{token}", endpoint=confirm_email_handler), + # Route("/chat/{chat_id}", endpoint=sse_messages) ] app = Starlette( @@ -73,7 +78,14 @@ app = Starlette( middleware=middleware, routes=routes, ) -app.mount("/", GraphQL(schema, debug=True)) +app.mount("/", GraphQL( + schema, + debug=True, + websocket_handler=GraphQLTransportWSHandler( + on_connect=on_connect, + on_disconnect=on_disconnect + ) +)) dev_app = app = Starlette( debug=True, diff --git a/resolvers/create/editor.py b/resolvers/create/editor.py index 82ccaf57..84d744a4 100644 --- a/resolvers/create/editor.py +++ b/resolvers/create/editor.py @@ -13,7 +13,7 @@ from orm.user import User from resolvers.zine.reactions import reactions_follow, reactions_unfollow from services.zine.gittask import GitTask from resolvers.inbox.chats import create_chat -from services.inbox import MessagesStorage +from services.inbox.storage import MessagesStorage from orm.collab import Collab diff --git a/resolvers/inbox/load.py b/resolvers/inbox/load.py index 5ef3ccc6..ec032029 100644 --- a/resolvers/inbox/load.py +++ b/resolvers/inbox/load.py @@ -33,6 +33,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): auth: AuthCredentials = info.context["request"].auth cids = await redis.execute("SMEMBERS", "chats_by_user/" + str(auth.user_id)) + onliners = await redis.execute("SMEMBERS", "users-online") if cids: cids = list(cids)[offset:offset + limit] if not cids: @@ -56,6 +57,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): "userpic": a.userpic, "name": a.name, "lastSeen": a.lastSeen, + "online": a.id in onliners }) chats.append(c) return { diff --git a/resolvers/inbox/messages.py b/resolvers/inbox/messages.py index 9ffd341a..cc9304a5 100644 --- a/resolvers/inbox/messages.py +++ b/resolvers/inbox/messages.py @@ -6,7 +6,8 @@ from auth.authenticate import login_required from auth.credentials import AuthCredentials from base.redis import redis from base.resolvers import mutation, subscription -from services.inbox import ChatFollowing, MessageResult, MessagesStorage +from services.inbox.helpers import ChatFollowing, MessageResult +from services.inbox.storage import MessagesStorage @mutation.field("createMessage") @@ -18,7 +19,7 @@ async def create_message(_, info, chat: str, body: str, replyTo=None): chat = await redis.execute("GET", f"chats/{chat}") if not chat: return { - "error": "chat not exist" + "error": "chat is not exist" } else: chat = dict(json.loads(chat)) @@ -153,6 +154,7 @@ async def message_generator(obj, info): chat = await redis.execute("GET", f"chats/{chat_id}") updated[chat_id] = chat['updatedAt'] user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True) + for chat_id in user_following_chats_sorted: following_chat = ChatFollowing(chat_id) await MessagesStorage.register_chat(following_chat) diff --git a/schema.graphql b/schema.graphql index 4c8261af..647c9774 100644 --- a/schema.graphql +++ b/schema.graphql @@ -29,6 +29,7 @@ type ChatMember { name: String! userpic: String lastSeen: DateTime + online: Boolean # invitedAt: DateTime # invitedBy: String # user slug # TODO: keep invite databit diff --git a/services/inbox/helpers.py b/services/inbox/helpers.py new file mode 100644 index 00000000..56223160 --- /dev/null +++ b/services/inbox/helpers.py @@ -0,0 +1,14 @@ +import asyncio + + +class MessageResult: + def __init__(self, status, message): + self.status = status + self.message = message + + +class ChatFollowing: + queue = asyncio.Queue() + + def __init__(self, chat_id): + self.chat_id = chat_id diff --git a/services/inbox/presence.py b/services/inbox/presence.py new file mode 100644 index 00000000..b6d710ea --- /dev/null +++ b/services/inbox/presence.py @@ -0,0 +1,43 @@ +from base.exceptions import Unauthorized +from auth.tokenstorage import SessionToken +from base.redis import redis + + +async def set_online_status(user_id, status): + if user_id: + if status: + await redis.execute("SADD", "users-online", user_id) + else: + await redis.execute("SREM", "users-online", user_id) + + +async def on_connect(websocket, params): + if not isinstance(params, dict): + websocket.scope["connection_params"] = {} + return + token = params.get('token') + if not token: + raise Unauthorized("Please login") + else: + payload = await SessionToken.verify(token) + if payload and payload.user_id: + websocket.scope["user_id"] = payload.user_id + await set_online_status(payload.user_id, True) + + +async def on_disconnect(websocket): + user_id = websocket.scope.get("user_id") + await set_online_status(user_id, False) + + +# FIXME: not used yet +def context_value(request): + context = {} + print(f"[inbox.presense] request debug: {request}") + if request.scope["type"] == "websocket": + # request is an instance of WebSocket + context.update(request.scope["connection_params"]) + else: + context["token"] = request.META.get("authorization") + + return context diff --git a/services/inbox.py b/services/inbox/storage.py similarity index 72% rename from services/inbox.py rename to services/inbox/storage.py index d8222fa3..dd6e5fcf 100644 --- a/services/inbox.py +++ b/services/inbox/storage.py @@ -1,13 +1,6 @@ import asyncio -class ChatFollowing: - queue = asyncio.Queue() - - def __init__(self, chat_id): - self.chat_id = chat_id - - class MessagesStorage: lock = asyncio.Lock() chats = [] @@ -28,9 +21,3 @@ class MessagesStorage: for chat in MessagesStorage.chats: if message_result.message["chatId"] == chat.chat_id: chat.queue.put_nowait(message_result) - - -class MessageResult: - def __init__(self, status, message): - self.status = status - self.message = message diff --git a/settings.py b/settings.py index 196bc822..c712ddd1 100644 --- a/settings.py +++ b/settings.py @@ -26,6 +26,7 @@ FRONTEND_URL = environ.get("FRONTEND_URL") or "http://localhost:3000" SHOUTS_REPO = "content" SESSION_TOKEN_HEADER = "Authorization" +SENTRY_DSN = environ.get("SENTRY_DSN") + # for local development DEV_SERVER_STATUS_FILE_NAME = 'dev-server-status.txt' -SENTRY_ID = environ.get("SENTRY_ID")