diff --git a/services/stat/reacted.py b/services/stat/reacted.py index 3ae5684e..049da7ef 100644 --- a/services/stat/reacted.py +++ b/services/stat/reacted.py @@ -163,32 +163,29 @@ class ReactedStorage: @staticmethod async def recount_changed(session): - start = time.time() self = ReactedStorage - async with self.lock: - sss = list(self.modified_shouts) - c = 0 - for slug in sss: - siblings = session.query(Reaction).where(Reaction.shout == slug).all() - c += len(siblings) - await self.recount(siblings) + sss = list(self.modified_shouts) + c = 0 + for slug in sss: + siblings = session.query(Reaction).where(Reaction.shout == slug).all() + c += len(siblings) + await self.recount(siblings) - print("[stat.reacted] %d reactions recounted" % c) - print("[stat.reacted] %d shouts modified" % len(self.modified_shouts)) - print("[stat.reacted] %d topics" % len(self.reacted["topics"].values())) - print("[stat.reacted] %d authors" % len(self.reacted["authors"].values())) - print("[stat.reacted] %d replies" % len(self.reacted["reactions"])) - self.modified_shouts = set([]) - - end = time.time() - print("[stat.reacted] recount_changed took %fs " % (end - start)) + print("[stat.reacted] %d reactions recounted" % c) + print("[stat.reacted] %d shouts modified" % len(self.modified_shouts)) + print("[stat.reacted] %d topics" % len(self.reacted["topics"].values())) + print("[stat.reacted] %d authors" % len(self.reacted["authors"].values())) + print("[stat.reacted] %d replies" % len(self.reacted["reactions"])) + self.modified_shouts = set([]) @staticmethod async def worker(): while True: try: with local_session() as session: + ts = time.time() await ReactedStorage.recount_changed(session) + print("[stat.reacted] recount_changed took %fs " % (time.time() - ts)) except Exception as err: print("[stat.reacted] recount error %s" % (err)) await asyncio.sleep(ReactedStorage.period) diff --git a/services/stat/topicstat.py b/services/stat/topicstat.py index 3a5689e6..09ff7257 100644 --- a/services/stat/topicstat.py +++ b/services/stat/topicstat.py @@ -17,7 +17,6 @@ class TopicStat: @staticmethod async def load_stat(session): - start = time.time() self = TopicStat shout_topics = session.query(ShoutTopic, Shout).join(Shout).all() print("[stat.topics] %d links for shouts" % len(shout_topics)) @@ -43,9 +42,6 @@ class TopicStat: self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict()) self.followers_by_topic[topic][userslug] = userslug - end = time.time() - print("[stat.topics] load_stat took %fs " % (end - start)) - @staticmethod async def get_shouts(topic): self = TopicStat @@ -59,8 +55,10 @@ class TopicStat: while True: try: with local_session() as session: + ts = time.time() async with self.lock: await self.load_stat(session) + print("[stat.topicstat] load_stat took %fs " % (time.time() - ts)) except Exception as err: raise Exception(err) if first_run: diff --git a/services/stat/viewed.py b/services/stat/viewed.py index 8a49c707..e8e24340 100644 --- a/services/stat/viewed.py +++ b/services/stat/viewed.py @@ -86,24 +86,23 @@ class ViewedStorage: """ query all the pages from ackee sorted by views count """ start = time.time() self = ViewedStorage - async with self.lock: + try: + self.pages = await self.client.execute_async(load_pages) + self.pages = self.pages["domains"][0]["statistics"]["pages"] + print("[stat.viewed] ackee pages updated") + shouts = {} try: - self.pages = await self.client.execute_async(load_pages) - self.pages = self.pages["domains"][0]["statistics"]["pages"] - print("[stat.viewed] ackee pages updated") - shouts = {} - try: - for page in self.pages: - p = page["value"].split("?")[0] - slug = p.split('discours.io/')[-1] - shouts[slug] = page["count"] - for slug, v in shouts: - await ViewedStorage.increment(slug, v) - except Exception: - pass - print("[stat.viewed] %d pages collected " % len(shouts.keys())) - except Exception as e: - raise e + for page in self.pages: + p = page["value"].split("?")[0] + slug = p.split('discours.io/')[-1] + shouts[slug] = page["count"] + for slug, v in shouts: + await ViewedStorage.increment(slug, v) + except Exception: + pass + print("[stat.viewed] %d pages collected " % len(shouts.keys())) + except Exception as e: + raise e end = time.time() print("[stat.viewed] update_pages took %fs " % (end - start)) @@ -180,8 +179,10 @@ class ViewedStorage: async with self.lock: while True: try: + ts = time.time() await self.update_pages() failed = 0 + print("[stat.viewed] update_pages took %fs " % (time.time() - ts)) except Exception: failed += 1 print("[stat.viewed] update failed #%d, wait 10 seconds" % failed) diff --git a/services/zine/shoutauthor.py b/services/zine/shoutauthor.py index 477ff384..25505bc5 100644 --- a/services/zine/shoutauthor.py +++ b/services/zine/shoutauthor.py @@ -1,46 +1,49 @@ import asyncio +import time from base.orm import local_session -from orm.shout import ShoutAuthor, Shout +from orm.shout import ShoutAuthor class ShoutAuthorStorage: authors_by_shout = {} lock = asyncio.Lock() - period = 30 * 60 # sec + # period = 30 * 60 # sec @staticmethod async def load_captions(session): self = ShoutAuthorStorage - sas = session.query(ShoutAuthor).join(Shout).all() + sas = session.query(ShoutAuthor).all() for sa in sas: - self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) - self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) + self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, {}) + self.authors_by_shout[sa.shout][sa.user] = sa.caption print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout)) - @staticmethod - async def get_authors(shout): - self = ShoutAuthorStorage - async with self.lock: - return self.authors_by_shout.get(shout, []) - @staticmethod async def get_author_caption(shout, author): self = ShoutAuthorStorage async with self.lock: - for a in self.authors_by_shout.get(shout, []): - if author in a: - return a[1] - return {"error": "author caption not found"} + return self.authors_by_shout.get(shout, {}).get(author) + + @staticmethod + async def set_author_caption(shout, author, caption): + self = ShoutAuthorStorage + async with self.lock: + self.authors_by_shout[shout] = self.authors_by_shout.get(shout, {}) + self.authors_by_shout[shout][author] = caption + return { + "error": None, + } @staticmethod async def worker(): self = ShoutAuthorStorage - while True: + async with self.lock: + # while True: try: with local_session() as session: - async with self.lock: - await self.load_captions(session) - print("[zine.authors] index by authors was updated") + ts = time.time() + await self.load_captions(session) + print("[zine.authors] load_captions took %fs " % (time.time() - ts)) except Exception as err: print("[zine.authors] error indexing by author: %s" % (err)) - await asyncio.sleep(self.period) + # await asyncio.sleep(self.period)