diff --git a/main.py b/main.py index 4550028b..3e1deaae 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,6 @@ from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat from services.stat.viewed import ViewedStorage from services.zine.gittask import GitTask -from services.zine.shoutauthor import ShoutAuthorStorage from settings import DEV_SERVER_STATUS_FILE_NAME import_module("resolvers") @@ -40,8 +39,6 @@ async def start_up(): print(views_stat_task) reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) print(reacted_storage_task) - shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) - print(shout_author_task) topic_stat_task = asyncio.create_task(TopicStat.worker()) print(topic_stat_task) git_task = asyncio.create_task(GitTask.git_task_worker()) diff --git a/resolvers/zine/load.py b/resolvers/zine/load.py index f77cb730..fe13d431 100644 --- a/resolvers/zine/load.py +++ b/resolvers/zine/load.py @@ -4,12 +4,26 @@ from sqlalchemy.orm import joinedload from sqlalchemy.sql.expression import desc, asc, select, case from base.orm import local_session from base.resolvers import query -from orm.shout import Shout +from orm import ViewedEntry +from orm.shout import Shout, ShoutAuthor from orm.reaction import Reaction, ReactionKind -from services.zine.shoutauthor import ShoutAuthorStorage from services.stat.reacted import ReactedStorage +def add_rating_column(q): + return q.join(Reaction).add_columns(sa.func.sum(case( + (Reaction.kind == ReactionKind.AGREE, 1), + (Reaction.kind == ReactionKind.DISAGREE, -1), + (Reaction.kind == ReactionKind.PROOF, 1), + (Reaction.kind == ReactionKind.DISPROOF, -1), + (Reaction.kind == ReactionKind.ACCEPT, 1), + (Reaction.kind == ReactionKind.REJECT, -1), + (Reaction.kind == ReactionKind.LIKE, 1), + (Reaction.kind == ReactionKind.DISLIKE, -1), + else_=0 + )).label('rating')) + + def apply_filters(q, filters, user=None): filters = {} if filters is None else filters if filters.get("reacted") and user: @@ -35,15 +49,21 @@ def apply_filters(q, filters, user=None): @query.field("loadShout") async def load_shout(_, info, slug): with local_session() as session: - shout = session.query(Shout).options( + q = select(Shout).options( # TODO add cation joinedload(Shout.authors), joinedload(Shout.topics), - ).filter( + ) + q = add_rating_column(q) + q = q.filter( Shout.slug == slug ).filter( Shout.deletedAt.is_(None) - ).one() + ).group_by(Shout.id) + + [shout, rating] = session.execute(q).unique().one() + + shout.stat = await ReactedStorage.get_shout_stat(shout.slug, rating) return shout @@ -84,22 +104,12 @@ async def load_shouts_by(_, info, options): ) user = info.context["request"].user q = apply_filters(q, options.get("filters"), user) - q = q.join(Reaction).add_columns(sa.func.sum(case( - (Reaction.kind == ReactionKind.AGREE, 1), - (Reaction.kind == ReactionKind.DISAGREE, -1), - (Reaction.kind == ReactionKind.PROOF, 1), - (Reaction.kind == ReactionKind.DISPROOF, -1), - (Reaction.kind == ReactionKind.ACCEPT, 1), - (Reaction.kind == ReactionKind.REJECT, -1), - (Reaction.kind == ReactionKind.LIKE, 1), - (Reaction.kind == ReactionKind.DISLIKE, -1), - else_=0 - )).label('rating')) + q = add_rating_column(q) o = options.get("order_by") if o: - q = q.add_columns(sa.func.count(Reaction.id).label(o)) if o == 'comments': + q = q.add_columns(sa.func.count(Reaction.id).label(o)) q = q.join(Reaction, Shout.slug == Reaction.shout) q = q.filter(Reaction.body.is_not(None)) elif o == 'reacted': @@ -108,12 +118,17 @@ async def load_shouts_by(_, info, options): ).add_columns( sa.func.max(Reaction.createdAt).label(o) ) + elif o == 'views': + q = q.join(ViewedEntry) + q = q.add_columns(sa.func.sum(ViewedEntry.amount).label(o)) order_by = o else: order_by = Shout.createdAt order_by_desc = True if options.get('order_by_desc') is None else options.get('order_by_desc') + with_author_captions = False if options.get('with_author_captions') is None else options.get('with_author_captions') + query_order_by = desc(order_by) if order_by_desc else asc(order_by) offset = options.get("offset", 0) limit = options.get("limit", 10) @@ -124,9 +139,25 @@ async def load_shouts_by(_, info, options): for shout in shouts: shout.stat = await ReactedStorage.get_shout_stat(shout.slug, shout.rating) - del shout.rating - for author in shout.authors: - author.caption = await ShoutAuthorStorage.get_author_caption(shout.slug, author.slug) + + author_captions = {} + + if with_author_captions: + author_captions_result = session.query(ShoutAuthor).where( + ShoutAuthor.shout.in_(map(lambda s: s.slug, shouts))).all() + + for author_captions_result_item in author_captions_result: + if author_captions.get(author_captions_result_item.shout) is None: + author_captions[author_captions_result_item.shout] = {} + + author_captions[ + author_captions_result_item.shout + ][ + author_captions_result_item.user + ] = author_captions_result_item.caption + + for author in shout.authors: + author.caption = author_captions[shout.slug][author.slug] return shouts diff --git a/resolvers/zine/profile.py b/resolvers/zine/profile.py index d2db5ccb..de4041c1 100644 --- a/resolvers/zine/profile.py +++ b/resolvers/zine/profile.py @@ -7,11 +7,11 @@ from auth.authenticate import login_required from base.orm import local_session from base.resolvers import mutation, query from orm.reaction import Reaction +from orm.shout import ShoutAuthor from orm.topic import Topic, TopicFollower from orm.user import AuthorFollower, Role, User, UserRating, UserRole from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat -from services.zine.shoutauthor import ShoutAuthor # from .community import followed_communities from resolvers.inbox.unread import get_total_unread_counter diff --git a/schema.graphql b/schema.graphql index adce2273..e1d30873 100644 --- a/schema.graphql +++ b/schema.graphql @@ -241,6 +241,7 @@ input LoadShoutsFilters { input LoadShoutsOptions { filters: LoadShoutsFilters + with_author_captions: Boolean limit: Int! offset: Int order_by: String diff --git a/services/stat/reacted.py b/services/stat/reacted.py index ee3612fd..ef5a2953 100644 --- a/services/stat/reacted.py +++ b/services/stat/reacted.py @@ -1,4 +1,5 @@ import asyncio +import time from base.orm import local_session from orm.reaction import ReactionKind, Reaction from services.zine.topics import TopicStorage @@ -175,6 +176,7 @@ class ReactedStorage: @staticmethod async def recount_changed(session): + start = time.time() self = ReactedStorage async with self.lock: sss = list(self.modified_shouts) @@ -191,6 +193,9 @@ class ReactedStorage: print("[stat.reacted] %d replies" % len(self.reacted["reactions"])) self.modified_shouts = set([]) + end = time.time() + print("[stat.reacted] recount_changed took %fs " % (end - start)) + @staticmethod async def worker(): while True: diff --git a/services/stat/topicstat.py b/services/stat/topicstat.py index 034967f5..c95d0850 100644 --- a/services/stat/topicstat.py +++ b/services/stat/topicstat.py @@ -1,9 +1,9 @@ import asyncio - +import time from base.orm import local_session -from orm.shout import Shout, ShoutTopic +from orm.shout import Shout, ShoutTopic, ShoutAuthor from orm.topic import TopicFollower -from services.zine.shoutauthor import ShoutAuthorStorage +from sqlalchemy.sql.expression import select class TopicStat: @@ -17,22 +17,24 @@ class TopicStat: @staticmethod async def load_stat(session): + start = time.time() self = TopicStat - shout_topics = session.query(ShoutTopic).all() + shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() + all_shout_authors = session.query(ShoutAuthor).all() print("[stat.topics] %d links for shouts" % len(shout_topics)) - for shout_topic in shout_topics: + for [shout_topic, shout] in shout_topics: tpc = shout_topic.topic # shouts by topics - shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first() + # shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first() self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict()) self.shouts_by_topic[tpc][shout.slug] = shout # authors by topics - authors = await ShoutAuthorStorage.get_authors(shout.slug) + shout_authors = filter(lambda asa: asa.shout == shout.slug, all_shout_authors) + self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict()) - for a in authors: - [aslug, acaption] = a - self.authors_by_topic[tpc][aslug] = acaption + for sa in shout_authors: + self.authors_by_topic[tpc][sa.shout] = sa.caption self.followers_by_topic = {} followings = session.query(TopicFollower).all() @@ -43,6 +45,9 @@ class TopicStat: self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict()) self.followers_by_topic[topic][userslug] = userslug + end = time.time() + print("[stat.topics] load_stat took %fs " % (end - start)) + @staticmethod async def get_shouts(topic): self = TopicStat @@ -52,6 +57,7 @@ class TopicStat: @staticmethod async def worker(): self = TopicStat + first_run = True while True: try: with local_session() as session: @@ -59,4 +65,9 @@ class TopicStat: await self.load_stat(session) except Exception as err: raise Exception(err) + if first_run: + # sleep for period + 1 min after first run + # to distribute load on server by workers with the same period + await asyncio.sleep(60) + first_run = False await asyncio.sleep(self.period) diff --git a/services/stat/viewed.py b/services/stat/viewed.py index ece3e8c1..8a49c707 100644 --- a/services/stat/viewed.py +++ b/services/stat/viewed.py @@ -1,4 +1,5 @@ import asyncio +import time from datetime import timedelta, timezone, datetime from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport @@ -9,7 +10,6 @@ from orm.viewed import ViewedEntry from ssl import create_default_context from os import environ, path - load_facts = gql(""" query getDomains { domains { @@ -82,8 +82,9 @@ class ViewedStorage: self.disabled = True @staticmethod - async def update_pages(session): + async def update_pages(): """ query all the pages from ackee sorted by views count """ + start = time.time() self = ViewedStorage async with self.lock: try: @@ -104,6 +105,9 @@ class ViewedStorage: except Exception as e: raise e + end = time.time() + print("[stat.viewed] update_pages took %fs " % (end - start)) + @staticmethod async def get_facts(): self = ViewedStorage @@ -176,9 +180,8 @@ class ViewedStorage: async with self.lock: while True: try: - with local_session() as session: - await self.update_pages(session) - failed = 0 + await self.update_pages() + failed = 0 except Exception: failed += 1 print("[stat.viewed] update failed #%d, wait 10 seconds" % failed) diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py deleted file mode 100644 index ca224959..00000000 --- a/services/zine/shoutauthor.py +++ /dev/null @@ -1,47 +0,0 @@ -import asyncio - -from base.orm import local_session -from orm.shout import ShoutAuthor, Shout - - -class ShoutAuthorStorage: - authors_by_shout = {} - lock = asyncio.Lock() - period = 30 * 60 # sec - - @staticmethod - async def load(session): - self = ShoutAuthorStorage - sas = session.query(ShoutAuthor).join(Shout).all() - for sa in sas: - self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) - self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) - print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout)) - - @staticmethod - async def get_authors(shout): - self = ShoutAuthorStorage - async with self.lock: - return self.authors_by_shout.get(shout, []) - - @staticmethod - async def get_author_caption(shout, author): - self = ShoutAuthorStorage - async with self.lock: - for a in self.authors_by_shout.get(shout, []): - if author in a: - return a[1] - return {"error": "author caption not found"} - - @staticmethod - async def worker(): - self = ShoutAuthorStorage - while True: - try: - with local_session() as session: - async with self.lock: - await self.load(session) - print("[zine.authors] index by authors was updated") - except Exception as err: - print("[zine.authors] error indexing by author: %s" % (err)) - await asyncio.sleep(self.period)