diff --git a/main.py b/main.py index b2e1dd0b..c354efea 100644 --- a/main.py +++ b/main.py @@ -16,7 +16,7 @@ from base.redis import redis from base.resolvers import resolvers from resolvers.auth import confirm_email_handler from services.main import storages_init -from services.stat.reacted import ReactedStorage +# from services.stat.reacted import ReactedStorage from services.stat.topicstat import TopicStat from services.stat.viewed import ViewedStorage from services.zine.topics import TopicStorage @@ -41,8 +41,8 @@ async def start_up(): print(topics_random_work) views_stat_task = asyncio.create_task(ViewedStorage().worker()) print(views_stat_task) - reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) - print(reacted_storage_task) + # reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) + # print(reacted_storage_task) shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker()) print(shout_author_task) topic_stat_task = asyncio.create_task(TopicStat.worker()) diff --git a/migration/tables/comments.py b/migration/tables/comments.py index c93e3d63..0ca72915 100644 --- a/migration/tables/comments.py +++ b/migration/tables/comments.py @@ -8,7 +8,7 @@ from orm.reaction import Reaction, ReactionKind from orm.shout import ShoutReactionsFollower from orm.topic import TopicFollower from orm.user import User -from services.stat.reacted import ReactedStorage +# from services.stat.reacted import ReactedStorage ts = datetime.now(tz=timezone.utc) @@ -77,7 +77,7 @@ async def migrate(entry, storage): # creating reaction from old comment reaction = Reaction.create(**reaction_dict) session.add(reaction) - await ReactedStorage.react(reaction) + # await ReactedStorage.react(reaction) # creating shout's reactions following for reaction author following1 = session.query( @@ -148,7 +148,7 @@ async def migrate(entry, storage): ) session.add(following2) session.add(rr) - await ReactedStorage.react(rr) + # await ReactedStorage.react(rr) except Exception as e: print("[migration] comment rating error: %r" % re_reaction_dict) diff --git a/migration/tables/content_items.py b/migration/tables/content_items.py index 354f430c..8680d2b8 100644 --- a/migration/tables/content_items.py +++ b/migration/tables/content_items.py @@ -9,7 +9,7 @@ from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutTopic, ShoutReactionsFollower from orm.user import User from orm.topic import TopicFollower -from services.stat.reacted import ReactedStorage +# from services.stat.reacted import ReactedStorage from services.stat.viewed import ViewedStorage OLD_DATE = "2016-03-05 22:22:00.350000" @@ -373,7 +373,7 @@ async def content_ratings_to_reactions(entry, slug): else: rea = Reaction.create(**reaction_dict) session.add(rea) - await ReactedStorage.react(rea) + # await ReactedStorage.react(rea) # shout_dict['ratings'].append(reaction_dict) session.commit() diff --git a/resolvers/zine/load.py b/resolvers/zine/load.py index ad8f8247..b10e90a1 100644 --- a/resolvers/zine/load.py +++ b/resolvers/zine/load.py @@ -29,7 +29,10 @@ def calc_reactions(q): (Reaction.body.is_not(None), 1), else_=0 ) - ).label('commented') + ).label('commented'), + sa.func.sum( + Reaction.id + ).label('reacted') ) @@ -72,7 +75,7 @@ async def load_shout(_, info, slug): Shout.deletedAt.is_(None) ).group_by(Shout.id) - [shout, rating, commented] = session.execute(q).unique().one() + [shout, rating, commented, reacted] = session.execute(q).unique().one() for a in shout.authors: a.caption = await ShoutAuthorStorage.get_author_caption(shout.slug, a.slug) viewed = await ViewedStorage.get_shout(shout.slug) @@ -80,7 +83,7 @@ async def load_shout(_, info, slug): "rating": rating, "viewed": viewed, "commented": commented, - # "reacted": reacted + "reacted": reacted } return shout @@ -146,12 +149,12 @@ async def load_shouts_by(_, info, options): shouts = [] with local_session() as session: - for [shout, rating, commented] in session.execute(q).unique(): + for [shout, rating, commented, reacted] in session.execute(q).unique(): shout.stat = { "rating": rating, "viewed": await ViewedStorage.get_shout(shout.slug), "commented": commented, - # "reacted": reacted + "reacted": reacted } # NOTE: no need authors captions in arrays # for author in shout.authors: diff --git a/resolvers/zine/reactions.py b/resolvers/zine/reactions.py index 6b99bc43..98faac31 100644 --- a/resolvers/zine/reactions.py +++ b/resolvers/zine/reactions.py @@ -8,16 +8,8 @@ from base.resolvers import mutation, query from orm.reaction import Reaction, ReactionKind from orm.shout import Shout, ShoutReactionsFollower from orm.user import User -from services.stat.reacted import ReactedStorage - - -async def get_reaction_stat(reaction_id): - return { - # "viewed": await ViewedStorage.get_reaction(reaction_id), - "reacted": len(await ReactedStorage.get_reaction(reaction_id)), - "rating": await ReactedStorage.get_reaction_rating(reaction_id), - "commented": len(await ReactedStorage.get_reaction_comments(reaction_id)), - } +# from services.stat.reacted import ReactedStorage +from resolvers.zine.load import calc_reactions def reactions_follow(user: User, slug: str, auto=False): @@ -142,13 +134,17 @@ async def create_reaction(_, info, inp): elif check_to_publish(session, user, reaction): set_published(session, reaction.shout, reaction.createdBy) - ReactedStorage.react(reaction) + # ReactedStorage.react(reaction) try: reactions_follow(user, inp["shout"], True) except Exception as e: print(f"[resolvers.reactions] error on reactions autofollowing: {e}") - reaction.stat = await get_reaction_stat(reaction.id) + reaction.stat = { + "commented": 0, + "reacted": 0, + "rating": 0 + } return {"reaction": reaction} @@ -160,11 +156,16 @@ async def update_reaction(_, info, inp): with local_session() as session: user = session.query(User).where(User.id == user_id).first() - reaction = session.query(Reaction).filter(Reaction.id == inp.id).first() + q = select(Reaction).filter(Reaction.id == inp.id) + q = calc_reactions(q) + + [reaction, rating, commented, reacted] = session.execute(q).unique().one() + if not reaction: return {"error": "invalid reaction id"} if reaction.createdBy != user.slug: return {"error": "access denied"} + reaction.body = inp["body"] reaction.updatedAt = datetime.now(tz=timezone.utc) if reaction.kind != inp["kind"]: @@ -173,8 +174,11 @@ async def update_reaction(_, info, inp): if inp.get("range"): reaction.range = inp.get("range") session.commit() - - reaction.stat = await get_reaction_stat(reaction.id) + reaction.stat = { + "commented": commented, + "reacted": reacted, + "rating": rating + } return {"reaction": reaction} @@ -198,6 +202,7 @@ async def delete_reaction(_, info, rid): def map_result_item(result_item): [user, shout, reaction] = result_item + print(reaction) reaction.createdBy = user reaction.shout = shout reaction.replyTo = reaction @@ -254,16 +259,24 @@ async def load_reactions_by(_, _info, by, limit=50, offset=0): ).order_by( order_way(order_field) ) - + q = calc_reactions(q) q = q.where(Reaction.deletedAt.is_(None)) q = q.limit(limit).offset(offset) - + reactions = [] with local_session() as session: - reactions = list(map(map_result_item, session.execute(q))) - for reaction in reactions: - reaction.stat = await get_reaction_stat(reaction.id) + for [ + [reaction, rating, commented, reacted], shout, reply + ] in list(map(map_result_item, session.execute(q))): + reaction.shout = shout + reaction.replyTo = reply + reaction.stat = { + "rating": rating, + "commented": commented, + "reacted": reacted + } + reactions.append(reaction) - if by.get("stat"): - reactions.sort(lambda r: r.stat.get(by["stat"]) or r.createdAt) + if by.get("stat"): + reactions.sort(lambda r: r.stat.get(by["stat"]) or r.createdAt) return reactions diff --git a/services/main.py b/services/main.py index 636f01db..1099e90e 100644 --- a/services/main.py +++ b/services/main.py @@ -1,4 +1,4 @@ -from services.stat.reacted import ReactedStorage +# from services.stat.reacted import ReactedStorage from services.auth.roles import RoleStorage from services.auth.users import UserStorage from services.zine.topics import TopicStorage @@ -10,7 +10,7 @@ from base.orm import local_session async def storages_init(): with local_session() as session: print('[main] initialize storages') - ReactedStorage.init(session) + # ReactedStorage.init(session) RoleStorage.init(session) UserStorage.init(session) TopicStorage.init(session) diff --git a/services/stat/reacted.py b/services/stat/reacted.py deleted file mode 100644 index 049da7ef..00000000 --- a/services/stat/reacted.py +++ /dev/null @@ -1,191 +0,0 @@ -import asyncio -import time -from base.orm import local_session -from orm.reaction import ReactionKind, Reaction -from services.zine.topics import TopicStorage - - -def kind_to_rate(kind) -> int: - if kind in [ - ReactionKind.AGREE, - ReactionKind.LIKE, - ReactionKind.PROOF, - ReactionKind.ACCEPT, - ]: - return 1 - elif kind in [ - ReactionKind.DISAGREE, - ReactionKind.DISLIKE, - ReactionKind.DISPROOF, - ReactionKind.REJECT, - ]: - return -1 - else: - return 0 - - -class ReactedStorage: - reacted = {"shouts": {}, "topics": {}, "reactions": {}, "authors": {}} - rating = {"shouts": {}, "topics": {}, "reactions": {}} - reactions = [] - to_flush = [] - period = 30 * 60 # sec - lock = asyncio.Lock() - modified_shouts = set([]) - - @staticmethod - async def get_shout(shout_slug): - self = ReactedStorage - async with self.lock: - return self.reacted["shouts"].get(shout_slug, []) - - @staticmethod - async def get_author(user_slug): - self = ReactedStorage - async with self.lock: - return self.reacted["authors"].get(user_slug, []) - - @staticmethod - async def get_followed_reactions(user_slug): - self = ReactedStorage - async with self.lock: - author_reactions = self.reacted["authors"].get(user_slug, []) - shouts = [] - for r in author_reactions: - if r.shout not in shouts: - shouts.append(r.shout) - return shouts - - @staticmethod - async def get_topic(topic_slug): - self = ReactedStorage - async with self.lock: - return self.reacted["topics"].get(topic_slug, []) - - @staticmethod - async def get_comments(shout_slug): - self = ReactedStorage - async with self.lock: - return list( - filter(lambda r: bool(r.body), self.reacted["shouts"].get(shout_slug, {})) - ) - - @staticmethod - async def get_topic_comments(topic_slug): - self = ReactedStorage - async with self.lock: - return list( - filter(lambda r: bool(r.body), self.reacted["topics"].get(topic_slug, [])) - ) - - @staticmethod - async def get_reaction_comments(reaction_id): - self = ReactedStorage - async with self.lock: - return list( - filter( - lambda r: bool(r.body), self.reacted["reactions"].get(reaction_id, {}) - ) - ) - - @staticmethod - async def get_reaction(reaction_id): - self = ReactedStorage - async with self.lock: - return self.reacted["reactions"].get(reaction_id, []) - - @staticmethod - async def get_rating(shout_slug): - self = ReactedStorage - rating = 0 - async with self.lock: - for r in self.reacted["shouts"].get(shout_slug, []): - rating = rating + kind_to_rate(r.kind) - return rating - - @staticmethod - async def get_topic_rating(topic_slug): - self = ReactedStorage - rating = 0 - async with self.lock: - for r in self.reacted["topics"].get(topic_slug, []): - rating = rating + kind_to_rate(r.kind) - return rating - - @staticmethod - async def get_reaction_rating(reaction_id): - self = ReactedStorage - rating = 0 - async with self.lock: - for r in self.reacted["reactions"].get(reaction_id, []): - rating = rating + kind_to_rate(r.kind) - return rating - - @staticmethod - async def react(reaction): - ReactedStorage.modified_shouts.add(reaction.shout) - - @staticmethod - async def recount(reactions): - self = ReactedStorage - for r in reactions: - # renew reactions by shout - self.reacted["shouts"][r.shout] = self.reacted["shouts"].get(r.shout, []) - self.reacted["shouts"][r.shout].append(r) - # renew reactions by author - self.reacted["authors"][r.createdBy] = self.reacted["authors"].get(r.createdBy, []) - self.reacted["authors"][r.createdBy].append(r) - # renew reactions by topic - shout_topics = await TopicStorage.get_topics_by_slugs([r.shout, ]) - for t in shout_topics: - self.reacted["topics"][t] = self.reacted["topics"].get(t, []) - self.reacted["topics"][t].append(r) - self.rating["topics"][t] = \ - self.rating["topics"].get(t, 0) + kind_to_rate(r.kind) - if r.replyTo: - # renew reactions replies - self.reacted["reactions"][r.replyTo] = \ - self.reacted["reactions"].get(r.replyTo, []) - self.reacted["reactions"][r.replyTo].append(r) - self.rating["reactions"][r.replyTo] = \ - self.rating["reactions"].get(r.replyTo, 0) + kind_to_rate(r.kind) - else: - # renew shout rating - self.rating["shouts"][r.shout] = \ - self.rating["shouts"].get(r.shout, 0) + kind_to_rate(r.kind) - - @staticmethod - def init(session): - self = ReactedStorage - all_reactions = session.query(Reaction).all() - self.modified_shouts = list(set([r.shout for r in all_reactions])) - print("[stat.reacted] %d shouts with reactions" % len(self.modified_shouts)) - - @staticmethod - async def recount_changed(session): - self = ReactedStorage - sss = list(self.modified_shouts) - c = 0 - for slug in sss: - siblings = session.query(Reaction).where(Reaction.shout == slug).all() - c += len(siblings) - await self.recount(siblings) - - print("[stat.reacted] %d reactions recounted" % c) - print("[stat.reacted] %d shouts modified" % len(self.modified_shouts)) - print("[stat.reacted] %d topics" % len(self.reacted["topics"].values())) - print("[stat.reacted] %d authors" % len(self.reacted["authors"].values())) - print("[stat.reacted] %d replies" % len(self.reacted["reactions"])) - self.modified_shouts = set([]) - - @staticmethod - async def worker(): - while True: - try: - with local_session() as session: - ts = time.time() - await ReactedStorage.recount_changed(session) - print("[stat.reacted] recount_changed took %fs " % (time.time() - ts)) - except Exception as err: - print("[stat.reacted] recount error %s" % (err)) - await asyncio.sleep(ReactedStorage.period) diff --git a/services/stat/topicstat.py b/services/stat/topicstat.py index 09ff7257..05a61388 100644 --- a/services/stat/topicstat.py +++ b/services/stat/topicstat.py @@ -17,9 +17,12 @@ class TopicStat: @staticmethod async def load_stat(session): + print("[stat.topics] ⎧ loading stat -------") + ts = time.time() self = TopicStat - shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() - print("[stat.topics] %d links for shouts" % len(shout_topics)) + shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() # ~ 10 secs + print("[stat.topics] ⎪ shout topics joined query took %fs " % (time.time() - ts)) + print("[stat.topics] ⎪ indexing %d links..." % len(shout_topics)) for [shout_topic, shout] in shout_topics: tpc = shout_topic.topic self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict()) @@ -35,7 +38,7 @@ class TopicStat: self.followers_by_topic = {} followings = session.query(TopicFollower).all() - print("[stat.topics] %d followings by users" % len(followings)) + print("[stat.topics] ⎪ indexing %d followings..." % len(followings)) for flw in followings: topic = flw.topic userslug = flw.follower @@ -58,7 +61,7 @@ class TopicStat: ts = time.time() async with self.lock: await self.load_stat(session) - print("[stat.topicstat] load_stat took %fs " % (time.time() - ts)) + print("[stat.topics] ⎩ load_stat took %fs " % (time.time() - ts)) except Exception as err: raise Exception(err) if first_run: diff --git a/services/stat/viewed.py b/services/stat/viewed.py index 85894ad2..a00dc0ff 100644 --- a/services/stat/viewed.py +++ b/services/stat/viewed.py @@ -76,9 +76,9 @@ class ViewedStorage: self.client = create_client({ "Authorization": "Bearer %s" % str(token) }, schema=schema_str) - print("[stat.viewed] authorized permanentely by ackee.discours.io: %s" % token) + print("[stat.viewed] * authorized permanentely by ackee.discours.io: %s" % token) else: - print("[stat.viewed] please set ACKEE_TOKEN") + print("[stat.viewed] * please set ACKEE_TOKEN") self.disabled = True @staticmethod @@ -89,7 +89,7 @@ class ViewedStorage: try: self.pages = await self.client.execute_async(load_pages) self.pages = self.pages["domains"][0]["statistics"]["pages"] - print("[stat.viewed] ackee pages updated") + print("[stat.viewed] ⎪ ackee pages updated") shouts = {} try: for page in self.pages: @@ -100,12 +100,12 @@ class ViewedStorage: await ViewedStorage.increment(slug, v) except Exception: pass - print("[stat.viewed] %d pages collected " % len(shouts.keys())) + print("[stat.viewed] ⎪ %d pages collected " % len(shouts.keys())) except Exception as e: raise e end = time.time() - print("[stat.viewed] update_pages took %fs " % (end - start)) + print("[stat.viewed] ⎪ update_pages took %fs " % (end - start)) @staticmethod async def get_facts(): @@ -179,21 +179,22 @@ class ViewedStorage: async with self.lock: while True: try: + print("[stat.viewed] ⎧ updating views...") await self.update_pages() failed = 0 except Exception: failed += 1 - print("[stat.viewed] update failed #%d, wait 10 seconds" % failed) + print("[stat.viewed] ⎪ update failed #%d, wait 10 seconds" % failed) if failed > 3: - print("[stat.viewed] not trying to update anymore") + print("[stat.viewed] ⎩ not trying to update anymore") break if failed == 0: when = datetime.now(timezone.utc) + timedelta(seconds=self.period) t = format(when.astimezone().isoformat()) - print("[stat.viewed] next update: %s" % ( + print("[stat.viewed] ⎩ next update: %s" % ( t.split("T")[0] + " " + t.split("T")[1].split(".")[0] )) await asyncio.sleep(self.period) else: await asyncio.sleep(10) - print("[stat.viewed] trying to update data again...") + print("[stat.viewed] ⎪ trying to update data again...") diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py index 25505bc5..7d2a3d1d 100644 --- a/services/zine/shoutauthor.py +++ b/services/zine/shoutauthor.py @@ -16,7 +16,7 @@ class ShoutAuthorStorage: for sa in sas: self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, {}) self.authors_by_shout[sa.shout][sa.user] = sa.caption - print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout)) + print("[zine.authors] ⎧ %d shouts indexed by authors" % len(self.authors_by_shout)) @staticmethod async def get_author_caption(shout, author): @@ -43,7 +43,7 @@ class ShoutAuthorStorage: with local_session() as session: ts = time.time() await self.load_captions(session) - print("[zine.authors] load_captions took %fs " % (time.time() - ts)) + print("[zine.authors] ⎩ load_captions took %fs " % (time.time() - ts)) except Exception as err: - print("[zine.authors] error indexing by author: %s" % (err)) + print("[zine.authors] ⎩ error indexing by author: %s" % (err)) # await asyncio.sleep(self.period)