work-load-fixes

This commit is contained in:
tonyrewin 2022-10-01 13:19:13 +03:00
parent 67b3a5d529
commit 16ee157438
4 changed files with 42 additions and 77 deletions

View File

@ -5,7 +5,7 @@ from auth.authenticate import login_required
from base.orm import local_session from base.orm import local_session
from base.resolvers import mutation, query from base.resolvers import mutation, query
from orm.collection import ShoutCollection from orm.collection import ShoutCollection
from orm.shout import Shout, ShoutAuthor, ShoutTopic from orm.shout import Shout, ShoutTopic
from orm.topic import Topic from orm.topic import Topic
from resolvers.community import community_follow, community_unfollow from resolvers.community import community_follow, community_unfollow
from resolvers.profile import author_follow, author_unfollow from resolvers.profile import author_follow, author_unfollow
@ -117,23 +117,34 @@ async def get_search_results(_, _info, searchtext, offset, limit):
return shouts return shouts
@query.field("shoutsByTopics") @query.field("shoutsByAuthors")
async def shouts_by_topics(_, _info, slugs, offset, limit): async def shouts_by_authors(_, _info, slugs, offset, limit):
# TODO: use ShoutTopicsStorage shouts = []
with local_session() as session: for author in slugs:
shouts = ( shouts.extend(ShoutsCache.get_by_author(author))
session.query(Shout) shouts_prepared = []
.join(ShoutTopic)
.where(and_(ShoutTopic.topic.in_(slugs), Shout.publishedAt.is_not(None)))
.order_by(desc(Shout.publishedAt))
.limit(limit)
.offset(offset)
)
for s in shouts: for s in shouts:
if bool(s.publishedAt):
for a in s.authors: for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug) a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
return shouts shouts_prepared.append(s)
shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True)
return shouts_prepared[offset : offset + limit]
@query.field("shoutsByTopics")
async def shouts_by_topics(_, _info, slugs, offset, limit):
shouts = []
for topic in slugs:
shouts.extend(ShoutsCache.get_by_topic(topic))
shouts_prepared = []
for s in shouts:
if bool(s.publishedAt):
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
shouts_prepared.append(s)
shouts_prepared.sort(key=lambda s: s.publishedAt, reverse=True)
return shouts_prepared[offset : offset + limit]
@query.field("shoutsByCollection") @query.field("shoutsByCollection")
@ -153,25 +164,6 @@ async def shouts_by_collection(_, _info, collection, offset, limit):
return shouts return shouts
@query.field("shoutsByAuthors")
async def shouts_by_authors(_, _info, slugs, offset, limit):
# TODO: use ShoutAuthorsStorage
with local_session() as session:
shouts = (
session.query(Shout)
.join(ShoutAuthor)
.where(and_(ShoutAuthor.user.in_(slugs), Shout.publishedAt.is_not(None)))
.order_by(desc(Shout.publishedAt))
.limit(limit)
.offset(offset)
)
for s in shouts:
for a in s.authors:
a.caption = await ShoutAuthorStorage.get_author_caption(s.slug, a.slug)
return shouts
SINGLE_COMMUNITY = True SINGLE_COMMUNITY = True

View File

@ -1,22 +1,23 @@
import asyncio import asyncio
from base.orm import local_session from base.orm import local_session
from orm.shout import ShoutAuthor from orm.shout import ShoutAuthor, Shout
class ShoutAuthorStorage: class ShoutAuthorStorage:
authors_by_shout = {} authors_by_shout = {}
shouts_by_author = {}
lock = asyncio.Lock() lock = asyncio.Lock()
period = 30 * 60 # sec period = 30 * 60 # sec
@staticmethod @staticmethod
async def load(session): async def load(session):
self = ShoutAuthorStorage self = ShoutAuthorStorage
sas = session.query(ShoutAuthor).all() sas = session.query(ShoutAuthor).join(Shout).all()
for sa in sas: for sa in sas:
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, []) self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, [])
self.authors_by_shout[sa.shout].append([sa.user, sa.caption]) self.authors_by_shout[sa.shout].append([sa.user, sa.caption])
print("[zine.authors] %d shouts preprocessed" % len(self.authors_by_shout)) print("[zine.shouts] %d shouts indexed by authors" % len(self.authors_by_shout))
@staticmethod @staticmethod
async def get_authors(shout): async def get_authors(shout):
@ -41,7 +42,7 @@ class ShoutAuthorStorage:
with local_session() as session: with local_session() as session:
async with self.lock: async with self.lock:
await self.load(session) await self.load(session)
print("[zine.authors] state updated") print("[zine.shouts] index by authors was updated")
except Exception as err: except Exception as err:
print("[zine.authors] errror: %s" % (err)) print("[zine.shouts] error indexing by author: %s" % (err))
await asyncio.sleep(self.period) await asyncio.sleep(self.period)

View File

@ -244,6 +244,16 @@ class ShoutsCache:
print("[zine.cache] indexed by %d authors " % len(shouts_by_author.keys())) print("[zine.cache] indexed by %d authors " % len(shouts_by_author.keys()))
ShoutsCache.by_author = shouts_by_author ShoutsCache.by_author = shouts_by_author
@staticmethod
async def get_by_author(author):
async with ShoutsCache.lock:
return ShoutsCache.by_author.get(author, [])
@staticmethod
async def get_by_topic(topic):
async with ShoutsCache.lock:
return ShoutsCache.by_topic.get(topic, [])
@staticmethod @staticmethod
async def get_top_published_before(daysago, offset, limit): async def get_top_published_before(daysago, offset, limit):
shouts_by_rating = [] shouts_by_rating = []

View File

@ -1,38 +0,0 @@
import asyncio
from base.orm import local_session
from orm.shout import ShoutTopic
class ShoutTopicStorage:
topics_by_shout = {}
lock = asyncio.Lock()
period = 30 * 60 # sec
@staticmethod
async def load(session):
self = ShoutTopicStorage
sas = session.query(ShoutTopic).all()
for sa in sas:
self.topics_by_shout[sa.shout] = self.topics_by_shout.get(sa.shout, [])
self.topics_by_shout[sa.shout].append([sa.user, sa.caption])
print("[zine.topics] %d shouts preprocessed" % len(self.topics_by_shout))
@staticmethod
async def get_topics(shout):
self = ShoutTopicStorage
async with self.lock:
return self.topics_by_shout.get(shout, [])
@staticmethod
async def worker():
self = ShoutTopicStorage
while True:
try:
with local_session() as session:
async with self.lock:
await self.load(session)
print("[zine.topics] state updated")
except Exception as err:
print("[zine.topics] errror: %s" % (err))
await asyncio.sleep(self.period)