From 9b03e625f4a028b5f557dad41a718ac1539bc5ed Mon Sep 17 00:00:00 2001 From: Untone Date: Tue, 23 Jan 2024 23:13:49 +0300 Subject: [PATCH] logs-fixes-debug --- resolvers/chats.py | 37 ++++++---- resolvers/load.py | 5 +- resolvers/messages.py | 159 +++++++++++++++++++++-------------------- resolvers/search.py | 11 +-- services/auth.py | 10 ++- services/core.py | 33 +++++---- services/rediscache.py | 6 -- 7 files changed, 141 insertions(+), 120 deletions(-) diff --git a/resolvers/chats.py b/resolvers/chats.py index 21122f0..343dbc2 100644 --- a/resolvers/chats.py +++ b/resolvers/chats.py @@ -1,6 +1,7 @@ import json import time import uuid +import logging from models.chat import Chat, ChatUpdate from services.auth import login_required @@ -8,6 +9,8 @@ from services.presence import notify_chat from services.rediscache import redis from services.schema import mutation +logger = logging.getLogger("[resolvers.chats] ") +logger.setLevel(logging.DEBUG) @mutation.field("update_chat") @login_required @@ -25,7 +28,7 @@ async def update_chat(_, info, chat_new: ChatUpdate): chat_str = await redis.execute("GET", f"chats/{chat_id}") if not chat_str: return {"error": "chat not exist"} - else: + elif isinstance(chat_str, str): chat: Chat = json.loads(chat_str) if author_id in chat["admins"]: chat.update( @@ -50,7 +53,7 @@ async def update_chat(_, info, chat_new: ChatUpdate): async def create_chat(_, info, title="", members=None): members = members or [] author_id = info.context["author_id"] - chat = None + chat: Chat if author_id: if author_id not in members: members.append(int(author_id)) @@ -58,15 +61,19 @@ async def create_chat(_, info, title="", members=None): # NOTE: private chats has no title # reuse private chat created before if exists if len(members) == 2 and title == "": - chatset1 = set((await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}")) or []) - chatset2 = set((await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}")) or []) - for c in chatset1.intersection(chatset2): - chat_data = await redis.execute("GET", f"chats/{c}") - if chat_data: - chat = json.loads(chat_data) - if chat["title"] == "": - print("[inbox] createChat found old chat") - return {"chat": chat, "error": "existed"} + chatdata1 = await redis.execute("SMEMBERS", f"chats_by_author/{members[0]}") + chatdata2 = await redis.execute("SMEMBERS", f"chats_by_author/{members[1]}") + if isinstance(chatdata1, list) and isinstance(chatdata2, list): + chatset1 = set(chatdata1) + chatset2 = set(chatdata2) + + for c in chatset1.intersection(chatset2): + chat_data = await redis.execute("GET", f"chats/{c}") + if isinstance(chat_data, str): + chat = json.loads(chat_data) + if chat["title"] == "": + logger.info("[inbox] createChat found old chat") + return {"chat": chat, "error": "existed"} chat_id = str(uuid.uuid4()) chat: Chat = { @@ -89,7 +96,8 @@ async def create_chat(_, info, title="", members=None): await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat)) await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(0)) - return {"error": None, "chat": chat} + return {"error": None, "chat": chat} + return {"error": "no chat was created"} @mutation.field("delete_chat") @@ -97,12 +105,11 @@ async def create_chat(_, info, title="", members=None): async def delete_chat(_, info, chat_id: str): author_id = info.context["author_id"] chat_str = await redis.execute("GET", f"chats/{chat_id}") - if chat_str: + if isinstance(chat_str, str): chat: Chat = json.loads(chat_str) if author_id in chat["admins"]: await redis.execute("DEL", f"chats/{chat_id}") await redis.execute("SREM", f"chats_by_author/{author_id}", chat_id) for member_id in chat["members"]: await notify_chat(chat, member_id, "delete") - else: - return {"error": "chat not exist"} + return {"error": "chat not exist"} diff --git a/resolvers/load.py b/resolvers/load.py index 536c69d..06bd0fd 100644 --- a/resolvers/load.py +++ b/resolvers/load.py @@ -27,8 +27,9 @@ async def load_messages( try: message_ids = [] + (ids or []) if limit: - mids = (await redis.lrange(f"chats/{chat_id}/message_ids", offset, offset + limit)) or [] - message_ids += mids + mids = await redis.execute("LRANGE", f"chats/{chat_id}/message_ids", offset, offset + limit) + if isinstance(mids, list): + message_ids.extend(mids) if message_ids: message_keys = [f"chats/{chat_id}/messages/{mid}" for mid in message_ids] messages = (await redis.mget(*message_keys)) or [] diff --git a/resolvers/messages.py b/resolvers/messages.py index 03ca47a..d4d2b83 100644 --- a/resolvers/messages.py +++ b/resolvers/messages.py @@ -21,59 +21,61 @@ async def create_message(_, info, chat_id: str, body: str, reply_to=None): # Если данных чата нет, возвращаем ошибку if not chat_data: return {"error": "chat is not exist"} - else: + elif isinstance(chat_data, str): # Преобразование данных чата из строки JSON в словарь chat_dict = json.loads(chat_data) - print(chat_dict) + chat_id = chat_dict["id"] # Получение ID следующего сообщения message_id = await redis.execute("GET", f"chats/{chat_dict['id']}/next_message_id") - message_id = int(message_id) if message_id else 0 - chat_id = chat_dict["id"] - # Создание нового сообщения - new_message: Message = { - "chat_id": chat_id, - "id": message_id, - "created_by": author_id, - "body": body, - "created_at": int(time.time()), - "updated_at": None, - } + if isinstance(message_id, str) or isinstance(message_id, int): + message_id = int(message_id) if message_id else 0 + # Создание нового сообщения + new_message: Message = { + "chat_id": chat_id, + "id": message_id, + "created_by": author_id, + "body": body, + "created_at": int(time.time()), + "updated_at": None, + "reply_to": None + } - # Если есть ответ, добавляем его в сообщение - if reply_to: - new_message["reply_to"] = reply_to + # Если есть ответ, добавляем его в сообщение + if reply_to: + new_message["reply_to"] = reply_to - # Обновление времени последнего обновления чата - chat_dict["updated_at"] = new_message["created_at"] + # Обновление времени последнего обновления чата + chat_dict["updated_at"] = new_message["created_at"] - # Запись обновленных данных чата обратно в Redis - await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict)) - print(f"[inbox] creating message {new_message}") + # Запись обновленных данных чата обратно в Redis + await redis.execute("SET", f"chats/{chat_id}", json.dumps(chat_dict)) + print(f"[inbox] creating message {new_message}") - # Запись нового сообщения в Redis - await redis.execute( - "SET", - f"chats/{chat_id}/messages/{message_id}", - json.dumps(new_message), - ) + # Запись нового сообщения в Redis + await redis.execute( + "SET", + f"chats/{chat_id}/messages/{message_id}", + json.dumps(new_message), + ) - # Добавление ID нового сообщения в список ID сообщений чата - await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id)) + # Добавление ID нового сообщения в список ID сообщений чата + await redis.execute("LPUSH", f"chats/{chat_id}/message_ids", str(message_id)) - # Обновление ID следующего сообщения - await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)) + # Обновление ID следующего сообщения + await redis.execute("SET", f"chats/{chat_id}/next_message_id", str(message_id + 1)) - # Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата - members = chat_dict["members"] - for member_id in members: - await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)) + # Добавление нового сообщения в список непрочитанных сообщений для каждого участника чата + members = chat_dict["members"] + for member_id in members: + await redis.execute("LPUSH", f"chats/{chat_dict['id']}/unread/{member_id}", str(message_id)) - # Отправка уведомления о новом сообщении - new_message["chat_id"] = chat_id - await notify_message(new_message, "create") + # Отправка уведомления о новом сообщении + new_message["chat_id"] = chat_id + await notify_message(new_message, "create") - return {"message": new_message, "error": None} + return {"message": new_message, "error": None} + return {"error": "cannot create message"} @mutation.field("update_message") @@ -94,24 +96,24 @@ async def update_message(_, info, message): message = await redis.execute("GET", f"chats/{chat_id}/messages/{message_id}") if not message: return {"error": "message not exist"} + elif isinstance(message, str): + message = json.loads(message) + if message["created_by"] != author_id: + return {"error": "access denied"} - message = json.loads(message) - if message["created_by"] != author_id: - return {"error": "access denied"} + if body: + message["body"] = body + message["updated_at"] = int(time.time()) - if body: - message["body"] = body - message["updated_at"] = int(time.time()) + await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) - await redis.execute("SET", f"chats/{chat_id}/messages/{message_id}", json.dumps(message)) + # Отправка уведомления + message["chat_id"] = chat_id + await notify_message(message, "update") - # Отправка уведомления - message["chat_id"] = chat_id - await notify_message(message, "update") + return {"message": message, "error": None} - return {"message": message, "error": None} - else: - return {"message": message, "error": "cannot update, no message_id"} + return {"message": message, "error": "cannot update"} @mutation.field("delete_message") @@ -122,24 +124,26 @@ async def delete_message(_, info, chat_id: str, message_id: int): chat_str = await redis.execute("GET", f"chats/{chat_id}") if not chat_str: return {"error": "chat not exist"} - chat = json.loads(chat_str) + elif isinstance(chat_str, str): + chat = json.loads(chat_str) - message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") - if not message_data: - return {"error": "message not exist"} - message: Message = json.loads(message_data) - if message["author"] != author_id: - return {"error": "access denied"} + message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") + if not message_data: + return {"error": "message not exist"} + elif isinstance(message_data, str): + message: Message = json.loads(message_data) + if message["created_by"] != author_id: + return {"error": "access denied"} - await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)) - await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") + await redis.execute("LREM", f"chats/{chat_id}/message_ids", 0, str(message_id)) + await redis.execute("DEL", f"chats/{chat_id}/messages/{str(message_id)}") - members = chat["members"] - for member_id in members: - await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)) + members = chat["members"] + for member_id in members: + await redis.execute("LREM", f"chats/{chat_id}/unread/{member_id}", 0, str(message_id)) - message["chat_id"] = chat_id - await notify_message(message, "delete") + message["chat_id"] = chat_id + await notify_message(message, "delete") return {} @@ -152,19 +156,20 @@ async def mark_as_read(_, info, chat_id: str, message_id: int): chat_str = await redis.execute("GET", f"chats/{chat_id}") if not chat_str: return {"error": "chat not exist"} + if isinstance(chat_str, str): + chat = json.loads(chat_str) + members = set(chat["members"]) + if author_id not in members: + return {"error": "access denied"} - chat = json.loads(chat_str) - members = set(chat["members"]) - if author_id not in members: - return {"error": "access denied"} + await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)) - await redis.execute("LREM", f"chats/{chat_id}/unread/{author_id}", 0, str(message_id)) + message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") + if not message_data: + return {"error": "message not exist"} + elif isinstance(message_data, str): + message: Message = json.loads(message_data) - message_data = await redis.execute("GET", f"chats/{chat_id}/messages/{str(message_id)}") - if not message_data: - return {"error": "message not exist"} - message: Message = json.loads(message_data) - - await notify_message(message, "seen") + await notify_message(message, "seen") return {"error": None} diff --git a/resolvers/search.py b/resolvers/search.py index 5982b3a..b014f50 100644 --- a/resolvers/search.py +++ b/resolvers/search.py @@ -19,11 +19,11 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 author_id = info.context["author_id"] existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}") - if isinstance(existed_chats, str): - chats_list = list(json.loads(existed_chats)) - for chat_id in chats_list[offset : (offset + limit)]: - members_ids = await redis.execute("GET", f"/chats/{chat_id}/members") - if isinstance(members_ids, list): + if isinstance(existed_chats, set): + chats_list = list(existed_chats) + for chat_id in chats_list[offset: (offset + limit)]: + members_ids = await redis.execute("SMEMBERS", f"/chats/{chat_id}/members") + if isinstance(members_ids, set): for member_id in members_ids: author = CacheStorage.authors_by_id.get(member_id) if author: @@ -36,6 +36,7 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 return {"members": list(result), "error": None} + @query.field("search_messages") @login_required async def search_messages( diff --git a/services/auth.py b/services/auth.py index 23a5ea3..6e9cf0d 100644 --- a/services/auth.py +++ b/services/auth.py @@ -5,6 +5,10 @@ from starlette.exceptions import HTTPException from services.core import get_author_by_user from settings import AUTH_URL +import logging + +logger = logging.getLogger("[services.auth] ") +logger.setLevel(logging.DEBUG) async def check_auth(req) -> str | None: token = req.headers.get("Authorization") @@ -38,14 +42,14 @@ async def check_auth(req) -> str | None: data = await response.json() errors = data.get("errors") if errors: - print(f"[services.auth] errors: {errors}") + logger.error(f"{errors}") else: user_id = data.get("data", {}).get(query_name, {}).get("claims", {}).get("sub") - print(f"[services.auth] got user_id: {user_id}") + logger.info(f"[services.auth] got user_id: {user_id}") return user_id except Exception as e: # Handling and logging exceptions during authentication check - print(f"[services.auth] {e}") + logger.error(e) if not user_id: raise HTTPException(status_code=401, detail="Unauthorized") diff --git a/services/core.py b/services/core.py index c70c124..a94a623 100644 --- a/services/core.py +++ b/services/core.py @@ -6,21 +6,30 @@ from datetime import datetime, timezone, timedelta from models.member import ChatMember from settings import API_BASE +import time +import logging + +logger = logging.getLogger("[services.core] ") +logger.setLevel(logging.DEBUG) + def _request_endpoint(query_name, body) -> dict: - print(f"[services.core] requesting {query_name}...") + ts1 = time.time() + logger.debug(f"requesting {query_name}...") response = requests.post(API_BASE, headers={"Content-Type": "application/json"}, json=body) - print(f"[services.core] {query_name} response: <{response.status_code}> {response.text[:32]}..") + ts2 = time.time() + logger.debug(f"{query_name} response in {ts1-ts2} secs: <{response.status_code}> {response.text[:32]}..") + if response.status_code == 200: try: r = response.json() result = r.get("data", {}).get(query_name, {}) if result: - print(f"[services.core] entries amount in result: {len(result)} ") + logger.info(f"entries amount in result: {len(result)} ") return result except ValueError as e: - print(f"[services.core] Error decoding JSON response: {e}") + logger.error(f"Error decoding JSON response: {e}") return {} @@ -42,7 +51,7 @@ def get_author_by_user(user: str): gql = { "query": f"query {operation}($user: String!) {{ {query_name}(user: $user){{ id }} }}", "operationName": operation, - "variables": {"user": user}, + "variables": {"user": user.strip()}, } return _request_endpoint(query_name, gql) @@ -74,14 +83,14 @@ class CacheStorage: self = CacheStorage async with self.lock: task = asyncio.create_task(self.worker()) - print(task) + logger.info(task) @staticmethod async def update_authors(): self = CacheStorage async with self.lock: result = get_all_authors() - print(f"[services.core] loaded {len(result)}") + logger.info(f"cache loaded {len(result)}") if result: CacheStorage.authors = result for a in result: @@ -95,20 +104,20 @@ class CacheStorage: self = CacheStorage while True: try: - print("[services.core] - updating profiles data...") + logger.info(" - updating profiles data...") await self.update_authors() failed = 0 except Exception as er: failed += 1 - print(f"[services.core] {er} - update failed #{failed}, wait 10 seconds") + logger.error(f"{er} - update failed #{failed}, wait 10 seconds") if failed > 3: - print("[services.core] - not trying to update anymore") + logger.error(" - not trying to update anymore") break if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period) t = format(when.astimezone().isoformat()) - print("[services.core] ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) + logger.info(" ⎩ next update: %s" % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0])) await asyncio.sleep(self.period) else: await asyncio.sleep(10) - print("[services.core] - trying to update data again") + logger.info(" - trying to update data again") diff --git a/services/rediscache.py b/services/rediscache.py index d878609..1f5f753 100644 --- a/services/rediscache.py +++ b/services/rediscache.py @@ -24,7 +24,6 @@ class RedisCache: return r except Exception as e: print(f"[redis] error: {e}") - return None async def subscribe(self, *channels): if self._client: @@ -46,11 +45,6 @@ class RedisCache: return await self._client.publish(channel, data) - async def lrange(self, key, start, stop): - if self._client: - print(f"[redis] LRANGE {key} {start} {stop}") - return await self._client.lrange(key, start, stop) - async def mget(self, key, *keys): if self._client: print(f"[redis] MGET {key} {keys}")