From 16ee157438f0d1ec016bf0acfa8aaa29cf62c2b1 Mon Sep 17 00:00:00 2001 From: tonyrewin Date: Sat, 1 Oct 2022 13:19:13 +0300 Subject: [PATCH] work-load-fixes --- resolvers/zine.py | 60 ++++++++++++++++-------------------- services/zine/shoutauthor.py | 11 ++++--- services/zine/shoutscache.py | 10 ++++++ services/zine/shouttopic.py | 38 ----------------------- 4 files changed, 42 insertions(+), 77 deletions(-) delete mode 100644 services/zine/shouttopic.py diff --git a/resolvers/zine.py b/resolvers/zine.py index b77e2204..ccacb435 100644 --- a/resolvers/zine.py +++ b/resolvers/zine.py @@ -5,7 +5,7 @@ from auth.authenticate import login_required from base.orm import local_session from base.resolvers import mutation, query from orm.collection import ShoutCollection -from orm.shout import Shout, ShoutAuthor, ShoutTopic +from orm.shout import Shout, ShoutTopic from orm.topic import Topic from resolvers.community import community_follow, community_unfollow from resolvers.profile import author_follow, author_unfollow @@ -117,23 +117,34 @@ async def get_search_results(_, _info, searchtext, offset, limit): return shouts +@query.field("shoutsByAuthors") +async def shouts_by_authors(_, _info, slugs, offset, limit): + shouts = [] + for author in slugs: + shouts.extend(ShoutsCache.get_by_author(author)) + shouts_prepared = [] + for s in shouts: + if bool(s.publishedAt): + for a in s.authors: + a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) + shouts_prepared.append(s) + shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True) + return shouts_prepared[offset : offset + limit] + + @query.field("shoutsByTopics") async def shouts_by_topics(_, _info, slugs, offset, limit): - # TODO: use ShoutTopicsStorage - with local_session() as session: - shouts = ( - session.query(Shout) - .join(ShoutTopic) - .where(and_(ShoutTopic.topic.in_(slugs), Shout.publishedAt.is_not(None))) - .order_by(desc(Shout.publishedAt)) - .limit(limit) - .offset(offset) - ) - + shouts = [] + for topic in slugs: + shouts.extend(ShoutsCache.get_by_topic(topic)) + shouts_prepared = [] for s in shouts: - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - return shouts + if bool(s.publishedAt): + for a in s.authors: + a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) + shouts_prepared.append(s) + shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True) + return shouts_prepared[offset : offset + limit] @query.field("shoutsByCollection") @@ -153,25 +164,6 @@ async def shouts_by_collection(_, _info, collection, offset, limit): return shouts -@query.field("shoutsByAuthors") -async def shouts_by_authors(_, _info, slugs, offset, limit): - # TODO: use ShoutAuthorsStorage - with local_session() as session: - shouts = ( - session.query(Shout) - .join(ShoutAuthor) - .where(and_(ShoutAuthor.user.in_(slugs), Shout.publishedAt.is_not(None))) - .order_by(desc(Shout.publishedAt)) - .limit(limit) - .offset(offset) - ) - - for s in shouts: - for a in s.authors: - a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) - return shouts - - SINGLE_COMMUNITY = True diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py index f4c8728a..6906c9ef 100644 --- a/services/zine/shoutauthor.py +++ b/services/zine/shoutauthor.py @@ -1,22 +1,23 @@ import asyncio from base.orm import local_session -from orm.shout import ShoutAuthor +from orm.shout import ShoutAuthor, Shout class ShoutAuthorStorage: authors_by_shout = {} + shouts_by_author = {} lock = asyncio.Lock() period = 30 * 60 # sec @staticmethod async def load(session): self = ShoutAuthorStorage - sas = session.query(ShoutAuthor).all() + sas = session.query(ShoutAuthor).join(Shout).all() for sa in sas: self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) - print("[zine.authors] %d shouts preprocessed" % len(self.authors_by_shout)) + print("[zine.shouts] %d shouts indexed by authors" % len(self.authors_by_shout)) @staticmethod async def get_authors(shout): @@ -41,7 +42,7 @@ class ShoutAuthorStorage: with local_session() as session: async with self.lock: await self.load(session) - print("[zine.authors] state updated") + print("[zine.shouts] index by authors was updated") except Exception as err: - print("[zine.authors] errror: %s" % (err)) + print("[zine.shouts] error indexing by author: %s" % (err)) await asyncio.sleep(self.period) diff --git a/services/zine/shoutscache.py b/services/zine/shoutscache.py index 90789ed3..d72dd50c 100644 --- a/services/zine/shoutscache.py +++ b/services/zine/shoutscache.py @@ -244,6 +244,16 @@ class ShoutsCache: print("[zine.cache] indexed by %d authors " % len(shouts_by_author.keys())) ShoutsCache.by_author = shouts_by_author + @staticmethod + async def get_by_author(author): + async with ShoutsCache.lock: + return ShoutsCache.by_author.get(author, []) + + @staticmethod + async def get_by_topic(topic): + async with ShoutsCache.lock: + return ShoutsCache.by_topic.get(topic, []) + @staticmethod async def get_top_published_before(daysago, offset, limit): shouts_by_rating = [] diff --git a/services/zine/shouttopic.py b/services/zine/shouttopic.py deleted file mode 100644 index b98e4242..00000000 --- a/services/zine/shouttopic.py +++ /dev/null @@ -1,38 +0,0 @@ -import asyncio - -from base.orm import local_session -from orm.shout import ShoutTopic - - -class ShoutTopicStorage: - topics_by_shout = {} - lock = asyncio.Lock() - period = 30 * 60 # sec - - @staticmethod - async def load(session): - self = ShoutTopicStorage - sas = session.query(ShoutTopic).all() - for sa in sas: - self.topics_by_shout[sa.shout] = self.topics_by_shout.get(sa.shout, []) - self.topics_by_shout[sa.shout].append([sa.user, sa.caption]) - print("[zine.topics] %d shouts preprocessed" % len(self.topics_by_shout)) - - @staticmethod - async def get_topics(shout): - self = ShoutTopicStorage - async with self.lock: - return self.topics_by_shout.get(shout, []) - - @staticmethod - async def worker(): - self = ShoutTopicStorage - while True: - try: - with local_session() as session: - async with self.lock: - await self.load(session) - print("[zine.topics] state updated") - except Exception as err: - print("[zine.topics] errror: %s" % (err)) - await asyncio.sleep(self.period)