notify-message-chat
All checks were successful
deploy / deploy (push) Successful in 1m17s

This commit is contained in:
Untone 2023-11-16 17:52:39 +03:00
parent 1b94353850
commit 871f7e1d69
3 changed files with 50 additions and 27 deletions

View File

@ -6,7 +6,7 @@ from services.auth import login_required
from services.rediscache import redis from services.rediscache import redis
from services.schema import mutation from services.schema import mutation
from validators.chat import Chat, ChatUpdate from validators.chat import Chat, ChatUpdate
from services.presence import notify_chat
@mutation.field("updateChat") @mutation.field("updateChat")
@login_required @login_required
@ -31,12 +31,15 @@ async def update_chat(_, info, chat_new: ChatUpdate):
{ {
"title": chat_new.get("title", chat["title"]), "title": chat_new.get("title", chat["title"]),
"description": chat_new.get("description", chat["description"]), "description": chat_new.get("description", chat["description"]),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), "updated_at": int(datetime.now(tz=timezone.utc).timestamp()),
"admins": chat_new.get("admins", chat.get("admins") or []), "admins": chat_new.get("admins", chat.get("admins") or []),
"members": chat_new.get("members", chat["members"]), "members": chat_new.get("members", chat["members"]),
} }
) )
await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat))
for member_id in chat["members"]:
await notify_chat(chat, member_id, "update")
return {"error": None, "chat": chat} return {"error": None, "chat": chat}
@ -70,14 +73,15 @@ async def create_chat(_, info, title="", members=None):
"members": members, "members": members,
"title": title, "title": title,
"description": "", "description": "",
"createdBy": author_id, "created_by": author_id,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()), "created_at": int(datetime.now(tz=timezone.utc).timestamp()),
"updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), "updated_at": int(datetime.now(tz=timezone.utc).timestamp()),
"admins": members if (len(members) == 2 and title == "") else [], "admins": members if (len(members) == 2 and title == "") else [],
} }
for member_id in members: for member_id in members:
await redis.execute("SADD", f"chats_by_author/{member_id}", chat_id) await redis.execute("SADD", f"chats_by_author/{member_id}", chat_id)
await notify_chat(chat, member_id, "create")
print(f"\n\n[resolvers.chats] creating: {chat}\n\n") print(f"\n\n[resolvers.chats] creating: {chat}\n\n")
@ -98,5 +102,7 @@ async def delete_chat(_, info, chat_id: str):
if author_id in chat["admins"]: if author_id in chat["admins"]:
await redis.execute("DEL", f"chats/{chat_id}") await redis.execute("DEL", f"chats/{chat_id}")
await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id) await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id)
for member_id in chat["members"]:
await notify_chat(chat, member_id, "delete")
else: else:
return {"error": "chat not exist"} return {"error": "chat not exist"}

View File

@ -30,40 +30,40 @@ async def create_message(_, info, chat: str, body: str, reply_to=None):
# Получение ID следующего сообщения # Получение ID следующего сообщения
message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id") message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id")
message_id = int(message_id) if message_id else 0 message_id = int(message_id) if message_id else 0
chat_id = chat_dict['id']
# Создание нового сообщения # Создание нового сообщения
new_message: Message = { new_message: Message = {
"chat": chat_dict["id"], "chat": chat_id,
"id": message_id, "id": message_id,
"author": author_id, "author": author_id,
"body": body, "body": body,
"createdAt": int(datetime.now(tz=timezone.utc).timestamp()), "created_at": int(datetime.now(tz=timezone.utc).timestamp()),
"updatedAt": None, "updated_at": None,
} }
# Если есть ответ, добавляем его в сообщение # Если есть ответ, добавляем его в сообщение
if reply_to: if reply_to:
new_message["replyTo"] = reply_to new_message["reply_to"] = reply_to
# Обновление времени последнего обновления чата # Обновление времени последнего обновления чата
chat_dict["updatedAt"] = new_message["createdAt"] chat_dict["updated_at"] = new_message["created_at"]
# Запись обновленных данных чата обратно в Redis # Запись обновленных данных чата обратно в Redis
await redis.execute("SET", f"chats/{chat_dict['id']}", json.dumps(chat_dict)) await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict))
print(f"[inbox] creating message {new_message}") print(f"[inbox] creating message {new_message}")
# Запись нового сообщения в Redis # Запись нового сообщения в Redis
await redis.execute( await redis.execute(
"SET", "SET",
f"chats/{chat_dict['id']}/messages/{message_id}", f"chats/{chat_id}/messages/{message_id}",
json.dumps(new_message), json.dumps(new_message),
) )
# Добавление ID нового сообщения в список ID сообщений чата # Добавление ID нового сообщения в список ID сообщений чата
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id)) await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id))
# Обновление ID следующего сообщения # Обновление ID следующего сообщения
await redis.execute("SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1))
# Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата # Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата
members = chat_dict["members"] members = chat_dict["members"]
@ -71,7 +71,8 @@ async def create_message(_, info, chat: str, body: str, reply_to=None):
await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)) await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id))
# Отправка уведомления о новом сообщении # Отправка уведомления о новом сообщении
await notify_message(new_message, chat_dict["id"], "create") new_message["chat"] = chat_id
await notify_message(new_message, "create")
return {"message": new_message, "error": None} return {"message": new_message, "error": None}
@ -94,12 +95,14 @@ async def update_message(_, info, chat_id: str, message_id: int, body: str):
return {"error": "access denied"} return {"error": "access denied"}
message["body"] = body message["body"] = body
message["updatedAt"] = int(datetime.now(tz=timezone.utc).timestamp()) message["updated_at"] = int(datetime.now(tz=timezone.utc).timestamp())
await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
# TODO: use presence service to notify about updated message
await notify_message(message, chat_id, "update") # Отправка уведомления
message["chat"] = chat_id
await notify_message(message, "update")
return {"message": message, "error": None} return {"message": message, "error": None}
@ -128,8 +131,8 @@ async def delete_message(_, info, chat_id: str, message_id: int):
for member_id in members: for member_id in members:
await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)) await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id))
# TODO: use presence service to notify about deleted message message["chat"] = chat_id
await notify_message(message, chat_id, "delete") await notify_message(message, "delete")
return {} return {}
@ -151,7 +154,11 @@ async def mark_as_read(_, info, chat_id: str, messages: List[int]):
for message_id in messages: for message_id in messages:
await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)) await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id))
# TODO: seen per author message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
# await notify_message(message, chat_id, "seen") if not message_data:
return {"error": "message not exist"}
message: Message = json.loads(message_data)
await notify_message(message, "seen")
return {"error": None} return {"error": None}

View File

@ -1,14 +1,24 @@
import json import json
from services.rediscache import redis from services.rediscache import redis
from validators.chat import Message from validators.chat import Message, ChatUpdate
async def notify_message(message: Message, chat_id: str, action="create"): async def notify_message(message: Message, action="create"):
channel_name = f"chat:{chat_id}" channel_name = f"message:{message["chat"]}"
data = {"payload": message, "action": action} data = {"payload": message, "action": action}
try: try:
await redis.publish(channel_name, json.dumps(data)) await redis.publish(channel_name, json.dumps(data))
print(f"[services.presence] ok {data}") print(f"[services.presence] ok {data}")
except Exception as e: except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}") print(f"Failed to publish to channel {channel_name}: {e}")
async def notify_chat(chat: ChatUpdate, member_id, action="create"):
channel_name = f"chat:{member_id}"
data = {"payload": chat, "action": action}
try:
await redis.publish(channel_name, json.dumps(data))
print(f"[services.presence] ok {data}")
except Exception as e:
print(f"Failed to publish to channel {channel_name}: {e}")