more optimizations

removed AuthourShoutStore
loadShouts:sort by viewed
loadShouts with_author_cations option added
This commit is contained in:
Igor Lobanov 2022-11-24 18:19:43 +01:00
parent 38d7dd719c
commit 747873a9d8
8 changed files with 87 additions and 86 deletions

View File

@ -20,7 +20,6 @@ from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat from services.stat.topicstat import TopicStat
from services.stat.viewed import ViewedStorage from services.stat.viewed import ViewedStorage
from services.zine.gittask import GitTask from services.zine.gittask import GitTask
from services.zine.shoutauthor import ShoutAuthorStorage
from settings import DEV_SERVER_STATUS_FILE_NAME from settings import DEV_SERVER_STATUS_FILE_NAME
import_module("resolvers") import_module("resolvers")
@ -40,8 +39,6 @@ async def start_up():
print(views_stat_task) print(views_stat_task)
reacted_storage_task = asyncio.create_task(ReactedStorage.worker()) reacted_storage_task = asyncio.create_task(ReactedStorage.worker())
print(reacted_storage_task) print(reacted_storage_task)
shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker())
print(shout_author_task)
topic_stat_task = asyncio.create_task(TopicStat.worker()) topic_stat_task = asyncio.create_task(TopicStat.worker())
print(topic_stat_task) print(topic_stat_task)
git_task = asyncio.create_task(GitTask.git_task_worker()) git_task = asyncio.create_task(GitTask.git_task_worker())

View File

@ -4,12 +4,26 @@ from sqlalchemy.orm import joinedload
from sqlalchemy.sql.expression import desc, asc, select, case from sqlalchemy.sql.expression import desc, asc, select, case
from base.orm import local_session from base.orm import local_session
from base.resolvers import query from base.resolvers import query
from orm.shout import Shout from orm import ViewedEntry
from orm.shout import Shout, ShoutAuthor
from orm.reaction import Reaction, ReactionKind from orm.reaction import Reaction, ReactionKind
from services.zine.shoutauthor import ShoutAuthorStorage
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
def add_rating_column(q):
return q.join(Reaction).add_columns(sa.func.sum(case(
(Reaction.kind == ReactionKind.AGREE, 1),
(Reaction.kind == ReactionKind.DISAGREE, -1),
(Reaction.kind == ReactionKind.PROOF, 1),
(Reaction.kind == ReactionKind.DISPROOF, -1),
(Reaction.kind == ReactionKind.ACCEPT, 1),
(Reaction.kind == ReactionKind.REJECT, -1),
(Reaction.kind == ReactionKind.LIKE, 1),
(Reaction.kind == ReactionKind.DISLIKE, -1),
else_=0
)).label('rating'))
def apply_filters(q, filters, user=None): def apply_filters(q, filters, user=None):
filters = {} if filters is None else filters filters = {} if filters is None else filters
if filters.get("reacted") and user: if filters.get("reacted") and user:
@ -35,15 +49,21 @@ def apply_filters(q, filters, user=None):
@query.field("loadShout") @query.field("loadShout")
async def load_shout(_, info, slug): async def load_shout(_, info, slug):
with local_session() as session: with local_session() as session:
shout = session.query(Shout).options( q = select(Shout).options(
# TODO add cation # TODO add cation
joinedload(Shout.authors), joinedload(Shout.authors),
joinedload(Shout.topics), joinedload(Shout.topics),
).filter( )
q = add_rating_column(q)
q = q.filter(
Shout.slug == slug Shout.slug == slug
).filter( ).filter(
Shout.deletedAt.is_(None) Shout.deletedAt.is_(None)
).one() ).group_by(Shout.id)
[shout, rating] = session.execute(q).unique().one()
shout.stat = await ReactedStorage.get_shout_stat(shout.slug, rating)
return shout return shout
@ -84,22 +104,12 @@ async def load_shouts_by(_, info, options):
) )
user = info.context["request"].user user = info.context["request"].user
q = apply_filters(q, options.get("filters"), user) q = apply_filters(q, options.get("filters"), user)
q = q.join(Reaction).add_columns(sa.func.sum(case( q = add_rating_column(q)
(Reaction.kind == ReactionKind.AGREE, 1),
(Reaction.kind == ReactionKind.DISAGREE, -1),
(Reaction.kind == ReactionKind.PROOF, 1),
(Reaction.kind == ReactionKind.DISPROOF, -1),
(Reaction.kind == ReactionKind.ACCEPT, 1),
(Reaction.kind == ReactionKind.REJECT, -1),
(Reaction.kind == ReactionKind.LIKE, 1),
(Reaction.kind == ReactionKind.DISLIKE, -1),
else_=0
)).label('rating'))
o = options.get("order_by") o = options.get("order_by")
if o: if o:
q = q.add_columns(sa.func.count(Reaction.id).label(o))
if o == 'comments': if o == 'comments':
q = q.add_columns(sa.func.count(Reaction.id).label(o))
q = q.join(Reaction, Shout.slug == Reaction.shout) q = q.join(Reaction, Shout.slug == Reaction.shout)
q = q.filter(Reaction.body.is_not(None)) q = q.filter(Reaction.body.is_not(None))
elif o == 'reacted': elif o == 'reacted':
@ -108,12 +118,17 @@ async def load_shouts_by(_, info, options):
).add_columns( ).add_columns(
sa.func.max(Reaction.createdAt).label(o) sa.func.max(Reaction.createdAt).label(o)
) )
elif o == 'views':
q = q.join(ViewedEntry)
q = q.add_columns(sa.func.sum(ViewedEntry.amount).label(o))
order_by = o order_by = o
else: else:
order_by = Shout.createdAt order_by = Shout.createdAt
order_by_desc = True if options.get('order_by_desc') is None else options.get('order_by_desc') order_by_desc = True if options.get('order_by_desc') is None else options.get('order_by_desc')
with_author_captions = False if options.get('with_author_captions') is None else options.get('with_author_captions')
query_order_by = desc(order_by) if order_by_desc else asc(order_by) query_order_by = desc(order_by) if order_by_desc else asc(order_by)
offset = options.get("offset", 0) offset = options.get("offset", 0)
limit = options.get("limit", 10) limit = options.get("limit", 10)
@ -124,9 +139,25 @@ async def load_shouts_by(_, info, options):
for shout in shouts: for shout in shouts:
shout.stat = await ReactedStorage.get_shout_stat(shout.slug, shout.rating) shout.stat = await ReactedStorage.get_shout_stat(shout.slug, shout.rating)
del shout.rating del shout.rating
for author in shout.authors:
author.caption = await ShoutAuthorStorage.get_author_caption(shout.slug, author.slug) author_captions = {}
if with_author_captions:
author_captions_result = session.query(ShoutAuthor).where(
ShoutAuthor.shout.in_(map(lambda s: s.slug, shouts))).all()
for author_captions_result_item in author_captions_result:
if author_captions.get(author_captions_result_item.shout) is None:
author_captions[author_captions_result_item.shout] = {}
author_captions[
author_captions_result_item.shout
][
author_captions_result_item.user
] = author_captions_result_item.caption
for author in shout.authors:
author.caption = author_captions[shout.slug][author.slug]
return shouts return shouts

View File

@ -7,11 +7,11 @@ from auth.authenticate import login_required
from base.orm import local_session from base.orm import local_session
from base.resolvers import mutation, query from base.resolvers import mutation, query
from orm.reaction import Reaction from orm.reaction import Reaction
from orm.shout import ShoutAuthor
from orm.topic import Topic, TopicFollower from orm.topic import Topic, TopicFollower
from orm.user import AuthorFollower, Role, User, UserRating, UserRole from orm.user import AuthorFollower, Role, User, UserRating, UserRole
from services.stat.reacted import ReactedStorage from services.stat.reacted import ReactedStorage
from services.stat.topicstat import TopicStat from services.stat.topicstat import TopicStat
from services.zine.shoutauthor import ShoutAuthor
# from .community import followed_communities # from .community import followed_communities
from resolvers.inbox.unread import get_total_unread_counter from resolvers.inbox.unread import get_total_unread_counter

View File

@ -241,6 +241,7 @@ input LoadShoutsFilters {
input LoadShoutsOptions { input LoadShoutsOptions {
filters: LoadShoutsFilters filters: LoadShoutsFilters
with_author_captions: Boolean
limit: Int! limit: Int!
offset: Int offset: Int
order_by: String order_by: String

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import time
from base.orm import local_session from base.orm import local_session
from orm.reaction import ReactionKind, Reaction from orm.reaction import ReactionKind, Reaction
from services.zine.topics import TopicStorage from services.zine.topics import TopicStorage
@ -175,6 +176,7 @@ class ReactedStorage:
@staticmethod @staticmethod
async def recount_changed(session): async def recount_changed(session):
start = time.time()
self = ReactedStorage self = ReactedStorage
async with self.lock: async with self.lock:
sss = list(self.modified_shouts) sss = list(self.modified_shouts)
@ -191,6 +193,9 @@ class ReactedStorage:
print("[stat.reacted] %d replies" % len(self.reacted["reactions"])) print("[stat.reacted] %d replies" % len(self.reacted["reactions"]))
self.modified_shouts = set([]) self.modified_shouts = set([])
end = time.time()
print("[stat.reacted] recount_changed took %fs " % (end - start))
@staticmethod @staticmethod
async def worker(): async def worker():
while True: while True:

View File

@ -1,9 +1,9 @@
import asyncio import asyncio
import time
from base.orm import local_session from base.orm import local_session
from orm.shout import Shout, ShoutTopic from orm.shout import Shout, ShoutTopic, ShoutAuthor
from orm.topic import TopicFollower from orm.topic import TopicFollower
from services.zine.shoutauthor import ShoutAuthorStorage from sqlalchemy.sql.expression import select
class TopicStat: class TopicStat:
@ -17,22 +17,24 @@ class TopicStat:
@staticmethod @staticmethod
async def load_stat(session): async def load_stat(session):
start = time.time()
self = TopicStat self = TopicStat
shout_topics = session.query(ShoutTopic).all() shout_topics = session.query(ShoutTopic, Shout).join(Shout).all()
all_shout_authors = session.query(ShoutAuthor).all()
print("[stat.topics] %d links for shouts" % len(shout_topics)) print("[stat.topics] %d links for shouts" % len(shout_topics))
for shout_topic in shout_topics: for [shout_topic, shout] in shout_topics:
tpc = shout_topic.topic tpc = shout_topic.topic
# shouts by topics # shouts by topics
shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first() # shout = session.query(Shout).where(Shout.slug == shout_topic.shout).first()
self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict()) self.shouts_by_topic[tpc] = self.shouts_by_topic.get(tpc, dict())
self.shouts_by_topic[tpc][shout.slug] = shout self.shouts_by_topic[tpc][shout.slug] = shout
# authors by topics # authors by topics
authors = await ShoutAuthorStorage.get_authors(shout.slug) shout_authors = filter(lambda asa: asa.shout == shout.slug, all_shout_authors)
self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict()) self.authors_by_topic[tpc] = self.authors_by_topic.get(tpc, dict())
for a in authors: for sa in shout_authors:
[aslug, acaption] = a self.authors_by_topic[tpc][sa.shout] = sa.caption
self.authors_by_topic[tpc][aslug] = acaption
self.followers_by_topic = {} self.followers_by_topic = {}
followings = session.query(TopicFollower).all() followings = session.query(TopicFollower).all()
@ -43,6 +45,9 @@ class TopicStat:
self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict()) self.followers_by_topic[topic] = self.followers_by_topic.get(topic, dict())
self.followers_by_topic[topic][userslug] = userslug self.followers_by_topic[topic][userslug] = userslug
end = time.time()
print("[stat.topics] load_stat took %fs " % (end - start))
@staticmethod @staticmethod
async def get_shouts(topic): async def get_shouts(topic):
self = TopicStat self = TopicStat
@ -52,6 +57,7 @@ class TopicStat:
@staticmethod @staticmethod
async def worker(): async def worker():
self = TopicStat self = TopicStat
first_run = True
while True: while True:
try: try:
with local_session() as session: with local_session() as session:
@ -59,4 +65,9 @@ class TopicStat:
await self.load_stat(session) await self.load_stat(session)
except Exception as err: except Exception as err:
raise Exception(err) raise Exception(err)
if first_run:
# sleep for period + 1 min after first run
# to distribute load on server by workers with the same period
await asyncio.sleep(60)
first_run = False
await asyncio.sleep(self.period) await asyncio.sleep(self.period)

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import time
from datetime import timedelta, timezone, datetime from datetime import timedelta, timezone, datetime
from gql import Client, gql from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport from gql.transport.aiohttp import AIOHTTPTransport
@ -9,7 +10,6 @@ from orm.viewed import ViewedEntry
from ssl import create_default_context from ssl import create_default_context
from os import environ, path from os import environ, path
load_facts = gql(""" load_facts = gql("""
query getDomains { query getDomains {
domains { domains {
@ -82,8 +82,9 @@ class ViewedStorage:
self.disabled = True self.disabled = True
@staticmethod @staticmethod
async def update_pages(session): async def update_pages():
""" query all the pages from ackee sorted by views count """ """ query all the pages from ackee sorted by views count """
start = time.time()
self = ViewedStorage self = ViewedStorage
async with self.lock: async with self.lock:
try: try:
@ -104,6 +105,9 @@ class ViewedStorage:
except Exception as e: except Exception as e:
raise e raise e
end = time.time()
print("[stat.viewed] update_pages took %fs " % (end - start))
@staticmethod @staticmethod
async def get_facts(): async def get_facts():
self = ViewedStorage self = ViewedStorage
@ -176,9 +180,8 @@ class ViewedStorage:
async with self.lock: async with self.lock:
while True: while True:
try: try:
with local_session() as session: await self.update_pages()
await self.update_pages(session) failed = 0
failed = 0
except Exception: except Exception:
failed += 1 failed += 1
print("[stat.viewed] update failed #%d, wait 10 seconds" % failed) print("[stat.viewed] update failed #%d, wait 10 seconds" % failed)

View File

@ -1,47 +0,0 @@
import asyncio
from base.orm import local_session
from orm.shout import ShoutAuthor, Shout
class ShoutAuthorStorage:
authors_by_shout = {}
lock = asyncio.Lock()
period = 30 * 60 # sec
@staticmethod
async def load(session):
self = ShoutAuthorStorage
sas = session.query(ShoutAuthor).join(Shout).all()
for sa in sas:
self.authors_by_shout[sa.shout] = self.authors_by_shout.get(sa.shout, [])
self.authors_by_shout[sa.shout].append([sa.user, sa.caption])
print("[zine.authors] %d shouts indexed by authors" % len(self.authors_by_shout))
@staticmethod
async def get_authors(shout):
self = ShoutAuthorStorage
async with self.lock:
return self.authors_by_shout.get(shout, [])
@staticmethod
async def get_author_caption(shout, author):
self = ShoutAuthorStorage
async with self.lock:
for a in self.authors_by_shout.get(shout, []):
if author in a:
return a[1]
return {"error": "author caption not found"}
@staticmethod
async def worker():
self = ShoutAuthorStorage
while True:
try:
with local_session() as session:
async with self.lock:
await self.load(session)
print("[zine.authors] index by authors was updated")
except Exception as err:
print("[zine.authors] error indexing by author: %s" % (err))
await asyncio.sleep(self.period)