From ce02ce01300b7055d352285eafb8a5d63f5134be Mon Sep 17 00:00:00 2001 From: Untone Date: Sat, 23 Dec 2023 09:11:04 +0300 Subject: [PATCH] cached-storage-authors --- main.py | 45 +++++++++++++------------ resolvers/load.py | 32 +++++++++--------- resolvers/search.py | 81 +++++++++++++++++++++------------------------ services/core.py | 64 +++++++++++++++++++++++++++++++++-- settings.py | 4 +-- 5 files changed, 141 insertions(+), 85 deletions(-) diff --git a/main.py b/main.py index 47d9022..3dc627a 100644 --- a/main.py +++ b/main.py @@ -8,42 +8,43 @@ from sentry_sdk.integrations.aiohttp import AioHttpIntegration from sentry_sdk.integrations.ariadne import AriadneIntegration from sentry_sdk.integrations.redis import RedisIntegration from starlette.applications import Starlette - +import asyncio from services.rediscache import redis from services.schema import resolvers from settings import DEV_SERVER_PID_FILE_NAME, MODE, SENTRY_DSN +from services.core import CacheStorage + import_module("resolvers") schema = make_executable_schema(load_schema_from_path("inbox.graphql"), resolvers) # type: ignore async def start_up(): + await redis.connect() + + await CacheStorage.init() + if MODE == "dev": - if exists(DEV_SERVER_PID_FILE_NAME): - await redis.connect() - return - else: + if not exists(DEV_SERVER_PID_FILE_NAME): with open(DEV_SERVER_PID_FILE_NAME, "w", encoding="utf-8") as f: f.write(str(os.getpid())) else: - await redis.connect() + # startup sentry monitoring services + try: + import sentry_sdk - # startup sentry monitoring services - try: - import sentry_sdk - - sentry_sdk.init( - SENTRY_DSN, - enable_tracing=True, - integrations=[ - AriadneIntegration(), - RedisIntegration(), - AioHttpIntegration(), - ], - ) - except Exception as e: - print("[sentry] init error") - print(e) + sentry_sdk.init( + SENTRY_DSN, + enable_tracing=True, + integrations=[ + AriadneIntegration(), + RedisIntegration(), + AioHttpIntegration(), + ], + ) + except Exception as e: + print("[sentry] init error") + print(e) async def shutdown(): diff --git a/resolvers/load.py b/resolvers/load.py index 18f1f1b..536c69d 100644 --- a/resolvers/load.py +++ b/resolvers/load.py @@ -5,14 +5,17 @@ from typing import Any, Dict, List, Optional, Union from models.chat import ChatPayload, Message from resolvers.chats import create_chat from services.auth import login_required -from services.core import get_all_authors +from services.core import CacheStorage from services.rediscache import redis from services.schema import query async def get_unread_counter(chat_id: str, member_id: int) -> int: unread = await redis.execute("LLEN", f"chats/{chat_id}/unread/{member_id}") - return unread or 0 + if isinstance(unread, int): + return unread + else: + return 0 # NOTE: not an API handler @@ -55,9 +58,9 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni chats = [] if author_id: cids = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}") - if cids: + if isinstance(cids, list): members_online = (await redis.execute("SMEMBERS", "authors-online")) or [] - cids = list(cids)[offset : (offset + limit)] + cids = cids[offset : (offset + limit)] lock = asyncio.Lock() if len(cids) == 0: print(f"[resolvers.load] no chats for user with id={author_id}") @@ -65,21 +68,18 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni print(f"[resolvers.load] created chat: {r['chat_id']}") cids.append(r["chat"]["id"]) - authors = get_all_authors() - authors_by_id = {a["id"]: a for a in authors} - for cid in cids: async with lock: - chat_str: str = await redis.execute("GET", f"chats/{cid}") - print(f"[resolvers.load] redis GET by {cid}: {chat_str}") - if chat_str: + chat_str = await redis.execute("GET", f"chats/{cid}") + if isinstance(chat_str, str): + print(f"[resolvers.load] redis GET by {cid}: {chat_str}") c: ChatPayload = json.loads(chat_str) - c["messages"] = await load_messages(cid, 5, 0) + c["messages"] = (await load_messages(cid, 5, 0)) or [] c["unread"] = await get_unread_counter(cid, author_id) member_ids = c["members"].copy() c["members"] = [] for member_id in member_ids: - a = authors_by_id.get(member_id) + a = CacheStorage.authors_by_id.get(member_id) if a: a["online"] = a.get("id") in members_online c["members"].append(a) @@ -92,9 +92,9 @@ async def load_chats(_, info, limit: int = 50, offset: int = 0) -> Dict[str, Uni async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): """load :limit messages of :chat_id with :offset""" author_id = info.context["author_id"] - author_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) or [] - author_chats = [c for c in author_chats] - if author_chats: + author_chats = (await redis.execute("SMEMBERS", "chats_by_author/" + str(author_id))) + if isinstance(author_chats, list): + author_chats = [c for c in author_chats] messages = [] by_chat = by.get("chat") if by_chat in author_chats: @@ -105,7 +105,7 @@ async def load_messages_by(_, info, by, limit: int = 10, offset: int = 0): messages = await load_messages(by_chat, limit, offset) return { "messages": sorted( - [m for m in messages if m.get("created_at")], + [m for m in messages if m and m.get("created_at")], key=lambda m: m.get("created_at"), ), "error": None, diff --git a/resolvers/search.py b/resolvers/search.py index e8dfbe6..5982b3a 100644 --- a/resolvers/search.py +++ b/resolvers/search.py @@ -1,10 +1,10 @@ import json -from datetime import datetime, timedelta, timezone +import time from typing import Any, Dict, List, Union from resolvers.load import load_messages from services.auth import login_required -from services.core import get_all_authors +from services.core import CacheStorage from services.rediscache import redis from services.schema import query @@ -18,22 +18,21 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 author_id = info.context["author_id"] - authors = get_all_authors() - authors_by_id = {a["id"]: a for a in authors} - existed_chats = await redis.execute("SMEMBERS", f"/chats_by_author/{author_id}") - if existed_chats: - for chat_id in list(json.loads(existed_chats))[offset : (offset + limit)]: + if isinstance(existed_chats, str): + chats_list = list(json.loads(existed_chats)) + for chat_id in chats_list[offset : (offset + limit)]: members_ids = await redis.execute("GET", f"/chats/{chat_id}/members") - for member_id in members_ids: - author = authors_by_id.get(member_id) - if author: - if author["name"].startswith(text): - result.add(author) + if isinstance(members_ids, list): + for member_id in members_ids: + author = CacheStorage.authors_by_id.get(member_id) + if author: + if author["name"].startswith(text): + result.add(author) more_amount = limit - len(result) if more_amount > 0: - result.update(authors_by_id.values()[0:more_amount]) + result.update(CacheStorage.authors[0:more_amount]) return {"members": list(result), "error": None} @@ -42,39 +41,35 @@ async def search_recipients(_, info, text: str, limit: int = 50, offset: int = 0 async def search_messages( _, info, by: Dict[str, Union[str, int]], limit: int, offset: int ) -> Dict[str, Union[List[Dict[str, Any]], None]]: - author_id = info.context["author_id"] - lookup_chats = set((await redis.execute("SMEMBERS", f"chats_by_author/{author_id}")) or []) messages_set = set([]) + author_id = info.context["author_id"] + lookup_chats = await redis.execute("SMEMBERS", f"chats_by_author/{author_id}") + if isinstance(lookup_chats, list): + lookup_chats = set(lookup_chats) - by_member = by.get("author") - body_like = by.get("body") - days_ago = by.get("days") - - # pre-filter lookup chats - if by_member: - lookup_chats = filter( - lambda ca: by_member in ca["members"], - list(lookup_chats), - ) - - # load the messages from lookup chats - for c in lookup_chats: - chat_id = c.decode() - mmm = await load_messages(chat_id, limit, offset) + # pre-filter lookup chats + by_member = by.get("author") if by_member: - mmm = list(filter(lambda mx: mx["author"] == by_member, mmm)) - if body_like: - mmm = list(filter(lambda mx: body_like in mx["body"], mmm)) - if days_ago: - mmm = list( - filter( - lambda msg: int(datetime.now(tz=timezone.utc)) - int(msg["created_at"]) - < int(timedelta(days=days_ago)), - mmm, - ) + lookup_chats = filter( + lambda ca: by_member in ca["members"], + list(lookup_chats), ) - messages_set.union(set(mmm)) + # load the messages from lookup chats + for c in lookup_chats: + chat_id = c.decode() + mmm = await load_messages(chat_id, limit, offset) + filter_method = None + if by_member: + filter_method = lambda mx: mx and mx["created_by"] == by_member + body_like = by.get("body") or "" + if isinstance(body_like, str): + filter_method = lambda mx: mx and body_like in mx["body"] + days_ago = int(by.get("days") or "0") + if days_ago: + filter_method = lambda mx: mx and (int(time.time()) - mx["created_by"] < days_ago * 24 * 60 * 60) + if filter_method: + mmm = list(filter(filter_method, mmm)) + messages_set |= set(mmm) - messages_sorted = sorted(list(messages_set)) - return {"messages": messages_sorted, "error": None} + return {"messages": sorted(list(messages_set)), "error": None} diff --git a/services/core.py b/services/core.py index 8ddaf85..91fed73 100644 --- a/services/core.py +++ b/services/core.py @@ -1,6 +1,8 @@ from typing import List - +import asyncio import requests +from datetime import datetime, timezone, timedelta + from models.member import ChatMember from settings import API_BASE @@ -8,7 +10,7 @@ from settings import API_BASE def _request_endpoint(query_name, body) -> dict: print(f"[services.core] requesting {query_name}...") response = requests.post(API_BASE, headers={"Content-Type": "application/json"}, json=body) - print(f"[services.core] {query_name} response: <{response.status_code}> {response.text[:65]}..") + print(f"[services.core] {query_name} response: <{response.status_code}> {response.text}..") if response.status_code == 200: try: @@ -55,3 +57,61 @@ def get_my_followed() -> List[ChatMember]: result = _request_endpoint(query_name, gql) return result.get("authors", []) + + +class CacheStorage: + lock = asyncio.Lock() + period = 5 * 60 # every 5 mins + client = None + authors = [] + authors_by_user = {} + authors_by_id = {} + + @staticmethod + async def init(): + """graphql client connection using permanent token""" + self = CacheStorage + async with self.lock: + task = asyncio.create_task(self.worker()) + print(task) + + @staticmethod + async def update_authors(): + self = CacheStorage + async with self.lock: + result = get_all_authors() + print(f"[services.core] loaded {len(result)}") + if result: + CacheStorage.authors = result + for a in result: + self.authors_by_user[a.user] = a + self.authors_by_id[a.id] = a + + + @staticmethod + async def worker(): + """async task worker""" + failed = 0 + self = CacheStorage + while True: + try: + print("[services.core] - updating views...") + await self.update_authors() + failed = 0 + except Exception: + failed += 1 + print("[services.core] - update failed #%d, wait 10 seconds" % failed) + if failed > 3: + print("[services.core] - not trying to update anymore") + break + if failed == 0: + when = datetime.now(timezone.utc) + timedelta(seconds=self.period) + t = format(when.astimezone().isoformat()) + print( + "[services.core] ⎩ next update: %s" + % (t.split("T")[0] + " " + t.split("T")[1].split(".")[0]) + ) + await asyncio.sleep(self.period) + else: + await asyncio.sleep(10) + print("[services.core] - trying to update data again") diff --git a/settings.py b/settings.py index 97dad6d..c0318ae 100644 --- a/settings.py +++ b/settings.py @@ -2,8 +2,8 @@ from os import environ PORT = 80 REDIS_URL = environ.get("REDIS_URL") or "redis://127.0.0.1" -API_BASE = environ.get("API_BASE") or "https://v2.discours.io/" -AUTH_URL = environ.get("AUTH_URL") or "https://v2.discours.io/" +API_BASE = environ.get("API_BASE") or "https://core.discours.io/" +AUTH_URL = environ.get("AUTH_URL") or "https://auth.discours.io/" MODE = environ.get("MODE") or "production" SENTRY_DSN = environ.get("SENTRY_DSN") DEV_SERVER_PID_FILE_NAME = "dev-server.pid"