diff --git a/resolvers/chats.py b/resolvers/chats.py index c7638bf..4e3756c 100644 --- a/resolvers/chats.py +++ b/resolvers/chats.py @@ -1,7 +1,7 @@ import json import uuid from datetime import datetime, timezone - +from validators.chat import Chat from services.auth import login_required from services.redis import redis from services.schema import mutation @@ -9,7 +9,7 @@ from services.schema import mutation @mutation.field("updateChat") @login_required -async def update_chat(_, info, chat_new): +async def update_chat(_, info, chat_new: Chat): """ updating chat requires info["request"].user.slug to be in chat["admins"] @@ -20,11 +20,11 @@ async def update_chat(_, info, chat_new): """ author_id = info.context["author_id"] chat_id = chat_new["id"] - chat = (await redis.execute("GET", f"chats/{chat_id}")) + chat = await redis.execute("GET", f"chats/{chat_id}") if not chat: return {"error": "chat not exist"} else: - chat = json.loads(chat) + chat: Chat = json.loads(chat) if author_id in chat["admins"]: chat.update( @@ -33,11 +33,10 @@ async def update_chat(_, info, chat_new): "description": chat_new.get("description", chat["description"]), "updatedAt": int(datetime.now(tz=timezone.utc).timestamp()), "admins": chat_new.get("admins", chat.get("admins") or []), - "users": chat_new.get("users", chat["users"]), + "members": chat_new.get("members", chat["members"]), } ) await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) - await redis.execute("COMMIT") return {"error": None, "chat": chat} @@ -74,7 +73,7 @@ async def create_chat(_, info, title="", members=None): return {"chat": chat, "error": "existed"} chat_id = str(uuid.uuid4()) - chat = { + chat: Chat = { "id": chat_id, "members": members, "title": title, @@ -84,8 +83,8 @@ async def create_chat(_, info, title="", members=None): "admins": members if (len(members) == 2 and title == "") else [], } - for m in members: - await redis.execute("SADD", f"chats_by_author/{m}", chat_id) + for member_id in members: + await redis.execute("SADD", f"chats_by_author/{member_id}", chat_id) await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0)) @@ -99,19 +98,10 @@ async def delete_chat(_, info, chat_id: str): chat = await redis.execute("GET", f"/chats/{chat_id}") if chat: - chat = dict(json.loads(chat)) + chat: Chat = json.loads(chat) if author_id in chat["admins"]: await redis.execute("DEL", f"chats/{chat_id}") await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id) - await redis.execute("COMMIT") else: return {"error": "chat not exist"} - -chats_resolvers = { - "Mutation": { - "deleteChat": delete_chat, - "createChat": create_chat, - "updateChat": update_chat, - }, -} diff --git a/resolvers/load.py b/resolvers/load.py index 56262c7..4b5a8dd 100644 --- a/resolvers/load.py +++ b/resolvers/load.py @@ -4,13 +4,15 @@ from services.core import get_author, get_network from services.redis import redis from services.auth import login_required from services.schema import query -from validators.chat import Message +from validators.chat import Message, Chat, ChatMember from .chats import create_chat from .unread import get_unread_counter import asyncio # NOTE: not an API handler -async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[str]] = None) -> List[Message]: +async def load_messages( + chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[str]] = None +) -> List[Message]: """load :limit messages for :chat_id with :offset""" if ids is None: ids = [] @@ -18,9 +20,11 @@ async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Opti try: message_ids = [] + ids if limit: - mids = (await redis.lrange( - f"chats/{chat_id}/message_ids", offset, offset + limit - )) or [] + mids = ( + await redis.lrange( + f"chats/{chat_id}/message_ids", offset, offset + limit + ) + ) or [] mids = [mid for mid in mids] message_ids += mids if message_ids: @@ -40,18 +44,21 @@ async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Opti print(f"Error loading messages for chat {chat_id}: {e}") return messages + @query.field("loadChats") @login_required -async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]: +async def load_chats( + _, info, limit: int = 50, offset: int = 0 +) -> Dict[str, Union[List[Dict[str, Any]], None]]: """load :limit chats of current user with :offset""" author_id = info.context["author_id"] cids = (await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [] members_online = (await redis.execute("SMEMBERS", "authors-online")) or [] - cids = list(cids)[offset:(offset + limit)] + cids = list(cids)[offset : (offset + limit)] chats = [] lock = asyncio.Lock() if len(cids) == 0: - print(f"[resolvers.load] no chats for user with id={author_id}, create one with Discours (id=2)") + print(f"[resolvers.load] no chats for user with id={author_id}") r = await create_chat(None, info, members=[2]) # member with id = 2 is discours print(f"[resolvers.load] created chat: {r}") cids.append(r["chat"]["id"]) @@ -60,7 +67,7 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni c = await redis.execute("GET", f"chats/{cid}") print(f"[resolvers.load] redis GET by {cid}: {c}") if c: - c = json.loads(c) + c: Chat = json.loads(c) c["messages"] = await load_messages(cid, 5, 0) c["unread"] = await get_unread_counter(cid, author_id) member_ids = c["members"].copy() @@ -80,7 +87,9 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): """load :limit messages of :chat_id with :offset""" author_id = info.context["author_id"] - user_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) or [] + user_chats = ( + await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id)) + ) or [] user_chats = [c for c in user_chats] if user_chats: messages = [] @@ -92,7 +101,10 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): # everyone's messages in filtered chat messages = await load_messages(by_chat, limit, offset) return { - "messages": sorted([m for m in messages if m.get("createdAt")], key=lambda m: m.get("createdAt")), + "messages": sorted( + [m for m in messages if m.get("createdAt")], + key=lambda m: m.get("createdAt"), + ), "error": None, } else: @@ -104,18 +116,10 @@ async def load_recipients(_, _info, limit=50, offset=0): """load possible chat participants""" onliners = (await redis.execute("SMEMBERS", "authors-online")) or [] members = [] - all_authors = await get_network(limit, offset) + all_authors: List[ChatMember] = await get_network(limit, offset) for a in all_authors: - members.append( - { - "id": a.id, - "slug": a.slug, - "userpic": a.userpic, - "name": a.name, - "lastSeen": a.lastSeen, - "online": a.id in onliners, - } - ) + a["online"] = a["id"] in onliners + members.append(a) # NOTE: maybe sort members here diff --git a/resolvers/messages.py b/resolvers/messages.py index bfe272d..a47d915 100644 --- a/resolvers/messages.py +++ b/resolvers/messages.py @@ -26,9 +26,11 @@ async def create_message(_, info, chat: str, body: str, reply_to=None): else: chat_dict = json.loads(chat_data) print(chat_dict) - message_id = (await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id")) or 0 + message_id = ( + await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id") + ) or 0 message_id = int(message_id) - new_message = { + new_message: Message = { "chat": chat_dict["id"], "id": message_id, "author": author_id, @@ -41,9 +43,13 @@ async def create_message(_, info, chat: str, body: str, reply_to=None): await redis.execute("SET", f"chats/{chat_dict['id']}", json.dumps(chat)) print(f"[inbox] creating message {new_message}") await redis.execute( - "SET", f"chats/{chat_dict['id']}/messages/{message_id}", json.dumps(new_message) + "SET", + f"chats/{chat_dict['id']}/messages/{message_id}", + json.dumps(new_message), + ) + await redis.execute( + "LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id) ) - await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id)) await redis.execute( "SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1) ) @@ -147,13 +153,3 @@ async def mark_as_read(_, info, chat_id: str, messages: List[int]): ) return {"error": None} - - -messages_resolvers = { - "Mutation": { - "markAsRead": mark_as_read, - "deleteMessage": delete_message, - "updateMessage": update_message, - "createMessage": create_message, - } -} diff --git a/resolvers/search.py b/resolvers/search.py index 93e9366..2ffc848 100644 --- a/resolvers/search.py +++ b/resolvers/search.py @@ -12,7 +12,9 @@ from services.schema import query @login_required async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0): result = [] + # TODO: maybe redis scan? + author_id = info.context["author_id"] talk_before = await redis.execute("GET", f"/chats_by_author/{author_id}") if talk_before: @@ -34,9 +36,13 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 @query.field("searchMessages") @login_required -async def search_in_chats(_, info, by: Dict[str, Union[str, int]], limit: int, offset: int) -> Dict[str, Union[List[Dict[str, Any]], None]]: +async def search_in_chats( + _, info, by: Dict[str, Union[str, int]], limit: int, offset: int +) -> Dict[str, Union[List[Dict[str, Any]], None]]: author_id = info.context["author_id"] - lookup_chats = set((await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or []) + lookup_chats = set( + (await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [] + ) messages_set = set([]) by_member = by.get("author") @@ -59,20 +65,16 @@ async def search_in_chats(_, info, by: Dict[str, Union[str, int]], limit: int, o if body_like: mmm = list(filter(lambda mx: body_like in mx["body"], mmm)) if days_ago: - mmm = list(filter( - lambda msg: int(datetime.now(tz=timezone.utc)) - int(msg["createdAt"]) - < timedelta(days=days_ago), - mmm, - )) + mmm = list( + filter( + lambda msg: int(datetime.now(tz=timezone.utc)) + - int(msg["createdAt"]) + < timedelta(days=days_ago), + mmm, + ) + ) messages_set.union(set(mmm)) messages_sorted = sorted(list(messages_set)) return {"messages": messages_sorted, "error": None} - -search_resolvers = { - "Query": { - "searchMessages": search_in_chats, - "searchRecipients": search_recipients, - } -} \ No newline at end of file diff --git a/services/auth.py b/services/auth.py index f7f07f3..096eb6b 100644 --- a/services/auth.py +++ b/services/auth.py @@ -15,15 +15,18 @@ async def check_auth(req): query_type = "mutation" if INTERNAL_AUTH_SERVER else "query" operation = "GetUserId" - headers = { - "Authorization": 'Bearer ' + token, - "Content-Type": "application/json" - } + headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"} gql = { - "query": query_type + " " + operation + " { " + query_name + " { user { id } } " + " }", + "query": query_type + + " " + + operation + + " { " + + query_name + + " { user { id } } " + + " }", "operationName": operation, - "variables": None + "variables": None, } async with AsyncClient() as client: @@ -36,7 +39,10 @@ async def check_auth(req): user_id = ( r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None) if INTERNAL_AUTH_SERVER - else r.get("data", {}).get(query_name, {}).get("user", {}).get("id", None) + else r.get("data", {}) + .get(query_name, {}) + .get("user", {}) + .get("id", None) ) is_authenticated = user_id is not None return is_authenticated, user_id diff --git a/services/core.py b/services/core.py index 80fdfb0..51fcaaf 100644 --- a/services/core.py +++ b/services/core.py @@ -11,12 +11,16 @@ async def get_author(author_id): gql = { "query": "query GetAuthorById($author_id: Int!) { getAuthorById(author_id: $author_id) { id slug userpic name lastSeen } }", "operation": "GetAuthorById", - "variables": {"author_id": author_id} + "variables": {"author_id": author_id}, } async with AsyncClient() as client: try: - response = await client.post(API_BASE, headers=headers, data=json.dumps(gql)) - print(f"[services.core] get_author response: {response.status_code} {response.text}") + response = await client.post( + API_BASE, headers=headers, data=json.dumps(gql) + ) + print( + f"[services.core] get_author response: {response.status_code} {response.text}" + ) if response.status_code != 200: return None r = response.json() @@ -27,21 +31,19 @@ async def get_author(author_id): return None -async def get_network(author_id:int, limit:int = 50, offset:int = 0) -> list: +async def get_network(author_id: int, limit: int = 50, offset: int = 0) -> list: gql = { "query": "query LoadAuthors($author_id: Int!, $limit: Int, $offset: Int) { authorFollowings(author_id: $author_id, limit: $limit, offset: $offset) { id slug userpic name } }", "operation": "LoadAuthors", - "variables": { - "author_id": author_id, - "limit": limit, - "offset": offset - } + "variables": {"author_id": author_id, "limit": limit, "offset": offset}, } followings = [] try: async with AsyncClient() as client: - response = await client.post(API_BASE, headers=headers, data=json.dumps(gql)) + response = await client.post( + API_BASE, headers=headers, data=json.dumps(gql) + ) if response.status_code != 200: return [] r = response.json() @@ -60,15 +62,14 @@ async def get_followers(author_id, amount): gql = { "query": "query LoadAuthors($author_id: Int!, $limit: Int, $offset: Int) { authorFollowers(author_id: $author_id, limit: $limit) { id slug userpic name } }", "operation": "LoadAuthors", - "variables": { - "author_id": author_id, - "limit": amount - } + "variables": {"author_id": author_id, "limit": amount}, } followers = [] try: async with AsyncClient() as client: - response = await client.post(API_BASE, headers=headers, data=json.dumps(gql)) + response = await client.post( + API_BASE, headers=headers, data=json.dumps(gql) + ) if response.status_code != 200: return [] r = response.json() diff --git a/services/presence.py b/services/presence.py index 108fecf..11856ca 100644 --- a/services/presence.py +++ b/services/presence.py @@ -1,8 +1,9 @@ import json from services.redis import redis +from validators.chat import Message -async def notify_message(message, chat_id: str): +async def notify_message(message: Message, chat_id: str): channel_name = f"chat:{chat_id}" data = {**message, "kind": "new_message"} try: diff --git a/services/redis.py b/services/redis.py index 6b76d50..5282471 100644 --- a/services/redis.py +++ b/services/redis.py @@ -1,6 +1,7 @@ import redis.asyncio as aredis from settings import REDIS_URL + class RedisCache: def __init__(self, uri=REDIS_URL): self._uri: str = uri @@ -54,6 +55,7 @@ class RedisCache: print(f"[redis] MGET {key} {keys}") return await self._client.mget(key, *keys) + redis = RedisCache() __all__ = ["redis"] diff --git a/validators/chat.py b/validators/chat.py index 43f4d91..be7185e 100644 --- a/validators/chat.py +++ b/validators/chat.py @@ -1,10 +1,29 @@ -from typing import Dict, Optional +from typing import Dict, Optional, List class Message(Dict): - chat: str id: int + chat: str author: int body: str createdAt: int replyTo: Optional[int] - updatedAt: Optional[int] \ No newline at end of file + createdAt: int + updatedAt: Optional[int] + +class Chat(Dict): + id: str + members: List[int] + admins: List[int] + title: str + updatedAt: int + createdAt: int + createdBy: int + description: Optional[str] + +class ChatMember(Dict): + id: int + slug: str + name: str + userpic: Optional[str] + lastSeen: int + online: Optional[bool] \ No newline at end of file