core/services/zine/shoutscache.py
2022-11-13 07:34:02 +03:00

286 lines
11 KiB
Python

import asyncio
from datetime import datetime, timedelta
from sqlalchemy import and_, desc, func, select
from sqlalchemy.orm import selectinload
from base.orm import local_session
from orm.reaction import Reaction, ReactionKind
from orm.shout import Shout
from services.stat.reacted import ReactedStorage
async def get_shout_stat(slug):
return {
# TODO: use ackee as datasource
"viewed": 0, # await ViewedStorage.get_shout(slug),
"reacted": len(await ReactedStorage.get_shout(slug)),
"commented": len(await ReactedStorage.get_comments(slug)),
"rating": await ReactedStorage.get_rating(slug),
}
async def prepare_shouts(session, stmt):
shouts = []
print(stmt)
for s in list(map(lambda r: r.Shout, session.execute(stmt))):
s.stat = await get_shout_stat(s.slug)
shouts.append(s)
return shouts
LAYOUTS = ['audio', 'video', 'image', 'literature']
class ShoutsCache:
# limit = 200
period = 60 * 60 # 1 hour
lock = asyncio.Lock()
recent_published = []
recent_all = []
recent_reacted = []
recent_commented = []
top_month = []
top_overall = []
top_commented = []
by_author = {}
by_topic = {}
by_layout = {}
@staticmethod
async def prepare_recent_published():
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(Shout)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics)
)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt.is_not(None))
.group_by(Shout.id)
.order_by(desc("publishedAt"))
# .limit(ShoutsCache.limit)
),
)
async with ShoutsCache.lock:
for s in shouts:
for a in s.authors:
ShoutsCache.by_author[a.slug] = ShoutsCache.by_author.get(a.slug, {})
ShoutsCache.by_author[a.slug][s.slug] = s
for t in s.topics:
ShoutsCache.by_topic[t.slug] = ShoutsCache.by_topic.get(t.slug, {})
ShoutsCache.by_topic[t.slug][s.slug] = s
if s.layout in LAYOUTS:
ShoutsCache.by_layout[s.layout] = ShoutsCache.by_layout.get(s.layout, [])
ShoutsCache.by_layout[s.layout].append(s)
print("[zine.cache] indexed by %d topics " % len(ShoutsCache.by_topic.keys()))
print("[zine.cache] indexed by %d authors " % len(ShoutsCache.by_author.keys()))
print("[zine.cache] indexed by %d layouts " % len(ShoutsCache.by_layout.keys()))
ShoutsCache.recent_published = shouts
print("[zine.cache] %d recently published shouts " % len(shouts))
@staticmethod
async def prepare_recent_all():
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(Shout)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics)
)
.where(Shout.deletedAt.is_(None))
.group_by(Shout.id)
.order_by(desc("createdAt"))
# .limit(ShoutsCache.limit)
)
)
async with ShoutsCache.lock:
ShoutsCache.recent_all = shouts
print("[zine.cache] %d recently created shouts " % len(ShoutsCache.recent_all))
@staticmethod
async def prepare_recent_reacted():
with local_session() as session:
reactions = session.query(Reaction).order_by(Reaction.createdAt).all()
# .limit(ShoutsCache.limit)
reacted_slugs = set([])
for r in reactions:
reacted_slugs.add(r.shout)
shouts = await prepare_shouts(
session,
(
select(
Shout,
Reaction.createdAt.label('reactedAt')
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction)
.where(and_(Shout.deletedAt.is_(None), Shout.slug.in_(reacted_slugs)))
.filter(Shout.publishedAt.is_not(None))
.group_by(Shout.id, "reactedAt")
.order_by(desc("reactedAt"))
# .limit(ShoutsCache.limit)
)
)
async with ShoutsCache.lock:
ShoutsCache.recent_reacted = shouts
print("[zine.cache] %d recently reacted shouts " % len(shouts))
@staticmethod
async def prepare_recent_commented():
with local_session() as session:
reactions = session.query(Reaction).order_by(Reaction.createdAt).all()
# .limit(ShoutsCache.limit)
commented_slugs = set([])
for r in reactions:
if r.body and len(r.body) > 0:
commented_slugs.add(r.shout)
shouts = await prepare_shouts(
session,
(
select(
Shout,
Reaction.createdAt.label('reactedAt')
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction)
.where(and_(Shout.deletedAt.is_(None), Shout.slug.in_(commented_slugs)))
.group_by(Shout.id, "reactedAt")
.order_by(desc("reactedAt"))
# .limit(ShoutsCache.limit)
)
)
async with ShoutsCache.lock:
ShoutsCache.recent_commented = shouts
print("[zine.cache] %d recently commented shouts " % len(shouts))
@staticmethod
async def prepare_top_overall():
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(
Shout,
func.sum(Reaction.id).label('reacted')
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction, Reaction.kind == ReactionKind.LIKE)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt.is_not(None))
.group_by(Shout.id)
.order_by(desc("reacted"))
# .limit(ShoutsCache.limit)
),
)
shouts.sort(key=lambda s: s.stat["rating"], reverse=True)
async with ShoutsCache.lock:
print("[zine.cache] %d top rated published " % len(shouts))
ShoutsCache.top_overall = shouts
@staticmethod
async def prepare_top_month():
month_ago = datetime.now() - timedelta(days=30)
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(Shout)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions),
)
.join(Reaction)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt > month_ago)
.group_by(Shout.id)
# .limit(ShoutsCache.limit)
),
)
shouts.sort(key=lambda s: s.stat["rating"], reverse=True)
async with ShoutsCache.lock:
ShoutsCache.top_month = shouts
print("[zine.cache] %d top month published " % len(ShoutsCache.top_month))
@staticmethod
async def prepare_top_commented():
month_ago = datetime.now() - timedelta(days=30)
with local_session() as session:
shouts = await prepare_shouts(
session,
(
select(
Shout,
func.sum(Reaction.id).label("commented")
)
.options(
selectinload(Shout.authors),
selectinload(Shout.topics),
selectinload(Shout.reactions)
)
.join(Reaction, func.length(Reaction.body) > 0)
.where(Shout.deletedAt.is_(None))
.filter(Shout.publishedAt > month_ago)
.group_by(Shout.id)
.order_by(desc("commented"))
# .limit(ShoutsCache.limit)
),
)
shouts.sort(key=lambda s: s.stat["commented"], reverse=True)
async with ShoutsCache.lock:
ShoutsCache.top_commented = shouts
print("[zine.cache] %d last month top commented shouts " % len(ShoutsCache.top_commented))
@staticmethod
async def get_top_published_before(daysago, offset, limit):
shouts_by_rating = []
before = datetime.now() - timedelta(days=daysago)
for s in ShoutsCache.recent_published:
if s.publishedAt >= before:
shouts_by_rating.append(s)
shouts_by_rating.sort(lambda s: s.stat["rating"], reverse=True)
return shouts_by_rating
@staticmethod
async def get_all_authors_slugs():
slugs = ShoutsCache.by_author.keys()
return slugs
@staticmethod
async def worker():
while True:
try:
await ShoutsCache.prepare_top_month()
await ShoutsCache.prepare_top_overall()
await ShoutsCache.prepare_top_commented()
await ShoutsCache.prepare_recent_published()
await ShoutsCache.prepare_recent_all()
await ShoutsCache.prepare_recent_reacted()
await ShoutsCache.prepare_recent_commented()
print("[zine.cache] periodical update")
except Exception as err:
print("[zine.cache] error: %s" % (err))
raise err
await asyncio.sleep(ShoutsCache.period)