From 871f7e1d69df4bd037c64adff9bbc72bb9ddb8c7 Mon Sep 17 00:00:00 2001 From: Untone Date: Thu, 16 Nov 2023 17:52:39 +0300 Subject: [PATCH] notify-message-chat --- resolvers/chats.py | 18 ++++++++++++------ resolvers/messages.py | 43 +++++++++++++++++++++++++------------------ services/presence.py | 16 +++++++++++++--- 3 files changed, 50 insertions(+), 27 deletions(-) diff --git a/resolvers/chats.py b/resolvers/chats.py index a67d4d5..529006c 100644 --- a/resolvers/chats.py +++ b/resolvers/chats.py @@ -6,7 +6,7 @@ from services.auth import login_required from services.rediscache import redis from services.schema import mutation from validators.chat import Chat, ChatUpdate - +from services.presence import notify_chat @mutation.field("updateChat") @login_required @@ -31,12 +31,15 @@ async def update_chat(_, info, chat_new: ChatUpdate): { "title": chat_new.get("title", chat["title"]), "description": chat_new.get("description", chat["description"]), - "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), + "updated_at": int(datetime.now(tz=timezone.utc).timestamp()), "admins": chat_new.get("admins", chat.get("admins") or []), "members": chat_new.get("members", chat["members"]), } ) - await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) + + await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) + for member_id in chat["members"]: + await notify_chat(chat, member_id, "update") return {"error": None, "chat": chat} @@ -70,14 +73,15 @@ async def create_chat(_, info, title="", members=None): "members": members, "title": title, "description": "", - "createdBy": author_id, - "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), - "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), + "created_by": author_id, + "created_at": int(datetime.now(tz=timezone.utc).timestamp()), + "updated_at": int(datetime.now(tz=timezone.utc).timestamp()), "admins": members if (len(members) == 2 and title == "") else [], } for member_id in members: await redis.execute("SADD", f"chats_by_author/{member_id}", chat_id) + await notify_chat(chat, member_id, "create") print(f"\n\n[resolvers.chats] creating: {chat}\n\n") @@ -98,5 +102,7 @@ async def delete_chat(_, info, chat_id: str): if author_id in chat["admins"]: await redis.execute("DEL", f"chats/{chat_id}") await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id) + for member_id in chat["members"]: + await notify_chat(chat, member_id, "delete") else: return {"error": "chat not exist"} diff --git a/resolvers/messages.py b/resolvers/messages.py index 8f554d8..cc2d4b3 100644 --- a/resolvers/messages.py +++ b/resolvers/messages.py @@ -30,40 +30,40 @@ async def create_message(_, info, chat: str, body: str, reply_to=None): # Получение ID следующего сообщения message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id") message_id = int(message_id) if message_id else 0 - + chat_id = chat_dict['id'] # Создание нового сообщения new_message: Message = { - "chat": chat_dict["id"], + "chat": chat_id, "id": message_id, "author": author_id, "body": body, - "createdAt": int(datetime.now(tz=timezone.utc).timestamp()), - "updatedAt": None, + "created_at": int(datetime.now(tz=timezone.utc).timestamp()), + "updated_at": None, } # Если есть ответ, добавляем его в сообщение if reply_to: - new_message["replyTo"] = reply_to + new_message["reply_to"] = reply_to # Обновление времени последнего обновления чата - chat_dict["updatedAt"] = new_message["createdAt"] + chat_dict["updated_at"] = new_message["created_at"] # Запись обновленных данных чата обратно в Redis - await redis.execute("SET", f"chats/{chat_dict['id']}", json.dumps(chat_dict)) + await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict)) print(f"[inbox] creating message {new_message}") # Запись нового сообщения в Redis await redis.execute( "SET", - f"chats/{chat_dict['id']}/messages/{message_id}", + f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message), ) # Добавление ID нового сообщения в список ID сообщений чата - await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id)) + await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id)) # Обновление ID следующего сообщения - await redis.execute("SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1)) + await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)) # Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата members = chat_dict["members"] @@ -71,7 +71,8 @@ async def create_message(_, info, chat: str, body: str, reply_to=None): await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)) # Отправка уведомления о новом сообщении - await notify_message(new_message, chat_dict["id"], "create") + new_message["chat"] = chat_id + await notify_message(new_message, "create") return {"message": new_message, "error": None} @@ -94,12 +95,14 @@ async def update_message(_, info, chat_id: str, message_id: int, body: str): return {"error": "access denied"} message["body"] = body - message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp()) + message["updated_at"] = int(datetime.now(tz=timezone.utc).timestamp()) await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) - # TODO: use presence service to notify about updated message - await notify_message(message, chat_id, "update") + + # Отправка уведомления + message["chat"] = chat_id + await notify_message(message, "update") return {"message": message, "error": None} @@ -128,8 +131,8 @@ async def delete_message(_, info, chat_id: str, message_id: int): for member_id in members: await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)) - # TODO: use presence service to notify about deleted message - await notify_message(message, chat_id, "delete") + message["chat"] = chat_id + await notify_message(message, "delete") return {} @@ -151,7 +154,11 @@ async def mark_as_read(_, info, chat_id: str, messages: List[int]): for message_id in messages: await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)) - # TODO: seen per author - # await notify_message(message, chat_id, "seen") + message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") + if not message_data: + return {"error": "message not exist"} + message: Message = json.loads(message_data) + + await notify_message(message, "seen") return {"error": None} diff --git a/services/presence.py b/services/presence.py index 3bbef50..416ff7f 100644 --- a/services/presence.py +++ b/services/presence.py @@ -1,14 +1,24 @@ import json from services.rediscache import redis -from validators.chat import Message +from validators.chat import Message, ChatUpdate -async def notify_message(message: Message, chat_id: str, action="create"): - channel_name = f"chat:{chat_id}" +async def notify_message(message: Message, action="create"): + channel_name = f"message:{message["chat"]}" data = {"payload": message, "action": action} try: await redis.publish(channel_name, json.dumps(data)) print(f"[services.presence] ok {data}") except Exception as e: print(f"Failed to publish to channel {channel_name}: {e}") + + +async def notify_chat(chat: ChatUpdate, member_id, action="create"): + channel_name = f"chat:{member_id}" + data = {"payload": chat, "action": action} + try: + await redis.publish(channel_name, json.dumps(data)) + print(f"[services.presence] ok {data}") + except Exception as e: + print(f"Failed to publish to channel {channel_name}: {e}")