From da89b20e5c447ab5b0732c558bd3d2d16519e296 Mon Sep 17 00:00:00 2001 From: Untone Date: Sun, 26 May 2024 02:17:45 +0300 Subject: [PATCH 01/10] session-close-fix --- resolvers/author.py | 226 +++++++++++++++++++++--------------------- resolvers/follower.py | 7 +- resolvers/stat.py | 28 ++++-- 3 files changed, 136 insertions(+), 125 deletions(-) diff --git a/resolvers/author.py b/resolvers/author.py index 739e8a5f..1dc406bf 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -63,29 +63,30 @@ async def get_author(_, _info, slug="", author_id=0): author_query = author_query.filter(Author.id == author_id) else: raise ValueError("Author not found") - lookup_result = local_session().execute(author_query).first() - if lookup_result: - [found_author] = lookup_result - # logger.debug(found_author) - if found_author: - logger.debug(f"found author id: {found_author.id}") - author_id = found_author.id if found_author.id else author_id - if author_id: - cached_result = await redis.execute("GET", f"author:{author_id}") - if isinstance(cached_result, str): - author_dict = json.loads(cached_result) + with local_session() as session: + lookup_result = session.execute(author_query).first() + if lookup_result: + [found_author] = lookup_result + # logger.debug(found_author) + if found_author: + logger.debug(f"found author id: {found_author.id}") + author_id = found_author.id if found_author.id else author_id + if author_id: + cached_result = await redis.execute("GET", f"author:{author_id}") + if isinstance(cached_result, str): + author_dict = json.loads(cached_result) - # update stat from db - if not author_dict or not author_dict.get("stat"): - result = get_with_stat(author_query) - if not result: - raise ValueError("Author not found") - [author] = result - # use found author - if isinstance(author, Author): - logger.debug(f"update @{author.slug} with id {author.id}") - author_dict = author.dict() - await cache_author(author_dict) + # update stat from db + if not author_dict or not author_dict.get("stat"): + result = get_with_stat(author_query) + if not result: + raise ValueError("Author not found") + [author] = result + # use found author + if isinstance(author, Author): + logger.debug(f"update @{author.slug} with id {author.id}") + author_dict = author.dict() + await cache_author(author_dict) except ValueError: pass except Exception as exc: @@ -150,19 +151,20 @@ async def load_authors_by(_, _info, by, limit, offset): before = int(time.time()) - by["created_at"] authors_query = authors_query.filter(Author.created_at > before) authors_query = authors_query.limit(limit).offset(offset) - authors_nostat = local_session().execute(authors_query) - authors = [] - if authors_nostat: - for [a] in authors_nostat: - author_dict = None - if isinstance(a, Author): - author_id = a.id - if bool(author_id): - cached_result = await redis.execute("GET", f"author:{author_id}") - if isinstance(cached_result, str): - author_dict = json.loads(cached_result) - if not author_dict or not isinstance(author_dict.get("shouts"), int): - break + with local_session() as session: + authors_nostat = session.execute(authors_query) + authors = [] + if authors_nostat: + for [a] in authors_nostat: + author_dict = None + if isinstance(a, Author): + author_id = a.id + if bool(author_id): + cached_result = await redis.execute("GET", f"author:{author_id}") + if isinstance(cached_result, str): + author_dict = json.loads(cached_result) + if not author_dict or not isinstance(author_dict.get("shouts"), int): + break # order order = by.get("order") @@ -185,46 +187,47 @@ async def get_author_follows(_, _info, slug="", user=None, author_id=0): author_query = author_query.filter(Author.id == author_id) else: return {"error": "One of slug, user, or author_id must be provided"} - result = local_session().execute(author_query) - if result: - # logger.debug(result) - [author] = result - # logger.debug(author) - if author and isinstance(author, Author): - # logger.debug(author.dict()) - author_id = author.id if not author_id else author_id - topics = [] - authors = [] - if bool(author_id): - rkey = f"author:{author_id}:follows-authors" - logger.debug(f"getting {author_id} follows authors") - cached = await redis.execute("GET", rkey) - if not cached: - authors = author_follows_authors(author_id) # type: ignore - prepared = [author.dict() for author in authors] - await redis.execute( - "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) - ) - elif isinstance(cached, str): - authors = json.loads(cached) + with local_session() as session: + result = session.execute(author_query) + if result: + # logger.debug(result) + [author] = result + # logger.debug(author) + if author and isinstance(author, Author): + # logger.debug(author.dict()) + author_id = author.id if not author_id else author_id + topics = [] + authors = [] + if bool(author_id): + rkey = f"author:{author_id}:follows-authors" + logger.debug(f"getting {author_id} follows authors") + cached = await redis.execute("GET", rkey) + if not cached: + authors = author_follows_authors(author_id) # type: ignore + prepared = [author.dict() for author in authors] + await redis.execute( + "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) + ) + elif isinstance(cached, str): + authors = json.loads(cached) - rkey = f"author:{author_id}:follows-topics" - cached = await redis.execute("GET", rkey) - if cached and isinstance(cached, str): - topics = json.loads(cached) - if not cached: - topics = author_follows_topics(author_id) # type: ignore - prepared = [topic.dict() for topic in topics] - await redis.execute( - "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) - ) - return { - "topics": topics, - "authors": authors, - "communities": [ - {"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""} - ], - } + rkey = f"author:{author_id}:follows-topics" + cached = await redis.execute("GET", rkey) + if cached and isinstance(cached, str): + topics = json.loads(cached) + if not cached: + topics = author_follows_topics(author_id) # type: ignore + prepared = [topic.dict() for topic in topics] + await redis.execute( + "SET", rkey, json.dumps(prepared, cls=CustomJSONEncoder) + ) + return { + "topics": topics, + "authors": authors, + "communities": [ + {"id": 1, "name": "Дискурс", "slug": "discours", "pic": ""} + ], + } except Exception: import traceback @@ -310,44 +313,45 @@ async def get_author_followers(_, _info, slug: str): try: author_alias = aliased(Author) author_query = select(author_alias).filter(author_alias.slug == slug) - result = local_session().execute(author_query).first() - followers = [] - if result: - [author] = result - author_id = author.id - cached = await redis.execute("GET", f"author:{author_id}:followers") - if cached: - followers_ids = [] - followers = [] - if isinstance(cached, str): - followers_cached = json.loads(cached) - if isinstance(followers_cached, list): - logger.debug( - f"@{slug} got {len(followers_cached)} followers cached" - ) - for fc in followers_cached: - if fc["id"] not in followers_ids and fc["id"] != author_id: - followers.append(fc) - followers_ids.append(fc["id"]) - return followers + with local_session() as session: + result = session.execute(author_query).first() + followers = [] + if result: + [author] = result + author_id = author.id + cached = await redis.execute("GET", f"author:{author_id}:followers") + if cached: + followers_ids = [] + followers = [] + if isinstance(cached, str): + followers_cached = json.loads(cached) + if isinstance(followers_cached, list): + logger.debug( + f"@{slug} got {len(followers_cached)} followers cached" + ) + for fc in followers_cached: + if fc["id"] not in followers_ids and fc["id"] != author_id: + followers.append(fc) + followers_ids.append(fc["id"]) + return followers - author_follower_alias = aliased(AuthorFollower, name="af") - followers_query = select(Author).join( - author_follower_alias, - and_( - author_follower_alias.author == author_id, - author_follower_alias.follower == Author.id, - Author.id != author_id, # exclude the author from the followers - ), - ) - followers = get_with_stat(followers_query) - if isinstance(followers, list): - followers_ids = [r.id for r in followers] - for follower in followers: - if follower.id not in followers_ids: - await cache_follow_author_change(follower.dict(), author.dict()) - followers_ids.append(follower.id) - logger.debug(f"@{slug} cache updated with {len(followers)} followers") + author_follower_alias = aliased(AuthorFollower, name="af") + followers_query = select(Author).join( + author_follower_alias, + and_( + author_follower_alias.author == author_id, + author_follower_alias.follower == Author.id, + Author.id != author_id, # exclude the author from the followers + ), + ) + followers = get_with_stat(followers_query) + if isinstance(followers, list): + followers_ids = [r.id for r in followers] + for follower in followers: + if follower.id not in followers_ids: + await cache_follow_author_change(follower.dict(), author.dict()) + followers_ids.append(follower.id) + logger.debug(f"@{slug} cache updated with {len(followers)} followers") return followers except Exception as exc: import traceback diff --git a/resolvers/follower.py b/resolvers/follower.py index 8efcb577..1b24219b 100644 --- a/resolvers/follower.py +++ b/resolvers/follower.py @@ -73,8 +73,8 @@ async def follow(_, info, what, slug): _topic_dict = await cache_by_slug(what, slug) elif what == "COMMUNITY": - # FIXME: when more communities - follows = local_session().execute(select(Community)) + with local_session() as session: + follows = session.execute(select(Community)) elif what == "SHOUT": error = reactions_follow(follower_id, slug) @@ -122,7 +122,8 @@ async def unfollow(_, info, what, slug): _topic_dict = await cache_by_slug(what, slug) elif what == "COMMUNITY": - follows = local_session().execute(select(Community)) + with local_session() as session: + follows = session.execute(select(Community)) elif what == "SHOUT": error = reactions_unfollow(follower_id, slug) diff --git a/resolvers/stat.py b/resolvers/stat.py index 7a5ed004..79fa23e7 100644 --- a/resolvers/stat.py +++ b/resolvers/stat.py @@ -52,13 +52,13 @@ def get_topic_shouts_stat(topic_id: int): ) ) ) - result = local_session().execute(q).first() + with local_session() as session: + result = session.execute(q).first() return result[0] if result else 0 def get_topic_authors_stat(topic_id: int): - # authors - q = ( + count_query = ( select(func.count(distinct(ShoutAuthor.author))) .select_from(join(ShoutTopic, Shout, ShoutTopic.shout == Shout.id)) .join(ShoutAuthor, ShoutAuthor.shout == Shout.id) @@ -70,7 +70,10 @@ def get_topic_authors_stat(topic_id: int): ) ) ) - result = local_session().execute(q).first() + + # Выполняем запрос и получаем результат + with local_session() as session: + result = session.execute(count_query).first() return result[0] if result else 0 @@ -79,7 +82,8 @@ def get_topic_followers_stat(topic_id: int): q = select(func.count(distinct(aliased_followers.follower))).filter( aliased_followers.topic == topic_id ) - result = local_session().execute(q).first() + with local_session() as session: + result = session.execute(q).first() return result[0] if result else 0 @@ -106,8 +110,8 @@ def get_topic_comments_stat(topic_id: int): ShoutTopic.topic == topic_id ) q = q.outerjoin(sub_comments, ShoutTopic.shout == sub_comments.c.shout_id) - - result = local_session().execute(q).first() + with local_session() as session: + result = session.execute(q).first() return result[0] if result else 0 @@ -141,7 +145,8 @@ def get_author_authors_stat(author_id: int): aliased_authors.author != author_id, ) ) - result = local_session().execute(q).first() + with local_session() as session: + result = session.execute(q).first() return result[0] if result else 0 @@ -150,7 +155,8 @@ def get_author_followers_stat(author_id: int): q = select(func.count(distinct(aliased_followers.follower))).filter( aliased_followers.author == author_id ) - result = local_session().execute(q).first() + with local_session() as session: + result = session.execute(q).first() return result[0] if result else 0 @@ -172,8 +178,8 @@ def get_author_comments_stat(author_id: int): .subquery() ) q = select(sub_comments.c.comments_count).filter(sub_comments.c.id == author_id) - - result = local_session().execute(q).first() + with local_session() as session: + result = session.execute(q).first() return result[0] if result else 0 From 3f6f7f1aa0bdf28a9eb63e1c92517f3de76a67c2 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 18:30:28 +0300 Subject: [PATCH 02/10] get-followers-fix --- resolvers/author.py | 90 +++++++++++++++++++++++---------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/resolvers/author.py b/resolvers/author.py index 1dc406bf..8f76f9ec 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -9,7 +9,7 @@ from orm.shout import ShoutAuthor, ShoutTopic from orm.topic import Topic from resolvers.stat import author_follows_authors, author_follows_topics, get_with_stat from services.auth import login_required -from services.cache import cache_author, cache_follow_author_change +from services.cache import cache_author from services.db import local_session from services.encoders import CustomJSONEncoder from services.logger import root_logger as logger @@ -72,7 +72,9 @@ async def get_author(_, _info, slug="", author_id=0): logger.debug(f"found author id: {found_author.id}") author_id = found_author.id if found_author.id else author_id if author_id: - cached_result = await redis.execute("GET", f"author:{author_id}") + cached_result = await redis.execute( + "GET", f"author:{author_id}" + ) if isinstance(cached_result, str): author_dict = json.loads(cached_result) @@ -160,10 +162,14 @@ async def load_authors_by(_, _info, by, limit, offset): if isinstance(a, Author): author_id = a.id if bool(author_id): - cached_result = await redis.execute("GET", f"author:{author_id}") + cached_result = await redis.execute( + "GET", f"author:{author_id}" + ) if isinstance(cached_result, str): author_dict = json.loads(cached_result) - if not author_dict or not isinstance(author_dict.get("shouts"), int): + if not author_dict or not isinstance( + author_dict.get("shouts"), int + ): break # order @@ -312,50 +318,46 @@ async def get_author_followers(_, _info, slug: str): logger.debug(f"getting followers for @{slug}") try: author_alias = aliased(Author) - author_query = select(author_alias).filter(author_alias.slug == slug) - with local_session() as session: - result = session.execute(author_query).first() - followers = [] - if result: - [author] = result - author_id = author.id - cached = await redis.execute("GET", f"author:{author_id}:followers") - if cached: - followers_ids = [] - followers = [] - if isinstance(cached, str): - followers_cached = json.loads(cached) - if isinstance(followers_cached, list): - logger.debug( - f"@{slug} got {len(followers_cached)} followers cached" - ) - for fc in followers_cached: - if fc["id"] not in followers_ids and fc["id"] != author_id: - followers.append(fc) - followers_ids.append(fc["id"]) - return followers + author_query = select(author_alias).filter_by(slug=slug) + async with local_session() as session: + result = await session.execute(author_query).first() + if not result: + return [] - author_follower_alias = aliased(AuthorFollower, name="af") - followers_query = select(Author).join( - author_follower_alias, - and_( - author_follower_alias.author == author_id, - author_follower_alias.follower == Author.id, - Author.id != author_id, # exclude the author from the followers - ), + author = result[0] + author_id = author.id + + cached = await redis.execute("GET", f"author:{author_id}:followers") + if cached: + followers_cached = json.loads(cached) + if isinstance(followers_cached, list): + logger.debug( + f"@{slug} got {len(followers_cached)} followers cached" + ) + followers = [fc for fc in followers_cached if fc["id"] != author_id] + return followers + + author_follower_alias = aliased(AuthorFollower, name="af") + followers_query = select(Author).join( + author_follower_alias, + and_( + author_follower_alias.author == author_id, + author_follower_alias.follower == Author.id, + Author.id != author_id, + ), + ) + followers_result = await session.execute(followers_query) + followers = followers_result.scalars().all() + + if isinstance(followers, list): + followers_data = [follower.dict() for follower in followers] + await redis.execute( + "SET", f"author:{author_id}:followers", json.dumps(followers_data) ) - followers = get_with_stat(followers_query) - if isinstance(followers, list): - followers_ids = [r.id for r in followers] - for follower in followers: - if follower.id not in followers_ids: - await cache_follow_author_change(follower.dict(), author.dict()) - followers_ids.append(follower.id) - logger.debug(f"@{slug} cache updated with {len(followers)} followers") - return followers + logger.debug(f"@{slug} cache updated with {len(followers)} followers") + return followers except Exception as exc: import traceback - logger.error(exc) logger.error(traceback.format_exc()) return [] From f43624ca3d2640e83ac037545a21c8fd8aaca98e Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:29:51 +0300 Subject: [PATCH 03/10] async-fix --- resolvers/author.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resolvers/author.py b/resolvers/author.py index 8f76f9ec..918fa9b2 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -319,7 +319,7 @@ async def get_author_followers(_, _info, slug: str): try: author_alias = aliased(Author) author_query = select(author_alias).filter_by(slug=slug) - async with local_session() as session: + with local_session() as session: result = await session.execute(author_query).first() if not result: return [] @@ -358,6 +358,7 @@ async def get_author_followers(_, _info, slug: str): return followers except Exception as exc: import traceback + logger.error(exc) logger.error(traceback.format_exc()) return [] From 9d9adfbdfa00d276b6df44c9d02c147107d9b277 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:36:25 +0300 Subject: [PATCH 04/10] async-fix-2 --- resolvers/author.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolvers/author.py b/resolvers/author.py index 918fa9b2..d0f24073 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -320,7 +320,7 @@ async def get_author_followers(_, _info, slug: str): author_alias = aliased(Author) author_query = select(author_alias).filter_by(slug=slug) with local_session() as session: - result = await session.execute(author_query).first() + result = session.execute(author_query).first() if not result: return [] From c90783f4615c67b1ba88fc2588a3a2231937abec Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:38:34 +0300 Subject: [PATCH 05/10] async-fix-3 --- resolvers/author.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolvers/author.py b/resolvers/author.py index d0f24073..b4a78264 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -346,7 +346,7 @@ async def get_author_followers(_, _info, slug: str): Author.id != author_id, ), ) - followers_result = await session.execute(followers_query) + followers_result = session.execute(followers_query) followers = followers_result.scalars().all() if isinstance(followers, list): From 7b72963b241eea4d5a8dda6b81ce2899d8f95d8c Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:39:48 +0300 Subject: [PATCH 06/10] reply-to-fix --- resolvers/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolvers/reader.py b/resolvers/reader.py index 66788752..2caa5412 100644 --- a/resolvers/reader.py +++ b/resolvers/reader.py @@ -327,7 +327,7 @@ async def load_shouts_unrated(_, info, limit: int = 50, offset: int = 0): Reaction, and_( Reaction.shout == Shout.id, - Reaction.replyTo.is_(None), + Reaction.reply_to.is_(None), Reaction.kind.in_( [ReactionKind.LIKE.value, ReactionKind.DISLIKE.value] ), From 01d2d90df19b39daeb6cec37afb552b10c5befc8 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:55:56 +0300 Subject: [PATCH 07/10] cached-empty-fix --- resolvers/author.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolvers/author.py b/resolvers/author.py index b4a78264..76747196 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -330,7 +330,7 @@ async def get_author_followers(_, _info, slug: str): cached = await redis.execute("GET", f"author:{author_id}:followers") if cached: followers_cached = json.loads(cached) - if isinstance(followers_cached, list): + if isinstance(followers_cached, list) and len(followers_cached) > 0: logger.debug( f"@{slug} got {len(followers_cached)} followers cached" ) From d93311541eb19b52637957fa179ba9b61325498a Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:57:22 +0300 Subject: [PATCH 08/10] cached-empty-fix-2 --- resolvers/author.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resolvers/author.py b/resolvers/author.py index 76747196..f3a277a4 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -334,7 +334,7 @@ async def get_author_followers(_, _info, slug: str): logger.debug( f"@{slug} got {len(followers_cached)} followers cached" ) - followers = [fc for fc in followers_cached if fc["id"] != author_id] + followers = [fc for fc in followers_cached if str(fc["id"]) != str(author_id)] return followers author_follower_alias = aliased(AuthorFollower, name="af") From 9a94e5ac56c5ea078319ff6108b4e6582b2c63c2 Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 19:59:16 +0300 Subject: [PATCH 09/10] cached-empty-fix-3 --- resolvers/author.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resolvers/author.py b/resolvers/author.py index f3a277a4..f453a39e 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -334,8 +334,7 @@ async def get_author_followers(_, _info, slug: str): logger.debug( f"@{slug} got {len(followers_cached)} followers cached" ) - followers = [fc for fc in followers_cached if str(fc["id"]) != str(author_id)] - return followers + return followers_cached author_follower_alias = aliased(AuthorFollower, name="af") followers_query = select(Author).join( From a9ab2e8bb29f6e00d1180ad993139788d23c8eea Mon Sep 17 00:00:00 2001 From: Untone Date: Mon, 27 May 2024 20:03:07 +0300 Subject: [PATCH 10/10] cached-empty-fix-4 --- resolvers/author.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/resolvers/author.py b/resolvers/author.py index f453a39e..abaa29f8 100644 --- a/resolvers/author.py +++ b/resolvers/author.py @@ -328,13 +328,15 @@ async def get_author_followers(_, _info, slug: str): author_id = author.id cached = await redis.execute("GET", f"author:{author_id}:followers") - if cached: - followers_cached = json.loads(cached) - if isinstance(followers_cached, list) and len(followers_cached) > 0: - logger.debug( - f"@{slug} got {len(followers_cached)} followers cached" - ) - return followers_cached + cached_author = await redis.execute("GET", f"author:{author_id}") + if cached and cached_author: + followers = json.loads(cached) + author_dict = json.loads(cached_author) + if isinstance(followers, list) and str(len(followers)) == str( + author_dict["stat"]["followers"] + ): + logger.debug(f"@{slug} got {len(followers)} followers cached") + return followers author_follower_alias = aliased(AuthorFollower, name="af") followers_query = select(Author).join(