From 34580f267f946725d21a97c0d310e2ff72a36892 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Wed, 2 Nov 2022 12:23:14 +0300 Subject: [PATCH] inbox --- base/exceptions.py | 2 ++ resolvers/inbox.py | 71 +++++++++++----------------------------------- schema.graphql | 13 +++------ services/inbox.py | 36 +++++++++++++++++++++++ 4 files changed, 58 insertions(+), 64 deletions(-) create mode 100644 services/inbox.py diff --git a/base/exceptions.py b/base/exceptions.py index cdfaf893..1f3344e7 100644 --- a/base/exceptions.py +++ b/base/exceptions.py @@ -1,6 +1,8 @@ from graphql.error import GraphQLError +# TODO: remove traceback from logs for defined exceptions + class BaseHttpException(GraphQLError): code = 500 message = "500 Server error" diff --git a/resolvers/inbox.py b/resolvers/inbox.py index 164b7793..51f727a9 100644 --- a/resolvers/inbox.py +++ b/resolvers/inbox.py @@ -1,4 +1,3 @@ -import asyncio import json import uuid from datetime import datetime @@ -6,41 +5,7 @@ from datetime import datetime from auth.authenticate import login_required from base.redis import redis from base.resolvers import mutation, query, subscription - - -class ChatFollowing: - queue = asyncio.Queue() - - def __init__(self, chat_id): - self.chat_id = chat_id - - -class MessagesStorage: - lock = asyncio.Lock() - chats = [] - - @staticmethod - async def register_chat(chat): - async with MessagesStorage.lock: - MessagesStorage.chats.append(chat) - - @staticmethod - async def remove_chat(chat): - async with MessagesStorage.lock: - MessagesStorage.chats.remove(chat) - - @staticmethod - async def put(message_result): - async with MessagesStorage.lock: - for chat in MessagesStorage.chats: - if message_result.message["chatId"] == chat.chat_id: - chat.queue.put_nowait(message_result) - - -class MessageResult: - def __init__(self, status, message): - self.status = status - self.message = message +from services.inbox import MessageResult, MessagesStorage, ChatFollowing async def get_unread_counter(user_slug): @@ -57,21 +22,18 @@ async def get_unread_counter(user_slug): return unread -async def add_user_to_chat(user_slug, chat_id, chat=None): +async def add_user_to_chat(user_slug: str, chat_id: int, chat=None): chats = await redis.execute("GET", f"chats_by_user/{user_slug}") - if not chats: - chats = set() + if chats: + chats = list(json.loads(chats)) else: - chats = set(json.loads(chats)) - chats.add(str(chat_id)) - chats = list(chats) + chats = [] + if chat_id not in chats: + chats.append(chat_id) await redis.execute("SET", f"chats_by_user/{user_slug}", json.dumps(chats)) - - if chat: - users = set(chat["users"]) - users.add(user_slug) - chat["users"] = list(users) - await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) + if user_slug not in chat["users"]: + chat["users"].append(user_slug) + await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) @mutation.field("inviteChat") @@ -84,14 +46,15 @@ async def invite_to_chat(_, info, invited, chat_id): @mutation.field("createChat") @login_required -async def create_chat(_, info, description, title=""): +async def create_chat(_, info, description="", title=""): user = info.context["request"].user chat_id = uuid.uuid4() chat = { "title": title, "description": description, - "createdAt": str(datetime.now), + "createdAt": str(datetime.now().timestamp()), + "updatedAt": str(datetime.now().timestamp()), "createdBy": user.slug, "id": str(chat_id), "users": [user.slug], @@ -99,20 +62,19 @@ async def create_chat(_, info, description, title=""): await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", 0) - await add_user_to_chat(user.slug, chat_id) - return {"chatId": chat_id} + return chat -async def load_messages(chatId, size, page): +async def load_messages(chatId: int, size: int, page: int): message_ids = await redis.lrange( f"chats/{chatId}/message_ids", size * (page - 1), size * page - 1 ) messages = [] if message_ids: message_keys = [ - f"chats/{chatId}/messages/{id.decode('UTF-8')}" for id in message_ids + f"chats/{chatId}/messages/{mid}" for mid in message_ids ] messages = await redis.mget(*message_keys) messages = [json.loads(msg) for msg in messages] @@ -123,7 +85,6 @@ async def load_messages(chatId, size, page): @login_required async def user_chats(_, info): user = info.context["request"].user - chats = await redis.execute("GET", f"chats_by_user/{user.slug}") if not chats: chats = list() diff --git a/schema.graphql b/schema.graphql index 06296c03..6e561b2b 100644 --- a/schema.graphql +++ b/schema.graphql @@ -19,11 +19,6 @@ type ChatUpdatedResult { message: Message } -type CreateChatResult { - chatId: String - error: String -} - type EnterChatResult { chat: Chat messages: [Message] @@ -145,7 +140,7 @@ enum FollowingEntity { type Mutation { # inbox - createChat(description: String): CreateChatResult! + createChat(description: String): Chat! inviteChat(chatId: String!, userslug: String!): Result! createMessage(chatId: String!, body: String!, replyTo: Int): MessageResult! updateMessage(chatId: String!, id: Int!, body: String!): MessageResult! @@ -488,13 +483,13 @@ type Token { type Message { author: String! - chatRoom: Int! + chatId: Int! body: String! createdAt: DateTime! id: Int! replyTo: Int updatedAt: DateTime! - visibleForUsers: [Int]! + visibleForUsers: [Int] } type Chat { @@ -505,5 +500,5 @@ type Chat { title: String description: String users: [User]! - messages: [Message] + messages: [Message]! } diff --git a/services/inbox.py b/services/inbox.py new file mode 100644 index 00000000..d8222fa3 --- /dev/null +++ b/services/inbox.py @@ -0,0 +1,36 @@ +import asyncio + + +class ChatFollowing: + queue = asyncio.Queue() + + def __init__(self, chat_id): + self.chat_id = chat_id + + +class MessagesStorage: + lock = asyncio.Lock() + chats = [] + + @staticmethod + async def register_chat(chat): + async with MessagesStorage.lock: + MessagesStorage.chats.append(chat) + + @staticmethod + async def remove_chat(chat): + async with MessagesStorage.lock: + MessagesStorage.chats.remove(chat) + + @staticmethod + async def put(message_result): + async with MessagesStorage.lock: + for chat in MessagesStorage.chats: + if message_result.message["chatId"] == chat.chat_id: + chat.queue.put_nowait(message_result) + + +class MessageResult: + def __init__(self, status, message): + self.status = status + self.message = message