Merge remote-tracking branch 'origin/main' into storages-to-qeuries

This commit is contained in:
Igor Lobanov
2022-11-28 13:54:22 +01:00
30 changed files with 605 additions and 430 deletions

View File

@@ -20,11 +20,13 @@ class SearchService:
cached = await redis.execute("GET", text)
if not cached:
async with SearchService.lock:
by = {
options = {
"title": text,
"body": text
"body": text,
"limit": limit,
"offset": offset
}
payload = await load_shouts_by(None, None, by, limit, offset)
payload = await load_shouts_by(None, None, options)
await redis.execute("SET", text, json.dumps(payload))
return payload
else:

View File

@@ -76,9 +76,9 @@ class ViewedStorage:
self.client = create_client({
"Authorization": "Bearer %s" % str(token)
}, schema=schema_str)
print("[stat.viewed] authorized permanentely by ackee.discours.io: %s" % token)
print("[stat.viewed] * authorized permanentely by ackee.discours.io: %s" % token)
else:
print("[stat.viewed] please set ACKEE_TOKEN")
print("[stat.viewed] * please set ACKEE_TOKEN")
self.disabled = True
@staticmethod
@@ -86,27 +86,26 @@ class ViewedStorage:
""" query all the pages from ackee sorted by views count """
start = time.time()
self = ViewedStorage
async with self.lock:
try:
self.pages = await self.client.execute_async(load_pages)
self.pages = self.pages["domains"][0]["statistics"]["pages"]
print("[stat.viewed] ⎪ ackee pages updated")
shouts = {}
try:
self.pages = await self.client.execute_async(load_pages)
self.pages = self.pages["domains"][0]["statistics"]["pages"]
print("[stat.viewed] ackee pages updated")
shouts = {}
try:
for page in self.pages:
p = page["value"].split("?")[0]
slug = p.split('discours.io/')[-1]
shouts[slug] = page["count"]
for slug, v in shouts:
await ViewedStorage.increment(slug, v)
except Exception:
pass
print("[stat.viewed] %d pages collected " % len(shouts.keys()))
except Exception as e:
raise e
for page in self.pages:
p = page["value"].split("?")[0]
slug = p.split('discours.io/')[-1]
shouts[slug] = page["count"]
for slug, v in shouts:
await ViewedStorage.increment(slug, v)
except Exception:
pass
print("[stat.viewed] ⎪ %d pages collected " % len(shouts.keys()))
except Exception as e:
raise e
end = time.time()
print("[stat.viewed] update_pages took %fs " % (end - start))
print("[stat.viewed] update_pages took %fs " % (end - start))
@staticmethod
async def get_facts():
@@ -180,21 +179,22 @@ class ViewedStorage:
async with self.lock:
while True:
try:
print("[stat.viewed] ⎧ updating views...")
await self.update_pages()
failed = 0
except Exception:
failed += 1
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)
if failed > 3:
print("[stat.viewed] not trying to update anymore")
print("[stat.viewed] not trying to update anymore")
break
if failed == 0:
when = datetime.now(timezone.utc) + timedelta(seconds=self.period)
t = format(when.astimezone().isoformat())
print("[stat.viewed] next update: %s" % (
print("[stat.viewed] next update: %s" % (
t.split("T")[0] + " " + t.split("T")[1].split(".")[0]
))
await asyncio.sleep(self.period)
else:
await asyncio.sleep(10)
print("[stat.viewed] trying to update data again...")
print("[stat.viewed] trying to update data again...")

View File

@@ -0,0 +1,49 @@
import asyncio
import time
from base.orm import local_session
from orm.shout import ShoutAuthor
class ShoutAuthorStorage:
authors_by_shout = {}
lock = asyncio.Lock()
# period = 30 * 60 # sec
@staticmethod
async def load_captions(session):
self = ShoutAuthorStorage
sas = session.query(ShoutAuthor).all()
for sa in sas:
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, {})
self.authors_by_shout[sa.shout][sa.user] = sa.caption
print("[zine.authors] ⎧ %d shouts indexed by authors" % len(self.authors_by_shout))
@staticmethod
async def get_author_caption(shout, author):
self = ShoutAuthorStorage
async with self.lock:
return self.authors_by_shout.get(shout, {}).get(author)
@staticmethod
async def set_author_caption(shout, author, caption):
self = ShoutAuthorStorage
async with self.lock:
self.authors_by_shout[shout] = self.authors_by_shout.get(shout, {})
self.authors_by_shout[shout][author] = caption
return {
"error": None,
}
@staticmethod
async def worker():
self = ShoutAuthorStorage
async with self.lock:
# while True:
try:
with local_session() as session:
ts = time.time()
await self.load_captions(session)
print("[zine.authors] ⎩ load_captions took %fs " % (time.time() - ts))
except Exception as err:
print("[zine.authors] ⎩ error indexing by author: %s" % (err))
# await asyncio.sleep(self.period)

97
services/zine/topics.py Normal file
View File

@@ -0,0 +1,97 @@
import asyncio
from base.orm import local_session
from orm.topic import Topic
from orm.shout import Shout
import sqlalchemy as sa
from sqlalchemy import select
class TopicStorage:
topics = {}
lock = asyncio.Lock()
random_topics = []
@staticmethod
def init(session):
self = TopicStorage
topics = session.query(Topic)
self.topics = dict([(topic.slug, topic) for topic in topics])
for tpc in self.topics.values():
# self.load_parents(tpc)
pass
print("[zine.topics] %d precached" % len(self.topics.keys()))
# @staticmethod
# def load_parents(topic):
# self = TopicStorage
# parents = []
# for parent in self.topics.values():
# if topic.slug in parent.children:
# parents.append(parent.slug)
# topic.parents = parents
# return topic
@staticmethod
def get_random_topics(amount):
return TopicStorage.random_topics[0:amount]
@staticmethod
def renew_topics_random():
with local_session() as session:
q = select(Topic).join(Shout).group_by(Topic.id).having(sa.func.count(Shout.id) > 2).order_by(
sa.func.random()).limit(50)
TopicStorage.random_topics = list(map(
lambda result_item: result_item.Topic, session.execute(q)
))
@staticmethod
async def worker():
self = TopicStorage
async with self.lock:
while True:
try:
self.renew_topics_random()
except Exception as err:
print("[zine.topics] error %s" % (err))
await asyncio.sleep(300) # 5 mins
@staticmethod
async def get_topics_all():
self = TopicStorage
async with self.lock:
return list(self.topics.values())
@staticmethod
async def get_topics_by_slugs(slugs):
self = TopicStorage
async with self.lock:
if not slugs:
return self.topics.values()
topics = filter(lambda topic: topic.slug in slugs, self.topics.values())
return list(topics)
@staticmethod
async def get_topics_by_community(community):
self = TopicStorage
async with self.lock:
topics = filter(
lambda topic: topic.community == community, self.topics.values()
)
return list(topics)
@staticmethod
async def get_topics_by_author(author):
self = TopicStorage
async with self.lock:
topics = filter(
lambda topic: topic.community == author, self.topics.values()
)
return list(topics)
@staticmethod
async def update_topic(topic):
self = TopicStorage
async with self.lock:
self.topics[topic.slug] = topic
# self.load_parents(topic)