From 79f3be67e055f7691379fceac8041449c671554a Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Wed, 2 Nov 2022 16:36:10 +0300 Subject: [PATCH] inbox reworked --- resolvers/inbox.py | 218 ++++++++++++++++++++++++++------------------- schema.graphql | 28 +++--- 2 files changed, 144 insertions(+), 102 deletions(-) diff --git a/resolvers/inbox.py b/resolvers/inbox.py index 45f02c6f..7ad62f4e 100644 --- a/resolvers/inbox.py +++ b/resolvers/inbox.py @@ -1,5 +1,6 @@ import json import uuid +import asyncio from datetime import datetime from auth.authenticate import login_required @@ -8,14 +9,14 @@ from base.resolvers import mutation, query, subscription from services.inbox import MessageResult, MessagesStorage, ChatFollowing -async def get_unread_counter(chat_id, user_slug): +async def get_unread_counter(chat_id: str, user_slug: str): try: return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}")) except Exception: return 0 -async def get_total_unread_counter(user_slug): +async def get_total_unread_counter(user_slug: str): chats = await redis.execute("GET", f"chats_by_user/{user_slug}") if not chats: return 0 @@ -29,51 +30,50 @@ async def get_total_unread_counter(user_slug): return unread -async def add_user_to_chat(user_slug: str, chat_id: int, chat=None): - chats = await redis.execute("GET", f"chats_by_user/{user_slug}") - if chats: - chats = list(json.loads(chats)) +async def add_user_to_chat(user_slug: str, chat): + chats_ids = await redis.execute("GET", f"chats_by_user/{user_slug}") + if chats_ids: + chats_ids = list(json.loads(chats_ids)) else: - chats = [] - if chat_id not in chats: - chats.append(chat_id) - await redis.execute("SET", f"chats_by_user/{user_slug}", json.dumps(chats)) + chats_ids = [] + if chat.id not in chats_ids: + chats_ids.append(chat.id) + await redis.execute("SET", f"chats_by_user/{user_slug}", json.dumps(chats_ids)) if user_slug not in chat["users"]: chat["users"].append(user_slug) - await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) + chat["updatedAt"] = datetime.now().timestamp() + await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat)) + return chat @mutation.field("inviteChat") -async def invite_to_chat(_, info, invited, chat_id): +async def invite_to_chat(_, info, invited: str, chat_id: str): user = info.context["request"].user chat = await redis.execute("GET", f"chats/{chat_id}") - if user.slug in chat['users']: - add_user_to_chat(invited, chat_id, chat) - return { - "error": None, - "chat": chat - } - - -@mutation.field("createChat") -@login_required -async def create_chat(_, info, description="", title=""): - user = info.context["request"].user - - chat_id = uuid.uuid4() - chat = { - "title": title, - "description": description, - "createdAt": str(datetime.now().timestamp()), - "updatedAt": str(datetime.now().timestamp()), - "createdBy": user.slug, - "id": str(chat_id), - "users": [user.slug], + if user.slug not in chat['users']: + # TODO: check right to invite here + chat = await add_user_to_chat(invited, chat_id, chat) + return { + "error": None, + "chat": chat } - await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) - await redis.execute("SET", f"chats/{chat_id}/next_message_id", 0) - await add_user_to_chat(user.slug, chat_id) + +@mutation.field("updateChat") +@login_required +async def update_chat(_, info, chat_new): + user = info.context["request"].user + + chat = await redis.execute("GET", f"chats/{chat_new.id}") + chat.update({ + "title": chat_new.title, + "description": chat_new.description, + "updatedAt": datetime.now().timestamp(), + }) + + await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat)) + await redis.execute("SET", f"chats/{chat.id}/next_message_id", 0) + chat = await add_user_to_chat(user.slug, chat.id) return { "error": None, @@ -81,10 +81,36 @@ async def create_chat(_, info, description="", title=""): } -async def load_messages(chatId: int, size: int, page: int): +@mutation.field("createChat") +@login_required +async def create_chat(_, info, title="", members=[]): + user = info.context["request"].user + if user.slug not in members: + members.append(user.slug) + chat_id = str(uuid.uuid4()) + chat = { + "title": title, + "createdAt": datetime.now().timestamp(), + "updatedAt": datetime.now().timestamp(), + "createdBy": user.slug, + "id": chat_id, + "users": members, + } + + await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) + await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0)) + chat = await add_user_to_chat(user.slug, chat_id) + + return { + "error": None, + "chat": chat + } + + +async def load_messages(chatId: int, offset: int, amount: int): messages = [] message_ids = await redis.lrange( - f"chats/{chatId}/message_ids", size * (page - 1), size * page - 1 + f"chats/{chatId}/message_ids", 0 - offset - amount, 0 - offset ) if message_ids: message_keys = [ @@ -98,17 +124,18 @@ async def load_messages(chatId: int, size: int, page: int): } +async def get_chats_by_user(slug: str): + chats = await redis.execute("GET", f"chats_by_user/{slug}") + return chats or [] + + @query.field("myChats") @login_required async def user_chats(_, info): user = info.context["request"].user - chats = await redis.execute("GET", f"chats_by_user/{user.slug}") - if not chats: - chats = [] - else: - chats = json.loads(chats) + chats = await get_chats_by_user(user.slug) for c in chats: - c['messages'] = await load_messages(c['id'], 50, 1) + c['messages'] = await load_messages(c['id']) c['unread'] = await get_unread_counter(c['id'], user.slug) return { "chats": chats, @@ -116,20 +143,19 @@ async def user_chats(_, info): } -@query.field("enterChat") +@mutation.field("enterChat") @login_required -async def enter_chat(_, info, chatId): +async def enter_chat(_, info, chat_id: str): user = info.context["request"].user - - chat = await redis.execute("GET", f"chats/{chatId}") + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return { "error": "chat not exist" } else: chat = json.loads(chat) - await add_user_to_chat(user.slug, chatId, chat) - chat['messages'] = await load_messages(chatId, 50, 1) + chat = await add_user_to_chat(user.slug, chat_id, chat) + chat['messages'] = await load_messages(chat_id) return { "chat": chat, "error": None @@ -138,37 +164,38 @@ async def enter_chat(_, info, chatId): @mutation.field("createMessage") @login_required -async def create_message(_, info, chatId, body, replyTo=None): +async def create_message(_, info, chat_id: str, body: str, replyTo=None): user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chatId}") + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return { - "error": "chat not exist"} + "error": "chat not exist" + } - message_id = await redis.execute("GET", f"chats/{chatId}/next_message_id") + message_id = await redis.execute("GET", f"chats/{chat_id}/next_message_id") message_id = int(message_id) new_message = { - "chatId": chatId, + "chatId": chat_id, "id": message_id, "author": user.slug, "body": body, "replyTo": replyTo, - "createdAt": datetime.now().isoformat(), + "createdAt": datetime.now().timestamp(), } await redis.execute( - "SET", f"chats/{chatId}/messages/{message_id}", json.dumps(new_message) + "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message) ) - await redis.execute("LPUSH", f"chats/{chatId}/message_ids", str(message_id)) - await redis.execute("SET", f"chats/{chatId}/next_message_id", str(message_id + 1)) + await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id)) + await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)) chat = json.loads(chat) users = chat["users"] for user_slug in users: await redis.execute( - "LPUSH", f"chats/{chatId}/unread/{user_slug}", str(message_id) + "LPUSH", f"chats/{chat_id}/unread/{user_slug}", str(message_id) ) result = MessageResult("NEW", new_message) @@ -182,12 +209,14 @@ async def create_message(_, info, chatId, body, replyTo=None): @query.field("loadChat") @login_required -async def get_messages(_, info, chatId, size, page): - chat = await redis.execute("GET", f"chats/{chatId}") +async def load_chat_messages(_, info, chat_id: str, offset: int = 0, amount: int = 50): + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: - return {"error": "chat not exist"} + return { + "error": "chat not exist" + } - messages = await load_messages(chatId, size, page) + messages = await load_messages(chat_id, offset, amount) return { "messages": messages, @@ -197,14 +226,14 @@ async def get_messages(_, info, chatId, size, page): @mutation.field("updateMessage") @login_required -async def update_message(_, info, chatId, id, body): +async def update_message(_, info, chat_id: str, message_id: int, body: str): user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chatId}") + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} - message = await redis.execute("GET", f"chats/{chatId}/messages/{id}") + message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") if not message: return {"error": "message not exist"} @@ -213,9 +242,9 @@ async def update_message(_, info, chatId, id, body): return {"error": "access denied"} message["body"] = body - message["updatedAt"] = datetime.now().isoformat() + message["updatedAt"] = datetime.now().timestamp() - await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message)) + await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) result = MessageResult("UPDATED", message) await MessagesStorage.put(result) @@ -228,27 +257,27 @@ async def update_message(_, info, chatId, id, body): @mutation.field("deleteMessage") @login_required -async def delete_message(_, info, chatId, id): +async def delete_message(_, info, chat_id: str, message_id: int): user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chatId}") + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} + chat = json.loads(chat) - message = await redis.execute("GET", f"chats/{chatId}/messages/{id}") + message = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") if not message: return {"error": "message not exist"} message = json.loads(message) if message["author"] != user.slug: return {"error": "access denied"} - await redis.execute("LREM", f"chats/{chatId}/message_ids", 0, str(id)) - await redis.execute("DEL", f"chats/{chatId}/messages/{id}") + await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)) + await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") - chat = json.loads(chat) users = chat["users"] for user_slug in users: - await redis.execute("LREM", f"chats/{chatId}/unread/{user_slug}", 0, str(id)) + await redis.execute("LREM", f"chats/{chat_id}/unread/{user_slug}", 0, str(message_id)) result = MessageResult("DELETED", message) await MessagesStorage.put(result) @@ -258,10 +287,10 @@ async def delete_message(_, info, chatId, id): @mutation.field("markAsRead") @login_required -async def mark_as_read(_, info, chatId, ids): +async def mark_as_read(_, info, chat_id: str, messages: [int]): user = info.context["request"].user - chat = await redis.execute("GET", f"chats/{chatId}") + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} @@ -270,27 +299,34 @@ async def mark_as_read(_, info, chatId, ids): if user.slug not in users: return {"error": "access denied"} - for id in ids: - await redis.execute("LREM", f"chats/{chatId}/unread/{user.slug}", 0, str(id)) + for message_id in messages: + await redis.execute("LREM", f"chats/{chat_id}/unread/{user.slug}", 0, str(message_id)) return { "error": None } -@subscription.source("chatUpdated") +@subscription.source("newMessage") @login_required -async def message_generator(obj, info, chatId): +async def message_generator(obj, info): try: - following_chat = ChatFollowing(chatId) - await MessagesStorage.register_chat(following_chat) + user = info.context["request"].user + user_following_chats = await get_chats_by_user(user.slug) # chat ids + tasks = [] + updated = {} + for chat_id in user_following_chats: + chat = await redis.execute("GET", f"chats/{chat_id}") + updated[chat_id] = chat['updatedAt'] + user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True) + for chat_id in user_following_chats_sorted: + following_chat = ChatFollowing(chat_id) + await MessagesStorage.register_chat(following_chat) + chat_task = following_chat.queue.get() + tasks.append(chat_task) + while True: - msg = await following_chat.queue.get() + msg = await asyncio.gather(*tasks) yield msg finally: await MessagesStorage.remove_chat(following_chat) - - -@subscription.field("chatUpdated") -def message_resolver(message, info, chatId): - return message diff --git a/schema.graphql b/schema.graphql index e727f422..dce30611 100644 --- a/schema.graphql +++ b/schema.graphql @@ -77,7 +77,7 @@ input ShoutInput { subtitle: String versionOf: String visibleForRoles: [String] # role ids are strings - visibleForUsers: [Int] + visibleForUsers: [String] } input ProfileInput { @@ -117,6 +117,12 @@ input ReactionInput { replyTo: Int } +input ChatInput { + id: String! + title: String + description: String +} + enum FollowingEntity { TOPIC AUTHOR @@ -128,8 +134,10 @@ enum FollowingEntity { type Mutation { # inbox - createChat(description: String): Chat! + createChat(title: String, members: [String]!): Result! + updateChat(chat: ChatInput!): Result! inviteChat(chatId: String!, userslug: String!): Result! + enterChat(chatId: String!): Result! createMessage(chatId: String!, body: String!, replyTo: Int): Result! updateMessage(chatId: String!, id: Int!, body: String!): Result! deleteMessage(chatId: String!, id: Int!): Result! @@ -190,8 +198,7 @@ type Mutation { type Query { # inbox myChats: [Chat]! - enterChat(chatId: String!): Chat! - loadChat(chatId: String!, size: Int!, page: Int!): [Message]! + loadChat(chatId: String!, offset: Int, amount: Int): [Message]! # auth isEmailUsed(email: String!): Boolean! @@ -258,7 +265,7 @@ type Query { ############################################ Subscription type Subscription { - chatUpdated(chatId: String!): Result! + newMessage(chats: [Int!]): Message! onlineUpdated: [User!]! shoutUpdated: Shout! userUpdated: User! @@ -471,20 +478,19 @@ type Token { type Message { author: String! - chatId: Int! + chatId: String! body: String! - createdAt: DateTime! + createdAt: Int! id: Int! replyTo: Int - updatedAt: DateTime! - visibleForUsers: [Int] + updatedAt: Int } type Chat { id: Int! - createdAt: DateTime! + createdAt: Int! createdBy: User! - updatedAt: DateTime! + updatedAt: Int! title: String description: String users: [User]!