diff --git a/resolvers/chats.py b/resolvers/chats.py index fc4dfe5..c7638bf 100644 --- a/resolvers/chats.py +++ b/resolvers/chats.py @@ -20,10 +20,11 @@ async def update_chat(_, info, chat_new): """ author_id = info.context["author_id"] chat_id = chat_new["id"] - chat = await redis.execute("GET", f"chats/{chat_id}") + chat = (await redis.execute("GET", f"chats/{chat_id}")) if not chat: return {"error": "chat not exist"} - chat = dict(json.loads(chat)) + else: + chat = json.loads(chat) if author_id in chat["admins"]: chat.update( diff --git a/resolvers/load.py b/resolvers/load.py index 3a92d98..d51cb4a 100644 --- a/resolvers/load.py +++ b/resolvers/load.py @@ -1,15 +1,16 @@ import json +from typing import Any, Dict, List, Optional, Union from services.core import get_author, get_network from services.redis import redis from services.auth import login_required from services.schema import query +from validators.chat import Message from .chats import create_chat from .unread import get_unread_counter import asyncio - # NOTE: not an API handler -async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=None): +async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids: Optional[List[str]] = None) -> List[Message]: """load :limit messages for :chat_id with :offset""" if ids is None: ids = [] @@ -17,22 +18,22 @@ async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=None) try: message_ids = [] + ids if limit: - mids = await redis.lrange( + mids = (await redis.lrange( f"chats/{chat_id}/message_ids", offset, offset + limit - ) + )) or [] mids = [mid for mid in mids] message_ids += mids if message_ids: message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids] - messages = await redis.mget(*message_keys) - messages = [json.loads(msg.decode("utf-8")) for msg in messages] + messages = (await redis.mget(*message_keys)) or [] replies = [] for m in messages: - rt = m.get("replyTo") - if rt: - rt = int(rt) - if rt not in message_ids: - replies.append(rt) + if m: + rt = json.loads(m).get("replyTo") + if rt: + rt = int(rt) + if rt not in message_ids: + replies.append(rt) if replies: messages += await load_messages(chat_id, offset, limit, replies) except Exception as e: @@ -41,7 +42,7 @@ async def load_messages(chat_id: str, limit: int = 5, offset: int = 0, ids=None) @query.field("loadChats") @login_required -async def load_chats(_, info, limit: int = 50, offset: int = 0): +async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Union[List[Dict[str, Any]], None]]: """load :limit chats of current user with :offset""" author_id = info.context["author_id"] cids = (await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or [] @@ -79,8 +80,8 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0): async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): """load :limit messages of :chat_id with :offset""" author_id = info.context["author_id"] - user_chats = await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id)) - user_chats = [c for c in user_chats] + user_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) or [] + user_chats = [c.decode() for c in user_chats] if user_chats: messages = [] by_chat = by.get("chat") @@ -91,7 +92,7 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): # everyone's messages in filtered chat messages = await load_messages(by_chat, limit, offset) return { - "messages": sorted(list(messages), key=lambda m: m["createdAt"]), + "messages": sorted([m for m in messages if m.get("createdAt")], key=lambda m: m.get("createdAt")), "error": None, } else: diff --git a/resolvers/messages.py b/resolvers/messages.py index e5d9872..4f68a0f 100644 --- a/resolvers/messages.py +++ b/resolvers/messages.py @@ -1,5 +1,7 @@ import json from datetime import datetime, timezone +from typing import List +from validators.chat import Message from services.auth import login_required from services.presence import notify_message from services.redis import redis @@ -17,15 +19,15 @@ async def create_message(_, info, chat: str, body: str, reply_to=None): """ author_id = info.context["author_id"] - chat = await redis.execute("GET", f"chats/{chat}") - if not chat: + chat_dict = await redis.execute("GET", f"chats/{chat}") + if not chat_dict: return {"error": "chat is not exist"} else: - chat = vars(json.loads(chat)) - message_id = await redis.execute("GET", f"chats/{chat['id']}/next_message_id") + chat_dict = vars(json.loads(chat)) + message_id = (await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id")) or 0 message_id = int(message_id) new_message = { - "chat": chat["id"], + "chat": chat_dict["id"], "id": message_id, "author": author_id, "body": body, @@ -33,28 +35,28 @@ async def create_message(_, info, chat: str, body: str, reply_to=None): } if reply_to: new_message["replyTo"] = reply_to - chat["updatedAt"] = new_message["createdAt"] - await redis.execute("SET", f"chats/{chat['id']}", json.dumps(chat)) + chat_dict["updatedAt"] = new_message["createdAt"] + await redis.execute("SET", f"chats/{chat_dict['id']}", json.dumps(chat)) print(f"[inbox] creating message {new_message}") await redis.execute( - "SET", f"chats/{chat['id']}/messages/{message_id}", json.dumps(new_message) + "SET", f"chats/{chat_dict['id']}/messages/{message_id}", json.dumps(new_message) ) - await redis.execute("LPUSH", f"chats/{chat['id']}/message_ids", str(message_id)) + await redis.execute("LPUSH", f"chats/{chat_dict['id']}/message_ids", str(message_id)) await redis.execute( - "SET", f"chats/{chat['id']}/next_message_id", str(message_id + 1) + "SET", f"chats/{chat_dict['id']}/next_message_id", str(message_id + 1) ) - members = chat["members"] + members = chat_dict["members"] for member_id in members: await redis.execute( - "LPUSH", f"chats/{chat['id']}/unread/{member_id}", str(message_id) + "LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id) ) # result = FollowingResult("NEW", "chat", new_message) # await FollowingManager.push("chat", result) # subscribe on updates - await notify_message(new_message, chat["id"]) + await notify_message(new_message, chat_dict["id"]) return {"message": new_message, "error": None} @@ -103,7 +105,7 @@ async def delete_message(_, info, chat_id: str, message_id: int): message = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") if not message: return {"error": "message not exist"} - message = json.loads(message) + message: Message = json.loads(message) if message["author"] != author_id: return {"error": "access denied"} @@ -125,7 +127,7 @@ async def delete_message(_, info, chat_id: str, message_id: int): @mutation.field("markAsRead") @login_required -async def mark_as_read(_, info, chat_id: str, messages: [int]): +async def mark_as_read(_, info, chat_id: str, messages: List[int]): author_id = info.context["author_id"] chat = await redis.execute("GET", f"chats/{chat_id}") diff --git a/resolvers/search.py b/resolvers/search.py index b3c563a..93e9366 100644 --- a/resolvers/search.py +++ b/resolvers/search.py @@ -1,4 +1,5 @@ import json +from typing import Dict, Union, List, Any from datetime import datetime, timezone, timedelta from services.auth import login_required from services.core import get_network @@ -33,9 +34,9 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 @query.field("searchMessages") @login_required -async def search_in_chats(_, info, by, limit, offset): +async def search_in_chats(_, info, by: Dict[str, Union[str, int]], limit: int, offset: int) -> Dict[str, Union[List[Dict[str, Any]], None]]: author_id = info.context["author_id"] - lookup_chats = set(await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) + lookup_chats = set((await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or []) messages_set = set([]) by_member = by.get("author") @@ -44,7 +45,6 @@ async def search_in_chats(_, info, by, limit, offset): # pre-filter lookup chats if by_member: - # all author's chats where reqeusting author is participating lookup_chats = filter( lambda ca: by_member in ca["members"], list(lookup_chats), @@ -52,27 +52,27 @@ async def search_in_chats(_, info, by, limit, offset): # load the messages from lookup chats for c in lookup_chats: - chat_id = c.decode("utf-8") + chat_id = c.decode() mmm = await load_messages(chat_id, limit, offset) if by_member: - mmm = filter(lambda mx: mx["author"] == by_member, mmm) + mmm = list(filter(lambda mx: mx["author"] == by_member, mmm)) if body_like: - mmm = filter(lambda mx: body_like in mx["body"], mmm) + mmm = list(filter(lambda mx: body_like in mx["body"], mmm)) if days_ago: - mmm = filter( - lambda msg: datetime.now(tz=timezone.utc) - int(msg["createdAt"]) + mmm = list(filter( + lambda msg: int(datetime.now(tz=timezone.utc)) - int(msg["createdAt"]) < timedelta(days=days_ago), mmm, - ) + )) messages_set.union(set(mmm)) - messages_sorted = list(messages_set).sort() - return {"messages": messages_sorted, "error": None} + messages_sorted = sorted(list(messages_set)) + return {"messages": messages_sorted, "error": None} search_resolvers = { "Query": { "searchMessages": search_in_chats, "searchRecipients": search_recipients, } -} +} \ No newline at end of file diff --git a/resolvers/unread.py b/resolvers/unread.py index a81733b..3bd061d 100644 --- a/resolvers/unread.py +++ b/resolvers/unread.py @@ -4,18 +4,15 @@ import json from services.auth import login_required -async def get_unread_counter(chat_id: str, author_id: int): +async def get_unread_counter(chat_id: str, author_id: int) -> int: try: - unread = await redis.execute( - "LLEN", f"chats/{chat_id}/unread/{author_id}" - ) - if unread: - return unread + unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{author_id}") + return unread or 0 except Exception: return 0 -async def get_total_unread_counter(author_id: int): +async def get_total_unread_counter(author_id: int) -> int: chats = await redis.execute("GET", f"chats_by_author/{author_id}") unread = 0 if chats: diff --git a/services/core.py b/services/core.py index c93a7bb..80fdfb0 100644 --- a/services/core.py +++ b/services/core.py @@ -39,7 +39,6 @@ async def get_network(author_id:int, limit:int = 50, offset:int = 0) -> list: } followings = [] - followers = [] try: async with AsyncClient() as client: response = await client.post(API_BASE, headers=headers, data=json.dumps(gql)) @@ -49,12 +48,11 @@ async def get_network(author_id:int, limit:int = 50, offset:int = 0) -> list: followings = r.get("data", {}).get("authorFollowings", []) more_amount = limit - len(followings) if more_amount > 0: - followers = get_followers(author_id, more_amount) + followers = await get_followers(author_id, more_amount) + followings.extend(followers) except Exception as e: print(e) - followings.extend(followers) - return followings diff --git a/services/redis.py b/services/redis.py index 9f3f56f..6b76d50 100644 --- a/services/redis.py +++ b/services/redis.py @@ -11,25 +11,25 @@ class RedisCache: self._client = aredis.Redis.from_url(self._uri, decode_responses=True) async def disconnect(self): - await self._client.aclose() + if self._client: + await self._client.close() async def execute(self, command, *args, **kwargs): - if not self._client: - await self.connect() - try: - print("[redis] " + command + " " + " ".join(args)) - return await self._client.execute_command(command, *args, **kwargs) - except Exception as e: - print(f"[redis] error: {e}") + if self._client: + try: + print("[redis] " + command + " " + " ".join(args)) + r = await self._client.execute_command(command, *args, **kwargs) + return r + except Exception as e: + print(f"[redis] error: {e}") return None async def subscribe(self, *channels): - if not self._client: - await self.connect() - async with self._client.pubsub() as pubsub: - for channel in channels: - await pubsub.subscribe(channel) - self.pubsub_channels.append(channel) + if self._client: + async with self._client.pubsub() as pubsub: + for channel in channels: + await pubsub.subscribe(channel) + self.pubsub_channels.append(channel) async def unsubscribe(self, *channels): if not self._client: @@ -45,12 +45,14 @@ class RedisCache: await self._client.publish(channel, data) async def lrange(self, key, start, stop): - print(f"[redis] LRANGE {key} {start} {stop}") - return await self._client.lrange(key, start, stop) + if self._client: + print(f"[redis] LRANGE {key} {start} {stop}") + return await self._client.lrange(key, start, stop) async def mget(self, key, *keys): - print(f"[redis] MGET {key} {keys}") - return await self._client.mget(key, *keys) + if self._client: + print(f"[redis] MGET {key} {keys}") + return await self._client.mget(key, *keys) redis = RedisCache() diff --git a/validators/chat.py b/validators/chat.py new file mode 100644 index 0000000..43f4d91 --- /dev/null +++ b/validators/chat.py @@ -0,0 +1,10 @@ +from typing import Dict, Optional + +class Message(Dict): + chat: str + id: int + author: int + body: str + createdAt: int + replyTo: Optional[int] + updatedAt: Optional[int] \ No newline at end of file