inbox reworked

This commit is contained in:
tonyrewin 2022-11-02 16:36:10 +03:00
parent 32c9d134e5
commit 79f3be67e0
2 changed files with 144 additions and 102 deletions

View File

@ -1,5 +1,6 @@
import json import json
import uuid import uuid
import asyncio
from datetime import datetime from datetime import datetime
from auth.authenticate import login_required from auth.authenticate import login_required
@ -8,14 +9,14 @@ from base.resolvers import mutation, query, subscription
from services.inbox import MessageResult, MessagesStorage, ChatFollowing from services.inbox import MessageResult, MessagesStorage, ChatFollowing
async def get_unread_counter(chat_id, user_slug): async def get_unread_counter(chat_id: str, user_slug: str):
try: try:
return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}")) return int(await redis.execute("LLEN", f"chats/{chat_id}/unread/{user_slug}"))
except Exception: except Exception:
return 0 return 0
async def get_total_unread_counter(user_slug): async def get_total_unread_counter(user_slug: str):
chats = await redis.execute("GET", f"chats_by_user/{user_slug}") chats = await redis.execute("GET", f"chats_by_user/{user_slug}")
if not chats: if not chats:
return 0 return 0
@ -29,26 +30,51 @@ async def get_total_unread_counter(user_slug):
return unread return unread
async def add_user_to_chat(user_slug: str, chat_id: int, chat=None): async def add_user_to_chat(user_slug: str, chat):
chats = await redis.execute("GET", f"chats_by_user/{user_slug}") chats_ids = await redis.execute("GET", f"chats_by_user/{user_slug}")
if chats: if chats_ids:
chats = list(json.loads(chats)) chats_ids = list(json.loads(chats_ids))
else: else:
chats = [] chats_ids = []
if chat_id not in chats: if chat.id not in chats_ids:
chats.append(chat_id) chats_ids.append(chat.id)
await redis.execute("SET", f"chats_by_user/{user_slug}", json.dumps(chats)) await redis.execute("SET", f"chats_by_user/{user_slug}", json.dumps(chats_ids))
if user_slug not in chat["users"]: if user_slug not in chat["users"]:
chat["users"].append(user_slug) chat["users"].append(user_slug)
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) chat["updatedAt"] = datetime.now().timestamp()
await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat))
return chat
@mutation.field("inviteChat") @mutation.field("inviteChat")
async def invite_to_chat(_, info, invited, chat_id): async def invite_to_chat(_, info, invited: str, chat_id: str):
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}") chat = await redis.execute("GET", f"chats/{chat_id}")
if user.slug in chat['users']: if user.slug not in chat['users']:
add_user_to_chat(invited, chat_id, chat) # TODO: check right to invite here
chat = await add_user_to_chat(invited, chat_id, chat)
return {
"error": None,
"chat": chat
}
@mutation.field("updateChat")
@login_required
async def update_chat(_, info, chat_new):
user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_new.id}")
chat.update({
"title": chat_new.title,
"description": chat_new.description,
"updatedAt": datetime.now().timestamp(),
})
await redis.execute("SET", f"chats/{chat.id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat.id}/next_message_id", 0)
chat = await add_user_to_chat(user.slug, chat.id)
return { return {
"error": None, "error": None,
"chat": chat "chat": chat
@ -57,23 +83,23 @@ async def invite_to_chat(_, info, invited, chat_id):
@mutation.field("createChat") @mutation.field("createChat")
@login_required @login_required
async def create_chat(_, info, description="", title=""): async def create_chat(_, info, title="", members=[]):
user = info.context["request"].user user = info.context["request"].user
if user.slug not in members:
chat_id = uuid.uuid4() members.append(user.slug)
chat_id = str(uuid.uuid4())
chat = { chat = {
"title": title, "title": title,
"description": description, "createdAt": datetime.now().timestamp(),
"createdAt": str(datetime.now().timestamp()), "updatedAt": datetime.now().timestamp(),
"updatedAt": str(datetime.now().timestamp()),
"createdBy": user.slug, "createdBy": user.slug,
"id": str(chat_id), "id": chat_id,
"users": [user.slug], "users": members,
} }
await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat))
await redis.execute("SET", f"chats/{chat_id}/next_message_id", 0) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0))
await add_user_to_chat(user.slug, chat_id) chat = await add_user_to_chat(user.slug, chat_id)
return { return {
"error": None, "error": None,
@ -81,10 +107,10 @@ async def create_chat(_, info, description="", title=""):
} }
async def load_messages(chatId: int, size: int, page: int): async def load_messages(chatId: int, offset: int, amount: int):
messages = [] messages = []
message_ids = await redis.lrange( message_ids = await redis.lrange(
f"chats/{chatId}/message_ids", size * (page - 1), size * page - 1 f"chats/{chatId}/message_ids", 0 - offset - amount, 0 - offset
) )
if message_ids: if message_ids:
message_keys = [ message_keys = [
@ -98,17 +124,18 @@ async def load_messages(chatId: int, size: int, page: int):
} }
async def get_chats_by_user(slug: str):
chats = await redis.execute("GET", f"chats_by_user/{slug}")
return chats or []
@query.field("myChats") @query.field("myChats")
@login_required @login_required
async def user_chats(_, info): async def user_chats(_, info):
user = info.context["request"].user user = info.context["request"].user
chats = await redis.execute("GET", f"chats_by_user/{user.slug}") chats = await get_chats_by_user(user.slug)
if not chats:
chats = []
else:
chats = json.loads(chats)
for c in chats: for c in chats:
c['messages'] = await load_messages(c['id'], 50, 1) c['messages'] = await load_messages(c['id'])
c['unread'] = await get_unread_counter(c['id'], user.slug) c['unread'] = await get_unread_counter(c['id'], user.slug)
return { return {
"chats": chats, "chats": chats,
@ -116,20 +143,19 @@ async def user_chats(_, info):
} }
@query.field("enterChat") @mutation.field("enterChat")
@login_required @login_required
async def enter_chat(_, info, chatId): async def enter_chat(_, info, chat_id: str):
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chat_id}")
chat = await redis.execute("GET", f"chats/{chatId}")
if not chat: if not chat:
return { return {
"error": "chat not exist" "error": "chat not exist"
} }
else: else:
chat = json.loads(chat) chat = json.loads(chat)
await add_user_to_chat(user.slug, chatId, chat) chat = await add_user_to_chat(user.slug, chat_id, chat)
chat['messages'] = await load_messages(chatId, 50, 1) chat['messages'] = await load_messages(chat_id)
return { return {
"chat": chat, "chat": chat,
"error": None "error": None
@ -138,37 +164,38 @@ async def enter_chat(_, info, chatId):
@mutation.field("createMessage") @mutation.field("createMessage")
@login_required @login_required
async def create_message(_, info, chatId, body, replyTo=None): async def create_message(_, info, chat_id: str, body: str, replyTo=None):
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chatId}") chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat: if not chat:
return { return {
"error": "chat not exist"} "error": "chat not exist"
}
message_id = await redis.execute("GET", f"chats/{chatId}/next_message_id") message_id = await redis.execute("GET", f"chats/{chat_id}/next_message_id")
message_id = int(message_id) message_id = int(message_id)
new_message = { new_message = {
"chatId": chatId, "chatId": chat_id,
"id": message_id, "id": message_id,
"author": user.slug, "author": user.slug,
"body": body, "body": body,
"replyTo": replyTo, "replyTo": replyTo,
"createdAt": datetime.now().isoformat(), "createdAt": datetime.now().timestamp(),
} }
await redis.execute( await redis.execute(
"SET", f"chats/{chatId}/messages/{message_id}", json.dumps(new_message) "SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(new_message)
) )
await redis.execute("LPUSH", f"chats/{chatId}/message_ids", str(message_id)) await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id))
await redis.execute("SET", f"chats/{chatId}/next_message_id", str(message_id + 1)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1))
chat = json.loads(chat) chat = json.loads(chat)
users = chat["users"] users = chat["users"]
for user_slug in users: for user_slug in users:
await redis.execute( await redis.execute(
"LPUSH", f"chats/{chatId}/unread/{user_slug}", str(message_id) "LPUSH", f"chats/{chat_id}/unread/{user_slug}", str(message_id)
) )
result = MessageResult("NEW", new_message) result = MessageResult("NEW", new_message)
@ -182,12 +209,14 @@ async def create_message(_, info, chatId, body, replyTo=None):
@query.field("loadChat") @query.field("loadChat")
@login_required @login_required
async def get_messages(_, info, chatId, size, page): async def load_chat_messages(_, info, chat_id: str, offset: int = 0, amount: int = 50):
chat = await redis.execute("GET", f"chats/{chatId}") chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat: if not chat:
return {"error": "chat not exist"} return {
"error": "chat not exist"
}
messages = await load_messages(chatId, size, page) messages = await load_messages(chat_id, offset, amount)
return { return {
"messages": messages, "messages": messages,
@ -197,14 +226,14 @@ async def get_messages(_, info, chatId, size, page):
@mutation.field("updateMessage") @mutation.field("updateMessage")
@login_required @login_required
async def update_message(_, info, chatId, id, body): async def update_message(_, info, chat_id: str, message_id: int, body: str):
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chatId}") chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat: if not chat:
return {"error": "chat not exist"} return {"error": "chat not exist"}
message = await redis.execute("GET", f"chats/{chatId}/messages/{id}") message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}")
if not message: if not message:
return {"error": "message not exist"} return {"error": "message not exist"}
@ -213,9 +242,9 @@ async def update_message(_, info, chatId, id, body):
return {"error": "access denied"} return {"error": "access denied"}
message["body"] = body message["body"] = body
message["updatedAt"] = datetime.now().isoformat() message["updatedAt"] = datetime.now().timestamp()
await redis.execute("SET", f"chats/{chatId}/messages/{id}", json.dumps(message)) await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message))
result = MessageResult("UPDATED", message) result = MessageResult("UPDATED", message)
await MessagesStorage.put(result) await MessagesStorage.put(result)
@ -228,27 +257,27 @@ async def update_message(_, info, chatId, id, body):
@mutation.field("deleteMessage") @mutation.field("deleteMessage")
@login_required @login_required
async def delete_message(_, info, chatId, id): async def delete_message(_, info, chat_id: str, message_id: int):
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chatId}") chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat: if not chat:
return {"error": "chat not exist"} return {"error": "chat not exist"}
chat = json.loads(chat)
message = await redis.execute("GET", f"chats/{chatId}/messages/{id}") message = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}")
if not message: if not message:
return {"error": "message not exist"} return {"error": "message not exist"}
message = json.loads(message) message = json.loads(message)
if message["author"] != user.slug: if message["author"] != user.slug:
return {"error": "access denied"} return {"error": "access denied"}
await redis.execute("LREM", f"chats/{chatId}/message_ids", 0, str(id)) await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id))
await redis.execute("DEL", f"chats/{chatId}/messages/{id}") await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}")
chat = json.loads(chat)
users = chat["users"] users = chat["users"]
for user_slug in users: for user_slug in users:
await redis.execute("LREM", f"chats/{chatId}/unread/{user_slug}", 0, str(id)) await redis.execute("LREM", f"chats/{chat_id}/unread/{user_slug}", 0, str(message_id))
result = MessageResult("DELETED", message) result = MessageResult("DELETED", message)
await MessagesStorage.put(result) await MessagesStorage.put(result)
@ -258,10 +287,10 @@ async def delete_message(_, info, chatId, id):
@mutation.field("markAsRead") @mutation.field("markAsRead")
@login_required @login_required
async def mark_as_read(_, info, chatId, ids): async def mark_as_read(_, info, chat_id: str, messages: [int]):
user = info.context["request"].user user = info.context["request"].user
chat = await redis.execute("GET", f"chats/{chatId}") chat = await redis.execute("GET", f"chats/{chat_id}")
if not chat: if not chat:
return {"error": "chat not exist"} return {"error": "chat not exist"}
@ -270,27 +299,34 @@ async def mark_as_read(_, info, chatId, ids):
if user.slug not in users: if user.slug not in users:
return {"error": "access denied"} return {"error": "access denied"}
for id in ids: for message_id in messages:
await redis.execute("LREM", f"chats/{chatId}/unread/{user.slug}", 0, str(id)) await redis.execute("LREM", f"chats/{chat_id}/unread/{user.slug}", 0, str(message_id))
return { return {
"error": None "error": None
} }
@subscription.source("chatUpdated") @subscription.source("newMessage")
@login_required @login_required
async def message_generator(obj, info, chatId): async def message_generator(obj, info):
try: try:
following_chat = ChatFollowing(chatId) user = info.context["request"].user
user_following_chats = await get_chats_by_user(user.slug) # chat ids
tasks = []
updated = {}
for chat_id in user_following_chats:
chat = await redis.execute("GET", f"chats/{chat_id}")
updated[chat_id] = chat['updatedAt']
user_following_chats_sorted = sorted(user_following_chats, key=lambda x: updated[x], reverse=True)
for chat_id in user_following_chats_sorted:
following_chat = ChatFollowing(chat_id)
await MessagesStorage.register_chat(following_chat) await MessagesStorage.register_chat(following_chat)
chat_task = following_chat.queue.get()
tasks.append(chat_task)
while True: while True:
msg = await following_chat.queue.get() msg = await asyncio.gather(*tasks)
yield msg yield msg
finally: finally:
await MessagesStorage.remove_chat(following_chat) await MessagesStorage.remove_chat(following_chat)
@subscription.field("chatUpdated")
def message_resolver(message, info, chatId):
return message

View File

@ -77,7 +77,7 @@ input ShoutInput {
subtitle: String subtitle: String
versionOf: String versionOf: String
visibleForRoles: [String] # role ids are strings visibleForRoles: [String] # role ids are strings
visibleForUsers: [Int] visibleForUsers: [String]
} }
input ProfileInput { input ProfileInput {
@ -117,6 +117,12 @@ input ReactionInput {
replyTo: Int replyTo: Int
} }
input ChatInput {
id: String!
title: String
description: String
}
enum FollowingEntity { enum FollowingEntity {
TOPIC TOPIC
AUTHOR AUTHOR
@ -128,8 +134,10 @@ enum FollowingEntity {
type Mutation { type Mutation {
# inbox # inbox
createChat(description: String): Chat! createChat(title: String, members: [String]!): Result!
updateChat(chat: ChatInput!): Result!
inviteChat(chatId: String!, userslug: String!): Result! inviteChat(chatId: String!, userslug: String!): Result!
enterChat(chatId: String!): Result!
createMessage(chatId: String!, body: String!, replyTo: Int): Result! createMessage(chatId: String!, body: String!, replyTo: Int): Result!
updateMessage(chatId: String!, id: Int!, body: String!): Result! updateMessage(chatId: String!, id: Int!, body: String!): Result!
deleteMessage(chatId: String!, id: Int!): Result! deleteMessage(chatId: String!, id: Int!): Result!
@ -190,8 +198,7 @@ type Mutation {
type Query { type Query {
# inbox # inbox
myChats: [Chat]! myChats: [Chat]!
enterChat(chatId: String!): Chat! loadChat(chatId: String!, offset: Int, amount: Int): [Message]!
loadChat(chatId: String!, size: Int!, page: Int!): [Message]!
# auth # auth
isEmailUsed(email: String!): Boolean! isEmailUsed(email: String!): Boolean!
@ -258,7 +265,7 @@ type Query {
############################################ Subscription ############################################ Subscription
type Subscription { type Subscription {
chatUpdated(chatId: String!): Result! newMessage(chats: [Int!]): Message!
onlineUpdated: [User!]! onlineUpdated: [User!]!
shoutUpdated: Shout! shoutUpdated: Shout!
userUpdated: User! userUpdated: User!
@ -471,20 +478,19 @@ type Token {
type Message { type Message {
author: String! author: String!
chatId: Int! chatId: String!
body: String! body: String!
createdAt: DateTime! createdAt: Int!
id: Int! id: Int!
replyTo: Int replyTo: Int
updatedAt: DateTime! updatedAt: Int
visibleForUsers: [Int]
} }
type Chat { type Chat {
id: Int! id: Int!
createdAt: DateTime! createdAt: Int!
createdBy: User! createdBy: User!
updatedAt: DateTime! updatedAt: Int!
title: String title: String
description: String description: String
users: [User]! users: [User]!