fixed-storages
This commit is contained in:
parent
d898466ccd
commit
bc09f414e0
|
@ -163,32 +163,29 @@ class ReactedStorage:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def recount_changed(session):
|
async def recount_changed(session):
|
||||||
start = time.time()
|
|
||||||
self = ReactedStorage
|
self = ReactedStorage
|
||||||
async with self.lock:
|
sss = list(self.modified_shouts)
|
||||||
sss = list(self.modified_shouts)
|
c = 0
|
||||||
c = 0
|
for slug in sss:
|
||||||
for slug in sss:
|
siblings = session.query(Reaction).where(Reaction.shout == slug).all()
|
||||||
siblings = session.query(Reaction).where(Reaction.shout == slug).all()
|
c += len(siblings)
|
||||||
c += len(siblings)
|
await self.recount(siblings)
|
||||||
await self.recount(siblings)
|
|
||||||
|
|
||||||
print("[stat.reacted] %d reactions recounted" % c)
|
print("[stat.reacted] %d reactions recounted" % c)
|
||||||
print("[stat.reacted] %d shouts modified" % len(self.modified_shouts))
|
print("[stat.reacted] %d shouts modified" % len(self.modified_shouts))
|
||||||
print("[stat.reacted] %d topics" % len(self.reacted["topics"].values()))
|
print("[stat.reacted] %d topics" % len(self.reacted["topics"].values()))
|
||||||
print("[stat.reacted] %d authors" % len(self.reacted["authors"].values()))
|
print("[stat.reacted] %d authors" % len(self.reacted["authors"].values()))
|
||||||
print("[stat.reacted] %d replies" % len(self.reacted["reactions"]))
|
print("[stat.reacted] %d replies" % len(self.reacted["reactions"]))
|
||||||
self.modified_shouts = set([])
|
self.modified_shouts = set([])
|
||||||
|
|
||||||
end = time.time()
|
|
||||||
print("[stat.reacted] recount_changed took %fs " % (end - start))
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def worker():
|
async def worker():
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with local_session() as session:
|
with local_session() as session:
|
||||||
|
ts = time.time()
|
||||||
await ReactedStorage.recount_changed(session)
|
await ReactedStorage.recount_changed(session)
|
||||||
|
print("[stat.reacted] recount_changed took %fs " % (time.time() - ts))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print("[stat.reacted] recount error %s" % (err))
|
print("[stat.reacted] recount error %s" % (err))
|
||||||
await asyncio.sleep(ReactedStorage.period)
|
await asyncio.sleep(ReactedStorage.period)
|
||||||
|
|
|
@ -17,7 +17,6 @@ class TopicStat:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def load_stat(session):
|
async def load_stat(session):
|
||||||
start = time.time()
|
|
||||||
self = TopicStat
|
self = TopicStat
|
||||||
shout_topics = session.query(ShoutTopic, Shout).join(Shout).all()
|
shout_topics = session.query(ShoutTopic, Shout).join(Shout).all()
|
||||||
print("[stat.topics] %d links for shouts" % len(shout_topics))
|
print("[stat.topics] %d links for shouts" % len(shout_topics))
|
||||||
|
@ -43,9 +42,6 @@ class TopicStat:
|
||||||
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict())
|
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict())
|
||||||
self.followers_by_topic[topic][userslug] = userslug
|
self.followers_by_topic[topic][userslug] = userslug
|
||||||
|
|
||||||
end = time.time()
|
|
||||||
print("[stat.topics] load_stat took %fs " % (end - start))
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_shouts(topic):
|
async def get_shouts(topic):
|
||||||
self = TopicStat
|
self = TopicStat
|
||||||
|
@ -59,8 +55,10 @@ class TopicStat:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with local_session() as session:
|
with local_session() as session:
|
||||||
|
ts = time.time()
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
await self.load_stat(session)
|
await self.load_stat(session)
|
||||||
|
print("[stat.topicstat] load_stat took %fs " % (time.time() - ts))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
raise Exception(err)
|
raise Exception(err)
|
||||||
if first_run:
|
if first_run:
|
||||||
|
|
|
@ -86,24 +86,23 @@ class ViewedStorage:
|
||||||
""" query all the pages from ackee sorted by views count """
|
""" query all the pages from ackee sorted by views count """
|
||||||
start = time.time()
|
start = time.time()
|
||||||
self = ViewedStorage
|
self = ViewedStorage
|
||||||
async with self.lock:
|
try:
|
||||||
|
self.pages = await self.client.execute_async(load_pages)
|
||||||
|
self.pages = self.pages["domains"][0]["statistics"]["pages"]
|
||||||
|
print("[stat.viewed] ackee pages updated")
|
||||||
|
shouts = {}
|
||||||
try:
|
try:
|
||||||
self.pages = await self.client.execute_async(load_pages)
|
for page in self.pages:
|
||||||
self.pages = self.pages["domains"][0]["statistics"]["pages"]
|
p = page["value"].split("?")[0]
|
||||||
print("[stat.viewed] ackee pages updated")
|
slug = p.split('discours.io/')[-1]
|
||||||
shouts = {}
|
shouts[slug] = page["count"]
|
||||||
try:
|
for slug, v in shouts:
|
||||||
for page in self.pages:
|
await ViewedStorage.increment(slug, v)
|
||||||
p = page["value"].split("?")[0]
|
except Exception:
|
||||||
slug = p.split('discours.io/')[-1]
|
pass
|
||||||
shouts[slug] = page["count"]
|
print("[stat.viewed] %d pages collected " % len(shouts.keys()))
|
||||||
for slug, v in shouts:
|
except Exception as e:
|
||||||
await ViewedStorage.increment(slug, v)
|
raise e
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
print("[stat.viewed] %d pages collected " % len(shouts.keys()))
|
|
||||||
except Exception as e:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
end = time.time()
|
end = time.time()
|
||||||
print("[stat.viewed] update_pages took %fs " % (end - start))
|
print("[stat.viewed] update_pages took %fs " % (end - start))
|
||||||
|
@ -180,8 +179,10 @@ class ViewedStorage:
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
ts = time.time()
|
||||||
await self.update_pages()
|
await self.update_pages()
|
||||||
failed = 0
|
failed = 0
|
||||||
|
print("[stat.viewed] update_pages took %fs " % (time.time() - ts))
|
||||||
except Exception:
|
except Exception:
|
||||||
failed += 1
|
failed += 1
|
||||||
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)
|
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)
|
||||||
|
|
|
@ -1,46 +1,49 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
from base.orm import local_session
|
from base.orm import local_session
|
||||||
from orm.shout import ShoutAuthor, Shout
|
from orm.shout import ShoutAuthor
|
||||||
|
|
||||||
|
|
||||||
class ShoutAuthorStorage:
|
class ShoutAuthorStorage:
|
||||||
authors_by_shout = {}
|
authors_by_shout = {}
|
||||||
lock = asyncio.Lock()
|
lock = asyncio.Lock()
|
||||||
period = 30 * 60 # sec
|
# period = 30 * 60 # sec
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def load_captions(session):
|
async def load_captions(session):
|
||||||
self = ShoutAuthorStorage
|
self = ShoutAuthorStorage
|
||||||
sas = session.query(ShoutAuthor).join(Shout).all()
|
sas = session.query(ShoutAuthor).all()
|
||||||
for sa in sas:
|
for sa in sas:
|
||||||
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, [])
|
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, {})
|
||||||
self.authors_by_shout[sa.shout].append([sa.user, sa.caption])
|
self.authors_by_shout[sa.shout][sa.user] = sa.caption
|
||||||
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
|
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
async def get_authors(shout):
|
|
||||||
self = ShoutAuthorStorage
|
|
||||||
async with self.lock:
|
|
||||||
return self.authors_by_shout.get(shout, [])
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_author_caption(shout, author):
|
async def get_author_caption(shout, author):
|
||||||
self = ShoutAuthorStorage
|
self = ShoutAuthorStorage
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
for a in self.authors_by_shout.get(shout, []):
|
return self.authors_by_shout.get(shout, {}).get(author)
|
||||||
if author in a:
|
|
||||||
return a[1]
|
@staticmethod
|
||||||
return {"error": "author caption not found"}
|
async def set_author_caption(shout, author, caption):
|
||||||
|
self = ShoutAuthorStorage
|
||||||
|
async with self.lock:
|
||||||
|
self.authors_by_shout[shout] = self.authors_by_shout.get(shout, {})
|
||||||
|
self.authors_by_shout[shout][author] = caption
|
||||||
|
return {
|
||||||
|
"error": None,
|
||||||
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def worker():
|
async def worker():
|
||||||
self = ShoutAuthorStorage
|
self = ShoutAuthorStorage
|
||||||
while True:
|
async with self.lock:
|
||||||
|
# while True:
|
||||||
try:
|
try:
|
||||||
with local_session() as session:
|
with local_session() as session:
|
||||||
async with self.lock:
|
ts = time.time()
|
||||||
await self.load_captions(session)
|
await self.load_captions(session)
|
||||||
print("[zine.authors] index by authors was updated")
|
print("[zine.authors] load_captions took %fs " % (time.time() - ts))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print("[zine.authors] error indexing by author: %s" % (err))
|
print("[zine.authors] error indexing by author: %s" % (err))
|
||||||
await asyncio.sleep(self.period)
|
# await asyncio.sleep(self.period)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user