-reactions.storage, +collectionShouts.query, fixes

This commit is contained in:
tonyrewin 2022-08-13 12:48:07 +03:00
parent 5859a4db40
commit f0b625af53
19 changed files with 277 additions and 381 deletions

View File

@ -12,11 +12,11 @@ from auth.email import email_authorize
from base.redis import redis
from base.resolvers import resolvers
from resolvers.zine import ShoutsCache
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
from services.zine.gittask import GitTask
from services.stat.topicstat import TopicStat
from services.zine.shoutauthor import ShoutAuthorStorage
from services.zine.reactions import ReactionsStorage
import asyncio
import_module('resolvers')
@ -30,8 +30,8 @@ middleware = [
async def start_up():
await redis.connect()
viewed_storage_task = asyncio.create_task(ViewedStorage.worker())
reacted_storage_task = asyncio.create_task(ReactedStorage.worker())
shouts_cache_task = asyncio.create_task(ShoutsCache.worker())
reaction_stat_task = asyncio.create_task(ReactionsStorage.worker())
shout_author_task = asyncio.create_task(ShoutAuthorStorage.worker())
topic_stat_task = asyncio.create_task(TopicStat.worker())
git_task = asyncio.create_task(GitTask.git_task_worker())

View File

@ -1,7 +1,6 @@
from datetime import datetime
from dateutil.parser import parse as date_parse
from orm import Reaction, User
from orm import reaction
from base.orm import local_session
from migration.html2text import html2text
from orm.reaction import ReactionKind

View File

@ -6,6 +6,7 @@ from orm.topic import Topic, TopicFollower
from orm.notification import Notification
from orm.shout import Shout
from orm.reaction import Reaction
from services.stat.reacted import ReactedStorage
from services.zine.topics import TopicStorage
from services.auth.users import UserStorage
from services.stat.viewed import ViewedStorage
@ -24,6 +25,7 @@ Role.init_table()
with local_session() as session:
ViewedStorage.init(session)
ReactedStorage.init(session)
RoleStorage.init(session)
UserStorage.init(session)
TopicStorage.init(session)

View File

@ -10,16 +10,42 @@ class ReactionKind(enum.Enum):
DISAGREE = 2 # -1
PROOF = 3 # +1
DISPROOF = 4 # -1
ASK = 5 # +0
ASK = 5 # +0 bookmark
PROPOSE = 6 # +0
QUOTE = 7 # +0
QUOTE = 7 # +0 bookmark
COMMENT = 8 # +0
ACCEPT = 9 # +1
REJECT = 0 # -1
LIKE = 11 # +1
DISLIKE = 12 # -1
# TYPE = <reaction index> # rating change guess
# TYPE = <reaction index> # rating diff
def kind_to_rate(kind) -> int:
if kind in [
ReactionKind.AGREE,
ReactionKind.LIKE,
ReactionKind.PROOF,
ReactionKind.ACCEPT
]: return 1
elif kind in [
ReactionKind.DISAGREE,
ReactionKind.DISLIKE,
ReactionKind.DISPROOF,
ReactionKind.REJECT
]: return -1
else: return 0
def get_bookmarked(reactions):
c = 0
for r in reactions:
c += 1 if r.kind in [ ReactionKind.QUOTE, ReactionKind.ASK] else 0
return c
def get_rating(reactions):
rating = 0
for r in reactions:
rating += kind_to_rate(r.kind)
return rating
class Reaction(Base):
__tablename__ = 'reaction'
@ -38,13 +64,15 @@ class Reaction(Base):
@property
async def stat(self):
reacted = 0
reacted = []
try:
with local_session() as session:
reacted = session.query(Reaction).filter(Reaction.replyTo == self.id).count()
reacted = session.query(Reaction).filter(Reaction.replyTo == self.id).all()
except Exception as e:
print(e)
return {
"viewed": await ViewedStorage.get_reaction(self.slug),
"reacted": reacted
"viewed": await ViewedStorage.get_reaction(self.id),
"reacted": reacted.count(),
"rating": get_rating(reacted),
"bookmarked": get_bookmarked(reacted)
}

View File

@ -3,10 +3,10 @@ from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Boolean
from sqlalchemy.orm import relationship
from orm.user import User
from orm.topic import Topic, ShoutTopic
from orm.reaction import Reaction
from services.zine.reactions import ReactionsStorage
from orm.reaction import Reaction, get_bookmarked
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
from base.orm import Base
from base.orm import Base, local_session
class ShoutReactionsFollower(Base):
@ -63,7 +63,15 @@ class Shout(Base):
@property
async def stat(self):
reacted = []
try:
with local_session() as session:
reacted = session.query(Reaction).where(Reaction.shout == self.slug).all()
except Exception as e:
print(e)
return {
"viewed": await ViewedStorage.get_shout(self.slug),
"reacted": await ReactionsStorage.by_shout(self.slug)
"reacted": await ReactedStorage.get_shout(self.slug),
"rating": await ReactedStorage.get_rating(self.slug),
"bookmarked": get_bookmarked(reacted)
}

View File

@ -1,10 +1,10 @@
from orm.collection import Collection, CollectionFollower
from orm.collection import Collection
from base.orm import local_session
from orm.user import User
from base.resolvers import mutation, query
from auth.authenticate import login_required
from datetime import datetime
from typing import Collection, List
from typing import Collection
from sqlalchemy import and_
@mutation.field("createCollection")

View File

@ -1,3 +1,4 @@
from sqlalchemy import desc
from orm.reaction import Reaction
from base.orm import local_session
from orm.shout import ShoutReactionsFollower
@ -5,8 +6,7 @@ from orm.user import User
from base.resolvers import mutation, query
from auth.authenticate import login_required
from datetime import datetime
from services.zine.reactions import ReactionsStorage
from services.stat.viewed import ViewedStorage
from services.stat.reacted import ReactedStorage
def reactions_follow(user, slug, auto=False):
with local_session() as session:
@ -49,8 +49,10 @@ def reactions_unfollow(user, slug):
async def create_reaction(_, info, inp):
user = info.context["request"].user
reaction = Reaction.create(**inp)
# TODO: filter allowed reaction kinds
reaction = Reaction.create(**inp)
ReactedStorage.increment(reaction.shout, reaction.replyTo)
try:
reactions_follow(user, inp['shout'], True)
except Exception as e:
@ -101,26 +103,34 @@ async def delete_reaction(_, info, id):
return {}
@query.field("reactionsByShout")
def get_shout_reactions(_, info, slug):
#offset = page * size
#end = offset + size
return ReactionsStorage.reactions_by_shout.get(slug, []) #[offset:end]
def get_shout_reactions(_, info, slug, page, size):
offset = page * size
reactions = []
with local_session() as session:
reactions = session.query(Reaction).filter(Reaction.shout == slug).limit(size).offset(offset).all()
return reactions
@query.field("reactionsAll")
def get_all_reactions(_, info, page=1, size=10):
offset = page * size
end = offset + size
return ReactionsStorage.reactions[offset:end]
reactions = []
with local_session() as session:
stmt = session.query(Reaction).\
filter(Reaction.deletedAt == None).\
order_by(desc("createdAt")).\
offset(offset).limit(size)
reactions = []
for row in session.execute(stmt):
reaction = row.Reaction
reactions.append(reaction)
reactions.sort(key=lambda x: x.createdAt, reverse=True)
return reactions
@query.field("reactionsByAuthor")
def get_reactions_by_author(_, info, slug, page=1, size=50):
offset = page * size
end = offset + size
return ReactionsStorage.reactions_by_author.get(slug, [])[offset:end]
@mutation.field("viewReaction")
async def view_reaction(_, info, reaction):
await ViewedStorage.inc_reaction(reaction)
return {"error" : ""}
reactions = []
with local_session() as session:
reactions = session.query(Reaction).filter(Reaction.createdBy == slug).limit(size).offset(offset).all()
return reactions

View File

@ -1,3 +1,4 @@
from orm.collection import ShoutCollection
from orm.shout import Shout, ShoutAuthor, ShoutTopic
from orm.topic import Topic
from base.orm import local_session
@ -81,6 +82,18 @@ async def shouts_by_topics(_, info, slugs, page, size):
offset(page * size)
return shouts
@query.field("shoutsByCollection")
async def shouts_by_topics(_, info, collection, page, size):
page = page - 1
with local_session() as session:
shouts = session.query(Shout).\
join(ShoutCollection, ShoutCollection.collection == collection).\
where(and_(ShoutCollection.shout == Shout.slug, Shout.publishedAt != None)).\
order_by(desc(Shout.publishedAt)).\
limit(size).\
offset(page * size)
return shouts
@query.field("shoutsByAuthors")
async def shouts_by_authors(_, info, slugs, page, size):
page = page - 1

View File

@ -254,6 +254,7 @@ type Query {
# collection
getCollection(author: String!, slug: String!): Collection!
shoutsByCollection(collection: String, page: Int, size: Int): [Shout]!
# communities
getCommunity(slug: String): Community!
@ -408,8 +409,10 @@ type Shout {
}
type Stat {
viewed: Int!
reacted: Int!
viewed: Int
reacted: Int
rating: Int
bookmarked: Int
}
type Community {
@ -435,6 +438,7 @@ type TopicStat {
followers: Int!
authors: Int!
viewed: Int!
reacted: Int!
}
type Topic {

View File

@ -13,7 +13,7 @@ class RoleStorage:
roles = session.query(Role).\
options(selectinload(Role.permissions)).all()
self.roles = dict([(role.id, role) for role in roles])
print('[service.auth] %d roles' % len(roles))
print('[auth.roles] %d precached' % len(roles))
@staticmethod

View File

@ -13,7 +13,7 @@ class UserStorage:
users = session.query(User).\
options(selectinload(User.roles), selectinload(User.ratings)).all()
self.users = dict([(user.id, user) for user in users])
print('[service.auth] %d users' % len(self.users))
print('[auth.users] %d precached' % len(self.users))
@staticmethod
async def get_user(id):

View File

@ -1,9 +1,12 @@
import asyncio
from datetime import datetime
from typing_extensions import Self
from sqlalchemy.types import Enum
from sqlalchemy import Column, DateTime, ForeignKey, Integer
from sqlalchemy.orm.attributes import flag_modified
from base.orm import Base, local_session
from orm.reaction import ReactionKind
from orm.reaction import ReactionKind, kind_to_rate
from orm.topic import ShoutTopic
class ReactedByDay(Base):
__tablename__ = "reacted_by_day"
@ -12,42 +15,50 @@ class ReactedByDay(Base):
reaction = Column(ForeignKey("reaction.id"), primary_key = True)
shout = Column(ForeignKey('shout.slug'), primary_key=True)
reply = Column(ForeignKey('reaction.id'), primary_key=True, nullable=True)
kind = Column(ReactionKind, primary_key=True)
kind: int = Column(Enum(ReactionKind), nullable=False, comment="Reaction kind")
day = Column(DateTime, primary_key=True, default=datetime.now)
class ReactedStorage:
reacted = {
'shouts': {
'total': 0,
'today': 0,
'month': 0,
# TODO: need an opionated metrics list
},
'topics': {} # TODO: get sum reactions for all shouts in topic
'shouts': {},
'topics': {},
'reactions': {}
}
this_day_reactions = {}
rating = {
'shouts': {},
'topics': {},
'reactions': {}
}
reactions = []
to_flush = []
period = 30*60 # sec
lock = asyncio.Lock()
@staticmethod
def init(session):
def prepare(session):
self = ReactedStorage
reactions = session.query(ReactedByDay).all()
for reaction in reactions:
all_reactions = session.query(ReactedByDay).all()
day_start = datetime.now().replace(hour=0, minute=0, second=0)
for reaction in all_reactions:
day = reaction.day
shout = reaction.shout
value = reaction.value
if shout:
old_value = self.reacted['shouts'].get(shout, 0)
self.reacted['shouts'][shout] = old_value + value
if not shout in self.this_day_reactions:
self.this_day_reactions[shout] = reaction
this_day_reaction = self.this_day_reactions[shout]
if this_day_reaction.day < reaction.day:
self.this_day_reactions[shout] = reaction
topics = session.query(ShoutTopic.topic).where(ShoutTopic.shout == shout).all()
kind = reaction.kind
self.reacted['shouts'][shout] = self.reacted['shouts'].get(shout, 0) + 1
self.rating['shouts'][shout] = self.rating['shouts'].get(shout, 0) + kind_to_rate(kind)
for t in topics:
self.reacted['topics'][t] = self.reacted['topics'].get(t, 0) + 1 # reactions amount
self.rating['topics'][t] = self.rating['topics'].get(t, 0) + kind_to_rate(kind) # rating
if reaction.reply:
self.reacted['reactions'][reaction.reply] = self.reacted['reactions'].get(reaction.reply, 0) + 1
self.rating['reactions'][reaction.reply] = self.rating['reactions'].get(reaction.reply, 0) + kind_to_rate(reaction.kind)
print('[stat.reacted] %d shouts reacted' % len(self.reacted['shouts']))
print('[stat.reacted] %d reactions reacted' % len(self.reacted['reactions']))
print('[service.reacted] watching %d shouts' % len(reactions))
@staticmethod
async def get_shout(shout_slug):
@ -55,7 +66,24 @@ class ReactedStorage:
async with self.lock:
return self.reacted['shouts'].get(shout_slug, 0)
# NOTE: this method is never called
@staticmethod
async def get_topic(topic_slug):
self = ReactedStorage
async with self.lock:
return self.reacted['topics'].get(topic_slug, 0)
@staticmethod
async def get_rating(shout_slug):
self = ReactedStorage
async with self.lock:
return self.reacted['shouts'].get(shout_slug, 0)
@staticmethod
async def get_topic_rating(topic_slug):
self = ReactedStorage
async with self.lock:
return self.rating['topics'].get(topic_slug, 0)
@staticmethod
async def get_reaction(reaction_id):
self = ReactedStorage
@ -63,63 +91,35 @@ class ReactedStorage:
return self.reacted['reactions'].get(reaction_id, 0)
@staticmethod
async def inc_shout(shout_slug):
async def get_reaction_rating(reaction_id):
self = ReactedStorage
async with self.lock:
this_day_reaction = self.this_day_reactions.get(shout_slug)
day_start = datetime.now().replace(hour=0, minute=0, second=0)
if not this_day_reaction or this_day_reaction.day < day_start:
if this_day_reaction and getattr(this_day_reaction, "modified", False):
self.to_flush.append(this_day_reaction)
this_day_reaction = ReactedByDay.create(shout=shout_slug, value=1)
self.this_day_reactions[shout_slug] = this_day_reaction
else:
this_day_reaction.value = this_day_reaction.value + 1
this_day_reaction.modified = True
old_value = self.reacted['shouts'].get(shout_slug, 0)
self.reacted['shotus'][shout_slug] = old_value + 1
return self.rating['reactions'].get(reaction_id, 0)
@staticmethod
async def inc_reaction(shout_slug, reaction_id):
async def increment(shout_slug, kind, reply_id = None):
self = ReactedStorage
reaction: ReactedByDay = None
async with self.lock:
this_day_reaction = self.this_day_reactions.get(reaction_id)
day_start = datetime.now().replace(hour=0, minute=0, second=0)
if not this_day_reaction or this_day_reaction.day < day_start:
if this_day_reaction and getattr(this_day_reaction, "modified", False):
self.to_flush.append(this_day_reaction)
this_day_reaction = ReactedByDay.create(
shout=shout_slug, reaction=reaction_id, value=1)
self.this_day_reactions[shout_slug] = this_day_reaction
else:
this_day_reaction.value = this_day_reaction.value + 1
this_day_reaction.modified = True
old_value = self.reacted['shouts'].get(shout_slug, 0)
self.reacted['shouts'][shout_slug] = old_value + 1
old_value = self.reacted['reactions'].get(shout_slug, 0)
self.reacted['reaction'][reaction_id] = old_value + 1
@staticmethod
async def flush_changes(session):
self = ReactedStorage
async with self.lock:
for reaction in self.this_day_reactions.values():
if getattr(reaction, "modified", False):
session.add(reaction)
flag_modified(reaction, "value")
reaction.modified = False
for reaction in self.to_flush:
session.add(reaction)
self.to_flush.clear()
session.commit()
reaction = ReactedByDay.create(shout=shout_slug, kind=kind, reply=reply_id)
self.reacted['shouts'][shout_slug] = self.reacted['shouts'].get(shout_slug, [])
self.reacted['shouts'][shout_slug].append(reaction)
if reply_id:
self.reacted['reaction'][reply_id] = self.reacted['reactions'].get(shout_slug, [])
self.reacted['reaction'][reply_id].append(reaction)
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
await ReactedStorage.flush_changes(session)
print("[service.reacted] service flushed changes")
ReactedStorage.prepare(session)
print("[stat.reacted] updated")
except Exception as err:
print("[service.reacted] errror: %s" % (err))
print("[stat.reacted] error: %s" % (err))
raise err
await asyncio.sleep(ReactedStorage.period)
@staticmethod
def init(session):
ReactedStorage.prepare(session)

View File

@ -1,5 +1,7 @@
import asyncio
from base.orm import local_session
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedStorage
from services.zine.shoutauthor import ShoutAuthorStorage
from orm.topic import ShoutTopic, TopicFollower
from typing import Dict
@ -8,7 +10,6 @@ class TopicStat:
shouts_by_topic = {}
authors_by_topic = {}
followers_by_topic = {}
reactions_by_topic = {}
lock = asyncio.Lock()
period = 30*60 #sec
@ -32,8 +33,8 @@ class TopicStat:
else:
self.authors_by_topic[topic] = set(authors)
print('[service.topicstat] authors sorted')
print('[service.topicstat] shouts sorted')
print('[stat.topics] authors sorted')
print('[stat.topics] shouts sorted')
self.followers_by_topic = {}
followings = session.query(TopicFollower)
@ -44,7 +45,7 @@ class TopicStat:
self.followers_by_topic[topic].append(user)
else:
self.followers_by_topic[topic] = [user]
print('[service.topicstat] followers sorted')
print('[stat.topics] followers sorted')
@staticmethod
async def get_shouts(topic):
@ -59,13 +60,14 @@ class TopicStat:
shouts = self.shouts_by_topic.get(topic, [])
followers = self.followers_by_topic.get(topic, [])
authors = self.authors_by_topic.get(topic, [])
reactions = self.reactions_by_topic.get(topic, [])
return {
"shouts" : len(shouts),
"authors" : len(authors),
"followers" : len(followers),
"reactions" : len(reactions)
"viewed": ViewedStorage.get_topic(topic),
"reacted" : ReactedStorage.get_topic(topic),
"rating" : ReactedStorage.get_topic_rating(topic),
}
@staticmethod
@ -76,8 +78,8 @@ class TopicStat:
with local_session() as session:
async with self.lock:
await self.load_stat(session)
print("[service.topicstat] updated")
print("[stat.topics] updated")
except Exception as err:
print("[service.topicstat] errror: %s" % (err))
print("[stat.topics] errror: %s" % (err))
await asyncio.sleep(self.period)

View File

@ -3,6 +3,7 @@ from datetime import datetime
from sqlalchemy import Column, DateTime, ForeignKey, Integer
from sqlalchemy.orm.attributes import flag_modified
from base.orm import Base, local_session
from orm.topic import ShoutTopic
class ViewedByDay(Base):
@ -17,7 +18,7 @@ class ViewedByDay(Base):
class ViewedStorage:
viewed = {
'shouts': {},
'topics': {} # TODO: get sum views for all shouts in topic
'topics': {}
}
this_day_views = {}
to_flush = []
@ -31,17 +32,21 @@ class ViewedStorage:
for view in views:
shout = view.shout
topics = session.query(ShoutTopic.topic).filter(ShoutTopic.shout == shout).all()
value = view.value
if shout:
old_value = self.viewed['shouts'].get(shout, 0)
self.viewed['shouts'][shout] = old_value + value
for t in topics:
old_topic_value = self.viewed['topics'].get(t, 0)
self.viewed['topics'][t] = old_topic_value + value
if not shout in self.this_day_views:
self.this_day_views[shout] = view
this_day_view = self.this_day_views[shout]
if this_day_view.day < view.day:
self.this_day_views[shout] = view
print('[service.viewed] watching %d shouts' % len(views))
print('[stat.viewed] watching %d shouts' % len(views))
@staticmethod
async def get_shout(shout_slug):
@ -49,15 +54,14 @@ class ViewedStorage:
async with self.lock:
return self.viewed['shouts'].get(shout_slug, 0)
# NOTE: this method is never called
@staticmethod
async def get_reaction(reaction_id):
async def get_topic(topic_slug):
self = ViewedStorage
async with self.lock:
return self.viewed['reactions'].get(reaction_id, 0)
return self.viewed['topics'].get(topic_slug, 0)
@staticmethod
async def inc_shout(shout_slug):
async def increment(shout_slug):
self = ViewedStorage
async with self.lock:
this_day_view = self.this_day_views.get(shout_slug)
@ -71,27 +75,14 @@ class ViewedStorage:
this_day_view.value = this_day_view.value + 1
this_day_view.modified = True
old_value = self.viewed['shouts'].get(shout_slug, 0)
self.viewed['shotus'][shout_slug] = old_value + 1
@staticmethod
async def inc_reaction(shout_slug, reaction_id):
self = ViewedStorage
async with self.lock:
this_day_view = self.this_day_views.get(reaction_id)
day_start = datetime.now().replace(hour=0, minute=0, second=0)
if not this_day_view or this_day_view.day < day_start:
if this_day_view and getattr(this_day_view, "modified", False):
self.to_flush.append(this_day_view)
this_day_view = ViewedByDay.create(
shout=shout_slug, reaction=reaction_id, value=1)
self.this_day_views[shout_slug] = this_day_view
else:
this_day_view.value = this_day_view.value + 1
this_day_view.modified = True
old_value = self.viewed['shouts'].get(shout_slug, 0)
self.viewed['shouts'][shout_slug] = old_value + 1
old_value = self.viewed['reactions'].get(shout_slug, 0)
self.viewed['reaction'][reaction_id] = old_value + 1
with local_session() as session:
topics = session.query(ShoutTopic.topic).where(ShoutTopic.shout == shout_slug).all()
for t in topics:
old_topic_value = self.viewed['topics'].get(t, 0)
self.viewed['topics'][t] = old_topic_value + 1
flag_modified(this_day_view, "value")
@staticmethod
async def flush_changes(session):
@ -113,7 +104,7 @@ class ViewedStorage:
try:
with local_session() as session:
await ViewedStorage.flush_changes(session)
print("[service.viewed] service flushed changes")
print("[stat.viewed] service flushed changes")
except Exception as err:
print("[service.viewed] errror: %s" % (err))
print("[stat.viewed] errror: %s" % (err))
await asyncio.sleep(ViewedStorage.period)

View File

@ -53,10 +53,10 @@ class GitTask:
@staticmethod
async def git_task_worker():
print("[resolvers.git] worker start")
print("[service.git] worker start")
while True:
task = await GitTask.queue.get()
try:
task.execute()
except Exception as err:
print("[resolvers.git] worker error: %s" % (err))
print("[service.git] worker error: %s" % (err))

View File

@ -1,159 +0,0 @@
import asyncio
from sqlalchemy import and_, desc, func
from sqlalchemy.orm import joinedload
from base.orm import local_session
from orm.reaction import Reaction, ReactionKind
from orm.topic import ShoutTopic
def kind_to_rate(kind) -> int:
if kind in [
ReactionKind.AGREE,
ReactionKind.LIKE,
ReactionKind.PROOF,
ReactionKind.ACCEPT
]: return 1
elif kind in [
ReactionKind.DISAGREE,
ReactionKind.DISLIKE,
ReactionKind.DISPROOF,
ReactionKind.REJECT
]: return -1
else: return 0
class ReactionsStorage:
limit = 200
reactions = []
rating_by_shout = {}
reactions_by_shout = {}
reactions_by_topic = {} # TODO: get sum reactions for all shouts in topic
reactions_by_author = {}
lock = asyncio.Lock()
period = 3*60 # 3 mins
@staticmethod
async def prepare_all(session):
stmt = session.query(Reaction).\
filter(Reaction.deletedAt == None).\
order_by(desc("createdAt")).\
limit(ReactionsStorage.limit)
reactions = []
for row in session.execute(stmt):
reaction = row.Reaction
reactions.append(reaction)
reactions.sort(key=lambda x: x.createdAt, reverse=True)
async with ReactionsStorage.lock:
print("[service.reactions] %d recently published reactions " % len(reactions))
ReactionsStorage.reactions = reactions
@staticmethod
async def prepare_by_author(session):
try:
by_authors = session.query(Reaction.createdBy, func.count('*').label("count")).\
where(and_(Reaction.deletedAt == None)).\
group_by(Reaction.createdBy).all()
except Exception as e:
print(e)
by_authors = {}
async with ReactionsStorage.lock:
ReactionsStorage.reactions_by_author = dict([stat for stat in by_authors])
print("[service.reactions] %d reacted users" % len(by_authors))
@staticmethod
async def prepare_by_shout(session):
try:
by_shouts = session.query(Reaction.shout, func.count('*').label("count")).\
where(and_(Reaction.deletedAt == None)).\
group_by(Reaction.shout).all()
except Exception as e:
print(e)
by_shouts = {}
async with ReactionsStorage.lock:
ReactionsStorage.reactions_by_shout = dict([stat for stat in by_shouts])
print("[service.reactions] %d reacted shouts" % len(by_shouts))
@staticmethod
async def calc_ratings(session):
rating_by_shout = {}
for shout in ReactionsStorage.reactions_by_shout.keys():
rating_by_shout[shout] = 0
shout_reactions_by_kinds = session.query(Reaction).\
where(and_(Reaction.deletedAt == None, Reaction.shout == shout)).\
group_by(Reaction.kind, Reaction.id).all()
for reaction in shout_reactions_by_kinds:
rating_by_shout[shout] += kind_to_rate(reaction.kind)
async with ReactionsStorage.lock:
ReactionsStorage.rating_by_shout = rating_by_shout
@staticmethod
async def prepare_by_topic(session):
# TODO: optimize
by_topics = session.query(Reaction, func.count('*').label("count")).\
options(
joinedload(ShoutTopic),
joinedload(Reaction.shout)
).\
join(ShoutTopic, ShoutTopic.shout == Reaction.shout).\
filter(Reaction.deletedAt == None).\
group_by(ShoutTopic.topic).all()
reactions_by_topic = {}
for t, reactions in by_topics:
if not reactions_by_topic.get(t):
reactions_by_topic[t] = 0
for r in reactions:
reactions_by_topic[t] += r.count
async with ReactionsStorage.lock:
ReactionsStorage.reactions_by_topic = reactions_by_topic
@staticmethod
async def recent():
async with ReactionsStorage.lock:
return ReactionsStorage.reactions
@staticmethod
async def total():
async with ReactionsStorage.lock:
return len(ReactionsStorage.reactions)
@staticmethod
async def by_shout(shout):
async with ReactionsStorage.lock:
stat = ReactionsStorage.reactions_by_shout.get(shout)
stat = stat if stat else 0
return stat
@staticmethod
async def shout_rating(shout):
async with ReactionsStorage.lock:
return ReactionsStorage.rating_by_shout.get(shout)
@staticmethod
async def by_author(slug):
async with ReactionsStorage.lock:
stat = ReactionsStorage.reactions_by_author.get(slug)
stat = stat if stat else 0
return stat
@staticmethod
async def by_topic(topic):
async with ReactionsStorage.lock:
stat = ReactionsStorage.reactions_by_topic.get(topic)
stat = stat if stat else 0
return stat
@staticmethod
async def worker():
while True:
try:
with local_session() as session:
await ReactionsStorage.prepare_all(session)
print("[service.reactions] all reactions prepared")
await ReactionsStorage.prepare_by_shout(session)
print("[service.reactions] reactions by shouts prepared")
await ReactionsStorage.calc_ratings(session)
print("[service.reactions] reactions ratings prepared")
await ReactionsStorage.prepare_by_topic(session)
print("[service.reactions] reactions topics prepared")
except Exception as err:
print("[service.reactions] errror: %s" % (err))
await asyncio.sleep(ReactionsStorage.period)

View File

@ -20,8 +20,7 @@ class ShoutAuthorStorage:
self.authors_by_shout[shout].append(user)
else:
self.authors_by_shout[shout] = [user]
print('[service.shoutauthor] %d authors ' % len(self.authors_by_shout))
# FIXME: [service.shoutauthor] 4251 authors
print('[zine.authors] %d shouts preprocessed' % len(self.authors_by_shout))
@staticmethod
async def get_authors(shout):
@ -37,7 +36,7 @@ class ShoutAuthorStorage:
with local_session() as session:
async with self.lock:
await self.load(session)
print("[service.shoutauthor] updated")
print("[zine.authors] state updated")
except Exception as err:
print("[service.shoutauthor] errror: %s" % (err))
print("[zine.authors] errror: %s" % (err))
await asyncio.sleep(self.period)

View File

@ -2,12 +2,11 @@
import asyncio
from datetime import datetime, timedelta
from sqlalchemy import and_, desc, func, select
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.orm import selectinload
from base.orm import local_session
from orm.reaction import Reaction
from orm.shout import Shout
from orm.topic import Topic
from services.zine.reactions import ReactionsStorage
from services.stat.reacted import ReactedStorage
from services.stat.viewed import ViewedByDay
@ -27,11 +26,11 @@ class ShoutsCache:
shouts = []
for row in session.execute(stmt):
shout = row.Shout
shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0
shout.rating = await ReactedStorage.get_rating(shout.slug) or 0
shouts.append(shout)
async with ShoutsCache.lock:
ShoutsCache.recent_published = shouts
print("[service.shoutscache] %d recently published shouts " % len(shouts))
print("[zine.cache] %d recently published shouts " % len(shouts))
@staticmethod
async def prepare_recent_all():
@ -47,11 +46,11 @@ class ShoutsCache:
for row in session.execute(stmt):
shout = row.Shout
# shout.topics = [t.slug for t in shout.topics]
shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0
shout.rating = await ReactedStorage.get_rating(shout.slug) or 0
shouts.append(shout)
async with ShoutsCache.lock:
ShoutsCache.recent_all = shouts
print("[service.shoutscache] %d recently created shouts " % len(shouts))
print("[zine.cache] %d recently created shouts " % len(shouts))
@staticmethod
async def prepare_recent_reacted():
@ -70,11 +69,11 @@ class ShoutsCache:
for row in session.execute(stmt):
shout = row.Shout
# shout.topics = [t.slug for t in shout.topics]
shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0
shout.rating = await ReactedStorage.get_rating(shout.slug) or 0
shouts.append(shout)
async with ShoutsCache.lock:
ShoutsCache.recent_reacted = shouts
print("[service.shoutscache] %d recently reacted shouts " % len(shouts))
print("[zine.cache] %d recently reacted shouts " % len(shouts))
@staticmethod
@ -93,11 +92,11 @@ class ShoutsCache:
# with rating synthetic counter
for row in session.execute(stmt):
shout = row.Shout
shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0
shout.rating = await ReactedStorage.get_rating(shout.slug) or 0
shouts.append(shout)
shouts.sort(key = lambda shout: shout.rating, reverse = True)
async with ShoutsCache.lock:
print("[service.shoutscache] %d top shouts " % len(shouts))
print("[zine.cache] %d top shouts " % len(shouts))
ShoutsCache.top_overall = shouts
@staticmethod
@ -114,11 +113,11 @@ class ShoutsCache:
shouts = []
for row in session.execute(stmt):
shout = row.Shout
shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0
shout.rating = await ReactedStorage.get_rating(shout.slug) or 0
shouts.append(shout)
shouts.sort(key = lambda shout: shout.rating, reverse = True)
async with ShoutsCache.lock:
print("[service.shoutscache] %d top month shouts " % len(shouts))
print("[zine.cache] %d top month shouts " % len(shouts))
ShoutsCache.top_month = shouts
@staticmethod
@ -135,11 +134,11 @@ class ShoutsCache:
shouts = []
for row in session.execute(stmt):
shout = row.Shout
shout.rating = await ReactionsStorage.shout_rating(shout.slug) or 0
shout.rating = await ReactedStorage.get_rating(shout.slug) or 0
shouts.append(shout)
# shouts.sort(key = lambda shout: shout.viewed, reverse = True)
async with ShoutsCache.lock:
print("[service.shoutscache] %d top viewed shouts " % len(shouts))
print("[zine.cache] %d top viewed shouts " % len(shouts))
ShoutsCache.top_viewed = shouts
@staticmethod
@ -152,8 +151,8 @@ class ShoutsCache:
await ShoutsCache.prepare_recent_published()
await ShoutsCache.prepare_recent_all()
await ShoutsCache.prepare_recent_reacted()
print("[service.shoutscache] updated")
print("[zine.cache] updated")
except Exception as err:
print("[service.shoutscache] error: %s" % (err))
print("[zine.cache] error: %s" % (err))
raise err
await asyncio.sleep(ShoutsCache.period)

View File

@ -12,9 +12,9 @@ class TopicStorage:
topics = session.query(Topic)
self.topics = dict([(topic.slug, topic) for topic in topics])
for topic in self.topics.values():
self.load_parents(topic) # TODO: test
self.load_parents(topic)
print('[service.topics] %d ' % len(self.topics.keys()))
print('[zine.topics] %d ' % len(self.topics.keys()))
@staticmethod
def load_parents(topic):